File size: 21,859 Bytes
9361148
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2f01cc6
9361148
 
 
 
 
 
 
 
 
 
 
 
2f01cc6
 
 
 
 
 
 
 
 
9361148
86371bb
 
 
 
 
2f01cc6
86371bb
9361148
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86371bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2f01cc6
86371bb
2f01cc6
 
 
86371bb
2f01cc6
 
 
86371bb
2f01cc6
 
 
 
 
 
 
86371bb
2f01cc6
86371bb
9361148
 
 
 
 
 
 
 
2f01cc6
 
9361148
 
 
 
2f01cc6
 
 
 
9361148
 
 
 
 
 
 
 
86371bb
2f01cc6
86371bb
9361148
 
86371bb
 
9361148
 
 
 
86371bb
 
9361148
 
 
 
 
2f01cc6
9361148
 
 
 
 
2f01cc6
 
9361148
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86371bb
 
9361148
38e8ba6
 
 
 
 
86371bb
 
 
38e8ba6
9361148
 
 
 
 
 
 
 
 
 
 
 
38e8ba6
 
 
86371bb
38e8ba6
 
 
86371bb
38e8ba6
9361148
38e8ba6
 
 
 
 
 
 
 
 
86371bb
 
38e8ba6
 
 
 
 
86371bb
 
 
38e8ba6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86371bb
38e8ba6
 
 
 
 
 
9361148
38e8ba6
9361148
 
38e8ba6
 
 
9361148
38e8ba6
 
 
 
 
 
 
 
 
 
 
 
 
9361148
 
38e8ba6
 
9361148
86371bb
 
 
 
 
38e8ba6
 
9361148
38e8ba6
 
 
9361148
86371bb
 
38e8ba6
9361148
38e8ba6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86371bb
 
38e8ba6
 
 
 
 
86371bb
 
 
38e8ba6
 
 
 
 
 
 
 
 
9361148
38e8ba6
 
 
 
 
 
86371bb
38e8ba6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86371bb
 
 
 
 
38e8ba6
 
 
 
 
 
 
86371bb
 
38e8ba6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9361148
 
 
 
 
 
 
2f01cc6
 
9361148
 
 
 
2f01cc6
9361148
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2f01cc6
 
 
9361148
2f01cc6
 
 
 
 
9361148
 
 
 
 
 
 
2f01cc6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9361148
 
 
 
 
 
2f01cc6
9361148
 
2f01cc6
 
 
9361148
2f01cc6
9361148
2f01cc6
9361148
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
# -*- coding: utf-8 -*-
"""
简单单字导出插件

从TextGrid提取分词片段,按拼音排序导出
"""

import os
import json
import glob
import shutil
import logging
from typing import Any, Dict, List, Tuple

from .base import ExportPlugin, PluginOption, OptionType

logger = logging.getLogger(__name__)


class SimpleExportPlugin(ExportPlugin):
    """简单单字导出插件"""
    
    name = "简单单字导出"
    description = "从TextGrid提取分词片段,按时长排序导出"
    version = "1.1.0"
    author = "内置"
    
    def get_options(self) -> List[PluginOption]:
        return [
            PluginOption(
                key="max_samples",
                label="每个拼音最大样本数",
                option_type=OptionType.NUMBER,
                default=10,
                min_value=1,
                max_value=1000,
                description="按质量评分排序,保留最佳的N个"
            ),
            PluginOption(
                key="quality_metrics",
                label="质量评估维度",
                option_type=OptionType.COMBO,
                default="duration",
                choices=["duration", "duration+rms", "duration+f0", "all"],
                description="duration=仅时长, +rms=音量稳定性, +f0=音高稳定性。选择 all 可能耗时较长"
            ),
            PluginOption(
                key="extend_duration",
                label="头尾拓展(秒)",
                option_type=OptionType.TEXT,
                default="0",
                description="裁剪时头尾各拓展的时长,最大0.5秒。若一边到达边界,另一边继续拓展"
            ),
            PluginOption(
                key="naming_rule",
                label="命名规则",
                option_type=OptionType.TEXT,
                default="%p%%n%",
                description="变量: %p%=拼音, %n%=序号。示例: %p%_%n% → ba_1.wav"
            ),
            PluginOption(
                key="first_naming_rule",
                label="首个样本命名规则",
                option_type=OptionType.TEXT,
                default="%p%",
                description="第0个样本的特殊规则,留空则使用通用规则。示例: %p% → ba.wav"
            ),
            PluginOption(
                key="clean_temp",
                label="导出后清理临时文件",
                option_type=OptionType.SWITCH,
                default=True,
                description="删除临时的segments目录"
            )
        ]
    
    def _apply_extend(
        self,
        start_time: float,
        end_time: float,
        extend_duration: float,
        audio_duration: float
    ) -> Tuple[float, float]:
        """
        应用头尾拓展
        
        头尾各拓展 extend_duration 秒,若一边到达边界则另一边继续拓展
        """
        if extend_duration <= 0:
            return start_time, end_time
        
        total_extend = extend_duration * 2
        
        # 先尝试两边各拓展
        new_start = max(0, start_time - extend_duration)
        new_end = min(audio_duration, end_time + extend_duration)
        
        # 计算实际拓展量,剩余量补偿到另一边
        used = (start_time - new_start) + (new_end - end_time)
        remaining = total_extend - used
        
        if remaining > 0:
            # 优先补偿到尾部,再补偿到头部
            extra_end = min(remaining, audio_duration - new_end)
            new_end += extra_end
            remaining -= extra_end
            if remaining > 0:
                new_start = max(0, new_start - remaining)
        
        return new_start, new_end
    
    def export(
        self,
        source_name: str,
        bank_dir: str,
        options: Dict[str, Any]
    ) -> Tuple[bool, str]:
        """执行简单单字导出"""
        try:
            # 使用基类方法获取语言设置
            language = self.load_language_from_meta(bank_dir, source_name)
            max_samples = int(options.get("max_samples", 10))
            naming_rule = options.get("naming_rule", "%p%_%n%")
            first_naming_rule = options.get("first_naming_rule", "")
            clean_temp = options.get("clean_temp", True)
            quality_metrics = options.get("quality_metrics", "duration")
            
            # 使用基类方法解析质量评估维度
            enabled_metrics = self.parse_quality_metrics(quality_metrics)
            
            paths = self.get_source_paths(bank_dir, source_name)
            export_dir = self.get_export_dir(bank_dir, source_name, "simple_export")
            
            # 临时segments目录
            temp_base = os.path.join(bank_dir, ".temp_segments")
            segments_dir = os.path.join(temp_base, source_name)
            
            # 获取头尾拓展参数
            extend_duration = min(float(options.get("extend_duration", 0)), 0.5)
            
            # 步骤1: 提取分词片段
            self._log("【提取分词片段】")
            if extend_duration > 0:
                self._log(f"头尾拓展: {extend_duration}s(单边到达边界时另一边继续拓展)")
            success, msg, pinyin_counts = self._extract_segments(
                paths["slices_dir"],
                paths["textgrid_dir"],
                segments_dir,
                language,
                extend_duration
            )
            if not success:
                return False, msg
            
            # 步骤2: 排序导出
            self._log(f"\n【排序导出】评估维度: {enabled_metrics}")
            success, msg = self._sort_and_export(
                segments_dir,
                export_dir,
                max_samples,
                naming_rule,
                first_naming_rule,
                enabled_metrics
            )
            if not success:
                return False, msg
            
            # 清理临时目录
            if clean_temp and os.path.exists(segments_dir):
                self._log(f"\n清理临时目录: {segments_dir}")
                shutil.rmtree(segments_dir)
                if os.path.exists(temp_base) and not os.listdir(temp_base):
                    shutil.rmtree(temp_base)
            
            return True, f"导出完成: {export_dir}"
            
        except Exception as e:
            logger.error(f"简单单字导出失败: {e}", exc_info=True)
            return False, str(e)
    
    def _extract_segments(
        self,
        slices_dir: str,
        textgrid_dir: str,
        segments_dir: str,
        language: str,
        extend_duration: float = 0.0
    ) -> Tuple[bool, str, Dict[str, int]]:
        """
        提取分词片段
        
        中文:使用words层按字切分,用char_to_pinyin获取拼音名称
        日语:使用phones层按音素切分,合并辅音+元音为音节
        
        参数:
            extend_duration: 头尾拓展总时长(秒),单边到达边界时另一边继续拓展
        """
        try:
            import textgrid
            import soundfile as sf
            
            os.makedirs(segments_dir, exist_ok=True)
            
            tg_files = glob.glob(os.path.join(textgrid_dir, '*.TextGrid'))
            if not tg_files:
                return False, "未找到TextGrid文件", {}
            
            self._log(f"处理 {len(tg_files)} 个TextGrid文件")
            
            # 根据语言选择提取方法
            if language in ("japanese", "ja", "jp"):
                return self._extract_japanese_segments(
                    tg_files, slices_dir, segments_dir, extend_duration
                )
            else:
                return self._extract_chinese_segments(
                    tg_files, slices_dir, segments_dir, language, extend_duration
                )
            
        except Exception as e:
            logger.error(f"提取分词失败: {e}", exc_info=True)
            return False, str(e), {}
    
    def _extract_chinese_segments(
        self,
        tg_files: List[str],
        slices_dir: str,
        segments_dir: str,
        language: str,
        extend_duration: float = 0.0
    ) -> Tuple[bool, str, Dict[str, int]]:
        """
        中文音频提取
        
        使用words层的时间边界,按字符切分,用char_to_pinyin获取拼音
        
        参数:
            extend_duration: 头尾拓展总时长(秒),单边到达边界时另一边继续拓展
        """
        import textgrid
        import soundfile as sf
        from src.text_processor import char_to_pinyin, is_valid_char
        
        pinyin_counts: Dict[str, int] = {}
        
        for tg_path in tg_files:
            basename = os.path.basename(tg_path).replace('.TextGrid', '.wav')
            wav_path = os.path.join(slices_dir, basename)
            
            if not os.path.exists(wav_path):
                self._log(f"警告: 找不到 {basename}")
                continue
            
            tg = textgrid.TextGrid.fromFile(tg_path)
            audio, sr = sf.read(wav_path, dtype='float32')
            audio_duration = len(audio) / sr
            
            # 使用words层(第一层)
            words_tier = tg[0]
            
            for interval in words_tier:
                word_text = interval.mark.strip()
                
                if not word_text or word_text in ['', 'SP', 'AP', '<unk>', 'spn', 'sil']:
                    continue
                
                start_time = interval.minTime
                end_time = interval.maxTime
                duration = end_time - start_time
                
                # 获取有效字符
                chars = list(word_text)
                valid_chars = [c for c in chars if is_valid_char(c, language)]
                
                if not valid_chars:
                    continue
                
                # 按字符均分时长
                char_duration = duration / len(valid_chars)
                
                for i, char in enumerate(valid_chars):
                    pinyin = char_to_pinyin(char, language)
                    if not pinyin:
                        continue
                    
                    char_start = start_time + i * char_duration
                    char_end = char_start + char_duration
                    
                    # 应用头尾拓展,单边到达边界时另一边继续拓展
                    actual_start, actual_end = self._apply_extend(
                        char_start, char_end, extend_duration, audio_duration
                    )
                    
                    pinyin_dir = os.path.join(segments_dir, pinyin)
                    os.makedirs(pinyin_dir, exist_ok=True)
                    
                    current_count = pinyin_counts.get(pinyin, 0)
                    index = current_count + 1
                    pinyin_counts[pinyin] = index
                    
                    start_sample = int(round(actual_start * sr))
                    end_sample = int(round(actual_end * sr))
                    segment = audio[start_sample:end_sample]
                    
                    if len(segment) == 0:
                        pinyin_counts[pinyin] = current_count
                        continue
                    
                    output_path = os.path.join(pinyin_dir, f'{index}.wav')
                    sf.write(output_path, segment, sr, subtype='PCM_16')
        
        total = sum(pinyin_counts.values())
        self._log(f"提取完成: {len(pinyin_counts)} 个拼音,共 {total} 个片段")
        
        return True, f"提取完成: {len(pinyin_counts)} 个拼音", pinyin_counts
    
    def _extract_japanese_segments(
        self,
        tg_files: List[str],
        slices_dir: str,
        segments_dir: str,
        extend_duration: float = 0.0
    ) -> Tuple[bool, str, Dict[str, int]]:
        """
        日语音频提取
        
        使用phones层,将辅音+元音合并为音节
        
        参数:
            extend_duration: 头尾拓展总时长(秒),单边到达边界时另一边继续拓展
        """
        import textgrid
        import soundfile as sf
        
        phone_counts: Dict[str, int] = {}
        
        for tg_path in tg_files:
            basename = os.path.basename(tg_path).replace('.TextGrid', '.wav')
            wav_path = os.path.join(slices_dir, basename)
            
            if not os.path.exists(wav_path):
                self._log(f"警告: 找不到 {basename}")
                continue
            
            tg = textgrid.TextGrid.fromFile(tg_path)
            audio, sr = sf.read(wav_path, dtype='float32')
            audio_duration = len(audio) / sr
            
            # 查找phones层
            phones_tier = None
            for tier in tg:
                if tier.name.lower() in ('phones', 'phone'):
                    phones_tier = tier
                    break
            
            if phones_tier is None and len(tg) >= 2:
                phones_tier = tg[1]
            
            if phones_tier is None:
                self._log(f"警告: {basename} 未找到phones层,跳过")
                continue
            
            # 合并音素为音节
            syllables = self._merge_japanese_phones(phones_tier)
            
            for syllable, start_time, end_time in syllables:
                if not syllable:
                    continue
                
                # 标准化为ASCII
                normalized = self._normalize_japanese_phone(syllable)
                if not normalized:
                    continue
                
                # 应用头尾拓展,单边到达边界时另一边继续拓展
                actual_start, actual_end = self._apply_extend(
                    start_time, end_time, extend_duration, audio_duration
                )
                
                phone_dir = os.path.join(segments_dir, normalized)
                os.makedirs(phone_dir, exist_ok=True)
                
                current_count = phone_counts.get(normalized, 0)
                index = current_count + 1
                phone_counts[normalized] = index
                
                start_sample = int(round(actual_start * sr))
                end_sample = int(round(actual_end * sr))
                segment = audio[start_sample:end_sample]
                
                if len(segment) == 0:
                    phone_counts[normalized] = current_count
                    continue
                
                output_path = os.path.join(phone_dir, f'{index}.wav')
                sf.write(output_path, segment, sr, subtype='PCM_16')
        
        total = sum(phone_counts.values())
        self._log(f"提取完成: {len(phone_counts)} 个音节,共 {total} 个片段")
        
        return True, f"提取完成: {len(phone_counts)} 个音节", phone_counts
    
    def _merge_japanese_phones(self, phones_tier) -> List[Tuple[str, float, float]]:
        """
        日语音素合并
        
        规则:辅音 + 元音 合并为一个音节
        """
        # 元音集合
        vowels = {'a', 'e', 'i', 'o', 'u', 'ɯ'}
        skip_marks = {'', 'SP', 'AP', '<unk>', 'spn', 'sil'}
        
        syllables = []
        pending_consonant = None
        pending_start = None
        
        for interval in phones_tier:
            phone = interval.mark.strip()
            
            if phone in skip_marks:
                pending_consonant = None
                pending_start = None
                continue
            
            # 移除长音符号判断元音
            base_phone = phone.rstrip('ː')
            is_vowel = base_phone in vowels
            
            if is_vowel:
                if pending_consonant is not None:
                    syllable = pending_consonant + phone
                    syllables.append((syllable, pending_start, interval.maxTime))
                    pending_consonant = None
                    pending_start = None
                else:
                    syllables.append((phone, interval.minTime, interval.maxTime))
            else:
                if pending_consonant is not None:
                    syllables.append((pending_consonant, pending_start, interval.minTime))
                pending_consonant = phone
                pending_start = interval.minTime
        
        if pending_consonant is not None:
            syllables.append((pending_consonant, pending_start, phones_tier[-1].maxTime))
        
        return syllables
    
    def _normalize_japanese_phone(self, phone: str) -> str:
        """
        日语音素标准化为ASCII
        """
        # IPA到罗马音的映射
        ipa_map = {
            # 元音
            'ɯ': 'u',
            'ɯː': 'u',
            'aː': 'a',
            'eː': 'e',
            'iː': 'i',
            'oː': 'o',
            'uː': 'u',
            # 辅音
            'ɲ': 'n',
            'ŋ': 'n',
            'ɕ': 'sh',
            'ʑ': 'j',
            'dʑ': 'j',
            'tɕ': 'ch',
            'ɡ': 'g',
            'ː': '',
        }
        
        result = phone
        
        # 按长度降序处理映射
        for ipa in sorted(ipa_map.keys(), key=len, reverse=True):
            if ipa in result:
                result = result.replace(ipa, ipa_map[ipa])
        
        # 移除非ASCII字符
        result = ''.join(c for c in result if c.isascii() and c.isalnum())
        
        return result.lower() if result else None
    

    
    def _sort_and_export(
        self,
        segments_dir: str,
        export_dir: str,
        max_samples: int,
        naming_rule: str,
        first_naming_rule: str,
        enabled_metrics: List[str]
    ) -> Tuple[bool, str]:
        """排序并导出"""
        try:
            import soundfile as sf
            from src.quality_scorer import QualityScorer, duration_score
            
            os.makedirs(export_dir, exist_ok=True)
            
            # 清空已有导出
            for f in os.listdir(export_dir):
                fp = os.path.join(export_dir, f)
                if os.path.isfile(fp):
                    os.remove(fp)
            
            wav_files = glob.glob(
                os.path.join(segments_dir, '**', '*.wav'),
                recursive=True
            )
            
            if not wav_files:
                return False, "未找到分字片段"
            
            self._log(f"扫描到 {len(wav_files)} 个片段")
            
            # 判断是否需要加载音频计算质量分数
            need_audio_scoring = any(m in enabled_metrics for m in ["rms", "f0"])
            
            # 按拼音分组
            stats: Dict[str, List[Tuple[str, float, float]]] = {}  # pinyin -> [(path, duration, score)]
            
            if need_audio_scoring:
                scorer = QualityScorer(enabled_metrics=enabled_metrics)
            
            for path in wav_files:
                rel_path = os.path.relpath(path, segments_dir)
                parts = rel_path.split(os.sep)
                if len(parts) >= 2:
                    pinyin = parts[0]
                    if pinyin not in stats:
                        stats[pinyin] = []
                    
                    try:
                        info = sf.info(path)
                        duration = info.duration
                        
                        if need_audio_scoring:
                            # 加载音频计算质量分数
                            audio, sr = sf.read(path)
                            if len(audio.shape) > 1:
                                audio = audio.mean(axis=1)
                            scores = scorer.score(audio, sr, duration)
                            quality_score = scores.get("combined", 0.5)
                        else:
                            # 仅使用时长评分
                            quality_score = duration_score(duration)
                        
                        stats[pinyin].append((path, duration, quality_score))
                    except Exception as e:
                        logger.warning(f"处理文件失败 {path}: {e}")
                        continue
            
            self._log(f"统计到 {len(stats)} 个拼音")
            self._log(f"命名规则: {naming_rule}")
            if first_naming_rule:
                self._log(f"首个样本规则: {first_naming_rule}")
            
            # 按质量分数排序并导出
            exported = 0
            for pinyin, files in stats.items():
                sorted_files = sorted(files, key=lambda x: -x[2])  # 按质量分数降序
                for idx, (src_path, _, _) in enumerate(sorted_files[:max_samples]):
                    # 使用基类方法应用命名规则
                    if idx == 0 and first_naming_rule:
                        filename = self.apply_naming_rule(first_naming_rule, pinyin, idx)
                    else:
                        filename = self.apply_naming_rule(naming_rule, pinyin, idx)
                    
                    dst_path = os.path.join(export_dir, f'{filename}.wav')
                    shutil.copyfile(src_path, dst_path)
                    exported += 1
            
            self._log(f"导出完成: {exported} 个文件")
            return True, f"导出完成: {len(stats)} 个拼音,{exported} 个文件"
            
        except Exception as e:
            logger.error(f"排序导出失败: {e}", exc_info=True)
            return False, str(e)