在多变量时序预测任务中,如何利用多头注意力机制求得各变量之间的相关系?
例如输入为:(100,4,12)其中12为多个变量,我想计算变量之间的相关性,下面代码好像是计算时间步之间的相关性,求各位指点指点哇
class MultiHeadSelfAttention(Layer):
def __init__(self, embed_dim, num_heads):
super(MultiHeadSelfAttention, self).__init__()
self.embed_dim = embed_dim
self.num_heads = num_heads
if embed_dim % num_heads != 0:
raise ValueError(
f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}"
)
self.projection_dim = embed_dim // num_heads
self.query_dense = Dense(embed_dim)
self.key_dense = Dense(embed_dim)
self.value_dense = Dense(embed_dim)
self.output_dense = Dense(embed_dim)
# self.attention=MultiHeadAttention()
def attention(self, query, key, value):
score = tf.matmul(query, key, transpose_b=True)
dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
scaled_score = score / tf.math.sqrt(dim_key)
weights = tf.nn.softmax(scaled_score, axis=-1)
out = tf.matmul(weights, value)
return out, weights
def separate_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, inputs):
# x.shape = [batch_size, seq_len, embedding_dim]
batch_size = tf.shape(inputs)[0]
# Query
# Dense layer to project and split embed_dim to num_heads*projection_dim
query = self.query_dense(inputs) # (batch_size, seq_len, embed_dim)
query = self.separate_heads(
query, batch_size
) # (batch_size, num_heads, seq_len, projection_dim)
# Key
# Dense layer to project and split embed_dim to num_heads*projection_dim
key = self.key_dense(inputs) # (batch_size, seq_len, embed_dim)
key = self.separate_heads(
key, batch_size
) # (batch_size, num_heads, seq_len, projection_dim)
# Value
# Dense layer to project and split embed_dim to num_heads*projection_dim
value = self.value_dense(inputs) # (batch_size, seq_len, embed_dim)
value = self.separate_heads(
value, batch_size
) # (batch_size, num_heads, seq_len, projection_dim)
# Attention
attention, weigths = self.attention(query, key, value)
attention = tf.transpose(
attention, perm=[0, 2, 1, 3]
) # (batch_size, seq_len, num_heads, projection_dim)
# Combine back to num_heads*projection_dim
concat_attention = tf.reshape(
attention, (batch_size, -1, self.embed_dim)
) # (batch_size, seq_len, embed_dim)
# Apply a final Dense layer
out = self.output_dense(
concat_attention
) # (batch_size, seq_len, embed_dim)
return out
引用new bing作答:
在多变量时序预测任务中,利用多头注意力机制求得各变量之间的相关系数,需要对输入的数据进行一些处理和调整。假设输入数据的维度为(batch_size, seq_len, num_variables),其中num_variables为变量的数量。具体的步骤如下:
1 将输入的数据进行转置,使得变量的数量成为第二个维度,即新的维度为(batch_size, num_variables, seq_len)。
2 将转置后的数据输入到多头注意力机制中,通过query, key和value的计算,可以得到注意力矩阵,注意力矩阵的维度为(batch_size, num_heads, seq_len, seq_len),其中seq_len为序列的长度,num_heads为注意力头的数量。
3 将注意力矩阵进行汇总,得到每个时间步与其他时间步的相关系数,具体的计算方法为将每个头的注意力矩阵相加并除以num_heads,得到的维度为(batch_size, seq_len, seq_len)。
4 最后,将相关系数矩阵进行转置,使得变量的数量成为第一个维度,即新的维度为(batch_size, num_variables, num_variables),即可得到各变量之间的相关系数矩阵。
下面是代码实现:
class MultiHeadAttention(Layer):
def __init__(self, embed_dim, num_heads):
super(MultiHeadAttention, self).__init__()
self.embed_dim = embed_dim
self.num_heads = num_heads
if embed_dim % num_heads != 0:
raise ValueError(
f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}"
)
self.projection_dim = embed_dim // num_heads
self.query_dense = Dense(embed_dim)
self.key_dense = Dense(embed_dim)
self.value_dense = Dense(embed_dim)
self.output_dense = Dense(embed_dim)
def attention(self, query, key, value):
score = tf.matmul(query, key, transpose_b=True)
dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
scaled_score = score / tf.math.sqrt(dim_key)
weights = tf.nn.softmax(scaled_score, axis=-1)
out = tf.matmul(weights, value)
return out, weights
def separate_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, inputs):
# x.shape = [batch_size, seq_len, num_variables]
batch_size = tf.shape(inputs)[0]
# Transpose inputs to (batch_size, num_variables, seq_len)
inputs = tf.transpose(inputs, perm=[0, 2, 1])
# Query
query = self.query_dense(inputs)
query = self.separate_heads(query, batch_size)
# Key
key = self.key_dense(inputs)
key = self.separate_heads(key, batch_size)
# Value
value = self.value_dense(inputs)
value = self.separate_heads(value, batch_size)
# Attention
query_attention, weights = self.attention(query, key, value)
# Concatenate heads
query_attention = tf.transpose(query_attention, perm=[0, 2, 1, 3])
concat_attention = tf.reshape(query_attention, (batch_size, -1, self.embed_dim))
# Output
output = self.output_dense(concat_attention)
return output
最后输出的张量形状为 [batch_size, seq_len, embed_dim],其中 embed_dim 是指输入张量中每个时间步的特征维度。如果需要对时间步进行汇总,可以对第二维进行相应的汇聚操作。
该回答引用于ChatGPT:
对于多变量时序预测任务中,可以使用多头注意力机制来计算不同变量之间的相关性。下面介绍一种利用多头注意力机制求得各变量之间的相关系数的方法:
首先,对于输入的数据,将每个时间步的变量按照最后一个维度进行拆分,得到 shape 为 (batch_size, num_vars, seq_len) 的张量。
然后,将该张量通过一个全连接层,得到 shape 为 (batch_size, num_vars, embed_dim) 的张量。
接着,将这个张量传入一个多头注意力机制层,注意力头数可以自己设置。多头注意力机制层的输出为 shape 为 (batch_size, num_vars, embed_dim) 的张量。
最后,对于每个时间步,分别计算不同变量之间的相关系数,可以通过计算对应时间步的多头注意力机制层输出的张量和该张量的转置的乘积。得到 shape 为 (batch_size, num_vars, num_vars) 的相关系数矩阵。
下面是相应的 Python 代码实现:
class MultiHeadAttention(Layer):
def __init__(self, embed_dim, num_heads):
super(MultiHeadAttention, self).__init__()
self.embed_dim = embed_dim
self.num_heads = num_heads
if embed_dim % num_heads != 0:
raise ValueError(
f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}"
)
self.projection_dim = embed_dim // num_heads
self.query_dense = Dense(embed_dim)
self.key_dense = Dense(embed_dim)
self.value_dense = Dense(embed_dim)
self.output_dense = Dense(embed_dim)
def call(self, inputs):
# inputs.shape = [batch_size, num_vars, seq_len]
batch_size = tf.shape(inputs)[0]
# Project to embedding space
x = self.query_dense(inputs) # (batch_size, num_vars, seq_len, embed_dim)
y = self.key_dense(inputs) # (batch_size, num_vars, seq_len, embed_dim)
z = self.value_dense(inputs) # (batch_size, num_vars, seq_len, embed_dim)
# Split embedding space to multiple heads
x = tf.reshape(x, [batch_size, -1, self.num_heads, self.projection_dim])
y = tf.reshape(y, [batch_size, -1, self.num_heads, self.projection_dim])
z = tf.reshape(z, [batch_size, -1, self.num_heads, self.projection_dim])
# Transpose for attention dot product
x = tf.transpose(x, [0, 2, 1, 3])
y = tf.transpose(y, [0, 2, 1, 3])
z = tf.transpose(z, [0, 2, 1, 3])
# Attention weights calculation
query_key_dot_product = tf.matmul(x, y, transpose_b=True)
scaled_query_key_dot_product = query_key_dot_product / (self.embed_dim ** 0.5)
attention_weights = tf.nn.softmax(scaled_query_key_dot_product, axis=-1)
# Attention output calculation
attention_output = tf.matmul(attention_weights, z)
# Reshape to original shape
attention_output = tf.transpose(attention_output, [0, 2, 1, 3])
attention_output = tf.reshape(attention_output, [batch_size, -1, self.embed_dim])
# Project to output embedding space
attention_output = self.output_dense(attention_output)
return attention_output
class MultiHeadAttentionWithCorrelation(Layer):
def __init__(self, embed_dim, num_heads):
super(MultiHeadAttentionWithCorrelation, self).__init__()
self.multi_head_attention = MultiHeadAttention(embed_dim, num_heads)
def correlation(self, x, y):
# x.shape = y.shape = (batch_size, num_vars, embed_dim)
mean_x = tf.reduce_mean(x, axis=1, keepdims=True) # (batch_size, 1, embed_dim)
mean_y = tf.reduce_mean(y, axis=1, keepdims=True) # (batch_size, 1, embed_dim)
diff_x = x - mean_x # (batch_size, num_vars, embed_dim)
diff_y = y - mean_y # (batch_size, num_vars, embed_dim)
corr = tf.matmul(diff_x, diff_y, transpose_a=True) / (tf.norm(diff_x, axis=-1) * tf.norm(diff_y, axis=-1))
return corr
def call(self, inputs):
# inputs.shape = [batch_size, num_vars, seq_len]
batch_size = tf.shape(inputs)[0]
# Multi-head attention
attn_output = self.multi_head_attention(inputs) # (batch_size, num_vars, embed_dim)
# Correlation between variables
attn_output_transpose = tf.transpose(attn_output, [0, 2, 1]) # (batch_size, embed_dim, num_vars)
attn_output_corr = tf.map_fn(lambda x: self.correlation(x, x), attn_output_transpose) # (batch_size, num_vars, num_vars)
return attn_output_corr
该代码实现了一个 MultiHeadAttentionWithCorrelation
类,该类包含了一个多头注意力机制层和一个计算变量相关系数的方法,输入为 shape 为 (batch_size, num_vars, seq_len) 的张量,输出为 shape 为 (batch_size, num_vars, num_vars) 的相关系数矩阵。可以根据实际需要修改超参数 embed_dim
和 num_heads
。
python
class MultiHeadAttention(Layer):
def __init__(self, num_heads, d_model, dropout=0.1):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.d_model = d_model
self.depth = d_model // num_heads
self.query_dense = Dense(d_model)
self.key_dense = Dense(d_model)
self.value_dense = Dense(d_model)
self.dropout = Dropout(dropout)
self.final_dense = Dense(d_model)
def split_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, inputs):
query, key, value, mask = inputs['query'], inputs['key'], inputs['value'], inputs['mask']
batch_size = tf.shape(query)[0]
query = self.query_dense(query) # (batch_size, seq_len, d_model)
key = self.key_dense(key) # (batch_size, seq_len, d_model)
value = self.value_dense(value) # (batch_size, seq_len, d_model)
query = self.split_heads(query, batch_size) # (batch_size, num_heads, seq_len, depth)
key = self.split_heads(key, batch_size) # (batch_size, num_heads, seq_len, depth)
value = self.split_heads(value, batch_size) # (batch_size, num_heads, seq_len, depth)
scaled_attention, attention_weights = self.scaled_dot_product_attention(query, key, value, mask)
scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3]) # (batch_size, seq_len, num_heads, depth)
concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model)) # (batch_size, seq_len, d_model)
output = self.final_dense(concat_attention) # (batch_size, seq_len, d_model)
return output
def scaled_dot_product_attention(self, query, key, value, mask):
matmul_qk = tf.matmul(query, key, transpose_b=True)
depth = tf.cast(tf.shape(key)[-1], tf.float32)
logits = matmul_qk / tf.math.sqrt(depth)
if mask is not None:
logits += (mask * -1e9)
attention_weights = tf.nn.softmax(logits, axis=-1)
if self.dropout is not None:
attention_weights = self.dropout(attention_weights)
output = tf.matmul(attention_weights, value)
return output, attention_weights
python
class MultiVarMultiHeadAttention(Layer):
def __init__(self, num_heads, d_model, num_vars, dropout=0.1):
super(MultiVarMultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.d_model = d_model
self.depth = d_model // num_heads
self.num_vars = num_vars
self.query_dense = Dense(d_model * num_vars)
self.key_dense = Dense(d_model * num_vars)
self.value_dense = Dense(d_model * num_vars)
self.dropout = Dropout(dropout)
self.final_dense = Dense(d_model)
def split_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, self.num_vars, -1, self.num_heads, self.depth))
return tf.transpose(x, perm=[0, 1, 3, 2, 4])
def call(self, inputs):
query, key, value, mask = inputs['query'], inputs['key'], inputs['value'], inputs['mask']
batch_size = tf.shape(query)[0]
query = self.query_dense(query) # (batch_size, num_vars*seq_len, d_model)
key = self.key_dense(key) # (batch_size, num_vars*seq_len, d_model)
value = self.value_dense(value) # (batch_size, num_vars*seq_len, d_model)
query = self.split_heads(query, batch_size) # (batch_size, num_vars, num_heads, seq_len, depth)
key = self.split_heads(key, batch_size) # (batch_size, num_vars, num_heads, seq_len, depth)
value = self.split_heads(value, batch_size) # (batch_size, num_vars, num_heads, seq_len, depth)
scaled_attention, attention_weights = self.scaled_dot_product_attention(query, key, value, mask)
scaled_attention = tf.transpose(scaled_attention, perm=[0, 1, 3, 2, 4]) # (batch_size, num_vars, seq_len, num_heads, depth)
concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model * self.num_vars)) # (batch_size, seq_len*num_vars, d_model)
output = self.final_dense(concat_attention) # (batch_size, num_vars, seq_len, d_model)
return output
def scaled_dot_product_attention(self, query, key, value, mask):
matmul_qk = tf.matmul(query, key, transpose_b=True)
depth = tf.cast(tf.shape(key)[-1], tf.float32)
logits = matmul_qk / tf.math.sqrt(depth)
if mask is not None:
logits += (mask * -1e9)
attention_weights = tf.nn.softmax(logits, axis=-1)
if self.dropout is not None:
attention_weights = self.dropout(attention_weights)
output = tf.matmul(attention_weights, value)
return output, attention_weights
python
class TemporalAttention(Layer):
def __init__(self, d_model, num_heads, dropout=0.1):
super(TemporalAttention, self).__init__()
self.num_heads = num_heads
self.d_model = d_model
self.depth = d_model // num_heads
self.query_dense = Dense(d_model)
self.key_dense = Dense(d_model)
self.value_dense = Dense(d_model)
self.dropout = Dropout(dropout)
self.final_dense = Dense(d_model)
def split_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, x):
query = key = value = x
batch_size, seq_len, d_model = tf.shape(x)[0], tf.shape(x)[1], tf.shape(x)[2]
query = self.query_dense(query) # (batch_size, seq_len, d_model)
key = self.key_dense(key) # (batch_size, seq_len, d_model)
value = self.value_dense(value) # (batch_size, seq_len, d_model)
query = self.split_heads(query, batch_size) # (batch_size, num_heads, seq_len, depth)
key = self.split_heads(key, batch_size) # (batch_size, num_heads, seq_len, depth)
value = self.split_heads(value, batch_size) # (batch_size, num_heads, seq_len, depth)
scaled_attention, attention_weights = self.scaled_dot_product_attention(query, key, value, None)
scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3]) # (batch_size, seq_len, num_heads, depth)
concat_attention = tf.reshape(scaled_attention, (batch_size, seq_len, -1)) # (batch_size, seq_len, num_heads * depth)
output = self.final_dense(concat_attention) # (batch_size, seq_len, d_model)
return output
def scaled_dot_product_attention(self, query, key, value, mask):
matmul_qk = tf.matmul(query, key, transpose_b=True)
depth = tf.cast(tf.shape(key)[-1], tf.float32)
logits = matmul_qk / tf.math.sqrt(depth)
if mask is not None:
logits += (mask * -1e9)
attention_weights = tf.nn.softmax(logits, axis=-1)
if self.dropout is not None:
attention_weights = self.dropout(attention_weights)
output = tf.matmul(attention_weights, value)
return output, attention_weights
python
class MultiHeadAttention(Layer):
def __init__(self, embed_dim, num_heads):
super(MultiHeadAttention, self).__init__()
self.embed_dim = embed_dim
self.num_heads = num_heads
if embed_dim % num_heads != 0:
raise ValueError(
f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}"
)
self.projection_dim = embed_dim // num_heads
self.query_dense = Dense(embed_dim)
self.key_dense = Dense(embed_dim)
self.value_dense = Dense(embed_dim)
self.output_dense = Dense(embed_dim)
def call(self, inputs):
# split the input tensor into num_vars tensors along last axis
input_list = tf.unstack(inputs, axis=-1)
# build a list of outputs from each input variable
output_list = []
for i, input_var in enumerate(input_list):
# Query
query = self.query_dense(input_var) # (batch_size, seq_len, embed_dim)
query = self.separate_heads(query, batch_size) # (batch_size, num_heads, seq_len, projection_dim)
# Key
key = self.key_dense(inputs) # (batch_size, seq_len, embed_dim)
key = self.separate_heads(key, batch_size) # (batch_size, num_heads, seq_len, projection_dim)
# Value
value = self.value_dense(inputs) # (batch_size, seq_len, embed_dim)
value = self.separate_heads(value, batch_size) # (batch_size, num_heads, seq_len, projection_dim)
# Attention
attention, weights = self.attention(query, key, value)
attention = tf.transpose(
attention, perm=[0, 2, 1, 3]
) # (batch_size, seq_len, num_heads, projection_dim)
# Combine back to num_heads*projection_dim
concat_attention = tf.reshape(
attention, (batch_size, -1, self.embed_dim)
) # (batch_size, seq_len, embed_dim)
# Apply a final Dense layer
out = self.output_dense(
concat_attention
) # (batch_size, seq_len, embed_dim)
output_list.append(out)
# combine output for each variable back into a single tensor
outputs = tf.stack(output_list, axis=-1)
return outputs
def separate_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
return tf.transpose(x, perm=[0, 2, 1, 3])
def attention(self, query, key, value):
score = tf.matmul(query, key, transpose_b=True)
dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
scaled_score = score / tf.math.sqrt(dim_key)
weights = tf.nn.softmax(scaled_score, axis=-1)
out = tf.matmul(weights, value)
return out, weights
以下答案由GPT-3.5大模型与博主波罗歌共同编写:
在多变量时序预测任务中,可以利用多头注意力机制计算各变量之间的相关性。具体做法是将各个输入变量拆分成多个头,在这些头上计算注意力,然后将不同头的输出拼接起来。例如,在输入为(100,4,12)的情况下,可以将每个时间步上的每个变量拆分为多个头,进行注意力计算,最终将不同头的输出拼接起来。这样可以得到一个维度为(100,4,12)的输出张量,其中每个时间步上的每个变量都能够包含其他变量的信息。
以下是基于TensorFlow2的代码实现:
import tensorflow as tf
from tensorflow.keras.layers import Dense, Layer
class MultiHeadAttention(Layer):
def __init__(self, embed_dim, num_heads):
super(MultiHeadAttention, self).__init__()
self.embed_dim = embed_dim
self.num_heads = num_heads
if embed_dim % num_heads != 0:
raise ValueError(
f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}"已黑化的小白 )
self.projection_dim = embed_dim // num_heads
self.query_dense = Dense(embed_dim)
self.key_dense = Dense(embed_dim)
self.value_dense = Dense(embed_dim)
self.output_dense = Dense(embed_dim)
def attention(self, query, key, value):
score = tf.matmul(query, key, transpose_b=True)
dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
scaled_score = score / tf.math.sqrt(dim_key)
weights = tf.nn.softmax(scaled_score, axis=-1)
out = tf.matmul(weights, value)
return out, weights
def separate_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, inputs):
batch_size = tf.shape(inputs)[0]
# Split inputs into different heads
inputs = tf.reshape(inputs, (batch_size, -1, self.num_heads, self.embed_dim//self.num_heads))
# Query
query = self.query_dense(inputs)
query = self.separate_heads(query, batch_size)
# Key
key = self.key_dense(inputs)
key = self.separate_heads(key, batch_size)
# Value
value = self.value_dense(inputs)
value = self.separate_heads(value, batch_size)
# Attention
attention, weights = self.attention(query, key, value)
# Merge attention outputs
attention = tf.transpose(attention, perm=[0, 2, 1, 3])
concat_attention = tf.reshape(attention, (batch_size, -1, self.embed_dim))
# Apply final dense layer
out = self.output_dense(concat_attention)
return out
其中,输入张量inputs
的维度为(batch_size,time_steps,features),其中features
指输入变量的数量,time_steps
为时间步。在MultiHeadAttention
类中,通过将输入拆分为多个头来计算注意力,注意力计算的具体实现在attention
函数中。最终,所有头的输出通过拼接来得到最终输出张量out
。
如果我的回答解决了您的问题,请采纳!