""" 评论罗伯特事件分词器 - 支持语义特征、偏差特征等 扩展EventTokenizer以支持多模态特征的padding和批处理 """ from typing import Optional, Union, Dict, Any import numpy as np import torch from easy_tpp.preprocess.event_tokenizer import EventTokenizer, BatchEncoding from easy_tpp.utils import PaddingStrategy class RobertEventTokenizer(EventTokenizer): """ 支持语义特征、偏差特征等的事件分词器 扩展EventTokenizer以支持: - semantic_vectors: 语义向量padding - deviation_features: 偏差特征padding - is_spontaneous: 自发/被@标记padding """ def __init__(self, config, use_semantic=False, use_deviation=False, semantic_dim=768): """ 初始化分词器 Args: config: 配置对象 use_semantic: 是否使用语义特征 use_deviation: 是否使用偏差特征 semantic_dim: 语义向量维度 """ super(RobertEventTokenizer, self).__init__(config) self.use_semantic = use_semantic self.use_deviation = use_deviation self.semantic_dim = semantic_dim # 添加自定义特征到model_input_names # 标准顺序:time_seqs, time_delta_seqs, type_seqs, batch_non_pad_mask, attention_mask # 自定义特征添加在后面 if self.use_semantic: self.model_input_names.append('semantic_vectors') if self.use_deviation: self.model_input_names.append('deviation_features') # is_spontaneous总是添加(如果使用) self.model_input_names.append('is_spontaneous') def _pad( self, encoded_inputs: Union[Dict[str, Any], BatchEncoding], max_length: Optional[int] = None, padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD, return_attention_mask: Optional[bool] = None, ) -> dict: """ 填充编码输入(包括自定义特征) Args: encoded_inputs: 编码后的输入 max_length: 最大长度 padding_strategy: 填充策略 return_attention_mask: 是否返回注意力掩码 Returns: dict: 填充后的批次数据 """ # 先处理标准字段(调用父类方法) # 但我们需要重写以添加自定义特征的处理 required_input = encoded_inputs[self.model_input_names[0]] if padding_strategy == PaddingStrategy.LONGEST: max_length = max(len(seq) for seq in required_input) padding_strategy = PaddingStrategy.MAX_LENGTH # 获取序列长度 seq_lens = np.array([len(seq) for seq in required_input]) is_all_seq_equal_max_length = np.all(seq_lens == max_length) needs_to_be_padded = padding_strategy != PaddingStrategy.DO_NOT_PAD and ~is_all_seq_equal_max_length batch_output = dict() # 处理标准字段(time_seqs, time_delta_seqs, type_seqs) if needs_to_be_padded: batch_output[self.model_input_names[0]] = self.make_pad_sequence( encoded_inputs[self.model_input_names[0]], self.pad_token_id, padding_side=self.padding_side, max_len=max_length ) batch_output[self.model_input_names[1]] = self.make_pad_sequence( encoded_inputs[self.model_input_names[1]], self.pad_token_id, padding_side=self.padding_side, max_len=max_length ) batch_output[self.model_input_names[2]] = self.make_pad_sequence( encoded_inputs[self.model_input_names[2]], self.pad_token_id, padding_side=self.padding_side, max_len=max_length, dtype=np.int64 ) else: batch_output[self.model_input_names[0]] = np.array( encoded_inputs[self.model_input_names[0]], dtype=np.float32 ) batch_output[self.model_input_names[1]] = np.array( encoded_inputs[self.model_input_names[1]], dtype=np.float32 ) batch_output[self.model_input_names[2]] = np.array( encoded_inputs[self.model_input_names[2]], dtype=np.int64 ) # non_pad_mask seq_pad_mask = np.full_like(batch_output[self.model_input_names[2]], fill_value=True, dtype=bool) for i, seq_len in enumerate(seq_lens): seq_pad_mask[i, seq_len:] = False batch_output[self.model_input_names[3]] = seq_pad_mask # attention_mask if return_attention_mask is None: return_attention_mask = "attention_mask" in self.model_input_names if return_attention_mask: batch_output[self.model_input_names[4]] = self.make_attn_mask_for_pad_sequence( batch_output[self.model_input_names[2]], self.pad_token_id ) else: batch_output[self.model_input_names[4]] = [] # 处理自定义特征 # 处理语义向量 if self.use_semantic and 'semantic_vectors' in encoded_inputs: semantic_vectors = encoded_inputs['semantic_vectors'] if needs_to_be_padded: batch_output['semantic_vectors'] = self.make_pad_sequence_for_features( semantic_vectors, pad_value=0.0, max_len=max_length, feature_dim=self.semantic_dim ) else: batch_output['semantic_vectors'] = np.array(semantic_vectors, dtype=np.float32) elif self.use_semantic: # 如果没有提供但需要,创建零向量 batch_size = len(required_input) if needs_to_be_padded: batch_output['semantic_vectors'] = np.zeros( (batch_size, max_length, self.semantic_dim), dtype=np.float32 ) else: # 使用最大长度 max_seq_len = int(seq_lens.max()) batch_output['semantic_vectors'] = np.zeros( (batch_size, max_seq_len, self.semantic_dim), dtype=np.float32 ) # 处理偏差特征 if self.use_deviation and 'deviation_features' in encoded_inputs: deviation_features = encoded_inputs['deviation_features'] if needs_to_be_padded: batch_output['deviation_features'] = self.make_pad_sequence_for_features( deviation_features, pad_value=0.0, max_len=max_length, feature_dim=3 ) else: batch_output['deviation_features'] = np.array(deviation_features, dtype=np.float32) elif self.use_deviation: # 如果没有提供但需要,创建零向量 batch_size = len(required_input) if needs_to_be_padded: batch_output['deviation_features'] = np.zeros( (batch_size, max_length, 3), dtype=np.float32 ) else: max_seq_len = int(seq_lens.max()) batch_output['deviation_features'] = np.zeros( (batch_size, max_seq_len, 3), dtype=np.float32 ) # 处理is_spontaneous if 'is_spontaneous' in encoded_inputs: is_spontaneous = encoded_inputs['is_spontaneous'] if needs_to_be_padded: batch_output['is_spontaneous'] = self.make_pad_sequence_for_features( is_spontaneous, pad_value=-1.0, # -1表示不适用 max_len=max_length, feature_dim=1 # 标量 ) else: batch_output['is_spontaneous'] = np.array(is_spontaneous, dtype=np.float32) else: # 如果没有提供,创建-1向量(不适用) batch_size = len(required_input) if needs_to_be_padded: batch_output['is_spontaneous'] = np.full( (batch_size, max_length), -1.0, dtype=np.float32 ) else: max_seq_len = int(seq_lens.max()) batch_output['is_spontaneous'] = np.full( (batch_size, max_seq_len), -1.0, dtype=np.float32 ) return batch_output def make_pad_sequence_for_features(self, seqs, pad_value, max_len, feature_dim=None, dtype=np.float32): """ 为特征序列创建padding(辅助方法) Args: seqs: 序列列表 pad_value: padding值 max_len: 最大长度 feature_dim: 特征维度(如果是多维特征) dtype: 数据类型 Returns: np.ndarray: 填充后的数组 """ padded_seqs = [] for seq in seqs: seq_len = len(seq) if seq_len < max_len: pad_len = max_len - seq_len if isinstance(seq, np.ndarray): if seq.ndim == 1: # 一维数组 pad = np.full(pad_len, pad_value, dtype=dtype) padded_seq = np.concatenate([seq, pad], axis=0) else: # 多维数组 pad_shape = (pad_len,) + seq.shape[1:] pad = np.full(pad_shape, pad_value, dtype=dtype) padded_seq = np.concatenate([seq, pad], axis=0) else: # 列表 if isinstance(seq[0], (list, np.ndarray, tuple)): # 嵌套列表(多维) if feature_dim: pad = [[pad_value] * feature_dim] * pad_len else: pad = [[pad_value] * len(seq[0])] * pad_len padded_seq = seq + pad else: # 一维列表 pad = [pad_value] * pad_len padded_seq = seq + pad padded_seqs.append(padded_seq) else: padded_seqs.append(seq[:max_len]) return np.array(padded_seqs, dtype=dtype)