| |
|
| | """
|
| | F0 (基频) 提取模块 - 支持多种提取方法
|
| | """
|
| | import numpy as np
|
| | import torch
|
| | from typing import Optional, Literal
|
| |
|
| |
|
| | F0Method = Literal["rmvpe", "pm", "harvest", "crepe", "hybrid"]
|
| |
|
| |
|
| | class F0Extractor:
|
| | """F0 提取器基类"""
|
| |
|
| | def __init__(self, sample_rate: int = 16000, hop_length: int = 160):
|
| | self.sample_rate = sample_rate
|
| | self.hop_length = hop_length
|
| | self.f0_min = 50
|
| | self.f0_max = 1100
|
| |
|
| | def extract(self, audio: np.ndarray) -> np.ndarray:
|
| | """提取 F0,子类需实现此方法"""
|
| | raise NotImplementedError
|
| |
|
| |
|
| | class PMExtractor(F0Extractor):
|
| | """Parselmouth (Praat) F0 提取器 - 速度快"""
|
| |
|
| | def extract(self, audio: np.ndarray) -> np.ndarray:
|
| | import parselmouth
|
| |
|
| | time_step = self.hop_length / self.sample_rate
|
| | sound = parselmouth.Sound(audio, self.sample_rate)
|
| |
|
| | pitch = sound.to_pitch_ac(
|
| | time_step=time_step,
|
| | voicing_threshold=0.6,
|
| | pitch_floor=self.f0_min,
|
| | pitch_ceiling=self.f0_max
|
| | )
|
| |
|
| | f0 = pitch.selected_array["frequency"]
|
| | f0[f0 == 0] = np.nan
|
| |
|
| | return f0
|
| |
|
| |
|
| | class HarvestExtractor(F0Extractor):
|
| | """PyWorld Harvest F0 提取器 - 质量较好"""
|
| |
|
| | def extract(self, audio: np.ndarray) -> np.ndarray:
|
| | import pyworld
|
| |
|
| | audio = audio.astype(np.float64)
|
| | f0, _ = pyworld.harvest(
|
| | audio,
|
| | self.sample_rate,
|
| | f0_floor=self.f0_min,
|
| | f0_ceil=self.f0_max,
|
| | frame_period=self.hop_length / self.sample_rate * 1000
|
| | )
|
| |
|
| | return f0
|
| |
|
| |
|
| | class CrepeExtractor(F0Extractor):
|
| | """TorchCrepe F0 提取器 - 深度学习方法"""
|
| |
|
| | def __init__(self, sample_rate: int = 16000, hop_length: int = 160,
|
| | device: str = "cuda"):
|
| | super().__init__(sample_rate, hop_length)
|
| | self.device = device
|
| |
|
| | def extract(self, audio: np.ndarray) -> np.ndarray:
|
| | import torchcrepe
|
| |
|
| | audio_tensor = torch.from_numpy(audio).float().unsqueeze(0)
|
| | audio_tensor = audio_tensor.to(self.device)
|
| |
|
| | f0, _ = torchcrepe.predict(
|
| | audio_tensor,
|
| | self.sample_rate,
|
| | self.hop_length,
|
| | self.f0_min,
|
| | self.f0_max,
|
| | model="full",
|
| | batch_size=512,
|
| | device=self.device,
|
| | return_periodicity=True
|
| | )
|
| |
|
| | f0 = f0.squeeze(0).cpu().numpy()
|
| | return f0
|
| |
|
| |
|
| | class RMVPEExtractor(F0Extractor):
|
| | """RMVPE F0 提取器 - 质量最高 (推荐)"""
|
| |
|
| | def __init__(self, model_path: str, sample_rate: int = 16000,
|
| | hop_length: int = 160, device: str = "cuda"):
|
| | super().__init__(sample_rate, hop_length)
|
| | self.device = device
|
| | self.model = None
|
| | self.model_path = model_path
|
| |
|
| | def load_model(self):
|
| | """加载 RMVPE 模型"""
|
| | if self.model is not None:
|
| | return
|
| |
|
| | from models.rmvpe import RMVPE
|
| |
|
| | self.model = RMVPE(self.model_path, device=self.device)
|
| | print(f"RMVPE 模型已加载: {self.device}")
|
| |
|
| | def extract(self, audio: np.ndarray) -> np.ndarray:
|
| | self.load_model()
|
| |
|
| |
|
| | f0 = self.model.infer_from_audio(audio, thred=0.01)
|
| |
|
| | return f0
|
| |
|
| |
|
| | def get_f0_extractor(method: F0Method, device: str = "cuda",
|
| | rmvpe_path: str = None, crepe_threshold: float = 0.05) -> F0Extractor:
|
| | """
|
| | 获取 F0 提取器实例
|
| |
|
| | Args:
|
| | method: 提取方法 ("rmvpe", "pm", "harvest", "crepe", "hybrid")
|
| | device: 计算设备
|
| | rmvpe_path: RMVPE 模型路径 (rmvpe/hybrid 方法需要)
|
| | crepe_threshold: CREPE置信度阈值 (仅hybrid方法使用)
|
| |
|
| | Returns:
|
| | F0Extractor: 提取器实例
|
| | """
|
| | if method == "rmvpe":
|
| | if rmvpe_path is None:
|
| | raise ValueError("RMVPE 方法需要指定模型路径")
|
| | return RMVPEExtractor(rmvpe_path, device=device)
|
| | elif method == "hybrid":
|
| | if rmvpe_path is None:
|
| | raise ValueError("Hybrid 方法需要指定RMVPE模型路径")
|
| | return HybridF0Extractor(rmvpe_path, device=device, crepe_threshold=crepe_threshold)
|
| | elif method == "pm":
|
| | return PMExtractor()
|
| | elif method == "harvest":
|
| | return HarvestExtractor()
|
| | elif method == "crepe":
|
| | return CrepeExtractor(device=device)
|
| | else:
|
| | raise ValueError(f"未知的 F0 提取方法: {method}")
|
| |
|
| |
|
| | class HybridF0Extractor(F0Extractor):
|
| | """混合F0提取器 - RMVPE主导 + CREPE高精度补充"""
|
| |
|
| | def __init__(self, rmvpe_path: str, sample_rate: int = 16000,
|
| | hop_length: int = 160, device: str = "cuda",
|
| | crepe_threshold: float = 0.05):
|
| | super().__init__(sample_rate, hop_length)
|
| | self.device = device
|
| | self.rmvpe = RMVPEExtractor(rmvpe_path, sample_rate, hop_length, device)
|
| | self.crepe = None
|
| | self.crepe_threshold = crepe_threshold
|
| |
|
| | def _load_crepe(self):
|
| | """延迟加载CREPE模型"""
|
| | if self.crepe is None:
|
| | try:
|
| | self.crepe = CrepeExtractor(self.sample_rate, self.hop_length, self.device)
|
| | except ImportError:
|
| | print("警告: torchcrepe未安装,混合F0将仅使用RMVPE")
|
| | self.crepe = False
|
| |
|
| | def extract(self, audio: np.ndarray) -> np.ndarray:
|
| | """
|
| | 混合提取F0:
|
| | 1. 使用RMVPE作为主要方法(快速、稳定)
|
| | 2. 在RMVPE不稳定的区域使用CREPE补充(高精度)
|
| | """
|
| |
|
| | f0_rmvpe = self.rmvpe.extract(audio)
|
| |
|
| |
|
| | self._load_crepe()
|
| | if self.crepe is False:
|
| | return f0_rmvpe
|
| |
|
| |
|
| | import torchcrepe
|
| | audio_tensor = torch.from_numpy(audio).float().unsqueeze(0).to(self.device)
|
| | f0_crepe, confidence = torchcrepe.predict(
|
| | audio_tensor,
|
| | self.sample_rate,
|
| | self.hop_length,
|
| | self.f0_min,
|
| | self.f0_max,
|
| | model="full",
|
| | batch_size=512,
|
| | device=self.device,
|
| | return_periodicity=True
|
| | )
|
| | f0_crepe = f0_crepe.squeeze(0).cpu().numpy()
|
| | confidence = confidence.squeeze(0).cpu().numpy()
|
| |
|
| |
|
| | min_len = min(len(f0_rmvpe), len(f0_crepe), len(confidence))
|
| | f0_rmvpe = f0_rmvpe[:min_len]
|
| | f0_crepe = f0_crepe[:min_len]
|
| | confidence = confidence[:min_len]
|
| |
|
| |
|
| |
|
| | f0_diff = np.abs(np.diff(f0_rmvpe, prepend=f0_rmvpe[0]))
|
| | semitone_diff = np.abs(12 * np.log2((f0_rmvpe + 1e-6) / (np.roll(f0_rmvpe, 1) + 1e-6)))
|
| | semitone_diff[0] = 0
|
| | unstable_jump = semitone_diff > 3.0
|
| |
|
| |
|
| | unstable_unvoiced = (f0_rmvpe < 1e-3) & (confidence > self.crepe_threshold)
|
| |
|
| |
|
| | f0_ratio = (f0_crepe + 1e-6) / (f0_rmvpe + 1e-6)
|
| | semitone_gap = np.abs(12 * np.log2(f0_ratio))
|
| | unstable_diverge = (semitone_gap > 2.0) & (confidence > self.crepe_threshold * 1.5)
|
| |
|
| |
|
| | unstable_mask = unstable_jump | unstable_unvoiced | unstable_diverge
|
| |
|
| |
|
| | kernel = np.ones(5, dtype=bool)
|
| | unstable_mask = np.convolve(unstable_mask, kernel, mode='same')
|
| |
|
| |
|
| | f0_hybrid = f0_rmvpe.copy()
|
| | f0_hybrid[unstable_mask] = f0_crepe[unstable_mask]
|
| |
|
| |
|
| | for i in range(1, len(f0_hybrid) - 1):
|
| | if unstable_mask[i] != unstable_mask[i-1]:
|
| |
|
| | w = 0.5
|
| | f0_hybrid[i] = w * f0_rmvpe[i] + (1-w) * f0_crepe[i]
|
| |
|
| | return f0_hybrid
|
| |
|
| |
|
| | def shift_f0(f0: np.ndarray, semitones: float) -> np.ndarray:
|
| | """
|
| | 音调偏移
|
| |
|
| | Args:
|
| | f0: 原始 F0
|
| | semitones: 偏移半音数 (正数升调,负数降调)
|
| |
|
| | Returns:
|
| | np.ndarray: 偏移后的 F0
|
| | """
|
| | factor = 2 ** (semitones / 12)
|
| | f0_shifted = f0 * factor
|
| | return f0_shifted
|
| |
|