一、简单介绍
transformer也是基于编码器和解码器构造而成的,transformer完全摒弃了递归结构,这意味着不存在梯度消失或梯度爆炸的问题,同时也使得模型能够更容易学习长距离的依赖关系。它的结构树如下:
粉色部分是只能使用编码器encoder的部分模型,可用于特征提取、图像理解;
绿色部分为是同时使用了编码器(Encoder)和解码器(Decoder)部分的模型,这一类模型通常用于机器翻译;
蓝灰色部分是只能使用解码器decoder的部分模型,可用于补全对话、文本生成。
1.1核心思想
利用自注意力机制来捕获输入序列中各元素之间的全局依赖关系,无论这些元素隔得有多远,它都能一并学习。
(1)自注意力机制:模型在处理序列中每个元素时,会考虑所有其他元素,依次捕捉他们之间的关系和依赖。弥补了RNN循环神经网络难以捕捉长距离依赖的缺点。
(2)多头注意力:通过并行地使用多个注意力机制,在不同的子空间捕捉序列的不同特征。
(3)位置编码:通过添加位置编码到输入序列,使模型能够利用序列中元素的位置信息。
1.2优点
(1)并行处理:传统的神经网络是递归处理即串行处理,transformer的处理方式为并行,可以一次性处理整个序列;
(2)长距离依赖捕捉能力:transformer有自注意力机制这一模块(机制),使得模型能够拥有长距离依赖捕捉的能力;
(3)灵活性和适用性:transformer可以被运用在多个领域,如语言理解、机器翻译、视觉处理等。
1.3应用
自然语言处理:如机器翻译、文本摘要、情感分析、问答系统等。
语音和音频处理:如语音识别、音乐生成等。
图像处理和计算机视觉:通过将图像切割成序列化的片段,Transformer也被应用于图像分类、物体检测等任务。
跨模态任务:如图像字幕生成、视觉问答等,Transformer可以处理来自不同模态(如文本和图像)的数据。
二、Transformer架构
transformer整体结构:
简单理解:
(1)输入处理:Embedding + 位置编码;
(2)编码器(Encoder)模块:理解输入内容
(3)解码器(Decoder)模块:生成输出内容
2.1输入
输入部分结构图:
直白翻译input就是输入,embedding是我们很熟悉的词嵌入,就是把文本变成向量的工具。
positional encoding就是位置编码,因为transformer是没有顺序处理能力的,它需要位置编码给它带路。
简单小结理解:
每个词(或图像块) → 转换成向量(Embedding)
再加上位置信息(Positional Encoding),因为 Transformer 本身没有顺序感
2.1.1词嵌入层
nn.Embedding(seq_len,embed_dim)
解释:
seq_len:词的数量,词汇量;
embed_dim:每个词(字)的嵌入维度,简单理解,比如一个“你”字,我输入的维度为3,则说明我需要用包含三个值的向量来表示这一个字。
如:你 -> [0.2, 0.5, -0.3]
2.1.2位置编码
transformer没有处理序列顺序的能力(因为它要考虑所有元素的语义关系,而不是所有元素之间的顺序)。
位置编码通过在每一个字(词)的嵌入向量旁边添加一个与嵌入向量维度一样的与位置相关的向量相加。
公式:
pos:参数位置的索引;
i:i是维度的索引;
dmodel是维度。
dmodel是多少,那么一个字词的向量中就包含几个值。
注意:位置参数是预先设定好的,根据每个字词的所在位置根据数学公式固定了的,不会随着参数更新而改变。
用代码实现位置编码:
(1)简单版本
import numpy as np
d_model = 4
seq_len = 5
n = 10000
PE = np.zeros((seq_len,d_model))
for pos in range(seq_len):
for i in np.arange(d_model//2):
PE[pos,2*i] = np.sin(pos/n**(2*i/d_model))
PE[pos,2*i+1] = np.cos(pos/n**(2*i/d_model))
print(PE)
(2)通用版本
import math
import torch.nn as nn
import torch
class PE(nn.Module):
def __init__(self,d_model,dropout=0.1,max_len=500):
super(PE,self).__init__()
self.dropout = nn.Dropout(p = dropout)
pe = torch.zeros(max_len,d_model)
pos = torch.arange(0,max_len,dtype = torch.float).unsqueeze(1)
div = torch.exp(torch.arange(0,d_model,2).float() * (-math.log(10000.0)/d_model))
pe[:,0::2] = torch.sin(pos*div)
pe[:,1::2] = torch.cos(pos*div)
pe = pe.unsqueeze(0)
self.register_buffer('pe',pe)
def forward(self,x):
x = x+self.pe[:,:x.size(1),:]
return self.dropout(x)
if __name__ == '__main__':
pe = PositionalEncoding(d_model=512)
embed = torch.randn(5, 10, 512)
out = pe(embed)
# print(out)
print(out.shape)
结果:torch.Size([5, 10, 512])
2.2编码器
编码器结构:
2.2.1编码器层
编码器层主要包括两个子层:
(1)自注意力机制子层:模型考虑序列的所有位置,捕捉不同单词之间的内部关系,可并行处理优化计算效率;
(2)前馈神经网络子层:FFN就是一个全连接神经网络,对自注意力层的输出进行进一步的变换。
两个子层周围还有一个残差连接和层归一化,有助于解决模型中梯度消失的问题。
2.2.2注意力机制
注意力机制:提取最有用的,最关键的信息,进行语义提取。
核心原理:
三个非常重要的向量:Q、K、V,其中Q(查询向量)K(键向量)V(值向量)
三个向量的意义:实现序列中各元素之间的信息交互和依赖建模。
Q、K、V与注意力的公式:
这里的QKV分别等于------>Q = q*Wq K = k*Wk V = v*Wv
注意力分数:softmax里面的内容就是注意力分数,QK越相似值越大
注意力权重:将注意力分数进行softmax之后的值,将输出转为概率的矩阵就是注意力权重矩阵。
最后,将注意力权重与真实值V相乘,所得到的结果就是注意力结果。
2.2.3自注意力机制
自注意力机制是注意力机制的特殊形式,它让模型在处理序列的每一个元素是,关注序列中的其他元素。
自注意力机制和注意力机制的区别:
自注意力机制需要结合一个序列的所有元素,注意力机制关注不同序列之间的关系。
自注意机制计算流程:
(1)初始化输入矩阵,可随机;
(2)初始化权重矩阵;
(3)计算QKV;
(4)计算注意力得分;
(5)应用softmax函数,得到注意力权重;
(6)计算attention,加权求和。
代码实现:
import torch
import torch.nn as nn
class SelfAttn(nn.Module):
def __init__(self):
super(SelfAttn,self).__init__()
#初始化权重矩阵
self.w_q = torch.eye(2)
self.w_k = torch.eye(2)
self.w_v = torch.eye(2)
def forward(self,x):
#计算QKV
Q = x @ self.w_q
K = x @ self.w_k
V = x @ self.w_v
#计算注意力分数
d_model = Q.shape[1]
scores = (Q @ K.T)/torch.sqrt(torch.tensor(d_model,dtype=torch.float32))
#计算注意力权重
weights = nn.Softmax(dim=-1)(scores)
#weights = torch.Softmax(scores,dim=-1)
#计算输出
out = weights @ V
return out
if __name__ == "__main__":
x = torch.randn(3,2)
self_attn = SelfAttn()
out = self_attn(x)
print(out.shape)
结果:
torch.Size([3, 2])
注:注意力机制不会改变原来输入的形状。
2.2.4多头注意力机制
多头注意力机制是自注意力机制的扩展,允许模型同时从不同子空间中获取信息。拆分成number_heads个头数,每一个头进行独立的注意力计算,将每一个头得到的结果进行拼接,拼接后再做一次线性变换,整合不同头捕获的各种信息。
多头注意力机制计算流程:
(1)定义头数;
(2)初始化权重矩阵;
(3)分头计算Q、K、V;
(4)分头计算注意力得分和输出;
(5)合并各头的输出;
(6)最后的线性变换。
代码实现:
注意:拆分多头、维度转换、合并多头
import math
import torch
import torch.nn as nn
class MHA(nn.Module):
def __init__(self,d_model,num_heads):
super(MHA,self).__init__()
self.d_model = d_model
self.num_heads = num_heads
self.h_dmodel = d_model // num_heads
self.w_q = nn.Linear(d_model,d_model)
self.w_k = nn.Linear(d_model,d_model)
self.w_v = nn.Linear(d_model,d_model)
self.w_o = nn.Linear(d_model,d_model)
self.dropout = nn.Dropout(0.1)
def forward(self,x):
batch_size = x.shape[0]
q = self.w_q(x)
k = self.w_k(x)
v = self.w_v(x)
#将qkv分头,并转置[b,s,h,d]----->[b,h,s,d]
q = q.view(batch_size,-1,self.num_heads,self.h_dmodel).transpose(1,2)
k = k.view(batch_size,-1,self.num_heads,self.h_dmodel).transpose(1,2)
v = v.view(batch_size,-1,self.num_heads,self.h_dmodel).transpose(1,2)
#计算注意力权重
attention_scores = torch.matmul(q,k.transpose(2,3)) / math.sqrt(self.h_dmodel)
attention_weights = torch.softmax(attention_scores,dim=-1)
#计算输出
out = torch.matmul(attention_weights,v)
#合并多头,将[b,h,s,d]----->[b,s,h,d]----->[b,s,d]
out = out.transpose(1,2).contiguous().view(batch_size,-1,self.d_model)
out = self.w_o(out)
out = self.dropout(out)
return out
if __name__ == '__main__':
x = torch.randn(2,10,512) #[batch_size,seq_len,d_model]
mha = MHA(d_model=512,num_heads=8)
out = mha(x)
print(out.shape)
结果:
torch.Size([2, 10, 512])
代码实现二:----->更贴合与encoder的MHA,即添加了填充掩码padding_mask的代码,就是在计算完注意力得分之后添加mask。
#带掩码的多头注意力机制
import torch
import torch.nn as nn
class Mask_Mha(nn.Module):
def __init__(self,d_model,num_heads):
super(Mask_Mha,self).__init__()
self.d_model = d_model
self.num_heads = num_heads
self.h_dmodel = d_model//num_heads
#初始化权重矩阵
self.wq = nn.Linear(d_model,d_model)
self.wk = nn.Linear(d_model,d_model)
self.wv = nn.Linear(d_model,d_model)
self.wo = nn.Linear(d_model,d_model)
self.layer_norm = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(0.1)
def forward(self,x,mask=None):
batch_size = x.size(0)
#计算QKV
q = self.wq(x)
k = self.wk(x)
v = self.wv(x)
#QKV分头、转置
q = q.view(batch_size,-1,self.num_heads,self.h_dmodel).transpose(1,2)
k = k.view(batch_size,-1,self.num_heads,self.h_dmodel).transpose(1,2)
v = v.view(batch_size,-1,self.num_heads,self.h_dmodel).transpose(1,2)
#计算QK注意力分数
attn_scores = torch.matmul(q,k.transpose(-2,-1))/torch.sqrt(torch.tensor(self.h_dmodel,dtype=torch.float32))
#计算mask
if mask is not None:
mask = mask.unsqueeze(1).unsqueeze(2) #batch_size,1,1,seq_len
attn_scores = attn_scores.masked_fill(mask == 0,float('-1e9'))
#计算softmax
attn_weights = torch.softmax(attn_scores,dim=-1)
#计算V的转置点积
attn = torch.matmul(attn_weights,v)
#多头拼接
attn = attn.transpose(1,2).contiguous().view(batch_size,-1,self.d_model)
#计算输出
out = self.wo(attn)
out = self.layer_norm(out)
out = self.dropout(out)
#返回输出
return out
if __name__ == '__main__':
x = torch.randn(64,10,512) #batch_size,seq_len,d_model
mha = Mask_Mha(d_model=512,num_heads=8)
mask = torch.zeros(64,10,dtype=torch.bool) #batch_size,seq_len
mask[0,8:] = 0
out = mha(x,mask)
print(out.shape)
结果:
torch.Size([64, 10, 512])
2.2.5残差和归一化
add指的是残差连接,norm指的是层归一化。网络越深,这样的操作更有助于改善训练深层网络的稳定性和效率。主要包括两个全连接,中间穿插一个激活函数。
残差连接有助于解决梯度消失问题,将子层的输入直接加到输出上。
层归一化有助于稳定和加速神经网络的训练。
代码实现:
import torch.nn as nn
import torch
class Add_Norm(nn.Module):
def __init__(self, d_model):
super(Add_Norm,self).__init__()
self.norm = nn.LayerNorm(d_model)
def forward(self, x,sublayer_output):
out = self.norm(x + sublayer_output)
return out
2.2.6FFN前馈神经网络
前馈神经网络层的主要作用:对自注意层的输出做进一步的处理,增加模型的表达能力。
这一层主要包括两个线性变换以及一个激活函数ReLU或者GELU。然后后边是一个残差连接和一个层归一化。
实现代码:
import torch.nn as nn
import torch
# class Add_Norm(nn.Module):
# def __init__(self, d_model):
# super(Add_Norm,self).__init__()
# self.norm = nn.LayerNorm(d_model)
# def forward(self, x,sublayer_output):
# out = self.norm(x + sublayer_output)
# return out
class FFN(nn.Module):
def __init__(self,d_model,d_ff):
super(FFN,self).__init__()
self.norm1 = nn.LayerNorm(d_model)
self.linear1 = nn.Linear(d_model,d_ff)
self.relu = nn.ReLU()
self.linear2 = nn.Linear(d_ff,d_model)
self.norm2 = nn.LayerNorm(d_model)
def forward(self,x):
res = x
x = self.norm1(x) #----add和norm中的norm
x = self.linear1(x) #----ffn
x = self.relu(x) #----ffn
x = self.linear2(x) #----ffn
out = self.norm2(x + res)#----add和norm中的norm
return out
2.3解码器
解码器的主要结构:
主要包括一个带pad和seq掩码的多头注意力机制,和带pad的多头交叉注意力机制。
2.3.1输入层
解码器的输入为编码器最后一个隐藏层的信息,它的作用是生成序列。但是为了防止模型提取看到答案而说出答案,因此我们进行一个右移操作shifted right:通过将输入序列向右移动一个位置,模型在训练时只能基于先前生成的 token 来预测下一个 token,从而避免了数据泄漏的问题,保证了模型的训练和推理的准确性。
同时,解码器的输入也需要进行词嵌入和位置编码。
2.3.2带掩码的MHA
掩码的作用:
(1)对序列进行填充;
(2)对输出序列进行未来词掩盖。
主要针对第二个作用进行讲解,掩码是如何通过上三角矩阵来掩盖为了信息的:
这里面的-inf为看不到的序列信息,掩码将当前位置之后的信息屏蔽掉,确保模型只能依赖已经生成的部分序列进行预测,而不会受到未来信息的干扰。原输入与上三角矩阵相乘之后,对应位置的信息变为0,此时就无法查看未来信息。
在多头注意力基础上添加遮掩操作以实现遮掩多头注意力的步骤:
(1)创建上三角矩阵torch.triu();
(2)在计算完注意力分数之后,注意力权重之前将掩码加到注意力分数矩阵上;
(3)进行剩余操作:softmax,attn,拼接各头的输出结果。
填充掩码代码:
import torch
def make_padding_mask():
padding_mask = padding_mask.unsqueeze(1).unsqueeze(2)
return padding_mask
def apply_padding_mask():
padding_mask =make_padding_mask(padding_mask)
scores = scores.masked_fill(padding_mask, float('-inf'))
return scores
序列掩码代码:
import torch
def Seq_mask(x):
seq_len = x.size(1)
seq_mask = torch.triu(torch.ones(seq_len, seq_len), diagonal=1).bool()
return seq_mask
def apply_seq_mask(x):
seq_mask = Seq_mask(x)
seq_mask = seq_mask.unsqueeze(0).unsqueeze(0)
scores = scores.masked_fill(seq_mask, float('-inf'))
return scores
带序列掩码的MHA代码:
import torch
from torch import nn
def seq_mask(x):
# 创建序列掩码,遮挡未来位置
seq_len = x.size(1)
device = x.device
mask = torch.triu(torch.ones((seq_len, seq_len), device=device), diagonal=1).bool()
return mask
class Seq_mask_MHA(nn.Module):
def __init__(self, d_model, num_heads):
super(Seq_mask_MHA, self).__init__()
self.d_model = d_model
self.num_heads = num_heads
self.h_dmodel = d_model // num_heads
# 线性映射
self.wq = nn.Linear(d_model, d_model)
self.wk = nn.Linear(d_model, d_model)
self.wv = nn.Linear(d_model, d_model)
self.wo = nn.Linear(d_model, d_model)
self.drop = nn.Dropout(0.1)
def forward(self, x, mask=None):
batch_size, seq_len, _ = x.size()
# 计算QKV
q = self.wq(x) # [batch, seq_len, d_model]
k = self.wk(x)
v = self.wv(x)
# 分头操作,reshape后交换维度方便矩阵乘法
q = q.view(batch_size, seq_len, self.num_heads, self.h_dmodel).transpose(1, 2) # [batch, heads, seq_len, head_dim]
k = k.view(batch_size, seq_len, self.num_heads, self.h_dmodel).transpose(1, 2)
v = v.view(batch_size, seq_len, self.num_heads, self.h_dmodel).transpose(1, 2)
# 计算注意力分数
scores = torch.matmul(q, k.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.h_dmodel, dtype=torch.float32))
# 生成或使用传入的掩码
if mask is None:
mask = seq_mask(x) # [seq_len, seq_len]
scores = scores.masked_fill(mask.unsqueeze(0).unsqueeze(0), float('-inf')) # 扩展维度匹配scores
# 注意力权重
weights = torch.softmax(scores, dim=-1)
# 加权求和
attn = torch.matmul(weights, v) # [batch, heads, seq_len, head_dim]
# 合并多头
attn = attn.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model) # [batch, seq_len, d_model]
out = self.wo(attn)
out = self.drop(out)
return out
if __name__ == '__main__':
x = torch.randn(3, 10, 256) # batch=3, seq_len=10, d_model=256
seq_mask_mha = Seq_mask_MHA(d_model=256, num_heads=8)
out = seq_mask_mha(x) # 传入输入张量x
print(out.shape) # 应该是 torch.Size([3, 10, 256])
2.3.3交叉意力
交叉注意力中Q来自于解码器,KV来自于编码器。
代码实现:
import torch
from torch import nn
class CrossMultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads, dropout=0.1):
super(CrossMultiHeadAttention, self).__init__()
self.d_model = d_model
self.num_heads = num_heads
self.h_dmodel = d_model // num_heads
self.wq = nn.Linear(d_model, d_model)
self.wk = nn.Linear(d_model, d_model)
self.wv = nn.Linear(d_model, d_model)
self.wo = nn.Linear(d_model, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, query, key_value, mask=None):
"""
query: [batch_size, query_len, d_model]
key_value: [batch_size, kv_len, d_model]
mask: [query_len, kv_len] 或者 [batch_size, 1, query_len, kv_len](可选)
"""
batch_size, query_len, _ = query.size()
_, kv_len, _ = key_value.size()
# 线性变换
Q = self.wq(query) # [batch, query_len, d_model]
K = self.wk(key_value) # [batch, kv_len, d_model]
V = self.wv(key_value) # [batch, kv_len, d_model]
# 分头处理并转置维度方便矩阵乘法
Q = Q.view(batch_size, query_len, self.num_heads, self.h_dmodel).transpose(1, 2) # [batch, heads, query_len, head_dim]
K = K.view(batch_size, kv_len, self.num_heads, self.h_dmodel).transpose(1, 2) # [batch, heads, kv_len, head_dim]
V = V.view(batch_size, kv_len, self.num_heads, self.h_dmodel).transpose(1, 2) # [batch, heads, kv_len, head_dim]
# 计算注意力分数
scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.h_dmodel, dtype=torch.float32))
# 应用掩码
if mask is not None:
# 这里假设mask形状是[query_len, kv_len],需要扩展到[batch, heads, query_len, kv_len]
if mask.dim() == 2:
mask = mask.unsqueeze(0).unsqueeze(0) # [1,1,query_len, kv_len]
scores = scores.masked_fill(mask == True, float('-inf'))
# 计算注意力权重
weights = torch.softmax(scores, dim=-1)
weights = self.dropout(weights)
# 注意力加权求和
attn = torch.matmul(weights, V) # [batch, heads, query_len, head_dim]
# 合并多头
attn = attn.transpose(1, 2).contiguous().view(batch_size, query_len, self.d_model) # [batch, query_len, d_model]
out = self.wo(attn)
out = self.dropout(out)
return out
# 简单测试
if __name__ == '__main__':
batch_size = 2
query_len = 5
kv_len = 8
d_model = 256
num_heads = 8
cross_attn = CrossMultiHeadAttention(d_model=d_model, num_heads=num_heads)
query = torch.randn(batch_size, query_len, d_model)
key_value = torch.randn(batch_size, kv_len, d_model)
# 可选掩码,遮挡某些key-value位置,示例全False无遮挡
mask = torch.zeros(query_len, kv_len).bool()
out = cross_attn(query, key_value, mask)
print(out.shape) # 期望: torch.Size([2, 5, 256])
注意:交叉的注意力机制和自注意机制的区别:
自注意力计算QKV:
Q = self.wq(x) K = self.wk(x) V = self.wv(x)
交叉注意力计算QKV:
Q = self.wq(query) K = self.wk(key_value) V = self.wv(key_value)
2.3.4FFN
这一段代码的实现与编码器的FFN一致:
代码实现:
'''
就是一个norm,一个线性,一个激活
然后线性、norm
forward部分有一个残差连接
'''
from torch import nn
class FFN():
def __init__(self,d_model,d_ff):
super(FFN,self).__init__()
self.norm1 = nn.LayerNorm(d_model)
self.linear1 = nn.Linear(d_model,d_ff)
self.gelu = nn.GELU()
self.linear2 = nn.Linear(d_ff,d_model)
self.norm2 = nn.LayerNorm(d_model)
def forward(self,x):
res = x
x = self.norm1(x)
x = self.linear1(x)
x = self.gelu(x)
x = self.linear2(x)
x = x + res
out = self.norm2(x)
return out
2.3.5输出层
代码实现:
import torch.nn as nn
class Output(nn.Module):
def __init__(self,d_model,vocab_size):
super(Output,self).__init__()
self.linear = nn.Linear(d_model,vocab_size)
self.softmax = nn.Softmax()
#如果损失函数为交叉熵,就不要softmax层
def forward(self,x):
out = self.linear(x)
return out
三、小结
主要学习和实践了transformer的基础和搭建,transformer在大模型应用众多,是大模型的基础,要熟练掌握MHA的搭建流程,搞清楚参数形状长什么样子。