基于Seq2Seq模型的机器翻译

引入Attention的Seq2Seq模型-机器翻译

Transformer

首先,可以思考为什么会需要Transformer?

  1. 大部分的机器翻译等序列生成任务都是基于Encoder-Decoder的模式,而Encoder和Decoder一般是由RNN、LSTM、GRU其中一种,它们的特点就是:每个时刻的输出是根据上一时刻隐藏层state和当前时刻的输入,由这样的方式来实现产生序列。

    那这样的特点就会导致无法并行计算,因为每次计算都需要等待上一时刻的计算完成。

  2. 每个时刻的输出只与上一时刻的输出和当前时刻的输入相关,这会导致两个问题:

    a. 对于长序列,这样一层一层地传递,前面时刻的信息到了后面基本就消失了;

    b. 对于文本问题,每个时刻的输出不仅与上一时刻相关,还可能与前n个时刻相关,循环神经网络难以学习相对位置较远的文本之间的关系。

网络结构

在这里插入图片描述

与以往的Encoder-Decoder模型不同的是:

  1. 完全没有RNN等循环神经网络;
  2. 由于没有循环神经网络,所以加入位置编码,来代表不同位置的信息;
  3. 用Attention来代替循环神经网络,并且是多头注意力Multi-Head Attention。

位置编码 Positional Encoding

在这里插入图片描述

首先,正如上述,由于没有循环神经网络,所以加入位置编码,来代表不同位置的信息;

另外:

  1. 论文解释了选择这个位置编码函数的原因:这个函数让模型更容易学习到相对位置的信息,因为任何位置下的 P E p o s + k PE_{pos+k} PEpos+k都可以用 P E p o s PE_pos PEpos来线性表示;
  2. Google也试过用位置embedding来表示位置编码,两者的效果接近,但是上述方法可以接受超过训练集最大长度的序列。
def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates


def positional_encoding(position, d_model):
    """
    生成0-position的位置编码
    :param position: 最大的位置id
    :param d_model:
    :return:
    """
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                            np.arange(d_model)[np.newaxis, :],
                            d_model)

    # 将 sin 应用于数组中的偶数索引(indices);2i
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])

    # 将 cos 应用于数组中的奇数索引;2i+1
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

    pos_encoding = angle_rads[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)

Encoder

从上图可以看出,左边的Encoder由Multi-Head Attention -->> 残差连接(Add) & LayerNorm -->> 前馈网络(Feed Forward)–>> 残差连接(Add) & LayerNorm -->> output

Multi-Head Attention

首先,来解析Multi-Head Attention的结构,如下图,

  1. 输入由Queries、Keys、Values组成,
  2. 然后经过一层Linear层,其实就是Dense,并且被拆分为多头。
  3. 接着又经过一层Scaled Dot-Product Attention;
  4. 再跟着,将多头Scaled Dot-Product Attention输出进行拼接;
  5. 最后,经过一层Linear层,得到Multi-Head Attention的输出

在这里插入图片描述

class MultiHeadAttention(tf.keras.layers.Layer):
    """
    多头注意力模型
    """
    def __init__(self, d_model, num_heads):
        """
        :param d_model: 模型输出维度
        :param num_heads: 注意力的头数
        """
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        # 将d_model拆分为num_heads个深度为depth的注意力头,d_model = num_heads*depth
        assert d_model % self.num_heads == 0

        self.wq = tf.keras.layers.Dense(d_model)
        self.wk = tf.keras.layers.Dense(d_model)
        self.wv = tf.keras.layers.Dense(d_model)

        self.dense = tf.keras.layers.Dense(d_model)

    def split_heads(self, x, batch_size):
        """
        将d_model拆分为num_heads个深度为depth的注意力头
        分拆最后一个维度到 (num_heads, depth).
        转置结果使得形状为 (batch_size, num_heads, seq_len, depth)
        """
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)  # (batch_size, seq_len, d_model)
        k = self.wk(k)  # (batch_size, seq_len, d_model)
        v = self.wv(v)  # (batch_size, seq_len, d_model)

        q = self.split_heads(q, batch_size)  # (batch_size, num_heads, seq_len_q, depth)
        k = self.split_heads(k, batch_size)  # (batch_size, num_heads, seq_len_k, depth)
        v = self.split_heads(v, batch_size)  # (batch_size, num_heads, seq_len_v, depth)

        # scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth)
        # attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k)
        scaled_attention, attention_weights = scaled_dot_product_attention(
            q, k, v, mask)

        scaled_attention = tf.transpose(scaled_attention,
                                        perm=[0, 2, 1, 3])  # (batch_size, seq_len_q, num_heads, depth)

        concat_attention = tf.reshape(scaled_attention,
                                      (batch_size, -1, self.d_model))  # (batch_size, seq_len_q, d_model)

        output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)

        return output, attention_weights

Scaled Dot-Product Attention

那么,Scaled Dot-Product Attention又是怎么样的结构呢?

  1. 首先,输入就是上面提到的,经过拆分多头的Q、K、V;
  2. 接着,Q和K进行矩阵相乘,并进行缩放;
  3. 然后,加入Mask,再使用SoftMax函数;
  4. 最后,与V矩阵相乘得到输出。

在这里插入图片描述

具体公式如下:

在这里插入图片描述

在这有两个疑点:

  1. 为什么Q与K相乘之后需要缩放呢,并且是 d k d_k dk,即K的深度?

    google在对比试验发现缩放的效果比未缩放的效果好,怀疑是由于随着 d k d_k dk越大,会使得SoftMax层的梯度变得比较小。

  2. 为什么需要在这里接入SoftMax层?

    因为接入Mask之后,被遮挡的部分会变为绝对值非常大的负数,那么SoftMax可以将其归零。

def scaled_dot_product_attention(q, k, v, mask):
    """
    在Encoder中,q、k和v都是source词向量
    在Decoder中,q和k是Encoder的输出,v是target词向量经过多注意力等网络的输出
    :param q: query, [..., seq_len_q, depth]
    :param k: keys, [..., seq_len_k, depth]
    :param v: values, [..., seq_len_v, depth]
    :param mask: [..., seq_len_q, seq_len_k]
    :return: output,attention_weights
    """
    matmul_qk = tf.matmul(q, k, transpose_b=True)  # [..., seq_len_q, seq_len_k]

    # 缩放matmul_qk
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    # 加入mask
    scaled_attention_logits += (mask * -1e9)

    # softmax目的一是归一化得到attention_weights,
    # 二是加上mask之后,mask的位置是绝对值很大的负数,softmax可以将其清零
    # [..., seq_len_q, seq_len_k]
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)

    output = tf.matmul(attention_weights, v)  # [..., seq_len_q, depth_v]

    return output, attention_weights

在这里,要提下mask的形式,mask:[…, seq_len_q, seq_len_k]

接入mask的matmul_qk:[…, seq_len_q, seq_len_k]

因为mask其实就是在文本长度这个维度上进行mask,那么Q对应的维度为seq_len_q,K对应的维度为seq_len_k,相乘之后的matmul_qk即为[…, seq_len_q, seq_len_k]。

所以,mask的形式必须是[…, seq_len_q, seq_len_k],才能对matmul_qk完成对Q和K的遮挡。

Mask

这跟大部分的NLP任务一样,需要对不定长的文本进行填充,所以对填充的部分进行Mask,结合Scaled Dot-Product Attention的需要,填充的部分mask值为1,其他的为0。

用于Encoder,以及Decoder的第二个Attention中。

def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)

    return seq[:, tf.newaxis, tf.newaxis, :]  # (batch_size, 1, 1, seq_len)

其此之外,还需要多增加另外一种Mask,因为Transformer中,由于其结构的特点,会同时使用所有位置的输入,但这在翻译任务中是不合理,我们只能基于第一个词预测第二个词,基于第一个词和第二个词预测第三个词…

用于Decoder的第一个Attention中

def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask  # (seq_len, seq_len)
x = tf.random.uniform((1, 3))
temp = create_look_ahead_mask(x.shape[1])
temp

<tf.Tensor: shape=(3, 3), dtype=float32, numpy=
array([[0., 1., 1.],
       [0., 0., 1.],
       [0., 0., 0.]], dtype=float32)>
def create_masks(inp, tar):
    # 编码器填充遮挡
    enc_padding_mask = create_padding_mask(inp)

    # 在解码器的第二个注意力模块使用。
    # 该填充遮挡用于遮挡编码器的输出。
    dec_padding_mask = create_padding_mask(inp)

    # 在解码器的第一个注意力模块使用。
    # 用于填充(pad)和遮挡(mask)解码器获取到的输入的后续标记(future tokens)。
    look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
    dec_target_padding_mask = create_padding_mask(tar)
    combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)

    return enc_padding_mask, combined_mask, dec_padding_mask

前馈网络(Feed Forward)

前馈网络就比较简单吗由两个全连接层组成,然后第一个全连接层有一个relu激活函数。

def point_wise_feed_forward_network(d_model, dff):
  return tf.keras.Sequential([
      tf.keras.layers.Dense(dff, activation='relu'),  # (batch_size, seq_len, dff)
      tf.keras.layers.Dense(d_model)  # (batch_size, seq_len, d_model)
  ])

残差连接

请注意,Multi-Head Attention不是简单经过LayerNorm,然后传递给前馈网络,而是还加了一步残差连接的操作:残差连接有助于避免深度网络中的梯度消失问题。

在这里插入图片描述

Encoder Layer

这样,我们的Encoder就全部组成完成了。

class EncoderLayer(tf.keras.layers.Layer):
    """
    Encoder网络层。
    source输入q,k,v -->> 多头注意力层 -->> residual connection & LayerNormalization -->>
        前馈网络 -->> residual connection & LayerNormalization
    """
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask):
        attn_output, _ = self.mha(x, x, x, mask)  # (batch_size, input_seq_len, d_model)
        attn_output = self.dropout1(attn_output, training=training)
        # a residual connection
        out1 = self.layernorm1(x + attn_output)  # (batch_size, input_seq_len, d_model)

        ffn_output = self.ffn(out1)  # (batch_size, input_seq_len, d_model)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)  # (batch_size, input_seq_len, d_model)

        return out2
   

class Encoder(tf.keras.layers.Layer):
    """
    词向量+位置编码 -->> N个EncoderLayer -->> 最后一层EncoderLayer的输出
    """
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
                 maximum_position_encoding, rate=0.1):
        super(Encoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding,
                                                self.d_model)

        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate)
                           for _ in range(num_layers)]

        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, training, mask):
        seq_len = tf.shape(x)[1]

        # 将嵌入和位置编码相加。
        x = self.embedding(x)  # (batch_size, input_seq_len, d_model)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)

        return x  # (batch_size, input_seq_len, d_model)

Decoder

了解了Encoder的结构之后,Decoder其实基本上是一样的,不同的就是:Decoder包含两个Multi-Head Attention,第一个是接收target words embedding,第二个是接收Encoder的输出和Decoder第一个Attention的输出。

class DecoderLayer(tf.keras.layers.Layer):
    """
    Decoder网络层。
    target输入v -->> 多头注意力层 -->> residual connection & LayerNormalization -->>
        output & Encoder_output -->> 多头注意力层 -->> residual connection & LayerNormalization -->> 
        前馈网络 -->> residual connection & LayerNormalization
         -->> Linear -->> softmax
    """
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(DecoderLayer, self).__init__()

        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)

        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(rate)
        self.dropout2 = tf.keras.layers.Dropout(rate)
        self.dropout3 = tf.keras.layers.Dropout(rate)

    def call(self, x, enc_output, training,
             look_ahead_mask, padding_mask):
        # enc_output.shape == (batch_size, input_seq_len, d_model)

        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)  # (batch_size, target_seq_len, d_model)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(attn1 + x)

        attn2, attn_weights_block2 = self.mha2(
            enc_output, enc_output, out1, padding_mask)  # (batch_size, target_seq_len, d_model)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(attn2 + out1)  # (batch_size, target_seq_len, d_model)

        ffn_output = self.ffn(out2)  # (batch_size, target_seq_len, d_model)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(ffn_output + out2)  # (batch_size, target_seq_len, d_model)

        return out3, attn_weights_block1, attn_weights_block2
        
      
class Decoder(tf.keras.layers.Layer):
    """
    target词向量+位置编码 -->> N个DecoderLayer -->> 最后一层DecoderLayer的输出
    """
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size,
                 maximum_position_encoding, rate=0.1):
        super(Decoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)

        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate)
                           for _ in range(num_layers)]
        self.dropout = tf.keras.layers.Dropout(rate)

    def call(self, x, enc_output, training,
             look_ahead_mask, padding_mask):
        seq_len = tf.shape(x)[1]
        attention_weights = {}

        x = self.embedding(x)  # (batch_size, target_seq_len, d_model)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x, block1, block2 = self.dec_layers[i](x, enc_output, training,
                                                   look_ahead_mask, padding_mask)

            attention_weights['decoder_layer{}_block1'.format(i + 1)] = block1
            attention_weights['decoder_layer{}_block2'.format(i + 1)] = block2

        # x.shape == (batch_size, target_seq_len, d_model)
        return x, attention_weights

Transformer

最后,将包含Multi-Head Attention的Encoder-Decoder组合起来,就是Transformer了。

class Transformer(tf.keras.Model):
    """
    Encoder输出 -->> Decoder输出 -->> Linear层输出
    """
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
                 target_vocab_size, pe_input, pe_target, rate=0.1):
        """

        :param num_layers: EncoderLayer和DecoderLayer的层数
        :param d_model: Encoder和Decoder的输出深度
        :param num_heads: 多头注意力的头数
        :param dff: 前馈网络的中间层深度
        :param input_vocab_size: source词表的大小
        :param target_vocab_size: target词表的大小
        :param pe_input: source词位置id的最大值
        :param pe_target: target词位置id的最大值
        :param rate: 学习率
        """
        super(Transformer, self).__init__()

        self.encoder = Encoder(num_layers, d_model, num_heads, dff,
                               input_vocab_size, pe_input, rate)

        self.decoder = Decoder(num_layers, d_model, num_heads, dff,
                               target_vocab_size, pe_target, rate)

        self.final_layer = tf.keras.layers.Dense(target_vocab_size)

    def call(self, inp, tar, training, enc_padding_mask,
             look_ahead_mask, dec_padding_mask):
        enc_output = self.encoder(inp, training, enc_padding_mask)  # (batch_size, inp_seq_len, d_model)

        # dec_output.shape == (batch_size, tar_seq_len, d_model)
        dec_output, attention_weights = self.decoder(
            tar, enc_output, training, look_ahead_mask, dec_padding_mask)

        final_output = self.final_layer(dec_output)  # (batch_size, tar_seq_len, target_vocab_size)

        return final_output, attention_weights

优化器

Transformer在优化器这一块对学习率作了独特的处理:

在这里插入图片描述

class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
  def __init__(self, d_model, warmup_steps=4000):
    super(CustomSchedule, self).__init__()
    
    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)

    self.warmup_steps = warmup_steps
    
  def __call__(self, step):
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps ** -1.5)
    
    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

模型训练

参考上两篇博客,同样是采用Teacher Forcing的方法

模型推理

其实与上两篇博客也是类似。

  1. Encoder输入还是一样,为source words即待翻译文本;
  2. Decoder初始输入为开始标记<s>,然后预测得到一个单词;
  3. 将之前的输入与当前预测得到单词进行拼接,重新输入Decoder进行预测下一个单词;
  4. 以此循环,直到预测单词为结束标记,或者超过最大长度限制。
Logo

尧米是由西云算力与CSDN联合运营的AI算力和模型开源社区品牌,为基于DaModel智算平台的AI应用企业和泛AI开发者提供技术交流与成果转化平台。

更多推荐