| | |
| |
|
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import torch |
| | import torch.nn as nn |
| | from typing import Tuple, Union, Optional, List |
| | from modules.wenet_extractor.squeezeformer.subsampling import ( |
| | DepthwiseConv2dSubsampling4, |
| | TimeReductionLayer1D, |
| | TimeReductionLayer2D, |
| | TimeReductionLayerStream, |
| | ) |
| | from modules.wenet_extractor.squeezeformer.encoder_layer import ( |
| | SqueezeformerEncoderLayer, |
| | ) |
| | from modules.wenet_extractor.transformer.embedding import RelPositionalEncoding |
| | from modules.wenet_extractor.transformer.attention import MultiHeadedAttention |
| | from modules.wenet_extractor.squeezeformer.attention import ( |
| | RelPositionMultiHeadedAttention, |
| | ) |
| | from modules.wenet_extractor.squeezeformer.positionwise_feed_forward import ( |
| | PositionwiseFeedForward, |
| | ) |
| | from modules.wenet_extractor.squeezeformer.convolution import ConvolutionModule |
| | from modules.wenet_extractor.utils.mask import make_pad_mask, add_optional_chunk_mask |
| | from modules.wenet_extractor.utils.common import get_activation |
| |
|
| |
|
| | class SqueezeformerEncoder(nn.Module): |
| | def __init__( |
| | self, |
| | input_size: int = 80, |
| | encoder_dim: int = 256, |
| | output_size: int = 256, |
| | attention_heads: int = 4, |
| | num_blocks: int = 12, |
| | reduce_idx: Optional[Union[int, List[int]]] = 5, |
| | recover_idx: Optional[Union[int, List[int]]] = 11, |
| | feed_forward_expansion_factor: int = 4, |
| | dw_stride: bool = False, |
| | input_dropout_rate: float = 0.1, |
| | pos_enc_layer_type: str = "rel_pos", |
| | time_reduction_layer_type: str = "conv1d", |
| | do_rel_shift: bool = True, |
| | feed_forward_dropout_rate: float = 0.1, |
| | attention_dropout_rate: float = 0.1, |
| | cnn_module_kernel: int = 31, |
| | cnn_norm_type: str = "batch_norm", |
| | dropout: float = 0.1, |
| | causal: bool = False, |
| | adaptive_scale: bool = True, |
| | activation_type: str = "swish", |
| | init_weights: bool = True, |
| | global_cmvn: torch.nn.Module = None, |
| | normalize_before: bool = False, |
| | use_dynamic_chunk: bool = False, |
| | concat_after: bool = False, |
| | static_chunk_size: int = 0, |
| | use_dynamic_left_chunk: bool = False, |
| | ): |
| | """Construct SqueezeformerEncoder |
| | |
| | Args: |
| | input_size to use_dynamic_chunk, see in Transformer BaseEncoder. |
| | encoder_dim (int): The hidden dimension of encoder layer. |
| | output_size (int): The output dimension of final projection layer. |
| | attention_heads (int): Num of attention head in attention module. |
| | num_blocks (int): Num of encoder layers. |
| | reduce_idx Optional[Union[int, List[int]]]: |
| | reduce layer index, from 40ms to 80ms per frame. |
| | recover_idx Optional[Union[int, List[int]]]: |
| | recover layer index, from 80ms to 40ms per frame. |
| | feed_forward_expansion_factor (int): Enlarge coefficient of FFN. |
| | dw_stride (bool): Whether do depthwise convolution |
| | on subsampling module. |
| | input_dropout_rate (float): Dropout rate of input projection layer. |
| | pos_enc_layer_type (str): Self attention type. |
| | time_reduction_layer_type (str): Conv1d or Conv2d reduction layer. |
| | do_rel_shift (bool): Whether to do relative shift |
| | operation on rel-attention module. |
| | cnn_module_kernel (int): Kernel size of CNN module. |
| | activation_type (str): Encoder activation function type. |
| | use_cnn_module (bool): Whether to use convolution module. |
| | cnn_module_kernel (int): Kernel size of convolution module. |
| | adaptive_scale (bool): Whether to use adaptive scale. |
| | init_weights (bool): Whether to initialize weights. |
| | causal (bool): whether to use causal convolution or not. |
| | """ |
| | super(SqueezeformerEncoder, self).__init__() |
| | self.global_cmvn = global_cmvn |
| | self.reduce_idx: Optional[Union[int, List[int]]] = ( |
| | [reduce_idx] if type(reduce_idx) == int else reduce_idx |
| | ) |
| | self.recover_idx: Optional[Union[int, List[int]]] = ( |
| | [recover_idx] if type(recover_idx) == int else recover_idx |
| | ) |
| | self.check_ascending_list() |
| | if reduce_idx is None: |
| | self.time_reduce = None |
| | else: |
| | if recover_idx is None: |
| | self.time_reduce = "normal" |
| | else: |
| | self.time_reduce = "recover" |
| | assert len(self.reduce_idx) == len(self.recover_idx) |
| | self.reduce_stride = 2 |
| | self._output_size = output_size |
| | self.normalize_before = normalize_before |
| | self.static_chunk_size = static_chunk_size |
| | self.use_dynamic_chunk = use_dynamic_chunk |
| | self.use_dynamic_left_chunk = use_dynamic_left_chunk |
| | self.pos_enc_layer_type = pos_enc_layer_type |
| | activation = get_activation(activation_type) |
| |
|
| | |
| | if pos_enc_layer_type != "rel_pos": |
| | encoder_selfattn_layer = MultiHeadedAttention |
| | encoder_selfattn_layer_args = ( |
| | attention_heads, |
| | output_size, |
| | attention_dropout_rate, |
| | ) |
| | else: |
| | encoder_selfattn_layer = RelPositionMultiHeadedAttention |
| | encoder_selfattn_layer_args = ( |
| | attention_heads, |
| | encoder_dim, |
| | attention_dropout_rate, |
| | do_rel_shift, |
| | adaptive_scale, |
| | init_weights, |
| | ) |
| |
|
| | |
| | positionwise_layer = PositionwiseFeedForward |
| | positionwise_layer_args = ( |
| | encoder_dim, |
| | encoder_dim * feed_forward_expansion_factor, |
| | feed_forward_dropout_rate, |
| | activation, |
| | adaptive_scale, |
| | init_weights, |
| | ) |
| |
|
| | |
| | convolution_layer = ConvolutionModule |
| | convolution_layer_args = ( |
| | encoder_dim, |
| | cnn_module_kernel, |
| | activation, |
| | cnn_norm_type, |
| | causal, |
| | True, |
| | adaptive_scale, |
| | init_weights, |
| | ) |
| |
|
| | self.embed = DepthwiseConv2dSubsampling4( |
| | 1, |
| | encoder_dim, |
| | RelPositionalEncoding(encoder_dim, dropout_rate=0.1), |
| | dw_stride, |
| | input_size, |
| | input_dropout_rate, |
| | init_weights, |
| | ) |
| |
|
| | self.preln = nn.LayerNorm(encoder_dim) |
| | self.encoders = torch.nn.ModuleList( |
| | [ |
| | SqueezeformerEncoderLayer( |
| | encoder_dim, |
| | encoder_selfattn_layer(*encoder_selfattn_layer_args), |
| | positionwise_layer(*positionwise_layer_args), |
| | convolution_layer(*convolution_layer_args), |
| | positionwise_layer(*positionwise_layer_args), |
| | normalize_before, |
| | dropout, |
| | concat_after, |
| | ) |
| | for _ in range(num_blocks) |
| | ] |
| | ) |
| | if time_reduction_layer_type == "conv1d": |
| | time_reduction_layer = TimeReductionLayer1D |
| | time_reduction_layer_args = { |
| | "channel": encoder_dim, |
| | "out_dim": encoder_dim, |
| | } |
| | elif time_reduction_layer_type == "stream": |
| | time_reduction_layer = TimeReductionLayerStream |
| | time_reduction_layer_args = { |
| | "channel": encoder_dim, |
| | "out_dim": encoder_dim, |
| | } |
| | else: |
| | time_reduction_layer = TimeReductionLayer2D |
| | time_reduction_layer_args = {"encoder_dim": encoder_dim} |
| |
|
| | self.time_reduction_layer = time_reduction_layer(**time_reduction_layer_args) |
| | self.time_recover_layer = nn.Linear(encoder_dim, encoder_dim) |
| | self.final_proj = None |
| | if output_size != encoder_dim: |
| | self.final_proj = nn.Linear(encoder_dim, output_size) |
| |
|
| | def output_size(self) -> int: |
| | return self._output_size |
| |
|
| | def forward( |
| | self, |
| | xs: torch.Tensor, |
| | xs_lens: torch.Tensor, |
| | decoding_chunk_size: int = 0, |
| | num_decoding_left_chunks: int = -1, |
| | ) -> Tuple[torch.Tensor, torch.Tensor]: |
| | T = xs.size(1) |
| | masks = ~make_pad_mask(xs_lens, T).unsqueeze(1) |
| | if self.global_cmvn is not None: |
| | xs = self.global_cmvn(xs) |
| | xs, pos_emb, masks = self.embed(xs, masks) |
| | mask_pad = masks |
| | chunk_masks = add_optional_chunk_mask( |
| | xs, |
| | masks, |
| | self.use_dynamic_chunk, |
| | self.use_dynamic_left_chunk, |
| | decoding_chunk_size, |
| | self.static_chunk_size, |
| | num_decoding_left_chunks, |
| | ) |
| | xs_lens = mask_pad.squeeze(1).sum(1) |
| | xs = self.preln(xs) |
| | recover_activations: List[ |
| | Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor] |
| | ] = [] |
| | index = 0 |
| | for i, layer in enumerate(self.encoders): |
| | if self.reduce_idx is not None: |
| | if self.time_reduce is not None and i in self.reduce_idx: |
| | recover_activations.append((xs, chunk_masks, pos_emb, mask_pad)) |
| | xs, xs_lens, chunk_masks, mask_pad = self.time_reduction_layer( |
| | xs, xs_lens, chunk_masks, mask_pad |
| | ) |
| | pos_emb = pos_emb[:, ::2, :] |
| | index += 1 |
| |
|
| | if self.recover_idx is not None: |
| | if self.time_reduce == "recover" and i in self.recover_idx: |
| | index -= 1 |
| | ( |
| | recover_tensor, |
| | recover_chunk_masks, |
| | recover_pos_emb, |
| | recover_mask_pad, |
| | ) = recover_activations[index] |
| | |
| | xs = xs.unsqueeze(2).repeat(1, 1, 2, 1).flatten(1, 2) |
| | xs = self.time_recover_layer(xs) |
| | recoverd_t = recover_tensor.size(1) |
| | xs = recover_tensor + xs[:, :recoverd_t, :].contiguous() |
| | chunk_masks = recover_chunk_masks |
| | pos_emb = recover_pos_emb |
| | mask_pad = recover_mask_pad |
| | xs = xs.masked_fill(~mask_pad[:, 0, :].unsqueeze(-1), 0.0) |
| |
|
| | xs, chunk_masks, _, _ = layer(xs, chunk_masks, pos_emb, mask_pad) |
| |
|
| | if self.final_proj is not None: |
| | xs = self.final_proj(xs) |
| | return xs, masks |
| |
|
| | def check_ascending_list(self): |
| | if self.reduce_idx is not None: |
| | assert self.reduce_idx == sorted( |
| | self.reduce_idx |
| | ), "reduce_idx should be int or ascending list" |
| | if self.recover_idx is not None: |
| | assert self.recover_idx == sorted( |
| | self.recover_idx |
| | ), "recover_idx should be int or ascending list" |
| |
|
| | def calculate_downsampling_factor(self, i: int) -> int: |
| | if self.reduce_idx is None: |
| | return 1 |
| | else: |
| | reduce_exp, recover_exp = 0, 0 |
| | for exp, rd_idx in enumerate(self.reduce_idx): |
| | if i >= rd_idx: |
| | reduce_exp = exp + 1 |
| | if self.recover_idx is not None: |
| | for exp, rc_idx in enumerate(self.recover_idx): |
| | if i >= rc_idx: |
| | recover_exp = exp + 1 |
| | return int(2 ** (reduce_exp - recover_exp)) |
| |
|
| | def forward_chunk( |
| | self, |
| | xs: torch.Tensor, |
| | offset: int, |
| | required_cache_size: int, |
| | att_cache: torch.Tensor = torch.zeros(0, 0, 0, 0), |
| | cnn_cache: torch.Tensor = torch.zeros(0, 0, 0, 0), |
| | att_mask: torch.Tensor = torch.ones((0, 0, 0), dtype=torch.bool), |
| | ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: |
| | """ Forward just one chunk |
| | |
| | Args: |
| | xs (torch.Tensor): chunk input, with shape (b=1, time, mel-dim), |
| | where `time == (chunk_size - 1) * subsample_rate + \ |
| | subsample.right_context + 1` |
| | offset (int): current offset in encoder output time stamp |
| | required_cache_size (int): cache size required for next chunk |
| | compuation |
| | >=0: actual cache size |
| | <0: means all history cache is required |
| | att_cache (torch.Tensor): cache tensor for KEY & VALUE in |
| | transformer/conformer attention, with shape |
| | (elayers, head, cache_t1, d_k * 2), where |
| | `head * d_k == hidden-dim` and |
| | `cache_t1 == chunk_size * num_decoding_left_chunks`. |
| | cnn_cache (torch.Tensor): cache tensor for cnn_module in conformer, |
| | (elayers, b=1, hidden-dim, cache_t2), where |
| | `cache_t2 == cnn.lorder - 1` |
| | |
| | Returns: |
| | torch.Tensor: output of current input xs, |
| | with shape (b=1, chunk_size, hidden-dim). |
| | torch.Tensor: new attention cache required for next chunk, with |
| | dynamic shape (elayers, head, ?, d_k * 2) |
| | depending on required_cache_size. |
| | torch.Tensor: new conformer cnn cache required for next chunk, with |
| | same shape as the original cnn_cache. |
| | |
| | """ |
| | assert xs.size(0) == 1 |
| | |
| | tmp_masks = torch.ones(1, xs.size(1), device=xs.device, dtype=torch.bool) |
| | tmp_masks = tmp_masks.unsqueeze(1) |
| | if self.global_cmvn is not None: |
| | xs = self.global_cmvn(xs) |
| | |
| | xs, pos_emb, _ = self.embed(xs, tmp_masks, offset) |
| | |
| | elayers, cache_t1 = att_cache.size(0), att_cache.size(2) |
| | chunk_size = xs.size(1) |
| | attention_key_size = cache_t1 + chunk_size |
| | pos_emb = self.embed.position_encoding( |
| | offset=offset - cache_t1, size=attention_key_size |
| | ) |
| | if required_cache_size < 0: |
| | next_cache_start = 0 |
| | elif required_cache_size == 0: |
| | next_cache_start = attention_key_size |
| | else: |
| | next_cache_start = max(attention_key_size - required_cache_size, 0) |
| |
|
| | r_att_cache = [] |
| | r_cnn_cache = [] |
| |
|
| | mask_pad = torch.ones(1, xs.size(1), device=xs.device, dtype=torch.bool) |
| | mask_pad = mask_pad.unsqueeze(1) |
| | max_att_len: int = 0 |
| | recover_activations: List[ |
| | Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor] |
| | ] = [] |
| | index = 0 |
| | xs_lens = torch.tensor([xs.size(1)], device=xs.device, dtype=torch.int) |
| | xs = self.preln(xs) |
| | for i, layer in enumerate(self.encoders): |
| | |
| | |
| | |
| | if self.reduce_idx is not None: |
| | if self.time_reduce is not None and i in self.reduce_idx: |
| | recover_activations.append((xs, att_mask, pos_emb, mask_pad)) |
| | xs, xs_lens, att_mask, mask_pad = self.time_reduction_layer( |
| | xs, xs_lens, att_mask, mask_pad |
| | ) |
| | pos_emb = pos_emb[:, ::2, :] |
| | index += 1 |
| |
|
| | if self.recover_idx is not None: |
| | if self.time_reduce == "recover" and i in self.recover_idx: |
| | index -= 1 |
| | ( |
| | recover_tensor, |
| | recover_att_mask, |
| | recover_pos_emb, |
| | recover_mask_pad, |
| | ) = recover_activations[index] |
| | |
| | xs = xs.unsqueeze(2).repeat(1, 1, 2, 1).flatten(1, 2) |
| | xs = self.time_recover_layer(xs) |
| | recoverd_t = recover_tensor.size(1) |
| | xs = recover_tensor + xs[:, :recoverd_t, :].contiguous() |
| | att_mask = recover_att_mask |
| | pos_emb = recover_pos_emb |
| | mask_pad = recover_mask_pad |
| | if att_mask.size(1) != 0: |
| | xs = xs.masked_fill(~att_mask[:, 0, :].unsqueeze(-1), 0.0) |
| |
|
| | factor = self.calculate_downsampling_factor(i) |
| |
|
| | xs, _, new_att_cache, new_cnn_cache = layer( |
| | xs, |
| | att_mask, |
| | pos_emb, |
| | att_cache=att_cache[i : i + 1][:, :, ::factor, :][ |
| | :, :, : pos_emb.size(1) - xs.size(1), : |
| | ] |
| | if elayers > 0 |
| | else att_cache[:, :, ::factor, :], |
| | cnn_cache=cnn_cache[i] if cnn_cache.size(0) > 0 else cnn_cache, |
| | ) |
| | |
| | |
| | |
| | cached_att = new_att_cache[:, :, next_cache_start // factor :, :] |
| | cached_cnn = new_cnn_cache.unsqueeze(0) |
| | cached_att = ( |
| | cached_att.unsqueeze(3).repeat(1, 1, 1, factor, 1).flatten(2, 3) |
| | ) |
| | if i == 0: |
| | |
| | max_att_len = cached_att.size(2) |
| | r_att_cache.append(cached_att[:, :, :max_att_len, :]) |
| | r_cnn_cache.append(cached_cnn) |
| | |
| | |
| | r_att_cache = torch.cat(r_att_cache, dim=0) |
| | |
| | r_cnn_cache = torch.cat(r_cnn_cache, dim=0) |
| |
|
| | if self.final_proj is not None: |
| | xs = self.final_proj(xs) |
| | return (xs, r_att_cache, r_cnn_cache) |
| |
|
| | def forward_chunk_by_chunk( |
| | self, |
| | xs: torch.Tensor, |
| | decoding_chunk_size: int, |
| | num_decoding_left_chunks: int = -1, |
| | ) -> Tuple[torch.Tensor, torch.Tensor]: |
| | """Forward input chunk by chunk with chunk_size like a streaming |
| | fashion |
| | |
| | Here we should pay special attention to computation cache in the |
| | streaming style forward chunk by chunk. Three things should be taken |
| | into account for computation in the current network: |
| | 1. transformer/conformer encoder layers output cache |
| | 2. convolution in conformer |
| | 3. convolution in subsampling |
| | |
| | However, we don't implement subsampling cache for: |
| | 1. We can control subsampling module to output the right result by |
| | overlapping input instead of cache left context, even though it |
| | wastes some computation, but subsampling only takes a very |
| | small fraction of computation in the whole model. |
| | 2. Typically, there are several covolution layers with subsampling |
| | in subsampling module, it is tricky and complicated to do cache |
| | with different convolution layers with different subsampling |
| | rate. |
| | 3. Currently, nn.Sequential is used to stack all the convolution |
| | layers in subsampling, we need to rewrite it to make it work |
| | with cache, which is not prefered. |
| | Args: |
| | xs (torch.Tensor): (1, max_len, dim) |
| | chunk_size (int): decoding chunk size |
| | """ |
| | assert decoding_chunk_size > 0 |
| | |
| | assert self.static_chunk_size > 0 or self.use_dynamic_chunk |
| | subsampling = self.embed.subsampling_rate |
| | context = self.embed.right_context + 1 |
| | stride = subsampling * decoding_chunk_size |
| | decoding_window = (decoding_chunk_size - 1) * subsampling + context |
| | num_frames = xs.size(1) |
| | att_cache: torch.Tensor = torch.zeros((0, 0, 0, 0), device=xs.device) |
| | cnn_cache: torch.Tensor = torch.zeros((0, 0, 0, 0), device=xs.device) |
| | outputs = [] |
| | offset = 0 |
| | required_cache_size = decoding_chunk_size * num_decoding_left_chunks |
| |
|
| | |
| | for cur in range(0, num_frames - context + 1, stride): |
| | end = min(cur + decoding_window, num_frames) |
| | chunk_xs = xs[:, cur:end, :] |
| | (y, att_cache, cnn_cache) = self.forward_chunk( |
| | chunk_xs, offset, required_cache_size, att_cache, cnn_cache |
| | ) |
| | outputs.append(y) |
| | offset += y.size(1) |
| | ys = torch.cat(outputs, 1) |
| | masks = torch.ones((1, 1, ys.size(1)), device=ys.device, dtype=torch.bool) |
| | return ys, masks |
| |
|