1NEYRON1 commited on
Commit
bd9de4f
·
verified ·
1 Parent(s): 5e8590b

Create configuration_mosnet.py

Browse files
Files changed (1) hide show
  1. configuration_mosnet.py +21 -312
configuration_mosnet.py CHANGED
@@ -1,315 +1,24 @@
1
- from typing import Any, List, Tuple
2
 
3
- from einops import rearrange
4
- import librosa
5
- import numpy as np
 
 
 
6
 
7
- from src.models.base_model import BaseModel, BaseMultimodalModel
8
-
9
- import torch
10
- import torch.nn.functional as f
11
- from torch import nn
12
-
13
- from transformers import BertModel, BertTokenizer
14
-
15
-
16
- class TimeDistributed(nn.Module):
17
- def __init__(self, module: nn.Module, batch_first: bool) -> None:
18
- super().__init__()
19
- self.module = module
20
- self.batch_first = batch_first
21
-
22
- def forward(self, input_seq: torch.Tensor) -> torch.Tensor:
23
- assert len(input_seq.size()) > 2
24
- reshaped_input = input_seq.contiguous().view(-1, input_seq.size(-1))
25
- output = self.module(reshaped_input)
26
- if self.batch_first:
27
- output = output.contiguous().view(input_seq.size(0), -1, output.size(-1))
28
- else:
29
- output = output.contiguous().view(-1, input_seq.size(1), output.size(-1))
30
- return output
31
-
32
-
33
- class CnnBlstmMbnet2(nn.Module):
34
- def __init__(self, dropout: float = 0.3) -> None:
35
- super().__init__()
36
- self.conv1 = nn.Sequential(
37
- nn.Conv2d(1, 16, (3, 3), (1, 1), padding=1),
38
- nn.ReLU(),
39
- nn.Conv2d(16, 16, (3, 3), (1, 1), 1),
40
- nn.ReLU(),
41
- nn.Conv2d(16, 16, (3, 3), (1, 3), 1),
42
- nn.ReLU(),
43
- nn.BatchNorm2d(16),
44
- nn.Dropout(dropout),
45
- )
46
- self.conv2 = nn.Sequential(
47
- nn.Conv2d(16, 32, (3, 3), (1, 1), 1),
48
- nn.ReLU(),
49
- nn.Conv2d(32, 32, (3, 3), (1, 1), 1),
50
- nn.ReLU(),
51
- nn.Conv2d(32, 32, (3, 3), (1, 3), 1),
52
- nn.ReLU(),
53
- nn.BatchNorm2d(32),
54
- nn.Dropout(dropout),
55
- )
56
- self.conv3 = nn.Sequential(
57
- nn.Conv2d(32, 64, (3, 3), (1, 1), 1),
58
- nn.ReLU(),
59
- nn.Conv2d(64, 64, (3, 3), (1, 1), 1),
60
- nn.ReLU(),
61
- nn.Conv2d(64, 64, (3, 3), (1, 3), 1),
62
- nn.ReLU(),
63
- nn.BatchNorm2d(64),
64
- nn.Dropout(dropout),
65
- )
66
- self.conv4 = nn.Sequential(
67
- nn.Conv2d(64, 128, (3, 3), (1, 1), 1),
68
- nn.ReLU(),
69
- nn.Conv2d(128, 128, (3, 3), (1, 1), 1),
70
- nn.ReLU(),
71
- nn.Conv2d(128, 128, (3, 3), (1, 3), 1),
72
- nn.ReLU(),
73
- nn.BatchNorm2d(128),
74
- nn.Dropout(dropout),
75
- )
76
- self.blstm1 = nn.LSTM(512, 128, bidirectional=True, batch_first=True)
77
- self.droupout = nn.Dropout(dropout)
78
- self.flatten = TimeDistributed(nn.Flatten(), batch_first=True)
79
- self.dense1 = nn.Sequential(
80
- TimeDistributed(
81
- nn.Sequential(
82
- nn.Linear(256, 128),
83
- nn.ReLU(),
84
- ),
85
- batch_first=True,
86
- ),
87
- nn.Dropout(dropout),
88
- )
89
- self.frame_layer = TimeDistributed(nn.Linear(128, 1), batch_first=True)
90
- self.average_layer = nn.AdaptiveAvgPool1d(1)
91
-
92
- def forward(self, forward_input: torch.Tensor, mask: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
93
- conv1_output = self.conv1(forward_input)
94
- conv2_output = self.conv2(conv1_output)
95
- conv3_output = self.conv3(conv2_output)
96
- conv4_output = self.conv4(conv3_output)
97
- conv4_output = conv4_output.permute(0, 2, 1, 3)
98
- conv4_output = torch.reshape(conv4_output, (conv4_output.shape[0], conv4_output.shape[1], 4 * 128))
99
- blstm_output, _ = self.blstm1(conv4_output)
100
- blstm_output = self.droupout(blstm_output)
101
- flatten_output = self.flatten(blstm_output)
102
- fc_output = self.dense1(flatten_output)
103
- frame_score = self.frame_layer(fc_output)
104
- frame_score = frame_score.squeeze(-1) * mask
105
- valid_sum = torch.sum(frame_score, dim=1)
106
- valid_count = torch.sum(mask, dim=1)
107
- avg_score = valid_sum / (valid_count + 1e-8)
108
- return avg_score.unsqueeze(-1), frame_score
109
-
110
-
111
- class SwiGLU(nn.Module):
112
- def forward(self, x: torch.Tensor) -> torch.Tensor:
113
- x_, gate = x.chunk(2, dim=-1)
114
- return f.silu(gate) * x_
115
-
116
-
117
- class RotaryEmbedding(nn.Module):
118
- def __init__(self, dim: int, scale_base: int = 512, use_xpos: bool = True) -> None:
119
- super().__init__()
120
- inv_freq = 1.0 / (10000 ** (torch.arange(0, dim, 2).float() / dim))
121
- self.register_buffer("inv_freq", inv_freq)
122
- self.use_xpos = use_xpos
123
- self.scale_base = scale_base
124
- scale = (torch.arange(0, dim, 2) + 0.4 * dim) / (1.4 * dim)
125
- self.register_buffer('scale', scale)
126
-
127
- def forward(self, seq_len: int, device: torch.device) -> Tuple[torch.Tensor, torch.Tensor]:
128
- t = torch.arange(seq_len, device=device).type_as(self.inv_freq)
129
- freqs = torch.einsum('i , j -> i j', t, self.inv_freq)
130
- freqs = torch.cat((freqs, freqs), dim=-1)
131
- if not self.use_xpos:
132
- return freqs, torch.ones(1, device=device)
133
- power = (t - (seq_len // 2)) / self.scale_base
134
- scale = self.scale ** rearrange(power, 'n -> n 1')
135
- scale = torch.cat((scale, scale), dim=-1)
136
- return freqs, scale
137
-
138
-
139
- def rotate_half(x: torch.Tensor) -> torch.Tensor:
140
- x1, x2 = x.chunk(2, dim=-1)
141
- return torch.cat((-x2, x1), dim=-1)
142
-
143
-
144
- def apply_rotary_pos_emb(pos: torch.Tensor, t: torch.Tensor, scale: float = 1.) -> torch.Tensor:
145
- return (t * pos.cos() * scale) + (rotate_half(t) * pos.sin() * scale)
146
-
147
-
148
- def l2norm(t: torch.Tensor) -> torch.Tensor:
149
- return f.normalize(t, dim=-1)
150
-
151
-
152
- class TransformerBlock(nn.Module):
153
- def __init__(self, dim_head: int = 64, heads: int = 8, dropout: float = 0.2, forward_expansion: int = 2, device: str = "cpu") -> None:
154
- super().__init__()
155
- self.heads = heads
156
- self.dim_head = dim_head
157
- self.embed_dim = heads * dim_head
158
- self.device = device
159
-
160
- self.qkv = nn.Linear(dim_head * heads, dim_head * heads * 3)
161
- self.q_scale = nn.Parameter(torch.ones(dim_head))
162
- self.k_scale = nn.Parameter(torch.ones(dim_head))
163
- self.rotary_emb = RotaryEmbedding(dim_head)
164
- self.norm = nn.LayerNorm(dim_head * heads)
165
- self.feed_forward = nn.Sequential(
166
- nn.Linear(dim_head * heads, forward_expansion * dim_head * heads * 2), # *2 для SwiGLU
167
- SwiGLU(),
168
- nn.Dropout(dropout),
169
- nn.Linear(forward_expansion * dim_head * heads, dim_head * heads),
170
- )
171
-
172
- def forward(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor) -> torch.Tensor:
173
- n, seq_length, _ = q.shape
174
- qkv_proj = self.qkv(q)
175
- qkv_proj = qkv_proj.reshape(n, seq_length, self.heads, 3 * self.dim_head)
176
- qkv = qkv_proj.permute(0, 2, 1, 3)
177
- q_, k_, v_ = qkv.chunk(3, dim=-1)
178
- q_, k_ = map(l2norm, (q_, k_))
179
- q_ = q_ * self.q_scale
180
- k_ = k_ * self.k_scale
181
- positions, scale = self.rotary_emb(seq_length, self.device)
182
- q_ = apply_rotary_pos_emb(positions, q_, scale)
183
- k_ = apply_rotary_pos_emb(positions, k_, scale ** -1)
184
- attn_output = f.scaled_dot_product_attention(q_, k_, v_)
185
- attn_output = attn_output.permute(0, 2, 1, 3).reshape(n, seq_length, self.embed_dim)
186
- attn_output = self.norm(attn_output)
187
- forward_output = self.feed_forward(attn_output)
188
- return attn_output + forward_output
189
-
190
-
191
- class AudioFeatureExtractor(nn.Module):
192
- def __init__(self) -> None:
193
- super().__init__()
194
- self.conv1 = nn.Sequential(
195
- nn.Conv2d(1, 16, (3, 3), (1, 1), padding=1), nn.ReLU(),
196
- nn.Conv2d(16, 16, (3, 3), (1, 1), padding=1), nn.ReLU(),
197
- nn.Conv2d(16, 16, (3, 3), (1, 3), padding=1), nn.ReLU()
198
- )
199
- self.conv2 = nn.Sequential(
200
- nn.Conv2d(16, 32, (3, 3), (1, 1), padding=1), nn.ReLU(),
201
- nn.Conv2d(32, 32, (3, 3), (1, 1), padding=1), nn.ReLU(),
202
- nn.Conv2d(32, 32, (3, 3), (1, 3), padding=1), nn.ReLU()
203
- )
204
- self.conv3 = nn.Sequential(
205
- nn.Conv2d(32, 64, (3, 3), (1, 1), padding=1), nn.ReLU(),
206
- nn.Conv2d(64, 64, (3, 3), (1, 1), padding=1), nn.ReLU(),
207
- nn.Conv2d(64, 64, (3, 3), (1, 3), padding=1), nn.ReLU()
208
- )
209
- self.conv4 = nn.Sequential(
210
- nn.Conv2d(64, 128, (3, 3), (1, 1), padding=1), nn.ReLU(),
211
- nn.Conv2d(128, 128, (3, 3), (1, 1), padding=1), nn.ReLU(),
212
- nn.Conv2d(128, 128, (3, 3), (1, 3), padding=1), nn.ReLU()
213
- )
214
-
215
- def forward(self, x: torch.Tensor) -> torch.Tensor:
216
- x = self.conv1(x)
217
- x = self.conv2(x)
218
- x = self.conv3(x)
219
- x = self.conv4(x)
220
- x = x.permute(0, 2, 1, 3)
221
- x = torch.reshape(x, (x.shape[0], x.shape[1], -1))
222
- return x
223
-
224
-
225
- class MultiModalMosNet():
226
- config_class = MosNetConfig
227
-
228
- def __init__(self, config: MosNetConfig) -> None:
229
- super().__init__(config)
230
- self.config = config
231
-
232
- self.sample_rate = self.config.sample_rate
233
- self.fft_size = self.config.fft_size
234
- self.hop_length = self.config.hop_length
235
- self.win_length = self.config.win_length
236
- self.dropout = self.config.dropout
237
-
238
- self.audio_extractor = AudioFeatureExtractor()
239
-
240
- self.text_projection = nn.Linear(768, self.win_length)
241
-
242
- self.cross_attention = TransformerBlock(dim_head=64, heads=8)
243
-
244
- self.fc1 = nn.Sequential(
245
- nn.Linear(self.fft_size, 128),
246
- nn.ReLU(),
247
- nn.Dropout(self.dropout),
248
- )
249
- self.frame_layer = nn.Linear(128, 1)
250
- self.average_layer = nn.AdaptiveAvgPool1d(1)
251
-
252
- def forward(
253
  self,
254
- audio_input: torch.Tensor,
255
- text_embeddings: torch.Tensor,
256
- ) -> Tuple[torch.Tensor, torch.Tensor]:
257
- """audio_input shape: (B, 1, T, F)
258
- text_embeddings shape: (B, 768)
259
- """
260
- # ↳ Audio branch
261
- audio_features = self.audio_extractor(audio_input) # (B, T, 512)
262
-
263
- # Text branch
264
- text_proj = self.text_projection(text_embeddings) # (B, 512)
265
- text_proj = text_proj.unsqueeze(1) # (B, 1, 512)
266
-
267
- # Cross-attention
268
- cross_out = self.cross_attention(audio_features, text_proj, text_proj) # (B, T, 512)
269
-
270
- # Head
271
- fc_out = self.fc1(cross_out) # (B, T, 128)
272
- frame_score = self.frame_layer(fc_out) # (B, T, 1)
273
-
274
- # aggregate
275
- avg_score = self.average_layer(frame_score.permute(0, 2, 1)) # (B, 1, 1)
276
- return avg_score.reshape(avg_score.size(0), -1)
277
-
278
- def preprocess_audio(self, audios: List[np.ndarray]) -> torch.Tensor:
279
- tensors = []
280
- for audio in audios:
281
- y = torch.tensor(audio, dtype=torch.float32, device=self.device)
282
- spec = torch.stft(y, n_fft=512, hop_length=256, win_length=512, return_complex=False)
283
- mag = torch.sqrt(spec[..., 0] ** 2 + spec[..., 1] ** 2)
284
- mag = mag.permute(1, 0).unsqueeze(0)
285
- tensors.append(mag)
286
- max_len = max(t.shape[1] for t in tensors)
287
- padded = torch.zeros(len(tensors), 1, max_len, tensors[0].shape[2], device=self.device)
288
- for i, t in enumerate(tensors):
289
- padded[i, :, :t.shape[1], :] = t
290
- return padded
291
-
292
- def preprocess_text(self, texts: List[str]) -> dict:
293
- tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
294
- with torch.no_grad():
295
- inputs = tokenizer(texts, return_tensors="pt", padding=True, truncation=True, max_length=512)
296
- inputs = {k: v.to(self.device) for k, v in inputs.items()}
297
- return inputs
298
-
299
- def predict(self, audios: List[np.ndarray], texts: List[str] = None) -> List[float]:
300
- with torch.no_grad():
301
- audios_tensor = self.preprocess_audio(audios)
302
- inputs = self.preprocess_text(texts)
303
- model = BertModel.from_pretrained("bert-base-uncased").to(self.device)
304
- model.eval()
305
- outputs = model(**inputs)
306
- texts_tensor = outputs.last_hidden_state[:, 0, :]
307
- preds = self.forward(audios_tensor, texts_tensor)
308
- result = preds.squeeze().cpu().tolist()
309
- if isinstance(result, float):
310
- return [result]
311
- return [float(x) for x in result]
312
-
313
-
314
- AutoConfig.register("mosnet", MosNetConfig)
315
- AutoModel.register(MosNetConfig, MultiModalMosNet)
 
1
+ from transformers import PretrainedConfig
2
 
3
+ class MosNetConfig(PretrainedConfig):
4
+ """
5
+ Это конфигурация для модели MosNet.
6
+ Она хранит параметры, определяющие архитектуру модели.
7
+ """
8
+ model_type = "mosnet"
9
 
10
+ def __init__(
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  self,
12
+ sample_rate: int = 16000,
13
+ fft_size: int = 512,
14
+ hop_length: int = 256,
15
+ win_length: int = 512,
16
+ dropout: float = 0.3,
17
+ **kwargs,
18
+ ):
19
+ super().__init__(**kwargs)
20
+ self.sample_rate = sample_rate
21
+ self.fft_size = fft_size
22
+ self.hop_length = hop_length
23
+ self.win_length = win_length
24
+ self.dropout = dropout