from __future__ import annotations from dataclasses import dataclass import numpy as np FALLBACK_HYBRID_MODES = { "fallback", "smart", "rmvpe+fallback", "rmvpe_fallback", "rmvpe-fallback", "hybrid_fallback", "hybrid-fallback", } @dataclass(frozen=True) class F0RoutingPolicy: requested_method: str vc_method: str hybrid_mode: str gate_method: str description: str def resolve_cover_f0_policy( requested_method: str, configured_hybrid_mode: str = "off", repair_profile: bool = False, ) -> F0RoutingPolicy: requested = str(requested_method or "rmvpe").strip().lower() configured = str(configured_hybrid_mode or "off").strip().lower() if requested != "hybrid": return F0RoutingPolicy( requested_method=requested, vc_method=requested, hybrid_mode=configured or "off", gate_method=requested, description=f"{requested} uses the configured routing directly.", ) hybrid_mode = configured if configured in FALLBACK_HYBRID_MODES else "fallback" if repair_profile: hybrid_mode = "fallback" return F0RoutingPolicy( requested_method="hybrid", vc_method="rmvpe", hybrid_mode=hybrid_mode, gate_method="rmvpe", description="hybrid request routed to RMVPE with conservative fallback; post-gate uses RMVPE only.", ) def build_conservative_crepe_fill_mask( f0_rmvpe: np.ndarray, f0_crepe: np.ndarray, confidence: np.ndarray, confidence_threshold: float, max_ratio: float = 0.02, max_frames: int = 320, context_radius: int = 6, energy_mask: np.ndarray | None = None, ) -> np.ndarray: f0_rmvpe = np.asarray(f0_rmvpe, dtype=np.float32).reshape(-1) f0_crepe = np.asarray(f0_crepe, dtype=np.float32).reshape(-1) confidence = np.asarray(confidence, dtype=np.float32).reshape(-1) n = min(len(f0_rmvpe), len(f0_crepe), len(confidence)) if n == 0: return np.zeros(0, dtype=bool) f0_rmvpe = f0_rmvpe[:n] f0_crepe = f0_crepe[:n] confidence = confidence[:n] threshold = max(0.0, float(confidence_threshold)) context_radius = max(1, int(context_radius)) max_ratio = max(0.0, float(max_ratio)) max_frames = max(0, int(max_frames)) if energy_mask is None: energy_mask = np.ones(n, dtype=bool) else: energy_mask = np.asarray(energy_mask, dtype=bool).reshape(-1) if len(energy_mask) < n: energy_mask = np.pad(energy_mask, (0, n - len(energy_mask)), mode="edge") else: energy_mask = energy_mask[:n] voiced_seed = f0_rmvpe > 0 if not np.any(voiced_seed): return np.zeros(n, dtype=bool) idx = np.arange(n) left_seen = np.where(voiced_seed, idx, -10**9) left_seen = np.maximum.accumulate(left_seen) right_seen = np.where(voiced_seed, idx, 10**9) right_seen = np.minimum.accumulate(right_seen[::-1])[::-1] voiced_context = ((idx - left_seen) <= context_radius) & ((right_seen - idx) <= context_radius) fill_mask = ( (f0_rmvpe <= 0) & (f0_crepe > 0) & (confidence >= threshold) & energy_mask & voiced_context ) fill_count = int(np.sum(fill_mask)) if fill_count == 0: return fill_mask fill_ratio = float(fill_count) / float(n) if fill_count > max_frames or fill_ratio > max_ratio: return np.zeros(n, dtype=bool) return fill_mask def build_conservative_harvest_fill_mask( reference_f0: np.ndarray, fallback_f0: np.ndarray, dropout_mask: np.ndarray, max_run: int = 10, local_radius: int = 4, max_semitones: float = 4.0, min_neighbors: int = 2, ) -> np.ndarray: reference_f0 = np.asarray(reference_f0, dtype=np.float32).reshape(-1) fallback_f0 = np.asarray(fallback_f0, dtype=np.float32).reshape(-1) dropout_mask = np.asarray(dropout_mask, dtype=bool).reshape(-1) n = min(reference_f0.size, fallback_f0.size, dropout_mask.size) if n <= 0: return np.zeros(0, dtype=bool) reference_f0 = reference_f0[:n] fallback_f0 = fallback_f0[:n] dropout_mask = dropout_mask[:n] accepted = np.zeros(n, dtype=bool) max_run = max(1, int(max_run)) local_radius = max(1, int(local_radius)) max_semitones = max(0.0, float(max_semitones)) min_neighbors = max(1, int(min_neighbors)) padded = np.pad(dropout_mask.astype(np.int8), (1, 1), mode="constant") edges = np.diff(padded) starts = np.where(edges == 1)[0] ends = np.where(edges == -1)[0] eps = 1e-6 for start, end in zip(starts, ends): run_slice = slice(start, end) run_len = end - start if run_len <= 0 or run_len > max_run: continue run_fallback = fallback_f0[run_slice] voiced_run = run_fallback > 0 if not np.any(voiced_run): continue left = reference_f0[max(0, start - local_radius) : start] right = reference_f0[end : min(n, end + local_radius)] neighbors = np.concatenate([left[left > 0], right[right > 0]]) if neighbors.size < min_neighbors: continue local_median = float(np.median(neighbors)) if local_median <= 0: continue semitone_diff = np.abs( 12.0 * np.log2((run_fallback + eps) / (local_median + eps)) ) accepted[run_slice] = voiced_run & (semitone_diff <= max_semitones) return accepted def compute_chunk_crossfade_samples( tgt_sr: int, t_pad_tgt: int, segment_count: int, ) -> int: tgt_sr = int(max(tgt_sr, 0)) t_pad_tgt = int(max(t_pad_tgt, 0)) segment_count = int(max(segment_count, 0)) if tgt_sr <= 0 or t_pad_tgt <= 0 or segment_count <= 1: return 0 base = int(round(tgt_sr * 0.018)) extra = int(round(tgt_sr * 0.002 * max(0, segment_count - 2))) min_crossfade = int(round(tgt_sr * 0.012)) max_crossfade = max(min_crossfade, t_pad_tgt // 3) return int(np.clip(base + extra, min_crossfade, max_crossfade)) def compute_active_source_replace( activity: np.ndarray, soft_mask: np.ndarray, echo_ratio: np.ndarray, direct_ratio: np.ndarray, max_replace: float = 0.82, ) -> np.ndarray: activity = np.asarray(activity, dtype=np.float32).reshape(-1) direct_ratio = np.asarray(direct_ratio, dtype=np.float32).reshape(-1) soft_mask = np.asarray(soft_mask, dtype=np.float32) echo_ratio = np.asarray(echo_ratio, dtype=np.float32) if soft_mask.ndim == 1: soft_mask = soft_mask[np.newaxis, :] if echo_ratio.ndim == 1: echo_ratio = echo_ratio[np.newaxis, :] frame_count = min(soft_mask.shape[-1], echo_ratio.shape[-1], len(activity), len(direct_ratio)) if frame_count <= 0: return np.zeros_like(soft_mask, dtype=np.float32) soft_mask = soft_mask[..., :frame_count] echo_ratio = echo_ratio[..., :frame_count] activity = activity[:frame_count][np.newaxis, :] direct_ratio = direct_ratio[:frame_count][np.newaxis, :] base_replace = 0.85 * (1.0 - activity) * (1.0 - soft_mask) active_echo_presence = np.clip( echo_ratio * (0.35 + 0.65 * (1.0 - direct_ratio)), 0.0, 1.0, ) active_replace = 0.65 * activity * active_echo_presence * (1.0 - soft_mask) source_replace = np.clip(base_replace + active_replace, 0.0, float(max_replace)) return source_replace.astype(np.float32) def compute_source_cleanup_budget( energy_guard: np.ndarray, phrase_activity: np.ndarray, ) -> tuple[np.ndarray, np.ndarray]: energy_guard = np.clip(np.asarray(energy_guard, dtype=np.float32), 0.0, 1.0) phrase_activity = np.clip(np.asarray(phrase_activity, dtype=np.float32), 0.0, 1.0) allowed_boost = 0.35 + 1.00 * energy_guard cleanup_floor = 0.62 + 0.16 * phrase_activity return allowed_boost.astype(np.float32), cleanup_floor.astype(np.float32) def compute_breath_preserving_energy_gates( energy_db: np.ndarray, ref_db: float, unvoiced_mask: np.ndarray | None, quiet_floor: float = 0.05, breath_floor: float = 0.28, breath_active_margin_db: float = 52.0, transition_width_db: float = 6.0, ) -> tuple[np.ndarray, np.ndarray]: energy_db = np.asarray(energy_db, dtype=np.float32).reshape(-1) if energy_db.size == 0: empty = np.zeros(0, dtype=np.float32) return empty, empty quiet_floor = float(np.clip(quiet_floor, 0.0, 1.0)) breath_floor = float(np.clip(max(breath_floor, quiet_floor), quiet_floor, 1.0)) breath_active_margin_db = float(max(1.0, breath_active_margin_db)) transition_width_db = float(max(0.5, transition_width_db)) ref_db = float(ref_db) silence_center = ref_db - 45.0 slope = transition_width_db / 4.0 base_gate = 1.0 / (1.0 + np.exp(-((energy_db - silence_center) / slope))) base_gate = np.clip(base_gate, quiet_floor, 1.0).astype(np.float32) if unvoiced_mask is None: return base_gate, base_gate.copy() unvoiced_mask = np.asarray(unvoiced_mask, dtype=bool).reshape(-1) if len(unvoiced_mask) < len(base_gate): unvoiced_mask = np.pad(unvoiced_mask, (0, len(base_gate) - len(unvoiced_mask)), mode="edge") else: unvoiced_mask = unvoiced_mask[: len(base_gate)] breath_activity = np.clip( (energy_db - (ref_db - breath_active_margin_db)) / 10.0, 0.0, 1.0, ).astype(np.float32) feature_floor = quiet_floor + (breath_floor - quiet_floor) * breath_activity feature_gate = base_gate.copy() feature_gate[unvoiced_mask] = np.maximum(feature_gate[unvoiced_mask], feature_floor[unvoiced_mask]) feature_gate = np.clip(feature_gate, quiet_floor, 1.0).astype(np.float32) return feature_gate, base_gate.copy()