File size: 48,735 Bytes
b15e31b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
# -*- coding: utf-8 -*-
"""

RVC 推理管道 - 端到端 AI 翻唱

"""
import os
import gc
import torch
import numpy as np
import faiss
from pathlib import Path
from typing import Optional, Tuple, Union
from scipy import signal as sp_signal

from lib.audio import load_audio, save_audio, normalize_audio, soft_clip
from lib.device import get_device, empty_device_cache, supports_fp16
from lib.logger import log
from infer.f0_extractor import get_f0_extractor, shift_f0, F0Method

# 48Hz 高通 Butterworth 滤波器(与官方管道一致,去除低频隆隆声)
_bh, _ah = sp_signal.butter(N=5, Wn=48, btype="high", fs=16000)


class VoiceConversionPipeline:
    """RVC 推理管道"""

    def __init__(self, device: str = "cuda"):
        """

        初始化管道



        Args:

            device: 计算设备 ("cuda" 或 "cpu")

        """
        self.device = get_device(device)
        self.hubert_model = None
        self.hubert_model_type = None
        self.hubert_layer = 12
        self.voice_model = None
        self.index = None
        self.f0_extractor = None
        self.spk_count = 1
        self.model_version = "v2"  # 默认 v2(768 维)

        # 默认参数
        self.sample_rate = 16000  # HuBERT 输入采样率
        self.output_sr = 48000    # 输出采样率

    def unload_hubert(self):
        """卸载 HuBERT 模型释放显存"""
        if self.hubert_model is not None:
            self.hubert_model.cpu()
            del self.hubert_model
            self.hubert_model = None
            self.hubert_model_type = None
        gc.collect()
        empty_device_cache(self.device)

    def unload_f0_extractor(self):
        """卸载 F0 提取器释放显存"""
        if self.f0_extractor is not None:
            # RMVPEExtractor.model 是 RMVPE 包装类,内部有 model 和 mel_extractor
            if hasattr(self.f0_extractor, 'model') and self.f0_extractor.model is not None:
                rmvpe = self.f0_extractor.model
                # 卸载内部的 E2E 模型
                if hasattr(rmvpe, 'model') and rmvpe.model is not None:
                    rmvpe.model.cpu()
                    del rmvpe.model
                    rmvpe.model = None
                # 卸载 mel_extractor
                if hasattr(rmvpe, 'mel_extractor') and rmvpe.mel_extractor is not None:
                    rmvpe.mel_extractor.cpu()
                    del rmvpe.mel_extractor
                    rmvpe.mel_extractor = None
                del self.f0_extractor.model
                self.f0_extractor.model = None
            del self.f0_extractor
            self.f0_extractor = None
        gc.collect()
        empty_device_cache(self.device)

    def unload_voice_model(self):
        """卸载语音模型释放显存"""
        if self.voice_model is not None:
            self.voice_model.cpu()
            del self.voice_model
            self.voice_model = None
        gc.collect()
        empty_device_cache(self.device)

    def unload_all(self):
        """卸载所有模型"""
        self.unload_hubert()
        self.unload_f0_extractor()
        self.unload_voice_model()
        self.index = None

    def load_hubert(self, model_path: str):
        """

        加载 HuBERT 模型



        Args:

            model_path: HuBERT 模型路径(可以是本地 .pt 文件或 Hugging Face 模型名)

        """
        # 优先使用 fairseq 兼容实现(官方实现)
        if os.path.isfile(model_path):
            try:
                from fairseq import checkpoint_utils

                models, _, _ = checkpoint_utils.load_model_ensemble_and_task(
                    [model_path],
                    suffix=""
                )
                model = models[0]
                model = model.to(self.device).eval()
                self.hubert_model = model
                self.hubert_model_type = "fairseq"
                log.info(f"HuBERT 模型已加载: {model_path} ({self.device})")
                return
            except Exception as e:
                log.warning(f"fairseq 加载失败,尝试 torchaudio: {e}")

        try:
            import torchaudio

            bundle = torchaudio.pipelines.HUBERT_BASE
            model = bundle.get_model()
            model = model.to(self.device).eval()
            self.hubert_model = model
            self.hubert_model_type = "torchaudio"
            log.info(
                f"HuBERT 模型已加载: torchaudio HUBERT_BASE ({self.device})"
            )
            return
        except Exception as e:
            log.warning(f"torchaudio 加载失败,尝试 transformers: {e}")

        from transformers import HubertModel

        if os.path.isfile(model_path):
            log.info("检测到本地模型文件,将使用 Hugging Face 预训练模型替代")
            model_name = "facebook/hubert-base-ls960"
        else:
            model_name = model_path

        try:
            self.hubert_model = HubertModel.from_pretrained(model_name)
        except Exception as e:
            log.warning(f"从网络加载失败,尝试使用本地缓存: {e}")
            self.hubert_model = HubertModel.from_pretrained(
                model_name,
                local_files_only=True
            )
        self.hubert_model = self.hubert_model.to(self.device).eval()
        self.hubert_model_type = "transformers"
        log.info(f"HuBERT 模型已加载: {model_name} ({self.device})")

    def load_voice_model(self, model_path: str) -> dict:
        """

        加载语音模型



        Args:

            model_path: 模型文件路径 (.pth)



        Returns:

            dict: 模型信息

        """
        log.debug(f"正在加载语音模型: {model_path}")
        cpt = torch.load(model_path, map_location="cpu", weights_only=False)

        log.debug(f"模型文件 keys: {cpt.keys()}")

        # 提取模型配置
        config = cpt.get("config", [])
        self.output_sr = cpt.get("sr", 48000)

        log.debug(f"config 类型: {type(config)}, 内容: {config}")
        log.debug(f"采样率: {self.output_sr}")

        # 处理 list 格式的 config(RVC v2 标准格式)
        if isinstance(config, list) and len(config) >= 18:
            model_config = {
                "spec_channels": config[0],
                "segment_size": config[1],
                "inter_channels": config[2],
                "hidden_channels": config[3],
                "filter_channels": config[4],
                "n_heads": config[5],
                "n_layers": config[6],
                "kernel_size": config[7],
                "p_dropout": config[8],
                "resblock": config[9],
                "resblock_kernel_sizes": config[10],
                "resblock_dilation_sizes": config[11],
                "upsample_rates": config[12],
                "upsample_initial_channel": config[13],
                "upsample_kernel_sizes": config[14],
                "spk_embed_dim": config[15],
                "gin_channels": config[16],
            }
            # 使用 config 中的采样率(如果有)
            if len(config) > 17:
                self.output_sr = config[17]
        elif isinstance(config, dict):
            # 兼容 dict 格式
            model_config = config
        else:
            # 使用默认值
            log.warning("无法解析 config,使用默认值")
            model_config = {}

        log.debug(f"解析后的配置: {model_config}")

        # 根据gin_channels选择正确的合成器
        # v1模型: gin_channels=256, 使用256维HuBERT特征
        # v2模型: gin_channels=256, 使用768维HuBERT特征
        # 判断依据:检查模型文件中是否有'version'字段,或根据实际权重形状判断
        gin_channels = model_config.get("gin_channels", 256)

        # 判断模型版本的优先级:
        # 1. 检查'version'字段
        # 2. 检查权重形状 enc_p.emb_phone.weight
        # 3. 默认v2
        model_version = None

        if "version" in cpt:
            model_version = cpt["version"]
            log.debug(f"从version字段检测到: {model_version}")
        elif "weight" in cpt and "enc_p.emb_phone.weight" in cpt["weight"]:
            # 检查 enc_p.emb_phone.weight 的形状
            # v1: [hidden_channels, 256]
            # v2: [hidden_channels, 768]
            emb_shape = cpt["weight"]["enc_p.emb_phone.weight"].shape
            log.debug(f"enc_p.emb_phone.weight 形状: {emb_shape}")
            if emb_shape[1] == 256:
                model_version = "v1"
                log.debug("从权重形状检测到: v1 (256维)")
            elif emb_shape[1] == 768:
                model_version = "v2"
                log.debug("从权重形状检测到: v2 (768维)")

        # 根据检测结果选择合成器
        if model_version == "v1":
            # v1模型:256维
            from infer.lib.infer_pack.models import SynthesizerTrnMs256NSFsid
            synthesizer_class = SynthesizerTrnMs256NSFsid
            self.model_version = "v1"
            log.debug(f"使用v1合成器 (256维)")
        else:
            # v2模型:768维(默认)
            from infer.lib.infer_pack.models import SynthesizerTrnMs768NSFsid
            synthesizer_class = SynthesizerTrnMs768NSFsid
            self.model_version = "v2"
            log.debug(f"使用v2合成器 (768维)")

        # 加载模型权重
        self.voice_model = synthesizer_class(
            spec_channels=model_config.get("spec_channels", 1025),
            segment_size=model_config.get("segment_size", 32),
            inter_channels=model_config.get("inter_channels", 192),
            hidden_channels=model_config.get("hidden_channels", 192),
            filter_channels=model_config.get("filter_channels", 768),
            n_heads=model_config.get("n_heads", 2),
            n_layers=model_config.get("n_layers", 6),
            kernel_size=model_config.get("kernel_size", 3),
            p_dropout=model_config.get("p_dropout", 0),
            resblock=model_config.get("resblock", "1"),
            resblock_kernel_sizes=model_config.get("resblock_kernel_sizes", [3, 7, 11]),
            resblock_dilation_sizes=model_config.get("resblock_dilation_sizes", [[1, 3, 5], [1, 3, 5], [1, 3, 5]]),
            upsample_rates=model_config.get("upsample_rates", [10, 10, 2, 2]),
            upsample_initial_channel=model_config.get("upsample_initial_channel", 512),
            upsample_kernel_sizes=model_config.get("upsample_kernel_sizes", [16, 16, 4, 4]),
            spk_embed_dim=model_config.get("spk_embed_dim", 109),
            gin_channels=model_config.get("gin_channels", 256),
            sr=self.output_sr,
            is_half=supports_fp16(self.device)  # 根据设备能力决定是否使用半精度
        )
        self.spk_count = int(model_config.get("spk_embed_dim", 1) or 1)

        # 加载权重
        self.voice_model.load_state_dict(cpt["weight"], strict=False)
        self.voice_model = self.voice_model.to(self.device).eval()

        model_info = {
            "name": Path(model_path).stem,
            "sample_rate": self.output_sr,
            "version": cpt.get("version", "v2")
        }

        log.info(f"语音模型已加载: {model_info['name']} ({self.output_sr}Hz)")
        return model_info

    def load_index(self, index_path: str):
        """

        加载 FAISS 索引



        Args:

            index_path: 索引文件路径 (.index)

        """
        self.index = faiss.read_index(index_path)
        # 启用 direct_map 以支持 reconstruct()
        try:
            self.index.make_direct_map()
        except Exception:
            pass  # 某些索引类型不支持,忽略
        log.info(f"索引已加载: {index_path}")

    def load_f0_extractor(self, method: F0Method = "rmvpe",

                          rmvpe_path: str = None):
        """

        加载 F0 提取器



        Args:

            method: F0 提取方法

            rmvpe_path: RMVPE 模型路径

        """
        self.f0_extractor = get_f0_extractor(
            method,
            device=str(self.device),
            rmvpe_path=rmvpe_path
        )
        log.info(f"F0 提取器已加载: {method}")

    @torch.no_grad()
    def extract_features(self, audio: np.ndarray, use_final_proj: bool = False) -> torch.Tensor:
        """

        使用 HuBERT 提取特征



        Args:

            audio: 16kHz 音频数据

            use_final_proj: 是否使用 final_proj 将 768 维降到 256 维(v1 模型需要)



        Returns:

            torch.Tensor: HuBERT 特征

        """
        if self.hubert_model is None:
            raise RuntimeError("请先加载 HuBERT 模型")

        # 转换为张量
        audio_tensor = torch.from_numpy(audio).float().to(self.device)
        if audio_tensor.dim() == 1:
            audio_tensor = audio_tensor.unsqueeze(0)

        if self.hubert_model_type == "fairseq":
            # v1 模型使用第 9 层,v2 模型使用第 12 层
            output_layer = 9 if use_final_proj else 12
            feats = self.hubert_model.extract_features(
                audio_tensor,
                padding_mask=None,
                output_layer=output_layer
            )[0]
            # v1 模型需要 256 维特征,使用 final_proj 投影
            # v2 模型需要 768 维特征,不使用 final_proj
            if use_final_proj and hasattr(self.hubert_model, 'final_proj'):
                feats = self.hubert_model.final_proj(feats)
            return feats

        if self.hubert_model_type == "torchaudio":
            feats_list, _ = self.hubert_model.extract_features(audio_tensor)
            layer_idx = min(self.hubert_layer - 1, len(feats_list) - 1)
            return feats_list[layer_idx]

        # transformers fallback
        outputs = self.hubert_model(audio_tensor, output_hidden_states=True)
        layer_idx = min(self.hubert_layer, len(outputs.hidden_states) - 1)
        return outputs.hidden_states[layer_idx]

    def search_index(self, features: np.ndarray, k: int = 8) -> np.ndarray:
        """

        在索引中搜索相似特征



        Args:

            features: 输入特征

            k: 返回的近邻数量



        Returns:

            np.ndarray: 检索到的特征

        """
        if self.index is None:
            return features

        # 检查特征维度是否与索引匹配
        if features.shape[-1] != self.index.d:
            log.warning(f"特征维度 ({features.shape[-1]}) 与索引维度 ({self.index.d}) 不匹配,跳过索引搜索")
            return features

        # 搜索(使用距离倒数平方加权,与官方管道一致)
        scores, indices = self.index.search(features, k)

        # 尝试重建特征,如果索引不支持则跳过
        try:
            big_npy = self.index.reconstruct_n(0, self.index.ntotal)
        except RuntimeError as e:
            if "direct map" in str(e):
                log.warning("索引不支持向量重建,跳过索引混合")
                return features
            raise

        # 距离倒数平方加权
        weight = np.square(1.0 / (scores + 1e-6))
        weight /= weight.sum(axis=1, keepdims=True)
        retrieved = np.sum(
            big_npy[indices] * np.expand_dims(weight, axis=2), axis=1
        )
        return retrieved
    @staticmethod
    def _f0_to_coarse(

        f0: np.ndarray,

        f0_min: float = 50.0,

        f0_max: float = 1100.0

    ) -> np.ndarray:
        """Convert F0 (Hz) to official RVC coarse bins (1-255)."""
        f0 = np.asarray(f0, dtype=np.float32)
        f0_max = max(float(f0_max), float(f0_min) + 1.0)
        f0_mel_min = 1127 * np.log(1 + float(f0_min) / 700.0)
        f0_mel_max = 1127 * np.log(1 + f0_max / 700.0)
        f0_mel = 1127 * np.log1p(np.maximum(f0, 0.0) / 700.0)
        voiced = f0_mel > 0
        f0_mel[voiced] = (f0_mel[voiced] - f0_mel_min) * 254 / (f0_mel_max - f0_mel_min) + 1
        f0_mel[f0_mel <= 1] = 1
        f0_mel[f0_mel > 255] = 255
        return np.rint(f0_mel).astype(np.int64)
    def _apply_rms_mix(

        self,

        audio_out: np.ndarray,

        audio_in: np.ndarray,

        sr_out: int,

        sr_in: int,

        hop_length: int,

        rms_mix_rate: float

    ) -> np.ndarray:
        """Match output RMS envelope to input RMS (0=off, 1=full match)."""
        if rms_mix_rate <= 0:
            return audio_out

        import librosa

        frame_length_in = 1024
        rms_in = librosa.feature.rms(
            y=audio_in,
            frame_length=frame_length_in,
            hop_length=hop_length,
            center=True
        )[0]

        hop_out = int(round(hop_length * sr_out / sr_in))
        frame_length_out = int(round(frame_length_in * sr_out / sr_in))
        rms_out = librosa.feature.rms(
            y=audio_out,
            frame_length=frame_length_out,
            hop_length=hop_out,
            center=True
        )[0]

        min_len = min(len(rms_in), len(rms_out))
        if min_len == 0:
            return audio_out

        rms_in = rms_in[:min_len]
        rms_out = rms_out[:min_len]

        gain = rms_in / (rms_out + 1e-6)
        gain = np.clip(gain, 0.2, 4.0)
        gain = gain ** rms_mix_rate

        gain_samples = np.repeat(gain, hop_out)
        if len(gain_samples) < len(audio_out):
            gain_samples = np.pad(
                gain_samples,
                (0, len(audio_out) - len(gain_samples)),
                mode="edge"
            )
        else:
            gain_samples = gain_samples[:len(audio_out)]

        return audio_out * gain_samples

    def _apply_silence_gate(

        self,

        audio_out: np.ndarray,

        audio_in: np.ndarray,

        f0: np.ndarray,

        sr_out: int,

        sr_in: int,

        hop_length: int,

        threshold_db: float,

        smoothing_ms: float,

        min_silence_ms: float,

        protect: float

    ) -> np.ndarray:
        """Silence gate based on input RMS and F0."""
        import librosa

        frame_length = 1024
        rms = librosa.feature.rms(
            y=audio_in,
            frame_length=frame_length,
            hop_length=hop_length,
            center=True
        )[0]

        if len(rms) == 0 or len(f0) == 0:
            return audio_out

        # Align RMS length to F0 length
        if len(rms) < len(f0):
            rms = np.pad(rms, (0, len(f0) - len(rms)), mode="edge")
        else:
            rms = rms[:len(f0)]

        rms_db = 20 * np.log10(rms + 1e-6)
        ref_db = np.percentile(rms_db, 95)
        gate_db = ref_db + threshold_db  # threshold_db should be negative

        silent = (rms_db < gate_db) & (f0 <= 0)

        if min_silence_ms > 0:
            min_frames = int(
                round((min_silence_ms / 1000) * (sr_in / hop_length))
            )
            if min_frames > 1:
                silent_int = silent.astype(int)
                changes = np.diff(
                    np.concatenate(([0], silent_int, [0]))
                )
                starts = np.where(changes == 1)[0]
                ends = np.where(changes == -1)[0]
                keep_silent = np.zeros_like(silent, dtype=bool)
                for s, e in zip(starts, ends):
                    if e - s >= min_frames:
                        keep_silent[s:e] = True
                silent = keep_silent

        mask = 1.0 - silent.astype(float)

        if smoothing_ms > 0:
            smooth_frames = int(
                round((smoothing_ms / 1000) * (sr_in / hop_length))
            )
            if smooth_frames > 1:
                kernel = np.ones(smooth_frames) / smooth_frames
                mask = np.convolve(
                    mask,
                    kernel,
                    mode="same"
                )
                mask = np.clip(mask, 0.0, 1.0)
        protect = float(np.clip(protect, 0.0, 1.0))
        if protect > 0:
            mask = mask * (1.0 - protect) + protect

        samples_per_frame = int(round(sr_out * hop_length / sr_in))
        mask_samples = np.repeat(mask, samples_per_frame)

        if len(mask_samples) < len(audio_out):
            mask_samples = np.pad(
                mask_samples,
                (0, len(audio_out) - len(mask_samples)),
                mode="edge"
            )
        else:
            mask_samples = mask_samples[:len(audio_out)]

        return audio_out * mask_samples

    def _process_chunk(

        self,

        features: np.ndarray,

        f0: np.ndarray,

        use_fp16: bool = False,

        speaker_id: int = 0,

    ) -> np.ndarray:
        """

        处理单个音频块



        Args:

            features: HuBERT 特征 [T, C]

            f0: F0 数组

            use_fp16: 是否使用 FP16 推理



        Returns:

            np.ndarray: 合成的音频

        """
        import torch.nn.functional as F

        log.debug(f"[_process_chunk] 输入特征: shape={features.shape}, dtype={features.dtype}")
        log.debug(f"[_process_chunk] 输入特征统计: max={np.max(np.abs(features)):.4f}, mean={np.mean(np.abs(features)):.4f}, std={np.std(features):.4f}")
        log.debug(f"[_process_chunk] 输入 F0: len={len(f0)}, max={np.max(f0):.1f}, min={np.min(f0):.1f}, non-zero={np.sum(f0 > 0)}")

        # 转换为张量
        features_tensor = torch.from_numpy(features).float().to(self.device).unsqueeze(0)
        # HuBERT 输出帧率是 50fps (hop=320 @ 16kHz),但 RVC 模型期望 100fps
        # 需要 2x 上采样特征
        # 注意:interpolate 需要 [B, C, T] 格式,但模型需要 [B, T, C] 格式
        features_tensor = F.interpolate(features_tensor.transpose(1, 2), scale_factor=2, mode='nearest').transpose(1, 2)
        log.debug(f"[_process_chunk] 2x上采样后特征: shape={features_tensor.shape}")

        # F0 对齐到上采样后的特征长度
        # features_tensor 形状是 [B, T, C],所以时间维度是 shape[1]
        target_len = features_tensor.shape[1]
        original_f0_len = len(f0)
        if len(f0) > target_len:
            f0 = f0[:target_len]
        elif len(f0) < target_len:
            f0 = np.pad(f0, (0, target_len - len(f0)), mode='edge')
        log.debug(f"[_process_chunk] F0 对齐: {original_f0_len} -> {len(f0)} (目标: {target_len})")

        f0_tensor = torch.from_numpy(f0.copy()).float().to(self.device).unsqueeze(0)
        # 将 F0 (Hz) 转换为 pitch 索引 (0-255)
        # RVC mel 量化映射到 coarse pitch bins
        f0_coarse = torch.from_numpy(self._f0_to_coarse(f0)).to(self.device).unsqueeze(0)
        log.debug(f"[_process_chunk] F0 张量: shape={f0_tensor.shape}, max={f0_tensor.max().item():.1f}, min={f0_tensor.min().item():.1f}")
        log.debug(f"[_process_chunk] F0 coarse (pitch索引): shape={f0_coarse.shape}, max={f0_coarse.max().item()}, min={f0_coarse.min().item()}")

        safe_speaker_id = int(max(0, min(max(1, int(self.spk_count)) - 1, int(speaker_id))))
        sid = torch.tensor([safe_speaker_id], device=self.device)
        log.debug(f"[_process_chunk] 说话人 ID: {sid.item()}")

        # FP16 推理
        log.debug(f"[_process_chunk] 开始推理, use_fp16={use_fp16}, device={self.device.type}")
        if use_fp16 and supports_fp16(self.device):
            with torch.amp.autocast(str(self.device.type)):
                audio_out, x_mask, _ = self.voice_model.infer(
                    features_tensor,
                    torch.tensor([features_tensor.shape[1]], device=self.device),
                    f0_coarse,
                    f0_tensor,
                    sid
                )
        else:
            audio_out, x_mask, _ = self.voice_model.infer(
                features_tensor,
                torch.tensor([features_tensor.shape[1]], device=self.device),
                f0_coarse,
                f0_tensor,
                sid
            )

        log.debug(f"[_process_chunk] 推理完成, audio_out: shape={audio_out.shape}, dtype={audio_out.dtype}")
        log.debug(f"[_process_chunk] x_mask: shape={x_mask.shape}, sum={x_mask.sum().item()}")

        # 清理
        del features_tensor, f0_tensor, f0_coarse
        empty_device_cache(self.device)

        audio_out = audio_out.squeeze().cpu().detach().float().numpy()
        log.debug(f"Chunk audio: len={len(audio_out)}, max={np.max(np.abs(audio_out)):.4f}, min={np.min(audio_out):.4f}")

        # 注意:不再对 F0=0 区域应用硬静音 mask
        # 辅音(如 k, t, s, p)通常没有基频(F0=0),硬静音会导致只剩元音
        # 如果需要降噪,应该在后处理阶段使用更智能的方法

        return audio_out

    def convert(

        self,

        audio_path: str,

        output_path: str,

        pitch_shift: float = 0,

        index_ratio: float = 0.2,

        filter_radius: int = 3,

        resample_sr: int = 0,

        rms_mix_rate: float = 0.25,

        protect: float = 0.33,

        speaker_id: int = 0,

        silence_gate: bool = True,

        silence_threshold_db: float = -45.0,

        silence_smoothing_ms: float = 50.0,

        silence_min_duration_ms: float = 200.0

    ) -> str:
        """

        执行 RVC 推理



        Args:

            audio_path: 输入音频路径

            output_path: 输出音频路径

            pitch_shift: 音调偏移 (半音)

            index_ratio: 索引混合比率 (0-1)

            filter_radius: 中值滤波半径

            resample_sr: 重采样率 (0 表示不重采样)

            rms_mix_rate: RMS 混合比率

            protect: 保护清辅音

            speaker_id: 说话人 ID(多说话人模型可调)

            silence_gate: 启用静音门限(默认开启以消除静音段底噪)

            silence_threshold_db: 静音阈值 (dB, 相对峰值)

            silence_smoothing_ms: 门限平滑时长 (ms)

            silence_min_duration_ms: 最短静音时长 (ms)



        Returns:

            str: 输出文件路径

        """
        # 检查模型
        if self.voice_model is None:
            raise RuntimeError("请先加载语音模型")
        if self.hubert_model is None:
            raise RuntimeError("请先加载 HuBERT 模型")
        if self.f0_extractor is None:
            raise RuntimeError("请先加载 F0 提取器")

        # 加载音频
        audio = load_audio(audio_path, sr=self.sample_rate)
        audio = normalize_audio(audio)
        rms_mix_rate = float(np.clip(rms_mix_rate, 0.0, 1.0))
        speaker_id = int(max(0, min(max(1, int(self.spk_count)) - 1, int(speaker_id))))

        # 高通滤波去除低频隆隆声(与官方管道一致)
        audio = sp_signal.filtfilt(_bh, _ah, audio).astype(np.float32)

        # 步骤1: 提取 F0 (使用 RMVPE 或 Hybrid)
        f0 = self.f0_extractor.extract(audio)

        # 音调偏移
        if pitch_shift != 0:
            f0 = shift_f0(f0, pitch_shift)

        # 智能中值滤波 - 仅在F0跳变过大时应用,保留自然颤音
        if filter_radius > 0:
            from scipy.ndimage import median_filter

            # 计算F0跳变(半音)
            f0_semitone_diff = np.abs(12 * np.log2((f0 + 1e-6) / (np.roll(f0, 1) + 1e-6)))
            f0_semitone_diff[0] = 0

            # 只对跳变超过2个半音的区域应用滤波
            need_filter = f0_semitone_diff > 2.0

            # 扩展需要滤波的区域(前后各1帧)
            kernel = np.ones(3, dtype=bool)
            need_filter = np.convolve(need_filter, kernel, mode='same')

            # 应用滤波
            f0_filtered = median_filter(f0, size=filter_radius)

            # 高音区域 (>500Hz) 使用更温和的滤波,避免高音被过度平滑
            # 参考: RMVPE论文建议高频区域使用自适应平滑
            high_pitch_mask = f0 > 500

            # 对高音区域使用更小的滤波半径
            if np.any(high_pitch_mask):
                f0_filtered_high = median_filter(f0, size=max(1, filter_radius // 2))
                f0_filtered = np.where(high_pitch_mask, f0_filtered_high, f0_filtered)

            # 混合:只在需要的地方滤波,其他保留原始
            f0 = np.where(need_filter, f0_filtered, f0)

        # 释放 F0 提取器显存
        self.unload_f0_extractor()

        # 步骤2: 提取 HuBERT 特征
        # v1 模型需要 256 维特征(使用 final_proj),v2 模型需要 768 维
        use_final_proj = (self.model_version == "v1")
        features = self.extract_features(audio, use_final_proj=use_final_proj)
        features = features.squeeze(0).cpu().numpy()

        # 释放 HuBERT 显存
        self.unload_hubert()

        # 索引检索 (CPU 操作)
        if self.index is not None and index_ratio > 0:
            features_before_index = features.copy()
            retrieved = self.search_index(features)

            # 简单的自适应索引混合(不使用白化和残差去除)
            # 高音区域使用稍高的索引率
            adaptive_index_ratio = np.ones(len(features)) * index_ratio

            f0_per_feat = 2
            for fi in range(len(features)):
                f0_start = fi * f0_per_feat
                f0_end = min(f0_start + f0_per_feat, len(f0))
                if f0_end > f0_start:
                    f0_segment = f0[f0_start:f0_end]
                    avg_f0 = np.mean(f0_segment[f0_segment > 0]) if np.any(f0_segment > 0) else 0
                    # 高音区域提升索引率
                    if avg_f0 > 450:
                        adaptive_index_ratio[fi] = min(0.75, index_ratio * 1.3)

            adaptive_index_ratio = adaptive_index_ratio[:, np.newaxis]
            features = features * (1 - adaptive_index_ratio) + retrieved * adaptive_index_ratio

            # 动态辅音保护:基于F0置信度和能量调整protect强度
            # 避免索引检索破坏辅音清晰度,与官方管道行为一致
            if protect < 0.5:
                # 构建逐帧保护掩码:F0>0 的帧用 1.0(完全使用索引混合后特征),
                # F0=0 的帧用 protect 值(大部分保留原始特征)
                # F0 帧率是特征帧率的 2 倍 (hop 160 vs 320),需要下采样对齐
                f0_per_feat = 2  # 每个特征帧对应 2 个 F0 帧
                n_feat = features.shape[0]
                protect_mask = np.ones(n_feat, dtype=np.float32)

                # 计算每个特征帧的F0稳定性和能量
                for fi in range(n_feat):
                    f0_start = fi * f0_per_feat
                    f0_end = min(f0_start + f0_per_feat, len(f0))
                    if f0_end > f0_start:
                        f0_segment = f0[f0_start:f0_end]
                        # 无声段(F0=0):强保护,保留更多原始特征
                        # 参考: "Voice Conversion for Articulation Disorders" 建议保护辅音
                        if np.all(f0_segment <= 0):
                            # 提高无声段保护强度,从 protect 提升到 protect * 1.5
                            protect_mask[fi] = min(0.8, protect * 1.5)
                        # F0不稳定段(方差大):中等保护
                        elif len(f0_segment) > 1 and np.std(f0_segment) > 50:
                            protect_mask[fi] = protect + (1.0 - protect) * 0.3
                        # 低能量段(可能是呼吸音):增强保护
                        # 使用特征的L2范数作为能量指标
                        feat_energy = np.linalg.norm(features_before_index[fi])
                        if feat_energy < 0.5:  # 低能量阈值
                            protect_mask[fi] = min(0.8, protect * 1.3)

                # 平滑保护掩码,避免突变
                smooth_kernel = np.array([1, 2, 3, 2, 1], dtype=np.float32)
                smooth_kernel /= np.sum(smooth_kernel)
                protect_mask = np.convolve(protect_mask, smooth_kernel, mode="same")
                protect_mask = np.convolve(protect_mask, smooth_kernel, mode="same")
                protect_mask = np.clip(protect_mask, protect, 1.0)
                protect_mask = protect_mask[:, np.newaxis]  # [T, 1] 广播到 [T, C]
                features = features * protect_mask + features_before_index * (1 - protect_mask)

        # --- 能量感知软门控(索引+protect 之后、分块推理之前)---
        # 注意:使用软门控而非硬清零,避免音量损失
        import librosa as _librosa_local
        _hop_feat = 320  # HuBERT hop
        _n_feat = features.shape[0]
        _frame_rms = _librosa_local.feature.rms(
            y=audio, frame_length=_hop_feat * 2, hop_length=_hop_feat, center=True
        )[0]
        if _frame_rms.ndim > 1:
            _frame_rms = _frame_rms[0]
        if len(_frame_rms) > _n_feat:
            _frame_rms = _frame_rms[:_n_feat]
        elif len(_frame_rms) < _n_feat:
            _frame_rms = np.pad(_frame_rms, (0, _n_feat - len(_frame_rms)), mode='edge')
        _energy_db = 20.0 * np.log10(_frame_rms + 1e-8)
        _ref_db = float(np.percentile(_energy_db, 95)) if _frame_rms.size > 0 else -20.0

        # 改进的软门控:使用渐变衰减而非硬清零,保留低能量内容
        _silence_threshold = _ref_db - 65.0  # 进一步放宽到-65dB(只处理极端静音)
        _is_very_quiet = (_energy_db < _silence_threshold).astype(np.float32)

        # 检查F0:F0=0的帧更可能是静音(但也可能是辅音)
        _f0_50fps = f0[::2] if len(f0) >= _n_feat * 2 else np.pad(f0[::2], (0, _n_feat - len(f0[::2])), mode='edge')
        _f0_50fps = _f0_50fps[:_n_feat]
        _is_unvoiced = (_f0_50fps <= 0).astype(np.float32)

        # 组合判断:极低能量 + 无声 = 可能静音
        _is_silence = _is_very_quiet * _is_unvoiced

        # 平滑门控曲线
        _sm = np.array([1, 2, 3, 2, 1], dtype=np.float32)
        _sm /= _sm.sum()
        _is_silence = np.convolve(_is_silence, _sm, mode='same')[:_n_feat]

        # 最小静音时长过滤(避免误判短暂的低能量辅音)
        _min_silence_frames = 10  # 约200ms @ 50fps(更保守)
        _silence_binary = (_is_silence > 0.7).astype(int)  # 提高阈值到0.7
        _changes = np.diff(np.concatenate(([0], _silence_binary, [0])))
        _starts = np.where(_changes == 1)[0]
        _ends = np.where(_changes == -1)[0]
        _keep_silence = np.zeros_like(_silence_binary, dtype=bool)
        for _s, _e in zip(_starts, _ends):
            if _e - _s >= _min_silence_frames:
                _keep_silence[_s:_e] = True

        # 软门控:使用渐变衰减而非硬清零(0.3-1.0 而非 0-1)
        _energy_gate = np.where(_keep_silence, 0.3, 1.0).astype(np.float32)

        # 特征软门控(50fps)- 保留30%而非完全清零
        features = features * _energy_gate[:, np.newaxis]

        # F0 软清零(100fps = 特征帧率 × 2)- 保留30%而非完全清零
        _f0_gate = np.repeat(_energy_gate, 2)
        if len(_f0_gate) > len(f0):
            _f0_gate = _f0_gate[:len(f0)]
        elif len(_f0_gate) < len(f0):
            _f0_gate = np.pad(_f0_gate, (0, len(f0) - len(_f0_gate)), mode='constant', constant_values=1.0)
        f0 = f0 * _f0_gate

        # 步骤3: 语音合成 (voice_model 推理) - 分块处理
        # 分块参数 - 增加重叠以减少边界伪影
        CHUNK_SECONDS = 30  # 每块 30 秒
        OVERLAP_SECONDS = 2.0  # 重叠 2.0 秒(从1.0增加到2.0,减少破音)
        HOP_LENGTH = 320  # HuBERT hop length

        # 计算分块大小(以特征帧为单位)
        chunk_frames = int(CHUNK_SECONDS * self.sample_rate / HOP_LENGTH)
        overlap_frames = int(OVERLAP_SECONDS * self.sample_rate / HOP_LENGTH)

        total_frames = features.shape[0]

        # 如果音频短于一块,直接处理
        if total_frames <= chunk_frames:
            audio_out = self._process_chunk(features, f0, speaker_id=speaker_id)
        else:
            # 分块处理
            log.info(f"音频较长 ({total_frames} 帧),启用分块处理...")
            audio_chunks = []
            chunk_idx = 0

            for start in range(0, total_frames, chunk_frames - overlap_frames):
                end = min(start + chunk_frames, total_frames)
                chunk_features = features[start:end]

                # 计算对应的 F0 范围
                # F0 帧率是特征帧率的 2 倍 (hop 160 vs 320)
                f0_start = start * 2
                f0_end = min(end * 2, len(f0))
                chunk_f0 = f0[f0_start:f0_end]

                log.debug(f"处理块 {chunk_idx}: 帧 {start}-{end}")

                # 处理当前块
                chunk_audio = self._process_chunk(chunk_features, chunk_f0, speaker_id=speaker_id)
                audio_chunks.append(chunk_audio)
                chunk_idx += 1

                # 清理显存
                gc.collect()
                empty_device_cache(self.device)

            # 交叉淡入淡出拼接
            audio_out = self._crossfade_chunks(audio_chunks, overlap_frames)
            log.info(f"分块处理完成,共 {chunk_idx} 块")

        # 后处理
        if isinstance(audio_out, tuple):
            audio_out = audio_out[0]
        audio_out = np.asarray(audio_out).flatten()

        # 重采样
        if resample_sr > 0 and resample_sr != self.output_sr:
            import librosa
            audio_out = librosa.resample(
                audio_out,
                orig_sr=self.output_sr,
                target_sr=resample_sr
            )
            save_sr = resample_sr
        else:
            save_sr = self.output_sr

        # 可选 RMS 包络混合
        if rms_mix_rate > 0:
            audio_out = self._apply_rms_mix(
                audio_out=audio_out,
                audio_in=audio,
                sr_out=save_sr,
                sr_in=self.sample_rate,
                hop_length=160,
                rms_mix_rate=rms_mix_rate
            )

        # 可选静音门限 (减少无声段气声/噪声)
        if silence_gate:
            audio_out = self._apply_silence_gate(
                audio_out=audio_out,
                audio_in=audio,
                f0=f0,
                sr_out=save_sr,
                sr_in=self.sample_rate,
                hop_length=160,
                threshold_db=silence_threshold_db,
                smoothing_ms=silence_smoothing_ms,
                min_silence_ms=silence_min_duration_ms,
                protect=protect
            )

        # 应用人声清理后处理(减少齿音和呼吸音)
        # 注意:为避免过度处理导致音质下降,默认禁用
        try:
            from lib.vocal_cleanup import apply_vocal_cleanup
            audio_out = apply_vocal_cleanup(
                audio_out,
                sr=save_sr,
                reduce_sibilance_enabled=False,  # 禁用齿音处理,避免音质损失
                reduce_breath_enabled=False,
                sibilance_reduction_db=2.0,
                breath_reduction_db=0.0
            )
            log.detail("已应用人声清理")
        except Exception as e:
            log.warning(f"人声清理失败: {e}")

        # 应用vocoder伪影修复(呼吸音电音和长音撕裂)
        # 注意:只保留相位修复,禁用其他处理避免音量损失
        try:
            from lib.vocoder_fix import apply_vocoder_artifact_fix

            # 将F0重采样到音频帧率
            if len(f0) > 0:
                import librosa
                # F0是100fps,需要对齐到音频帧率
                f0_resampled = librosa.resample(
                    f0.astype(np.float32),
                    orig_sr=100,  # F0帧率
                    target_sr=save_sr / (save_sr / 16000 * 160)  # 音频帧率
                )
            else:
                f0_resampled = None

            audio_out = apply_vocoder_artifact_fix(
                audio_out,
                sr=save_sr,
                f0=f0_resampled,
                chunk_boundaries=None,
                fix_phase=True,        # 保留相位修复(修复长音撕裂)
                fix_breath=True,       # 启用底噪清理(使用优化后的精准检测)
                fix_sustained=False    # 禁用长音稳定,避免音质损失
            )
            log.detail("已应用vocoder伪影修复(相位+底噪清理)")
        except Exception as e:
            log.warning(f"Vocoder伪影修复失败: {e}")

        # 峰值限幅(不改变整体响度,后续由 cover_pipeline 控制音量)
        audio_out = soft_clip(audio_out, threshold=0.9, ceiling=0.99)

        # 保存
        save_audio(output_path, audio_out, sr=save_sr)

        return output_path

    def _crossfade_chunks(self, chunks: list, overlap_frames: int) -> np.ndarray:
        """

        使用 SOLA (Synchronized Overlap-Add) 拼接音频块



        SOLA 通过在重叠区域搜索最佳相位对齐点来避免分块边界的撕裂伪影。

        参考: w-okada/voice-changer Issue #163, DDSP-SVC 实现



        Args:

            chunks: 音频块列表

            overlap_frames: 重叠帧数(特征帧)



        Returns:

            np.ndarray: 拼接后的音频

        """
        if len(chunks) == 1:
            return chunks[0]

        # 正确计算重叠的音频样本数
        # 1 特征帧 = HOP_LENGTH 输入样本 @ 16kHz
        # 输出样本数 = HOP_LENGTH * (output_sr / input_sr)
        HOP_LENGTH = 320
        INPUT_SR = 16000
        output_sr = getattr(self, 'output_sr', 40000)

        # 每个特征帧对应的输出样本数
        samples_per_frame = int(HOP_LENGTH * output_sr / INPUT_SR)
        overlap_samples = overlap_frames * samples_per_frame

        log.debug(f"SOLA Crossfade: overlap_frames={overlap_frames}, samples_per_frame={samples_per_frame}, overlap_samples={overlap_samples}")

        result = chunks[0]

        for i in range(1, len(chunks)):
            chunk = chunks[i]

            # 确保重叠区域不超过任一块的长度
            actual_overlap = min(overlap_samples, len(result), len(chunk))

            if actual_overlap > 0:
                # SOLA: 在重叠区域搜索最佳相位对齐点
                # 搜索范围:不超过一个基频周期(约 100-200 样本 @ 48kHz)
                search_range = min(int(output_sr * 0.005), actual_overlap // 4)  # 5ms 或 1/4 重叠

                # 提取前一块的尾部作为参考
                reference = result[-actual_overlap:]

                # 在新块的开头搜索最佳对齐位置
                best_offset = 0
                max_correlation = -1.0

                for offset in range(max(0, -search_range), min(search_range + 1, len(chunk) - actual_overlap + 1)):
                    # 提取候选区域
                    candidate_start = max(0, offset)
                    candidate_end = candidate_start + actual_overlap

                    if candidate_end > len(chunk):
                        continue

                    candidate = chunk[candidate_start:candidate_end]

                    # 计算归一化互相关
                    ref_norm = np.linalg.norm(reference)
                    cand_norm = np.linalg.norm(candidate)

                    if ref_norm > 1e-6 and cand_norm > 1e-6:
                        correlation = np.dot(reference, candidate) / (ref_norm * cand_norm)

                        if correlation > max_correlation:
                            max_correlation = correlation
                            best_offset = offset

                log.debug(f"SOLA chunk {i}: best_offset={best_offset}, correlation={max_correlation:.4f}")

                # 如果相关性太低(<0.3),说明信号不连续,使用简单crossfade避免伪影
                if max_correlation < 0.3:
                    log.debug(f"SOLA chunk {i}: low correlation, using simple crossfade")
                    fade_out = np.linspace(1, 0, actual_overlap)
                    fade_in = np.linspace(0, 1, actual_overlap)
                    result_end = result[-actual_overlap:] * fade_out
                    chunk_start = chunk[:actual_overlap] * fade_in
                    result = np.concatenate([
                        result[:-actual_overlap],
                        result_end + chunk_start,
                        chunk[actual_overlap:]
                    ])
                    continue

                # 在最佳对齐点应用交叉淡入淡出
                aligned_start = max(0, best_offset)
                aligned_end = aligned_start + actual_overlap

                if aligned_end <= len(chunk):
                    # 创建淡入淡出曲线(使用余弦窗以获得更平滑的过渡)
                    fade_out = np.cos(np.linspace(0, np.pi / 2, actual_overlap)) ** 2
                    fade_in = np.sin(np.linspace(0, np.pi / 2, actual_overlap)) ** 2

                    # 应用交叉淡入淡出
                    result_end = result[-actual_overlap:] * fade_out
                    chunk_aligned = chunk[aligned_start:aligned_end] * fade_in

                    # 拼接
                    result = np.concatenate([
                        result[:-actual_overlap],
                        result_end + chunk_aligned,
                        chunk[aligned_end:]
                    ])
                else:
                    # 对齐失败,回退到简单拼接
                    log.warning(f"SOLA alignment failed for chunk {i}, using simple crossfade")
                    fade_out = np.linspace(1, 0, actual_overlap)
                    fade_in = np.linspace(0, 1, actual_overlap)
                    result_end = result[-actual_overlap:] * fade_out
                    chunk_start = chunk[:actual_overlap] * fade_in
                    result = np.concatenate([
                        result[:-actual_overlap],
                        result_end + chunk_start,
                        chunk[actual_overlap:]
                    ])
            else:
                # 无重叠,直接拼接
                result = np.concatenate([result, chunk])

        return result


def list_voice_models(weights_dir: str = "assets/weights") -> list:
    """

    列出可用的语音模型



    Args:

        weights_dir: 模型目录



    Returns:

        list: 模型信息列表

    """
    models = []
    weights_path = Path(weights_dir)

    if not weights_path.exists():
        return models

    # 递归搜索所有子目录
    for pth_file in weights_path.glob("**/*.pth"):
        # 查找对应的索引文件(同目录下)
        index_file = pth_file.with_suffix(".index")
        if not index_file.exists():
            # 尝试其他命名方式
            index_file = pth_file.parent / f"{pth_file.stem}_v2.index"
        if not index_file.exists():
            # 尝试不区分大小写匹配
            for f in pth_file.parent.glob("*.index"):
                if f.stem.lower() == pth_file.stem.lower():
                    index_file = f
                    break

        models.append({
            "name": pth_file.stem,
            "model_path": str(pth_file),
            "index_path": str(index_file) if index_file.exists() else None
        })

    return models