AI-RVC / infer /quality_policy.py
mason369's picture
Release v1.2.1
a9536c4 verified
Raw
History Blame Contribute Delete
10.1 kB
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
FALLBACK_HYBRID_MODES = {
"fallback",
"smart",
"rmvpe+fallback",
"rmvpe_fallback",
"rmvpe-fallback",
"hybrid_fallback",
"hybrid-fallback",
}
@dataclass(frozen=True)
class F0RoutingPolicy:
requested_method: str
vc_method: str
hybrid_mode: str
gate_method: str
description: str
def resolve_cover_f0_policy(
requested_method: str,
configured_hybrid_mode: str = "off",
repair_profile: bool = False,
) -> F0RoutingPolicy:
requested = str(requested_method or "rmvpe").strip().lower()
configured = str(configured_hybrid_mode or "off").strip().lower()
if requested != "hybrid":
return F0RoutingPolicy(
requested_method=requested,
vc_method=requested,
hybrid_mode=configured or "off",
gate_method=requested,
description=f"{requested} uses the configured routing directly.",
)
hybrid_mode = configured if configured in FALLBACK_HYBRID_MODES else "fallback"
if repair_profile:
hybrid_mode = "fallback"
return F0RoutingPolicy(
requested_method="hybrid",
vc_method="rmvpe",
hybrid_mode=hybrid_mode,
gate_method="rmvpe",
description="hybrid request routed to RMVPE with conservative fallback; post-gate uses RMVPE only.",
)
def build_conservative_crepe_fill_mask(
f0_rmvpe: np.ndarray,
f0_crepe: np.ndarray,
confidence: np.ndarray,
confidence_threshold: float,
max_ratio: float = 0.02,
max_frames: int = 320,
context_radius: int = 6,
energy_mask: np.ndarray | None = None,
) -> np.ndarray:
f0_rmvpe = np.asarray(f0_rmvpe, dtype=np.float32).reshape(-1)
f0_crepe = np.asarray(f0_crepe, dtype=np.float32).reshape(-1)
confidence = np.asarray(confidence, dtype=np.float32).reshape(-1)
n = min(len(f0_rmvpe), len(f0_crepe), len(confidence))
if n == 0:
return np.zeros(0, dtype=bool)
f0_rmvpe = f0_rmvpe[:n]
f0_crepe = f0_crepe[:n]
confidence = confidence[:n]
threshold = max(0.0, float(confidence_threshold))
context_radius = max(1, int(context_radius))
max_ratio = max(0.0, float(max_ratio))
max_frames = max(0, int(max_frames))
if energy_mask is None:
energy_mask = np.ones(n, dtype=bool)
else:
energy_mask = np.asarray(energy_mask, dtype=bool).reshape(-1)
if len(energy_mask) < n:
energy_mask = np.pad(energy_mask, (0, n - len(energy_mask)), mode="edge")
else:
energy_mask = energy_mask[:n]
voiced_seed = f0_rmvpe > 0
if not np.any(voiced_seed):
return np.zeros(n, dtype=bool)
idx = np.arange(n)
left_seen = np.where(voiced_seed, idx, -10**9)
left_seen = np.maximum.accumulate(left_seen)
right_seen = np.where(voiced_seed, idx, 10**9)
right_seen = np.minimum.accumulate(right_seen[::-1])[::-1]
voiced_context = ((idx - left_seen) <= context_radius) & ((right_seen - idx) <= context_radius)
fill_mask = (
(f0_rmvpe <= 0)
& (f0_crepe > 0)
& (confidence >= threshold)
& energy_mask
& voiced_context
)
fill_count = int(np.sum(fill_mask))
if fill_count == 0:
return fill_mask
fill_ratio = float(fill_count) / float(n)
if fill_count > max_frames or fill_ratio > max_ratio:
return np.zeros(n, dtype=bool)
return fill_mask
def build_conservative_harvest_fill_mask(
reference_f0: np.ndarray,
fallback_f0: np.ndarray,
dropout_mask: np.ndarray,
max_run: int = 10,
local_radius: int = 4,
max_semitones: float = 4.0,
min_neighbors: int = 2,
) -> np.ndarray:
reference_f0 = np.asarray(reference_f0, dtype=np.float32).reshape(-1)
fallback_f0 = np.asarray(fallback_f0, dtype=np.float32).reshape(-1)
dropout_mask = np.asarray(dropout_mask, dtype=bool).reshape(-1)
n = min(reference_f0.size, fallback_f0.size, dropout_mask.size)
if n <= 0:
return np.zeros(0, dtype=bool)
reference_f0 = reference_f0[:n]
fallback_f0 = fallback_f0[:n]
dropout_mask = dropout_mask[:n]
accepted = np.zeros(n, dtype=bool)
max_run = max(1, int(max_run))
local_radius = max(1, int(local_radius))
max_semitones = max(0.0, float(max_semitones))
min_neighbors = max(1, int(min_neighbors))
padded = np.pad(dropout_mask.astype(np.int8), (1, 1), mode="constant")
edges = np.diff(padded)
starts = np.where(edges == 1)[0]
ends = np.where(edges == -1)[0]
eps = 1e-6
for start, end in zip(starts, ends):
run_slice = slice(start, end)
run_len = end - start
if run_len <= 0 or run_len > max_run:
continue
run_fallback = fallback_f0[run_slice]
voiced_run = run_fallback > 0
if not np.any(voiced_run):
continue
left = reference_f0[max(0, start - local_radius) : start]
right = reference_f0[end : min(n, end + local_radius)]
neighbors = np.concatenate([left[left > 0], right[right > 0]])
if neighbors.size < min_neighbors:
continue
local_median = float(np.median(neighbors))
if local_median <= 0:
continue
semitone_diff = np.abs(
12.0 * np.log2((run_fallback + eps) / (local_median + eps))
)
accepted[run_slice] = voiced_run & (semitone_diff <= max_semitones)
return accepted
def compute_chunk_crossfade_samples(
tgt_sr: int,
t_pad_tgt: int,
segment_count: int,
) -> int:
tgt_sr = int(max(tgt_sr, 0))
t_pad_tgt = int(max(t_pad_tgt, 0))
segment_count = int(max(segment_count, 0))
if tgt_sr <= 0 or t_pad_tgt <= 0 or segment_count <= 1:
return 0
base = int(round(tgt_sr * 0.018))
extra = int(round(tgt_sr * 0.002 * max(0, segment_count - 2)))
min_crossfade = int(round(tgt_sr * 0.012))
max_crossfade = max(min_crossfade, t_pad_tgt // 3)
return int(np.clip(base + extra, min_crossfade, max_crossfade))
def compute_active_source_replace(
activity: np.ndarray,
soft_mask: np.ndarray,
echo_ratio: np.ndarray,
direct_ratio: np.ndarray,
max_replace: float = 0.82,
) -> np.ndarray:
activity = np.asarray(activity, dtype=np.float32).reshape(-1)
direct_ratio = np.asarray(direct_ratio, dtype=np.float32).reshape(-1)
soft_mask = np.asarray(soft_mask, dtype=np.float32)
echo_ratio = np.asarray(echo_ratio, dtype=np.float32)
if soft_mask.ndim == 1:
soft_mask = soft_mask[np.newaxis, :]
if echo_ratio.ndim == 1:
echo_ratio = echo_ratio[np.newaxis, :]
frame_count = min(soft_mask.shape[-1], echo_ratio.shape[-1], len(activity), len(direct_ratio))
if frame_count <= 0:
return np.zeros_like(soft_mask, dtype=np.float32)
soft_mask = soft_mask[..., :frame_count]
echo_ratio = echo_ratio[..., :frame_count]
activity = activity[:frame_count][np.newaxis, :]
direct_ratio = direct_ratio[:frame_count][np.newaxis, :]
base_replace = 0.85 * (1.0 - activity) * (1.0 - soft_mask)
active_echo_presence = np.clip(
echo_ratio * (0.35 + 0.65 * (1.0 - direct_ratio)),
0.0,
1.0,
)
active_replace = 0.65 * activity * active_echo_presence * (1.0 - soft_mask)
source_replace = np.clip(base_replace + active_replace, 0.0, float(max_replace))
return source_replace.astype(np.float32)
def compute_source_cleanup_budget(
energy_guard: np.ndarray,
phrase_activity: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
energy_guard = np.clip(np.asarray(energy_guard, dtype=np.float32), 0.0, 1.0)
phrase_activity = np.clip(np.asarray(phrase_activity, dtype=np.float32), 0.0, 1.0)
allowed_boost = 0.35 + 1.00 * energy_guard
cleanup_floor = 0.62 + 0.16 * phrase_activity
return allowed_boost.astype(np.float32), cleanup_floor.astype(np.float32)
def compute_breath_preserving_energy_gates(
energy_db: np.ndarray,
ref_db: float,
unvoiced_mask: np.ndarray | None,
quiet_floor: float = 0.05,
breath_floor: float = 0.28,
breath_active_margin_db: float = 52.0,
transition_width_db: float = 6.0,
) -> tuple[np.ndarray, np.ndarray]:
energy_db = np.asarray(energy_db, dtype=np.float32).reshape(-1)
if energy_db.size == 0:
empty = np.zeros(0, dtype=np.float32)
return empty, empty
quiet_floor = float(np.clip(quiet_floor, 0.0, 1.0))
breath_floor = float(np.clip(max(breath_floor, quiet_floor), quiet_floor, 1.0))
breath_active_margin_db = float(max(1.0, breath_active_margin_db))
transition_width_db = float(max(0.5, transition_width_db))
ref_db = float(ref_db)
silence_center = ref_db - 45.0
slope = transition_width_db / 4.0
base_gate = 1.0 / (1.0 + np.exp(-((energy_db - silence_center) / slope)))
base_gate = np.clip(base_gate, quiet_floor, 1.0).astype(np.float32)
if unvoiced_mask is None:
return base_gate, base_gate.copy()
unvoiced_mask = np.asarray(unvoiced_mask, dtype=bool).reshape(-1)
if len(unvoiced_mask) < len(base_gate):
unvoiced_mask = np.pad(unvoiced_mask, (0, len(base_gate) - len(unvoiced_mask)), mode="edge")
else:
unvoiced_mask = unvoiced_mask[: len(base_gate)]
breath_activity = np.clip(
(energy_db - (ref_db - breath_active_margin_db)) / 10.0,
0.0,
1.0,
).astype(np.float32)
feature_floor = quiet_floor + (breath_floor - quiet_floor) * breath_activity
feature_gate = base_gate.copy()
feature_gate[unvoiced_mask] = np.maximum(feature_gate[unvoiced_mask], feature_floor[unvoiced_mask])
feature_gate = np.clip(feature_gate, quiet_floor, 1.0).astype(np.float32)
return feature_gate, base_gate.copy()