File size: 10,653 Bytes
f43af3c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
"""
评论罗伯特事件分词器 - 支持语义特征、偏差特征等

扩展EventTokenizer以支持多模态特征的padding和批处理
"""

from typing import Optional, Union, Dict, Any
import numpy as np
import torch
from easy_tpp.preprocess.event_tokenizer import EventTokenizer, BatchEncoding
from easy_tpp.utils import PaddingStrategy


class RobertEventTokenizer(EventTokenizer):
    """
    支持语义特征、偏差特征等的事件分词器
    
    扩展EventTokenizer以支持:
    - semantic_vectors: 语义向量padding
    - deviation_features: 偏差特征padding
    - is_spontaneous: 自发/被@标记padding
    """
    
    def __init__(self, config, use_semantic=False, use_deviation=False, semantic_dim=768):
        """
        初始化分词器
        
        Args:
            config: 配置对象
            use_semantic: 是否使用语义特征
            use_deviation: 是否使用偏差特征
            semantic_dim: 语义向量维度
        """
        super(RobertEventTokenizer, self).__init__(config)
        
        self.use_semantic = use_semantic
        self.use_deviation = use_deviation
        self.semantic_dim = semantic_dim
        
        # 添加自定义特征到model_input_names
        # 标准顺序:time_seqs, time_delta_seqs, type_seqs, batch_non_pad_mask, attention_mask
        # 自定义特征添加在后面
        if self.use_semantic:
            self.model_input_names.append('semantic_vectors')
        if self.use_deviation:
            self.model_input_names.append('deviation_features')
        
        # is_spontaneous总是添加(如果使用)
        self.model_input_names.append('is_spontaneous')
    
    def _pad(
        self,
        encoded_inputs: Union[Dict[str, Any], BatchEncoding],
        max_length: Optional[int] = None,
        padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD,
        return_attention_mask: Optional[bool] = None,
    ) -> dict:
        """
        填充编码输入(包括自定义特征)
        
        Args:
            encoded_inputs: 编码后的输入
            max_length: 最大长度
            padding_strategy: 填充策略
            return_attention_mask: 是否返回注意力掩码
        
        Returns:
            dict: 填充后的批次数据
        """
        # 先处理标准字段(调用父类方法)
        # 但我们需要重写以添加自定义特征的处理
        required_input = encoded_inputs[self.model_input_names[0]]
        
        if padding_strategy == PaddingStrategy.LONGEST:
            max_length = max(len(seq) for seq in required_input)
            padding_strategy = PaddingStrategy.MAX_LENGTH
        
        # 获取序列长度
        seq_lens = np.array([len(seq) for seq in required_input])
        is_all_seq_equal_max_length = np.all(seq_lens == max_length)
        needs_to_be_padded = padding_strategy != PaddingStrategy.DO_NOT_PAD and ~is_all_seq_equal_max_length
        
        batch_output = dict()
        
        # 处理标准字段(time_seqs, time_delta_seqs, type_seqs)
        if needs_to_be_padded:
            batch_output[self.model_input_names[0]] = self.make_pad_sequence(
                encoded_inputs[self.model_input_names[0]],
                self.pad_token_id,
                padding_side=self.padding_side,
                max_len=max_length
            )
            batch_output[self.model_input_names[1]] = self.make_pad_sequence(
                encoded_inputs[self.model_input_names[1]],
                self.pad_token_id,
                padding_side=self.padding_side,
                max_len=max_length
            )
            batch_output[self.model_input_names[2]] = self.make_pad_sequence(
                encoded_inputs[self.model_input_names[2]],
                self.pad_token_id,
                padding_side=self.padding_side,
                max_len=max_length,
                dtype=np.int64
            )
        else:
            batch_output[self.model_input_names[0]] = np.array(
                encoded_inputs[self.model_input_names[0]], dtype=np.float32
            )
            batch_output[self.model_input_names[1]] = np.array(
                encoded_inputs[self.model_input_names[1]], dtype=np.float32
            )
            batch_output[self.model_input_names[2]] = np.array(
                encoded_inputs[self.model_input_names[2]], dtype=np.int64
            )
        
        # non_pad_mask
        seq_pad_mask = np.full_like(batch_output[self.model_input_names[2]], fill_value=True, dtype=bool)
        for i, seq_len in enumerate(seq_lens):
            seq_pad_mask[i, seq_len:] = False
        batch_output[self.model_input_names[3]] = seq_pad_mask
        
        # attention_mask
        if return_attention_mask is None:
            return_attention_mask = "attention_mask" in self.model_input_names
        
        if return_attention_mask:
            batch_output[self.model_input_names[4]] = self.make_attn_mask_for_pad_sequence(
                batch_output[self.model_input_names[2]],
                self.pad_token_id
            )
        else:
            batch_output[self.model_input_names[4]] = []
        
        # 处理自定义特征
        # 处理语义向量
        if self.use_semantic and 'semantic_vectors' in encoded_inputs:
            semantic_vectors = encoded_inputs['semantic_vectors']
            if needs_to_be_padded:
                batch_output['semantic_vectors'] = self.make_pad_sequence_for_features(
                    semantic_vectors,
                    pad_value=0.0,
                    max_len=max_length,
                    feature_dim=self.semantic_dim
                )
            else:
                batch_output['semantic_vectors'] = np.array(semantic_vectors, dtype=np.float32)
        elif self.use_semantic:
            # 如果没有提供但需要,创建零向量
            batch_size = len(required_input)
            if needs_to_be_padded:
                batch_output['semantic_vectors'] = np.zeros(
                    (batch_size, max_length, self.semantic_dim), dtype=np.float32
                )
            else:
                # 使用最大长度
                max_seq_len = int(seq_lens.max())
                batch_output['semantic_vectors'] = np.zeros(
                    (batch_size, max_seq_len, self.semantic_dim), dtype=np.float32
                )
        
        # 处理偏差特征
        if self.use_deviation and 'deviation_features' in encoded_inputs:
            deviation_features = encoded_inputs['deviation_features']
            if needs_to_be_padded:
                batch_output['deviation_features'] = self.make_pad_sequence_for_features(
                    deviation_features,
                    pad_value=0.0,
                    max_len=max_length,
                    feature_dim=3
                )
            else:
                batch_output['deviation_features'] = np.array(deviation_features, dtype=np.float32)
        elif self.use_deviation:
            # 如果没有提供但需要,创建零向量
            batch_size = len(required_input)
            if needs_to_be_padded:
                batch_output['deviation_features'] = np.zeros(
                    (batch_size, max_length, 3), dtype=np.float32
                )
            else:
                max_seq_len = int(seq_lens.max())
                batch_output['deviation_features'] = np.zeros(
                    (batch_size, max_seq_len, 3), dtype=np.float32
                )
        
        # 处理is_spontaneous
        if 'is_spontaneous' in encoded_inputs:
            is_spontaneous = encoded_inputs['is_spontaneous']
            if needs_to_be_padded:
                batch_output['is_spontaneous'] = self.make_pad_sequence_for_features(
                    is_spontaneous,
                    pad_value=-1.0,  # -1表示不适用
                    max_len=max_length,
                    feature_dim=1  # 标量
                )
            else:
                batch_output['is_spontaneous'] = np.array(is_spontaneous, dtype=np.float32)
        else:
            # 如果没有提供,创建-1向量(不适用)
            batch_size = len(required_input)
            if needs_to_be_padded:
                batch_output['is_spontaneous'] = np.full(
                    (batch_size, max_length), -1.0, dtype=np.float32
                )
            else:
                max_seq_len = int(seq_lens.max())
                batch_output['is_spontaneous'] = np.full(
                    (batch_size, max_seq_len), -1.0, dtype=np.float32
                )
        
        return batch_output
    
    def make_pad_sequence_for_features(self, seqs, pad_value, max_len, feature_dim=None, dtype=np.float32):
        """
        为特征序列创建padding(辅助方法)
        
        Args:
            seqs: 序列列表
            pad_value: padding值
            max_len: 最大长度
            feature_dim: 特征维度(如果是多维特征)
            dtype: 数据类型
        
        Returns:
            np.ndarray: 填充后的数组
        """
        padded_seqs = []
        for seq in seqs:
            seq_len = len(seq)
            if seq_len < max_len:
                pad_len = max_len - seq_len
                if isinstance(seq, np.ndarray):
                    if seq.ndim == 1:
                        # 一维数组
                        pad = np.full(pad_len, pad_value, dtype=dtype)
                        padded_seq = np.concatenate([seq, pad], axis=0)
                    else:
                        # 多维数组
                        pad_shape = (pad_len,) + seq.shape[1:]
                        pad = np.full(pad_shape, pad_value, dtype=dtype)
                        padded_seq = np.concatenate([seq, pad], axis=0)
                else:
                    # 列表
                    if isinstance(seq[0], (list, np.ndarray, tuple)):
                        # 嵌套列表(多维)
                        if feature_dim:
                            pad = [[pad_value] * feature_dim] * pad_len
                        else:
                            pad = [[pad_value] * len(seq[0])] * pad_len
                        padded_seq = seq + pad
                    else:
                        # 一维列表
                        pad = [pad_value] * pad_len
                        padded_seq = seq + pad
                padded_seqs.append(padded_seq)
            else:
                padded_seqs.append(seq[:max_len])
        
        return np.array(padded_seqs, dtype=dtype)