|
|
""" |
|
|
评论罗伯特事件分词器 - 支持语义特征、偏差特征等 |
|
|
|
|
|
扩展EventTokenizer以支持多模态特征的padding和批处理 |
|
|
""" |
|
|
|
|
|
from typing import Optional, Union, Dict, Any |
|
|
import numpy as np |
|
|
import torch |
|
|
from easy_tpp.preprocess.event_tokenizer import EventTokenizer, BatchEncoding |
|
|
from easy_tpp.utils import PaddingStrategy |
|
|
|
|
|
|
|
|
class RobertEventTokenizer(EventTokenizer): |
|
|
""" |
|
|
支持语义特征、偏差特征等的事件分词器 |
|
|
|
|
|
扩展EventTokenizer以支持: |
|
|
- semantic_vectors: 语义向量padding |
|
|
- deviation_features: 偏差特征padding |
|
|
- is_spontaneous: 自发/被@标记padding |
|
|
""" |
|
|
|
|
|
def __init__(self, config, use_semantic=False, use_deviation=False, semantic_dim=768): |
|
|
""" |
|
|
初始化分词器 |
|
|
|
|
|
Args: |
|
|
config: 配置对象 |
|
|
use_semantic: 是否使用语义特征 |
|
|
use_deviation: 是否使用偏差特征 |
|
|
semantic_dim: 语义向量维度 |
|
|
""" |
|
|
super(RobertEventTokenizer, self).__init__(config) |
|
|
|
|
|
self.use_semantic = use_semantic |
|
|
self.use_deviation = use_deviation |
|
|
self.semantic_dim = semantic_dim |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.use_semantic: |
|
|
self.model_input_names.append('semantic_vectors') |
|
|
if self.use_deviation: |
|
|
self.model_input_names.append('deviation_features') |
|
|
|
|
|
|
|
|
self.model_input_names.append('is_spontaneous') |
|
|
|
|
|
def _pad( |
|
|
self, |
|
|
encoded_inputs: Union[Dict[str, Any], BatchEncoding], |
|
|
max_length: Optional[int] = None, |
|
|
padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD, |
|
|
return_attention_mask: Optional[bool] = None, |
|
|
) -> dict: |
|
|
""" |
|
|
填充编码输入(包括自定义特征) |
|
|
|
|
|
Args: |
|
|
encoded_inputs: 编码后的输入 |
|
|
max_length: 最大长度 |
|
|
padding_strategy: 填充策略 |
|
|
return_attention_mask: 是否返回注意力掩码 |
|
|
|
|
|
Returns: |
|
|
dict: 填充后的批次数据 |
|
|
""" |
|
|
|
|
|
|
|
|
required_input = encoded_inputs[self.model_input_names[0]] |
|
|
|
|
|
if padding_strategy == PaddingStrategy.LONGEST: |
|
|
max_length = max(len(seq) for seq in required_input) |
|
|
padding_strategy = PaddingStrategy.MAX_LENGTH |
|
|
|
|
|
|
|
|
seq_lens = np.array([len(seq) for seq in required_input]) |
|
|
is_all_seq_equal_max_length = np.all(seq_lens == max_length) |
|
|
needs_to_be_padded = padding_strategy != PaddingStrategy.DO_NOT_PAD and ~is_all_seq_equal_max_length |
|
|
|
|
|
batch_output = dict() |
|
|
|
|
|
|
|
|
if needs_to_be_padded: |
|
|
batch_output[self.model_input_names[0]] = self.make_pad_sequence( |
|
|
encoded_inputs[self.model_input_names[0]], |
|
|
self.pad_token_id, |
|
|
padding_side=self.padding_side, |
|
|
max_len=max_length |
|
|
) |
|
|
batch_output[self.model_input_names[1]] = self.make_pad_sequence( |
|
|
encoded_inputs[self.model_input_names[1]], |
|
|
self.pad_token_id, |
|
|
padding_side=self.padding_side, |
|
|
max_len=max_length |
|
|
) |
|
|
batch_output[self.model_input_names[2]] = self.make_pad_sequence( |
|
|
encoded_inputs[self.model_input_names[2]], |
|
|
self.pad_token_id, |
|
|
padding_side=self.padding_side, |
|
|
max_len=max_length, |
|
|
dtype=np.int64 |
|
|
) |
|
|
else: |
|
|
batch_output[self.model_input_names[0]] = np.array( |
|
|
encoded_inputs[self.model_input_names[0]], dtype=np.float32 |
|
|
) |
|
|
batch_output[self.model_input_names[1]] = np.array( |
|
|
encoded_inputs[self.model_input_names[1]], dtype=np.float32 |
|
|
) |
|
|
batch_output[self.model_input_names[2]] = np.array( |
|
|
encoded_inputs[self.model_input_names[2]], dtype=np.int64 |
|
|
) |
|
|
|
|
|
|
|
|
seq_pad_mask = np.full_like(batch_output[self.model_input_names[2]], fill_value=True, dtype=bool) |
|
|
for i, seq_len in enumerate(seq_lens): |
|
|
seq_pad_mask[i, seq_len:] = False |
|
|
batch_output[self.model_input_names[3]] = seq_pad_mask |
|
|
|
|
|
|
|
|
if return_attention_mask is None: |
|
|
return_attention_mask = "attention_mask" in self.model_input_names |
|
|
|
|
|
if return_attention_mask: |
|
|
batch_output[self.model_input_names[4]] = self.make_attn_mask_for_pad_sequence( |
|
|
batch_output[self.model_input_names[2]], |
|
|
self.pad_token_id |
|
|
) |
|
|
else: |
|
|
batch_output[self.model_input_names[4]] = [] |
|
|
|
|
|
|
|
|
|
|
|
if self.use_semantic and 'semantic_vectors' in encoded_inputs: |
|
|
semantic_vectors = encoded_inputs['semantic_vectors'] |
|
|
if needs_to_be_padded: |
|
|
batch_output['semantic_vectors'] = self.make_pad_sequence_for_features( |
|
|
semantic_vectors, |
|
|
pad_value=0.0, |
|
|
max_len=max_length, |
|
|
feature_dim=self.semantic_dim |
|
|
) |
|
|
else: |
|
|
batch_output['semantic_vectors'] = np.array(semantic_vectors, dtype=np.float32) |
|
|
elif self.use_semantic: |
|
|
|
|
|
batch_size = len(required_input) |
|
|
if needs_to_be_padded: |
|
|
batch_output['semantic_vectors'] = np.zeros( |
|
|
(batch_size, max_length, self.semantic_dim), dtype=np.float32 |
|
|
) |
|
|
else: |
|
|
|
|
|
max_seq_len = int(seq_lens.max()) |
|
|
batch_output['semantic_vectors'] = np.zeros( |
|
|
(batch_size, max_seq_len, self.semantic_dim), dtype=np.float32 |
|
|
) |
|
|
|
|
|
|
|
|
if self.use_deviation and 'deviation_features' in encoded_inputs: |
|
|
deviation_features = encoded_inputs['deviation_features'] |
|
|
if needs_to_be_padded: |
|
|
batch_output['deviation_features'] = self.make_pad_sequence_for_features( |
|
|
deviation_features, |
|
|
pad_value=0.0, |
|
|
max_len=max_length, |
|
|
feature_dim=3 |
|
|
) |
|
|
else: |
|
|
batch_output['deviation_features'] = np.array(deviation_features, dtype=np.float32) |
|
|
elif self.use_deviation: |
|
|
|
|
|
batch_size = len(required_input) |
|
|
if needs_to_be_padded: |
|
|
batch_output['deviation_features'] = np.zeros( |
|
|
(batch_size, max_length, 3), dtype=np.float32 |
|
|
) |
|
|
else: |
|
|
max_seq_len = int(seq_lens.max()) |
|
|
batch_output['deviation_features'] = np.zeros( |
|
|
(batch_size, max_seq_len, 3), dtype=np.float32 |
|
|
) |
|
|
|
|
|
|
|
|
if 'is_spontaneous' in encoded_inputs: |
|
|
is_spontaneous = encoded_inputs['is_spontaneous'] |
|
|
if needs_to_be_padded: |
|
|
batch_output['is_spontaneous'] = self.make_pad_sequence_for_features( |
|
|
is_spontaneous, |
|
|
pad_value=-1.0, |
|
|
max_len=max_length, |
|
|
feature_dim=1 |
|
|
) |
|
|
else: |
|
|
batch_output['is_spontaneous'] = np.array(is_spontaneous, dtype=np.float32) |
|
|
else: |
|
|
|
|
|
batch_size = len(required_input) |
|
|
if needs_to_be_padded: |
|
|
batch_output['is_spontaneous'] = np.full( |
|
|
(batch_size, max_length), -1.0, dtype=np.float32 |
|
|
) |
|
|
else: |
|
|
max_seq_len = int(seq_lens.max()) |
|
|
batch_output['is_spontaneous'] = np.full( |
|
|
(batch_size, max_seq_len), -1.0, dtype=np.float32 |
|
|
) |
|
|
|
|
|
return batch_output |
|
|
|
|
|
def make_pad_sequence_for_features(self, seqs, pad_value, max_len, feature_dim=None, dtype=np.float32): |
|
|
""" |
|
|
为特征序列创建padding(辅助方法) |
|
|
|
|
|
Args: |
|
|
seqs: 序列列表 |
|
|
pad_value: padding值 |
|
|
max_len: 最大长度 |
|
|
feature_dim: 特征维度(如果是多维特征) |
|
|
dtype: 数据类型 |
|
|
|
|
|
Returns: |
|
|
np.ndarray: 填充后的数组 |
|
|
""" |
|
|
padded_seqs = [] |
|
|
for seq in seqs: |
|
|
seq_len = len(seq) |
|
|
if seq_len < max_len: |
|
|
pad_len = max_len - seq_len |
|
|
if isinstance(seq, np.ndarray): |
|
|
if seq.ndim == 1: |
|
|
|
|
|
pad = np.full(pad_len, pad_value, dtype=dtype) |
|
|
padded_seq = np.concatenate([seq, pad], axis=0) |
|
|
else: |
|
|
|
|
|
pad_shape = (pad_len,) + seq.shape[1:] |
|
|
pad = np.full(pad_shape, pad_value, dtype=dtype) |
|
|
padded_seq = np.concatenate([seq, pad], axis=0) |
|
|
else: |
|
|
|
|
|
if isinstance(seq[0], (list, np.ndarray, tuple)): |
|
|
|
|
|
if feature_dim: |
|
|
pad = [[pad_value] * feature_dim] * pad_len |
|
|
else: |
|
|
pad = [[pad_value] * len(seq[0])] * pad_len |
|
|
padded_seq = seq + pad |
|
|
else: |
|
|
|
|
|
pad = [pad_value] * pad_len |
|
|
padded_seq = seq + pad |
|
|
padded_seqs.append(padded_seq) |
|
|
else: |
|
|
padded_seqs.append(seq[:max_len]) |
|
|
|
|
|
return np.array(padded_seqs, dtype=dtype) |
|
|
|
|
|
|