| |
|
| | """
|
| | RVC 推理管道 - 端到端 AI 翻唱
|
| | """
|
| | import os
|
| | import gc
|
| | import torch
|
| | import numpy as np
|
| | import faiss
|
| | from pathlib import Path
|
| | from typing import Optional, Tuple, Union
|
| | from scipy import signal as sp_signal
|
| |
|
| | from lib.audio import load_audio, save_audio, normalize_audio, soft_clip
|
| | from lib.device import get_device, empty_device_cache, supports_fp16
|
| | from lib.logger import log
|
| | from infer.f0_extractor import get_f0_extractor, shift_f0, F0Method
|
| |
|
| |
|
| | _bh, _ah = sp_signal.butter(N=5, Wn=48, btype="high", fs=16000)
|
| |
|
| |
|
| | class VoiceConversionPipeline:
|
| | """RVC 推理管道"""
|
| |
|
| | def __init__(self, device: str = "cuda"):
|
| | """
|
| | 初始化管道
|
| |
|
| | Args:
|
| | device: 计算设备 ("cuda" 或 "cpu")
|
| | """
|
| | self.device = get_device(device)
|
| | self.hubert_model = None
|
| | self.hubert_model_type = None
|
| | self.hubert_layer = 12
|
| | self.voice_model = None
|
| | self.index = None
|
| | self.f0_extractor = None
|
| | self.spk_count = 1
|
| | self.model_version = "v2"
|
| |
|
| |
|
| | self.sample_rate = 16000
|
| | self.output_sr = 48000
|
| |
|
| | def unload_hubert(self):
|
| | """卸载 HuBERT 模型释放显存"""
|
| | if self.hubert_model is not None:
|
| | self.hubert_model.cpu()
|
| | del self.hubert_model
|
| | self.hubert_model = None
|
| | self.hubert_model_type = None
|
| | gc.collect()
|
| | empty_device_cache(self.device)
|
| |
|
| | def unload_f0_extractor(self):
|
| | """卸载 F0 提取器释放显存"""
|
| | if self.f0_extractor is not None:
|
| |
|
| | if hasattr(self.f0_extractor, 'model') and self.f0_extractor.model is not None:
|
| | rmvpe = self.f0_extractor.model
|
| |
|
| | if hasattr(rmvpe, 'model') and rmvpe.model is not None:
|
| | rmvpe.model.cpu()
|
| | del rmvpe.model
|
| | rmvpe.model = None
|
| |
|
| | if hasattr(rmvpe, 'mel_extractor') and rmvpe.mel_extractor is not None:
|
| | rmvpe.mel_extractor.cpu()
|
| | del rmvpe.mel_extractor
|
| | rmvpe.mel_extractor = None
|
| | del self.f0_extractor.model
|
| | self.f0_extractor.model = None
|
| | del self.f0_extractor
|
| | self.f0_extractor = None
|
| | gc.collect()
|
| | empty_device_cache(self.device)
|
| |
|
| | def unload_voice_model(self):
|
| | """卸载语音模型释放显存"""
|
| | if self.voice_model is not None:
|
| | self.voice_model.cpu()
|
| | del self.voice_model
|
| | self.voice_model = None
|
| | gc.collect()
|
| | empty_device_cache(self.device)
|
| |
|
| | def unload_all(self):
|
| | """卸载所有模型"""
|
| | self.unload_hubert()
|
| | self.unload_f0_extractor()
|
| | self.unload_voice_model()
|
| | self.index = None
|
| |
|
| | def load_hubert(self, model_path: str):
|
| | """
|
| | 加载 HuBERT 模型
|
| |
|
| | Args:
|
| | model_path: HuBERT 模型路径(可以是本地 .pt 文件或 Hugging Face 模型名)
|
| | """
|
| |
|
| | if os.path.isfile(model_path):
|
| | try:
|
| | from fairseq import checkpoint_utils
|
| |
|
| | models, _, _ = checkpoint_utils.load_model_ensemble_and_task(
|
| | [model_path],
|
| | suffix=""
|
| | )
|
| | model = models[0]
|
| | model = model.to(self.device).eval()
|
| | self.hubert_model = model
|
| | self.hubert_model_type = "fairseq"
|
| | log.info(f"HuBERT 模型已加载: {model_path} ({self.device})")
|
| | return
|
| | except Exception as e:
|
| | log.warning(f"fairseq 加载失败,尝试 torchaudio: {e}")
|
| |
|
| | try:
|
| | import torchaudio
|
| |
|
| | bundle = torchaudio.pipelines.HUBERT_BASE
|
| | model = bundle.get_model()
|
| | model = model.to(self.device).eval()
|
| | self.hubert_model = model
|
| | self.hubert_model_type = "torchaudio"
|
| | log.info(
|
| | f"HuBERT 模型已加载: torchaudio HUBERT_BASE ({self.device})"
|
| | )
|
| | return
|
| | except Exception as e:
|
| | log.warning(f"torchaudio 加载失败,尝试 transformers: {e}")
|
| |
|
| | from transformers import HubertModel
|
| |
|
| | if os.path.isfile(model_path):
|
| | log.info("检测到本地模型文件,将使用 Hugging Face 预训练模型替代")
|
| | model_name = "facebook/hubert-base-ls960"
|
| | else:
|
| | model_name = model_path
|
| |
|
| | try:
|
| | self.hubert_model = HubertModel.from_pretrained(model_name)
|
| | except Exception as e:
|
| | log.warning(f"从网络加载失败,尝试使用本地缓存: {e}")
|
| | self.hubert_model = HubertModel.from_pretrained(
|
| | model_name,
|
| | local_files_only=True
|
| | )
|
| | self.hubert_model = self.hubert_model.to(self.device).eval()
|
| | self.hubert_model_type = "transformers"
|
| | log.info(f"HuBERT 模型已加载: {model_name} ({self.device})")
|
| |
|
| | def load_voice_model(self, model_path: str) -> dict:
|
| | """
|
| | 加载语音模型
|
| |
|
| | Args:
|
| | model_path: 模型文件路径 (.pth)
|
| |
|
| | Returns:
|
| | dict: 模型信息
|
| | """
|
| | log.debug(f"正在加载语音模型: {model_path}")
|
| | cpt = torch.load(model_path, map_location="cpu", weights_only=False)
|
| |
|
| | log.debug(f"模型文件 keys: {cpt.keys()}")
|
| |
|
| |
|
| | config = cpt.get("config", [])
|
| | self.output_sr = cpt.get("sr", 48000)
|
| |
|
| | log.debug(f"config 类型: {type(config)}, 内容: {config}")
|
| | log.debug(f"采样率: {self.output_sr}")
|
| |
|
| |
|
| | if isinstance(config, list) and len(config) >= 18:
|
| | model_config = {
|
| | "spec_channels": config[0],
|
| | "segment_size": config[1],
|
| | "inter_channels": config[2],
|
| | "hidden_channels": config[3],
|
| | "filter_channels": config[4],
|
| | "n_heads": config[5],
|
| | "n_layers": config[6],
|
| | "kernel_size": config[7],
|
| | "p_dropout": config[8],
|
| | "resblock": config[9],
|
| | "resblock_kernel_sizes": config[10],
|
| | "resblock_dilation_sizes": config[11],
|
| | "upsample_rates": config[12],
|
| | "upsample_initial_channel": config[13],
|
| | "upsample_kernel_sizes": config[14],
|
| | "spk_embed_dim": config[15],
|
| | "gin_channels": config[16],
|
| | }
|
| |
|
| | if len(config) > 17:
|
| | self.output_sr = config[17]
|
| | elif isinstance(config, dict):
|
| |
|
| | model_config = config
|
| | else:
|
| |
|
| | log.warning("无法解析 config,使用默认值")
|
| | model_config = {}
|
| |
|
| | log.debug(f"解析后的配置: {model_config}")
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | gin_channels = model_config.get("gin_channels", 256)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | model_version = None
|
| |
|
| | if "version" in cpt:
|
| | model_version = cpt["version"]
|
| | log.debug(f"从version字段检测到: {model_version}")
|
| | elif "weight" in cpt and "enc_p.emb_phone.weight" in cpt["weight"]:
|
| |
|
| |
|
| |
|
| | emb_shape = cpt["weight"]["enc_p.emb_phone.weight"].shape
|
| | log.debug(f"enc_p.emb_phone.weight 形状: {emb_shape}")
|
| | if emb_shape[1] == 256:
|
| | model_version = "v1"
|
| | log.debug("从权重形状检测到: v1 (256维)")
|
| | elif emb_shape[1] == 768:
|
| | model_version = "v2"
|
| | log.debug("从权重形状检测到: v2 (768维)")
|
| |
|
| |
|
| | if model_version == "v1":
|
| |
|
| | from infer.lib.infer_pack.models import SynthesizerTrnMs256NSFsid
|
| | synthesizer_class = SynthesizerTrnMs256NSFsid
|
| | self.model_version = "v1"
|
| | log.debug(f"使用v1合成器 (256维)")
|
| | else:
|
| |
|
| | from infer.lib.infer_pack.models import SynthesizerTrnMs768NSFsid
|
| | synthesizer_class = SynthesizerTrnMs768NSFsid
|
| | self.model_version = "v2"
|
| | log.debug(f"使用v2合成器 (768维)")
|
| |
|
| |
|
| | self.voice_model = synthesizer_class(
|
| | spec_channels=model_config.get("spec_channels", 1025),
|
| | segment_size=model_config.get("segment_size", 32),
|
| | inter_channels=model_config.get("inter_channels", 192),
|
| | hidden_channels=model_config.get("hidden_channels", 192),
|
| | filter_channels=model_config.get("filter_channels", 768),
|
| | n_heads=model_config.get("n_heads", 2),
|
| | n_layers=model_config.get("n_layers", 6),
|
| | kernel_size=model_config.get("kernel_size", 3),
|
| | p_dropout=model_config.get("p_dropout", 0),
|
| | resblock=model_config.get("resblock", "1"),
|
| | resblock_kernel_sizes=model_config.get("resblock_kernel_sizes", [3, 7, 11]),
|
| | resblock_dilation_sizes=model_config.get("resblock_dilation_sizes", [[1, 3, 5], [1, 3, 5], [1, 3, 5]]),
|
| | upsample_rates=model_config.get("upsample_rates", [10, 10, 2, 2]),
|
| | upsample_initial_channel=model_config.get("upsample_initial_channel", 512),
|
| | upsample_kernel_sizes=model_config.get("upsample_kernel_sizes", [16, 16, 4, 4]),
|
| | spk_embed_dim=model_config.get("spk_embed_dim", 109),
|
| | gin_channels=model_config.get("gin_channels", 256),
|
| | sr=self.output_sr,
|
| | is_half=supports_fp16(self.device)
|
| | )
|
| | self.spk_count = int(model_config.get("spk_embed_dim", 1) or 1)
|
| |
|
| |
|
| | self.voice_model.load_state_dict(cpt["weight"], strict=False)
|
| | self.voice_model = self.voice_model.to(self.device).eval()
|
| |
|
| | model_info = {
|
| | "name": Path(model_path).stem,
|
| | "sample_rate": self.output_sr,
|
| | "version": cpt.get("version", "v2")
|
| | }
|
| |
|
| | log.info(f"语音模型已加载: {model_info['name']} ({self.output_sr}Hz)")
|
| | return model_info
|
| |
|
| | def load_index(self, index_path: str):
|
| | """
|
| | 加载 FAISS 索引
|
| |
|
| | Args:
|
| | index_path: 索引文件路径 (.index)
|
| | """
|
| | self.index = faiss.read_index(index_path)
|
| |
|
| | try:
|
| | self.index.make_direct_map()
|
| | except Exception:
|
| | pass
|
| | log.info(f"索引已加载: {index_path}")
|
| |
|
| | def load_f0_extractor(self, method: F0Method = "rmvpe",
|
| | rmvpe_path: str = None):
|
| | """
|
| | 加载 F0 提取器
|
| |
|
| | Args:
|
| | method: F0 提取方法
|
| | rmvpe_path: RMVPE 模型路径
|
| | """
|
| | self.f0_extractor = get_f0_extractor(
|
| | method,
|
| | device=str(self.device),
|
| | rmvpe_path=rmvpe_path
|
| | )
|
| | log.info(f"F0 提取器已加载: {method}")
|
| |
|
| | @torch.no_grad()
|
| | def extract_features(self, audio: np.ndarray, use_final_proj: bool = False) -> torch.Tensor:
|
| | """
|
| | 使用 HuBERT 提取特征
|
| |
|
| | Args:
|
| | audio: 16kHz 音频数据
|
| | use_final_proj: 是否使用 final_proj 将 768 维降到 256 维(v1 模型需要)
|
| |
|
| | Returns:
|
| | torch.Tensor: HuBERT 特征
|
| | """
|
| | if self.hubert_model is None:
|
| | raise RuntimeError("请先加载 HuBERT 模型")
|
| |
|
| |
|
| | audio_tensor = torch.from_numpy(audio).float().to(self.device)
|
| | if audio_tensor.dim() == 1:
|
| | audio_tensor = audio_tensor.unsqueeze(0)
|
| |
|
| | if self.hubert_model_type == "fairseq":
|
| |
|
| | output_layer = 9 if use_final_proj else 12
|
| | feats = self.hubert_model.extract_features(
|
| | audio_tensor,
|
| | padding_mask=None,
|
| | output_layer=output_layer
|
| | )[0]
|
| |
|
| |
|
| | if use_final_proj and hasattr(self.hubert_model, 'final_proj'):
|
| | feats = self.hubert_model.final_proj(feats)
|
| | return feats
|
| |
|
| | if self.hubert_model_type == "torchaudio":
|
| | feats_list, _ = self.hubert_model.extract_features(audio_tensor)
|
| | layer_idx = min(self.hubert_layer - 1, len(feats_list) - 1)
|
| | return feats_list[layer_idx]
|
| |
|
| |
|
| | outputs = self.hubert_model(audio_tensor, output_hidden_states=True)
|
| | layer_idx = min(self.hubert_layer, len(outputs.hidden_states) - 1)
|
| | return outputs.hidden_states[layer_idx]
|
| |
|
| | def search_index(self, features: np.ndarray, k: int = 8) -> np.ndarray:
|
| | """
|
| | 在索引中搜索相似特征
|
| |
|
| | Args:
|
| | features: 输入特征
|
| | k: 返回的近邻数量
|
| |
|
| | Returns:
|
| | np.ndarray: 检索到的特征
|
| | """
|
| | if self.index is None:
|
| | return features
|
| |
|
| |
|
| | if features.shape[-1] != self.index.d:
|
| | log.warning(f"特征维度 ({features.shape[-1]}) 与索引维度 ({self.index.d}) 不匹配,跳过索引搜索")
|
| | return features
|
| |
|
| |
|
| | scores, indices = self.index.search(features, k)
|
| |
|
| |
|
| | try:
|
| | big_npy = self.index.reconstruct_n(0, self.index.ntotal)
|
| | except RuntimeError as e:
|
| | if "direct map" in str(e):
|
| | log.warning("索引不支持向量重建,跳过索引混合")
|
| | return features
|
| | raise
|
| |
|
| |
|
| | weight = np.square(1.0 / (scores + 1e-6))
|
| | weight /= weight.sum(axis=1, keepdims=True)
|
| | retrieved = np.sum(
|
| | big_npy[indices] * np.expand_dims(weight, axis=2), axis=1
|
| | )
|
| | return retrieved
|
| | @staticmethod
|
| | def _f0_to_coarse(
|
| | f0: np.ndarray,
|
| | f0_min: float = 50.0,
|
| | f0_max: float = 1100.0
|
| | ) -> np.ndarray:
|
| | """Convert F0 (Hz) to official RVC coarse bins (1-255)."""
|
| | f0 = np.asarray(f0, dtype=np.float32)
|
| | f0_max = max(float(f0_max), float(f0_min) + 1.0)
|
| | f0_mel_min = 1127 * np.log(1 + float(f0_min) / 700.0)
|
| | f0_mel_max = 1127 * np.log(1 + f0_max / 700.0)
|
| | f0_mel = 1127 * np.log1p(np.maximum(f0, 0.0) / 700.0)
|
| | voiced = f0_mel > 0
|
| | f0_mel[voiced] = (f0_mel[voiced] - f0_mel_min) * 254 / (f0_mel_max - f0_mel_min) + 1
|
| | f0_mel[f0_mel <= 1] = 1
|
| | f0_mel[f0_mel > 255] = 255
|
| | return np.rint(f0_mel).astype(np.int64)
|
| | def _apply_rms_mix(
|
| | self,
|
| | audio_out: np.ndarray,
|
| | audio_in: np.ndarray,
|
| | sr_out: int,
|
| | sr_in: int,
|
| | hop_length: int,
|
| | rms_mix_rate: float
|
| | ) -> np.ndarray:
|
| | """Match output RMS envelope to input RMS (0=off, 1=full match)."""
|
| | if rms_mix_rate <= 0:
|
| | return audio_out
|
| |
|
| | import librosa
|
| |
|
| | frame_length_in = 1024
|
| | rms_in = librosa.feature.rms(
|
| | y=audio_in,
|
| | frame_length=frame_length_in,
|
| | hop_length=hop_length,
|
| | center=True
|
| | )[0]
|
| |
|
| | hop_out = int(round(hop_length * sr_out / sr_in))
|
| | frame_length_out = int(round(frame_length_in * sr_out / sr_in))
|
| | rms_out = librosa.feature.rms(
|
| | y=audio_out,
|
| | frame_length=frame_length_out,
|
| | hop_length=hop_out,
|
| | center=True
|
| | )[0]
|
| |
|
| | min_len = min(len(rms_in), len(rms_out))
|
| | if min_len == 0:
|
| | return audio_out
|
| |
|
| | rms_in = rms_in[:min_len]
|
| | rms_out = rms_out[:min_len]
|
| |
|
| | gain = rms_in / (rms_out + 1e-6)
|
| | gain = np.clip(gain, 0.2, 4.0)
|
| | gain = gain ** rms_mix_rate
|
| |
|
| | gain_samples = np.repeat(gain, hop_out)
|
| | if len(gain_samples) < len(audio_out):
|
| | gain_samples = np.pad(
|
| | gain_samples,
|
| | (0, len(audio_out) - len(gain_samples)),
|
| | mode="edge"
|
| | )
|
| | else:
|
| | gain_samples = gain_samples[:len(audio_out)]
|
| |
|
| | return audio_out * gain_samples
|
| |
|
| | def _apply_silence_gate(
|
| | self,
|
| | audio_out: np.ndarray,
|
| | audio_in: np.ndarray,
|
| | f0: np.ndarray,
|
| | sr_out: int,
|
| | sr_in: int,
|
| | hop_length: int,
|
| | threshold_db: float,
|
| | smoothing_ms: float,
|
| | min_silence_ms: float,
|
| | protect: float
|
| | ) -> np.ndarray:
|
| | """Silence gate based on input RMS and F0."""
|
| | import librosa
|
| |
|
| | frame_length = 1024
|
| | rms = librosa.feature.rms(
|
| | y=audio_in,
|
| | frame_length=frame_length,
|
| | hop_length=hop_length,
|
| | center=True
|
| | )[0]
|
| |
|
| | if len(rms) == 0 or len(f0) == 0:
|
| | return audio_out
|
| |
|
| |
|
| | if len(rms) < len(f0):
|
| | rms = np.pad(rms, (0, len(f0) - len(rms)), mode="edge")
|
| | else:
|
| | rms = rms[:len(f0)]
|
| |
|
| | rms_db = 20 * np.log10(rms + 1e-6)
|
| | ref_db = np.percentile(rms_db, 95)
|
| | gate_db = ref_db + threshold_db
|
| |
|
| | silent = (rms_db < gate_db) & (f0 <= 0)
|
| |
|
| | if min_silence_ms > 0:
|
| | min_frames = int(
|
| | round((min_silence_ms / 1000) * (sr_in / hop_length))
|
| | )
|
| | if min_frames > 1:
|
| | silent_int = silent.astype(int)
|
| | changes = np.diff(
|
| | np.concatenate(([0], silent_int, [0]))
|
| | )
|
| | starts = np.where(changes == 1)[0]
|
| | ends = np.where(changes == -1)[0]
|
| | keep_silent = np.zeros_like(silent, dtype=bool)
|
| | for s, e in zip(starts, ends):
|
| | if e - s >= min_frames:
|
| | keep_silent[s:e] = True
|
| | silent = keep_silent
|
| |
|
| | mask = 1.0 - silent.astype(float)
|
| |
|
| | if smoothing_ms > 0:
|
| | smooth_frames = int(
|
| | round((smoothing_ms / 1000) * (sr_in / hop_length))
|
| | )
|
| | if smooth_frames > 1:
|
| | kernel = np.ones(smooth_frames) / smooth_frames
|
| | mask = np.convolve(
|
| | mask,
|
| | kernel,
|
| | mode="same"
|
| | )
|
| | mask = np.clip(mask, 0.0, 1.0)
|
| | protect = float(np.clip(protect, 0.0, 1.0))
|
| | if protect > 0:
|
| | mask = mask * (1.0 - protect) + protect
|
| |
|
| | samples_per_frame = int(round(sr_out * hop_length / sr_in))
|
| | mask_samples = np.repeat(mask, samples_per_frame)
|
| |
|
| | if len(mask_samples) < len(audio_out):
|
| | mask_samples = np.pad(
|
| | mask_samples,
|
| | (0, len(audio_out) - len(mask_samples)),
|
| | mode="edge"
|
| | )
|
| | else:
|
| | mask_samples = mask_samples[:len(audio_out)]
|
| |
|
| | return audio_out * mask_samples
|
| |
|
| | def _process_chunk(
|
| | self,
|
| | features: np.ndarray,
|
| | f0: np.ndarray,
|
| | use_fp16: bool = False,
|
| | speaker_id: int = 0,
|
| | ) -> np.ndarray:
|
| | """
|
| | 处理单个音频块
|
| |
|
| | Args:
|
| | features: HuBERT 特征 [T, C]
|
| | f0: F0 数组
|
| | use_fp16: 是否使用 FP16 推理
|
| |
|
| | Returns:
|
| | np.ndarray: 合成的音频
|
| | """
|
| | import torch.nn.functional as F
|
| |
|
| | log.debug(f"[_process_chunk] 输入特征: shape={features.shape}, dtype={features.dtype}")
|
| | log.debug(f"[_process_chunk] 输入特征统计: max={np.max(np.abs(features)):.4f}, mean={np.mean(np.abs(features)):.4f}, std={np.std(features):.4f}")
|
| | log.debug(f"[_process_chunk] 输入 F0: len={len(f0)}, max={np.max(f0):.1f}, min={np.min(f0):.1f}, non-zero={np.sum(f0 > 0)}")
|
| |
|
| |
|
| | features_tensor = torch.from_numpy(features).float().to(self.device).unsqueeze(0)
|
| |
|
| |
|
| |
|
| | features_tensor = F.interpolate(features_tensor.transpose(1, 2), scale_factor=2, mode='nearest').transpose(1, 2)
|
| | log.debug(f"[_process_chunk] 2x上采样后特征: shape={features_tensor.shape}")
|
| |
|
| |
|
| |
|
| | target_len = features_tensor.shape[1]
|
| | original_f0_len = len(f0)
|
| | if len(f0) > target_len:
|
| | f0 = f0[:target_len]
|
| | elif len(f0) < target_len:
|
| | f0 = np.pad(f0, (0, target_len - len(f0)), mode='edge')
|
| | log.debug(f"[_process_chunk] F0 对齐: {original_f0_len} -> {len(f0)} (目标: {target_len})")
|
| |
|
| | f0_tensor = torch.from_numpy(f0.copy()).float().to(self.device).unsqueeze(0)
|
| |
|
| |
|
| | f0_coarse = torch.from_numpy(self._f0_to_coarse(f0)).to(self.device).unsqueeze(0)
|
| | log.debug(f"[_process_chunk] F0 张量: shape={f0_tensor.shape}, max={f0_tensor.max().item():.1f}, min={f0_tensor.min().item():.1f}")
|
| | log.debug(f"[_process_chunk] F0 coarse (pitch索引): shape={f0_coarse.shape}, max={f0_coarse.max().item()}, min={f0_coarse.min().item()}")
|
| |
|
| | safe_speaker_id = int(max(0, min(max(1, int(self.spk_count)) - 1, int(speaker_id))))
|
| | sid = torch.tensor([safe_speaker_id], device=self.device)
|
| | log.debug(f"[_process_chunk] 说话人 ID: {sid.item()}")
|
| |
|
| |
|
| | log.debug(f"[_process_chunk] 开始推理, use_fp16={use_fp16}, device={self.device.type}")
|
| | if use_fp16 and supports_fp16(self.device):
|
| | with torch.amp.autocast(str(self.device.type)):
|
| | audio_out, x_mask, _ = self.voice_model.infer(
|
| | features_tensor,
|
| | torch.tensor([features_tensor.shape[1]], device=self.device),
|
| | f0_coarse,
|
| | f0_tensor,
|
| | sid
|
| | )
|
| | else:
|
| | audio_out, x_mask, _ = self.voice_model.infer(
|
| | features_tensor,
|
| | torch.tensor([features_tensor.shape[1]], device=self.device),
|
| | f0_coarse,
|
| | f0_tensor,
|
| | sid
|
| | )
|
| |
|
| | log.debug(f"[_process_chunk] 推理完成, audio_out: shape={audio_out.shape}, dtype={audio_out.dtype}")
|
| | log.debug(f"[_process_chunk] x_mask: shape={x_mask.shape}, sum={x_mask.sum().item()}")
|
| |
|
| |
|
| | del features_tensor, f0_tensor, f0_coarse
|
| | empty_device_cache(self.device)
|
| |
|
| | audio_out = audio_out.squeeze().cpu().detach().float().numpy()
|
| | log.debug(f"Chunk audio: len={len(audio_out)}, max={np.max(np.abs(audio_out)):.4f}, min={np.min(audio_out):.4f}")
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | return audio_out
|
| |
|
| | def convert(
|
| | self,
|
| | audio_path: str,
|
| | output_path: str,
|
| | pitch_shift: float = 0,
|
| | index_ratio: float = 0.2,
|
| | filter_radius: int = 3,
|
| | resample_sr: int = 0,
|
| | rms_mix_rate: float = 0.25,
|
| | protect: float = 0.33,
|
| | speaker_id: int = 0,
|
| | silence_gate: bool = True,
|
| | silence_threshold_db: float = -45.0,
|
| | silence_smoothing_ms: float = 50.0,
|
| | silence_min_duration_ms: float = 200.0
|
| | ) -> str:
|
| | """
|
| | 执行 RVC 推理
|
| |
|
| | Args:
|
| | audio_path: 输入音频路径
|
| | output_path: 输出音频路径
|
| | pitch_shift: 音调偏移 (半音)
|
| | index_ratio: 索引混合比率 (0-1)
|
| | filter_radius: 中值滤波半径
|
| | resample_sr: 重采样率 (0 表示不重采样)
|
| | rms_mix_rate: RMS 混合比率
|
| | protect: 保护清辅音
|
| | speaker_id: 说话人 ID(多说话人模型可调)
|
| | silence_gate: 启用静音门限(默认开启以消除静音段底噪)
|
| | silence_threshold_db: 静音阈值 (dB, 相对峰值)
|
| | silence_smoothing_ms: 门限平滑时长 (ms)
|
| | silence_min_duration_ms: 最短静音时长 (ms)
|
| |
|
| | Returns:
|
| | str: 输出文件路径
|
| | """
|
| |
|
| | if self.voice_model is None:
|
| | raise RuntimeError("请先加载语音模型")
|
| | if self.hubert_model is None:
|
| | raise RuntimeError("请先加载 HuBERT 模型")
|
| | if self.f0_extractor is None:
|
| | raise RuntimeError("请先加载 F0 提取器")
|
| |
|
| |
|
| | audio = load_audio(audio_path, sr=self.sample_rate)
|
| | audio = normalize_audio(audio)
|
| | rms_mix_rate = float(np.clip(rms_mix_rate, 0.0, 1.0))
|
| | speaker_id = int(max(0, min(max(1, int(self.spk_count)) - 1, int(speaker_id))))
|
| |
|
| |
|
| | audio = sp_signal.filtfilt(_bh, _ah, audio).astype(np.float32)
|
| |
|
| |
|
| | f0 = self.f0_extractor.extract(audio)
|
| |
|
| |
|
| | if pitch_shift != 0:
|
| | f0 = shift_f0(f0, pitch_shift)
|
| |
|
| |
|
| | if filter_radius > 0:
|
| | from scipy.ndimage import median_filter
|
| |
|
| |
|
| | f0_semitone_diff = np.abs(12 * np.log2((f0 + 1e-6) / (np.roll(f0, 1) + 1e-6)))
|
| | f0_semitone_diff[0] = 0
|
| |
|
| |
|
| | need_filter = f0_semitone_diff > 2.0
|
| |
|
| |
|
| | kernel = np.ones(3, dtype=bool)
|
| | need_filter = np.convolve(need_filter, kernel, mode='same')
|
| |
|
| |
|
| | f0_filtered = median_filter(f0, size=filter_radius)
|
| |
|
| |
|
| |
|
| | high_pitch_mask = f0 > 500
|
| |
|
| |
|
| | if np.any(high_pitch_mask):
|
| | f0_filtered_high = median_filter(f0, size=max(1, filter_radius // 2))
|
| | f0_filtered = np.where(high_pitch_mask, f0_filtered_high, f0_filtered)
|
| |
|
| |
|
| | f0 = np.where(need_filter, f0_filtered, f0)
|
| |
|
| |
|
| | self.unload_f0_extractor()
|
| |
|
| |
|
| |
|
| | use_final_proj = (self.model_version == "v1")
|
| | features = self.extract_features(audio, use_final_proj=use_final_proj)
|
| | features = features.squeeze(0).cpu().numpy()
|
| |
|
| |
|
| | self.unload_hubert()
|
| |
|
| |
|
| | if self.index is not None and index_ratio > 0:
|
| | features_before_index = features.copy()
|
| | retrieved = self.search_index(features)
|
| |
|
| |
|
| |
|
| | adaptive_index_ratio = np.ones(len(features)) * index_ratio
|
| |
|
| | f0_per_feat = 2
|
| | for fi in range(len(features)):
|
| | f0_start = fi * f0_per_feat
|
| | f0_end = min(f0_start + f0_per_feat, len(f0))
|
| | if f0_end > f0_start:
|
| | f0_segment = f0[f0_start:f0_end]
|
| | avg_f0 = np.mean(f0_segment[f0_segment > 0]) if np.any(f0_segment > 0) else 0
|
| |
|
| | if avg_f0 > 450:
|
| | adaptive_index_ratio[fi] = min(0.75, index_ratio * 1.3)
|
| |
|
| | adaptive_index_ratio = adaptive_index_ratio[:, np.newaxis]
|
| | features = features * (1 - adaptive_index_ratio) + retrieved * adaptive_index_ratio
|
| |
|
| |
|
| |
|
| | if protect < 0.5:
|
| |
|
| |
|
| |
|
| | f0_per_feat = 2
|
| | n_feat = features.shape[0]
|
| | protect_mask = np.ones(n_feat, dtype=np.float32)
|
| |
|
| |
|
| | for fi in range(n_feat):
|
| | f0_start = fi * f0_per_feat
|
| | f0_end = min(f0_start + f0_per_feat, len(f0))
|
| | if f0_end > f0_start:
|
| | f0_segment = f0[f0_start:f0_end]
|
| |
|
| |
|
| | if np.all(f0_segment <= 0):
|
| |
|
| | protect_mask[fi] = min(0.8, protect * 1.5)
|
| |
|
| | elif len(f0_segment) > 1 and np.std(f0_segment) > 50:
|
| | protect_mask[fi] = protect + (1.0 - protect) * 0.3
|
| |
|
| |
|
| | feat_energy = np.linalg.norm(features_before_index[fi])
|
| | if feat_energy < 0.5:
|
| | protect_mask[fi] = min(0.8, protect * 1.3)
|
| |
|
| |
|
| | smooth_kernel = np.array([1, 2, 3, 2, 1], dtype=np.float32)
|
| | smooth_kernel /= np.sum(smooth_kernel)
|
| | protect_mask = np.convolve(protect_mask, smooth_kernel, mode="same")
|
| | protect_mask = np.convolve(protect_mask, smooth_kernel, mode="same")
|
| | protect_mask = np.clip(protect_mask, protect, 1.0)
|
| | protect_mask = protect_mask[:, np.newaxis]
|
| | features = features * protect_mask + features_before_index * (1 - protect_mask)
|
| |
|
| |
|
| |
|
| | import librosa as _librosa_local
|
| | _hop_feat = 320
|
| | _n_feat = features.shape[0]
|
| | _frame_rms = _librosa_local.feature.rms(
|
| | y=audio, frame_length=_hop_feat * 2, hop_length=_hop_feat, center=True
|
| | )[0]
|
| | if _frame_rms.ndim > 1:
|
| | _frame_rms = _frame_rms[0]
|
| | if len(_frame_rms) > _n_feat:
|
| | _frame_rms = _frame_rms[:_n_feat]
|
| | elif len(_frame_rms) < _n_feat:
|
| | _frame_rms = np.pad(_frame_rms, (0, _n_feat - len(_frame_rms)), mode='edge')
|
| | _energy_db = 20.0 * np.log10(_frame_rms + 1e-8)
|
| | _ref_db = float(np.percentile(_energy_db, 95)) if _frame_rms.size > 0 else -20.0
|
| |
|
| |
|
| | _silence_threshold = _ref_db - 65.0
|
| | _is_very_quiet = (_energy_db < _silence_threshold).astype(np.float32)
|
| |
|
| |
|
| | _f0_50fps = f0[::2] if len(f0) >= _n_feat * 2 else np.pad(f0[::2], (0, _n_feat - len(f0[::2])), mode='edge')
|
| | _f0_50fps = _f0_50fps[:_n_feat]
|
| | _is_unvoiced = (_f0_50fps <= 0).astype(np.float32)
|
| |
|
| |
|
| | _is_silence = _is_very_quiet * _is_unvoiced
|
| |
|
| |
|
| | _sm = np.array([1, 2, 3, 2, 1], dtype=np.float32)
|
| | _sm /= _sm.sum()
|
| | _is_silence = np.convolve(_is_silence, _sm, mode='same')[:_n_feat]
|
| |
|
| |
|
| | _min_silence_frames = 10
|
| | _silence_binary = (_is_silence > 0.7).astype(int)
|
| | _changes = np.diff(np.concatenate(([0], _silence_binary, [0])))
|
| | _starts = np.where(_changes == 1)[0]
|
| | _ends = np.where(_changes == -1)[0]
|
| | _keep_silence = np.zeros_like(_silence_binary, dtype=bool)
|
| | for _s, _e in zip(_starts, _ends):
|
| | if _e - _s >= _min_silence_frames:
|
| | _keep_silence[_s:_e] = True
|
| |
|
| |
|
| | _energy_gate = np.where(_keep_silence, 0.3, 1.0).astype(np.float32)
|
| |
|
| |
|
| | features = features * _energy_gate[:, np.newaxis]
|
| |
|
| |
|
| | _f0_gate = np.repeat(_energy_gate, 2)
|
| | if len(_f0_gate) > len(f0):
|
| | _f0_gate = _f0_gate[:len(f0)]
|
| | elif len(_f0_gate) < len(f0):
|
| | _f0_gate = np.pad(_f0_gate, (0, len(f0) - len(_f0_gate)), mode='constant', constant_values=1.0)
|
| | f0 = f0 * _f0_gate
|
| |
|
| |
|
| |
|
| | CHUNK_SECONDS = 30
|
| | OVERLAP_SECONDS = 2.0
|
| | HOP_LENGTH = 320
|
| |
|
| |
|
| | chunk_frames = int(CHUNK_SECONDS * self.sample_rate / HOP_LENGTH)
|
| | overlap_frames = int(OVERLAP_SECONDS * self.sample_rate / HOP_LENGTH)
|
| |
|
| | total_frames = features.shape[0]
|
| |
|
| |
|
| | if total_frames <= chunk_frames:
|
| | audio_out = self._process_chunk(features, f0, speaker_id=speaker_id)
|
| | else:
|
| |
|
| | log.info(f"音频较长 ({total_frames} 帧),启用分块处理...")
|
| | audio_chunks = []
|
| | chunk_idx = 0
|
| |
|
| | for start in range(0, total_frames, chunk_frames - overlap_frames):
|
| | end = min(start + chunk_frames, total_frames)
|
| | chunk_features = features[start:end]
|
| |
|
| |
|
| |
|
| | f0_start = start * 2
|
| | f0_end = min(end * 2, len(f0))
|
| | chunk_f0 = f0[f0_start:f0_end]
|
| |
|
| | log.debug(f"处理块 {chunk_idx}: 帧 {start}-{end}")
|
| |
|
| |
|
| | chunk_audio = self._process_chunk(chunk_features, chunk_f0, speaker_id=speaker_id)
|
| | audio_chunks.append(chunk_audio)
|
| | chunk_idx += 1
|
| |
|
| |
|
| | gc.collect()
|
| | empty_device_cache(self.device)
|
| |
|
| |
|
| | audio_out = self._crossfade_chunks(audio_chunks, overlap_frames)
|
| | log.info(f"分块处理完成,共 {chunk_idx} 块")
|
| |
|
| |
|
| | if isinstance(audio_out, tuple):
|
| | audio_out = audio_out[0]
|
| | audio_out = np.asarray(audio_out).flatten()
|
| |
|
| |
|
| | if resample_sr > 0 and resample_sr != self.output_sr:
|
| | import librosa
|
| | audio_out = librosa.resample(
|
| | audio_out,
|
| | orig_sr=self.output_sr,
|
| | target_sr=resample_sr
|
| | )
|
| | save_sr = resample_sr
|
| | else:
|
| | save_sr = self.output_sr
|
| |
|
| |
|
| | if rms_mix_rate > 0:
|
| | audio_out = self._apply_rms_mix(
|
| | audio_out=audio_out,
|
| | audio_in=audio,
|
| | sr_out=save_sr,
|
| | sr_in=self.sample_rate,
|
| | hop_length=160,
|
| | rms_mix_rate=rms_mix_rate
|
| | )
|
| |
|
| |
|
| | if silence_gate:
|
| | audio_out = self._apply_silence_gate(
|
| | audio_out=audio_out,
|
| | audio_in=audio,
|
| | f0=f0,
|
| | sr_out=save_sr,
|
| | sr_in=self.sample_rate,
|
| | hop_length=160,
|
| | threshold_db=silence_threshold_db,
|
| | smoothing_ms=silence_smoothing_ms,
|
| | min_silence_ms=silence_min_duration_ms,
|
| | protect=protect
|
| | )
|
| |
|
| |
|
| |
|
| | try:
|
| | from lib.vocal_cleanup import apply_vocal_cleanup
|
| | audio_out = apply_vocal_cleanup(
|
| | audio_out,
|
| | sr=save_sr,
|
| | reduce_sibilance_enabled=False,
|
| | reduce_breath_enabled=False,
|
| | sibilance_reduction_db=2.0,
|
| | breath_reduction_db=0.0
|
| | )
|
| | log.detail("已应用人声清理")
|
| | except Exception as e:
|
| | log.warning(f"人声清理失败: {e}")
|
| |
|
| |
|
| |
|
| | try:
|
| | from lib.vocoder_fix import apply_vocoder_artifact_fix
|
| |
|
| |
|
| | if len(f0) > 0:
|
| | import librosa
|
| |
|
| | f0_resampled = librosa.resample(
|
| | f0.astype(np.float32),
|
| | orig_sr=100,
|
| | target_sr=save_sr / (save_sr / 16000 * 160)
|
| | )
|
| | else:
|
| | f0_resampled = None
|
| |
|
| | audio_out = apply_vocoder_artifact_fix(
|
| | audio_out,
|
| | sr=save_sr,
|
| | f0=f0_resampled,
|
| | chunk_boundaries=None,
|
| | fix_phase=True,
|
| | fix_breath=True,
|
| | fix_sustained=False
|
| | )
|
| | log.detail("已应用vocoder伪影修复(相位+底噪清理)")
|
| | except Exception as e:
|
| | log.warning(f"Vocoder伪影修复失败: {e}")
|
| |
|
| |
|
| | audio_out = soft_clip(audio_out, threshold=0.9, ceiling=0.99)
|
| |
|
| |
|
| | save_audio(output_path, audio_out, sr=save_sr)
|
| |
|
| | return output_path
|
| |
|
| | def _crossfade_chunks(self, chunks: list, overlap_frames: int) -> np.ndarray:
|
| | """
|
| | 使用 SOLA (Synchronized Overlap-Add) 拼接音频块
|
| |
|
| | SOLA 通过在重叠区域搜索最佳相位对齐点来避免分块边界的撕裂伪影。
|
| | 参考: w-okada/voice-changer Issue #163, DDSP-SVC 实现
|
| |
|
| | Args:
|
| | chunks: 音频块列表
|
| | overlap_frames: 重叠帧数(特征帧)
|
| |
|
| | Returns:
|
| | np.ndarray: 拼接后的音频
|
| | """
|
| | if len(chunks) == 1:
|
| | return chunks[0]
|
| |
|
| |
|
| |
|
| |
|
| | HOP_LENGTH = 320
|
| | INPUT_SR = 16000
|
| | output_sr = getattr(self, 'output_sr', 40000)
|
| |
|
| |
|
| | samples_per_frame = int(HOP_LENGTH * output_sr / INPUT_SR)
|
| | overlap_samples = overlap_frames * samples_per_frame
|
| |
|
| | log.debug(f"SOLA Crossfade: overlap_frames={overlap_frames}, samples_per_frame={samples_per_frame}, overlap_samples={overlap_samples}")
|
| |
|
| | result = chunks[0]
|
| |
|
| | for i in range(1, len(chunks)):
|
| | chunk = chunks[i]
|
| |
|
| |
|
| | actual_overlap = min(overlap_samples, len(result), len(chunk))
|
| |
|
| | if actual_overlap > 0:
|
| |
|
| |
|
| | search_range = min(int(output_sr * 0.005), actual_overlap // 4)
|
| |
|
| |
|
| | reference = result[-actual_overlap:]
|
| |
|
| |
|
| | best_offset = 0
|
| | max_correlation = -1.0
|
| |
|
| | for offset in range(max(0, -search_range), min(search_range + 1, len(chunk) - actual_overlap + 1)):
|
| |
|
| | candidate_start = max(0, offset)
|
| | candidate_end = candidate_start + actual_overlap
|
| |
|
| | if candidate_end > len(chunk):
|
| | continue
|
| |
|
| | candidate = chunk[candidate_start:candidate_end]
|
| |
|
| |
|
| | ref_norm = np.linalg.norm(reference)
|
| | cand_norm = np.linalg.norm(candidate)
|
| |
|
| | if ref_norm > 1e-6 and cand_norm > 1e-6:
|
| | correlation = np.dot(reference, candidate) / (ref_norm * cand_norm)
|
| |
|
| | if correlation > max_correlation:
|
| | max_correlation = correlation
|
| | best_offset = offset
|
| |
|
| | log.debug(f"SOLA chunk {i}: best_offset={best_offset}, correlation={max_correlation:.4f}")
|
| |
|
| |
|
| | if max_correlation < 0.3:
|
| | log.debug(f"SOLA chunk {i}: low correlation, using simple crossfade")
|
| | fade_out = np.linspace(1, 0, actual_overlap)
|
| | fade_in = np.linspace(0, 1, actual_overlap)
|
| | result_end = result[-actual_overlap:] * fade_out
|
| | chunk_start = chunk[:actual_overlap] * fade_in
|
| | result = np.concatenate([
|
| | result[:-actual_overlap],
|
| | result_end + chunk_start,
|
| | chunk[actual_overlap:]
|
| | ])
|
| | continue
|
| |
|
| |
|
| | aligned_start = max(0, best_offset)
|
| | aligned_end = aligned_start + actual_overlap
|
| |
|
| | if aligned_end <= len(chunk):
|
| |
|
| | fade_out = np.cos(np.linspace(0, np.pi / 2, actual_overlap)) ** 2
|
| | fade_in = np.sin(np.linspace(0, np.pi / 2, actual_overlap)) ** 2
|
| |
|
| |
|
| | result_end = result[-actual_overlap:] * fade_out
|
| | chunk_aligned = chunk[aligned_start:aligned_end] * fade_in
|
| |
|
| |
|
| | result = np.concatenate([
|
| | result[:-actual_overlap],
|
| | result_end + chunk_aligned,
|
| | chunk[aligned_end:]
|
| | ])
|
| | else:
|
| |
|
| | log.warning(f"SOLA alignment failed for chunk {i}, using simple crossfade")
|
| | fade_out = np.linspace(1, 0, actual_overlap)
|
| | fade_in = np.linspace(0, 1, actual_overlap)
|
| | result_end = result[-actual_overlap:] * fade_out
|
| | chunk_start = chunk[:actual_overlap] * fade_in
|
| | result = np.concatenate([
|
| | result[:-actual_overlap],
|
| | result_end + chunk_start,
|
| | chunk[actual_overlap:]
|
| | ])
|
| | else:
|
| |
|
| | result = np.concatenate([result, chunk])
|
| |
|
| | return result
|
| |
|
| |
|
| | def list_voice_models(weights_dir: str = "assets/weights") -> list:
|
| | """
|
| | 列出可用的语音模型
|
| |
|
| | Args:
|
| | weights_dir: 模型目录
|
| |
|
| | Returns:
|
| | list: 模型信息列表
|
| | """
|
| | models = []
|
| | weights_path = Path(weights_dir)
|
| |
|
| | if not weights_path.exists():
|
| | return models
|
| |
|
| |
|
| | for pth_file in weights_path.glob("**/*.pth"):
|
| |
|
| | index_file = pth_file.with_suffix(".index")
|
| | if not index_file.exists():
|
| |
|
| | index_file = pth_file.parent / f"{pth_file.stem}_v2.index"
|
| | if not index_file.exists():
|
| |
|
| | for f in pth_file.parent.glob("*.index"):
|
| | if f.stem.lower() == pth_file.stem.lower():
|
| | index_file = f
|
| | break
|
| |
|
| | models.append({
|
| | "name": pth_file.stem,
|
| | "model_path": str(pth_file),
|
| | "index_path": str(index_file) if index_file.exists() else None
|
| | })
|
| |
|
| | return models
|
| |
|
| |
|
| |
|