Abigail99216's picture
Upload folder using huggingface_hub
f43af3c verified
"""
评论罗伯特事件分词器 - 支持语义特征、偏差特征等
扩展EventTokenizer以支持多模态特征的padding和批处理
"""
from typing import Optional, Union, Dict, Any
import numpy as np
import torch
from easy_tpp.preprocess.event_tokenizer import EventTokenizer, BatchEncoding
from easy_tpp.utils import PaddingStrategy
class RobertEventTokenizer(EventTokenizer):
"""
支持语义特征、偏差特征等的事件分词器
扩展EventTokenizer以支持:
- semantic_vectors: 语义向量padding
- deviation_features: 偏差特征padding
- is_spontaneous: 自发/被@标记padding
"""
def __init__(self, config, use_semantic=False, use_deviation=False, semantic_dim=768):
"""
初始化分词器
Args:
config: 配置对象
use_semantic: 是否使用语义特征
use_deviation: 是否使用偏差特征
semantic_dim: 语义向量维度
"""
super(RobertEventTokenizer, self).__init__(config)
self.use_semantic = use_semantic
self.use_deviation = use_deviation
self.semantic_dim = semantic_dim
# 添加自定义特征到model_input_names
# 标准顺序:time_seqs, time_delta_seqs, type_seqs, batch_non_pad_mask, attention_mask
# 自定义特征添加在后面
if self.use_semantic:
self.model_input_names.append('semantic_vectors')
if self.use_deviation:
self.model_input_names.append('deviation_features')
# is_spontaneous总是添加(如果使用)
self.model_input_names.append('is_spontaneous')
def _pad(
self,
encoded_inputs: Union[Dict[str, Any], BatchEncoding],
max_length: Optional[int] = None,
padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD,
return_attention_mask: Optional[bool] = None,
) -> dict:
"""
填充编码输入(包括自定义特征)
Args:
encoded_inputs: 编码后的输入
max_length: 最大长度
padding_strategy: 填充策略
return_attention_mask: 是否返回注意力掩码
Returns:
dict: 填充后的批次数据
"""
# 先处理标准字段(调用父类方法)
# 但我们需要重写以添加自定义特征的处理
required_input = encoded_inputs[self.model_input_names[0]]
if padding_strategy == PaddingStrategy.LONGEST:
max_length = max(len(seq) for seq in required_input)
padding_strategy = PaddingStrategy.MAX_LENGTH
# 获取序列长度
seq_lens = np.array([len(seq) for seq in required_input])
is_all_seq_equal_max_length = np.all(seq_lens == max_length)
needs_to_be_padded = padding_strategy != PaddingStrategy.DO_NOT_PAD and ~is_all_seq_equal_max_length
batch_output = dict()
# 处理标准字段(time_seqs, time_delta_seqs, type_seqs)
if needs_to_be_padded:
batch_output[self.model_input_names[0]] = self.make_pad_sequence(
encoded_inputs[self.model_input_names[0]],
self.pad_token_id,
padding_side=self.padding_side,
max_len=max_length
)
batch_output[self.model_input_names[1]] = self.make_pad_sequence(
encoded_inputs[self.model_input_names[1]],
self.pad_token_id,
padding_side=self.padding_side,
max_len=max_length
)
batch_output[self.model_input_names[2]] = self.make_pad_sequence(
encoded_inputs[self.model_input_names[2]],
self.pad_token_id,
padding_side=self.padding_side,
max_len=max_length,
dtype=np.int64
)
else:
batch_output[self.model_input_names[0]] = np.array(
encoded_inputs[self.model_input_names[0]], dtype=np.float32
)
batch_output[self.model_input_names[1]] = np.array(
encoded_inputs[self.model_input_names[1]], dtype=np.float32
)
batch_output[self.model_input_names[2]] = np.array(
encoded_inputs[self.model_input_names[2]], dtype=np.int64
)
# non_pad_mask
seq_pad_mask = np.full_like(batch_output[self.model_input_names[2]], fill_value=True, dtype=bool)
for i, seq_len in enumerate(seq_lens):
seq_pad_mask[i, seq_len:] = False
batch_output[self.model_input_names[3]] = seq_pad_mask
# attention_mask
if return_attention_mask is None:
return_attention_mask = "attention_mask" in self.model_input_names
if return_attention_mask:
batch_output[self.model_input_names[4]] = self.make_attn_mask_for_pad_sequence(
batch_output[self.model_input_names[2]],
self.pad_token_id
)
else:
batch_output[self.model_input_names[4]] = []
# 处理自定义特征
# 处理语义向量
if self.use_semantic and 'semantic_vectors' in encoded_inputs:
semantic_vectors = encoded_inputs['semantic_vectors']
if needs_to_be_padded:
batch_output['semantic_vectors'] = self.make_pad_sequence_for_features(
semantic_vectors,
pad_value=0.0,
max_len=max_length,
feature_dim=self.semantic_dim
)
else:
batch_output['semantic_vectors'] = np.array(semantic_vectors, dtype=np.float32)
elif self.use_semantic:
# 如果没有提供但需要,创建零向量
batch_size = len(required_input)
if needs_to_be_padded:
batch_output['semantic_vectors'] = np.zeros(
(batch_size, max_length, self.semantic_dim), dtype=np.float32
)
else:
# 使用最大长度
max_seq_len = int(seq_lens.max())
batch_output['semantic_vectors'] = np.zeros(
(batch_size, max_seq_len, self.semantic_dim), dtype=np.float32
)
# 处理偏差特征
if self.use_deviation and 'deviation_features' in encoded_inputs:
deviation_features = encoded_inputs['deviation_features']
if needs_to_be_padded:
batch_output['deviation_features'] = self.make_pad_sequence_for_features(
deviation_features,
pad_value=0.0,
max_len=max_length,
feature_dim=3
)
else:
batch_output['deviation_features'] = np.array(deviation_features, dtype=np.float32)
elif self.use_deviation:
# 如果没有提供但需要,创建零向量
batch_size = len(required_input)
if needs_to_be_padded:
batch_output['deviation_features'] = np.zeros(
(batch_size, max_length, 3), dtype=np.float32
)
else:
max_seq_len = int(seq_lens.max())
batch_output['deviation_features'] = np.zeros(
(batch_size, max_seq_len, 3), dtype=np.float32
)
# 处理is_spontaneous
if 'is_spontaneous' in encoded_inputs:
is_spontaneous = encoded_inputs['is_spontaneous']
if needs_to_be_padded:
batch_output['is_spontaneous'] = self.make_pad_sequence_for_features(
is_spontaneous,
pad_value=-1.0, # -1表示不适用
max_len=max_length,
feature_dim=1 # 标量
)
else:
batch_output['is_spontaneous'] = np.array(is_spontaneous, dtype=np.float32)
else:
# 如果没有提供,创建-1向量(不适用)
batch_size = len(required_input)
if needs_to_be_padded:
batch_output['is_spontaneous'] = np.full(
(batch_size, max_length), -1.0, dtype=np.float32
)
else:
max_seq_len = int(seq_lens.max())
batch_output['is_spontaneous'] = np.full(
(batch_size, max_seq_len), -1.0, dtype=np.float32
)
return batch_output
def make_pad_sequence_for_features(self, seqs, pad_value, max_len, feature_dim=None, dtype=np.float32):
"""
为特征序列创建padding(辅助方法)
Args:
seqs: 序列列表
pad_value: padding值
max_len: 最大长度
feature_dim: 特征维度(如果是多维特征)
dtype: 数据类型
Returns:
np.ndarray: 填充后的数组
"""
padded_seqs = []
for seq in seqs:
seq_len = len(seq)
if seq_len < max_len:
pad_len = max_len - seq_len
if isinstance(seq, np.ndarray):
if seq.ndim == 1:
# 一维数组
pad = np.full(pad_len, pad_value, dtype=dtype)
padded_seq = np.concatenate([seq, pad], axis=0)
else:
# 多维数组
pad_shape = (pad_len,) + seq.shape[1:]
pad = np.full(pad_shape, pad_value, dtype=dtype)
padded_seq = np.concatenate([seq, pad], axis=0)
else:
# 列表
if isinstance(seq[0], (list, np.ndarray, tuple)):
# 嵌套列表(多维)
if feature_dim:
pad = [[pad_value] * feature_dim] * pad_len
else:
pad = [[pad_value] * len(seq[0])] * pad_len
padded_seq = seq + pad
else:
# 一维列表
pad = [pad_value] * pad_len
padded_seq = seq + pad
padded_seqs.append(padded_seq)
else:
padded_seqs.append(seq[:max_len])
return np.array(padded_seqs, dtype=dtype)