ColabWan / postprocessing /seedvc /wgp_bridge.py
1ripon1's picture
Upload folder using huggingface_hub
7344bef verified
Raw
History Blame Contribute Delete
19.7 kB
from __future__ import annotations
import gc
import os
import tempfile
from typing import Any, Callable
_persistent_converter = None
_persistent_offloadobj = None
_persistent_profile = None
KEEP_ORIGINAL_AUDIO_OUTSIDE_TWO_SPEAKERS = True
SEEDVC_RESTORE_BACKGROUND_STEM = True
def _release_runtime_objects(converter=None, offloadobj=None) -> None:
import torch
if offloadobj is not None:
offloadobj.unload_all()
offloadobj.release()
del converter
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
def release_models() -> None:
global _persistent_converter, _persistent_offloadobj, _persistent_profile
_release_runtime_objects(_persistent_converter, _persistent_offloadobj)
_persistent_converter = None
_persistent_offloadobj = None
_persistent_profile = None
def _get_runtime(persistent_models: bool, profile_no=4, verbose_level: int = 1, init_pipe: Callable[..., int] | None = None, mode: int = 1):
import torch
from mmgp import offload
from postprocessing import seedvc
global _persistent_converter, _persistent_offloadobj, _persistent_profile
mode = seedvc.normalize_mode(mode)
profile_key = (profile_no, mode)
if _persistent_offloadobj is not None and _persistent_profile != profile_key:
release_models()
keep_alive = persistent_models
if _persistent_offloadobj is None:
converter = seedvc.get_model(dtype=torch.float16, mode=mode)
pipe = seedvc.get_pipe(profile_no=profile_no, model=converter, mode=mode)
offload_kwargs = {"coTenantsMap": seedvc.get_cotenants_map(pipe)}
if init_pipe is not None:
profile_no = init_pipe(pipe, offload_kwargs, profile_no)
offload_kwargs["pinnedMemory"] = False
offloadobj = offload.profile(pipe, profile_no=profile_no, quantizeTransformer=False, convertWeightsFloatTo=torch.float16, verboseLevel=verbose_level, **offload_kwargs)
if persistent_models:
_persistent_converter = converter
_persistent_offloadobj = offloadobj
_persistent_profile = profile_key
else:
converter = _persistent_converter
offloadobj = _persistent_offloadobj
keep_alive = True
return converter, offloadobj, keep_alive
def convert_audio_file(source_audio_path: str, voice_sample_path: str, output_path: str, *, persistent_models: bool = False, profile_no=4, verbose_level: int = 1, init_pipe: Callable[..., int] | None = None, diffusion_steps: int | None = None, cfg_rate: float | None = None, mode: int = 1, amplitude_match_audio_path: str | None = None) -> str:
import torch
from postprocessing import seedvc
from shared.utils.audio_video import write_wav_file
mode = seedvc.normalize_mode(mode)
converter, offloadobj, keep_alive = _get_runtime(persistent_models, profile_no=profile_no, verbose_level=verbose_level, init_pipe=init_pipe, mode=mode)
try:
source_audio, source_rate = _load_audio_tensor(source_audio_path)
reference_audio, reference_rate = _load_audio_tensor(voice_sample_path)
amplitude_audio = source_audio if amplitude_match_audio_path is None else _load_audio_tensor(amplitude_match_audio_path)[0]
reference_audio = _match_reference_amplitude(amplitude_audio, reference_audio)
with torch.inference_mode():
converted = converter.convert_tensor(
source_audio,
source_rate,
reference_audio,
reference_rate,
output_rate=source_rate,
diffusion_steps=seedvc.get_default_steps(mode) if diffusion_steps is None else diffusion_steps,
cfg_rate=seedvc.get_default_cfg_rate(mode) if cfg_rate is None else cfg_rate,
)
write_wav_file(output_path, converted, source_rate)
finally:
if offloadobj is not None:
offloadobj.unload_all()
if not keep_alive:
_release_runtime_objects(converter, offloadobj)
return output_path
def _make_temp_wav(output_dir: str, prefix: str) -> str:
os.makedirs(output_dir, exist_ok=True)
fd, path = tempfile.mkstemp(prefix=prefix, suffix=".wav", dir=output_dir)
os.close(fd)
return path
def _load_audio_tensor(audio_path: str):
import librosa
import numpy as np
import torch
audio_data, sample_rate = librosa.load(os.fspath(audio_path), sr=None, mono=False)
audio_data = np.asarray(audio_data, dtype=np.float32)
if audio_data.ndim == 1:
audio_data = audio_data[None, :]
return torch.from_numpy(audio_data), int(sample_rate)
def _audio_to_frames(audio, *, channels_first: bool):
import numpy as np
if hasattr(audio, "detach"):
audio = audio.detach().cpu().float().numpy()
audio = np.asarray(audio, dtype=np.float32)
if audio.ndim == 1:
return audio[:, None]
return audio.T if channels_first else audio
def _reference_active_mask(reference_audio, base_mask=None):
import numpy as np
frame_abs = np.mean(np.abs(reference_audio), axis=1)
active_mask = np.ones(reference_audio.shape[0], dtype=bool) if base_mask is None else np.asarray(base_mask, dtype=np.float32).reshape(-1)[:reference_audio.shape[0]] > 0.5
active_abs = frame_abs[active_mask]
if active_abs.size == 0:
return active_mask
threshold = 0.1 * float(active_abs.mean())
return active_mask & (frame_abs > threshold)
def _active_rms_amplitude(audio, active_mask=None, *, channels_first: bool = False) -> float:
import numpy as np
audio = _audio_to_frames(audio, channels_first=channels_first)
if active_mask is None:
active_mask = _reference_active_mask(audio)
else:
active_mask = np.asarray(active_mask, dtype=bool).reshape(-1)[:audio.shape[0]]
active_audio = audio[:active_mask.shape[0]][active_mask]
return float(np.sqrt(np.mean(np.square(active_audio)))) if active_audio.size else 0.0
def _match_reference_amplitude(source_audio, reference_audio):
source_active = _active_rms_amplitude(source_audio, channels_first=True)
reference_active = _active_rms_amplitude(reference_audio, channels_first=True)
if source_active <= 1e-8 or reference_active <= 1e-8:
return reference_audio
gain = source_active / reference_active
peak = float(reference_audio.detach().abs().max().item())
if peak > 0.0:
gain = min(gain, 1.0 / peak)
return reference_audio * float(gain)
def _match_audio_file_amplitude(reference_path, audio_path, active_mask=None, mask_sample_rate=None) -> str:
import numpy as np
import soundfile as sf
from shared.utils.audio_video import write_wav_file
reference_audio, reference_rate = sf.read(os.fspath(reference_path), dtype="float32", always_2d=True)
audio, audio_rate = sf.read(os.fspath(audio_path), dtype="float32", always_2d=True)
reference_mask = _fit_audio_mask_to_audio(active_mask, mask_sample_rate, reference_rate, reference_audio.shape[0]) if active_mask is not None else None
reference_activity = _reference_active_mask(reference_audio, reference_mask)
audio_activity = _fit_audio_mask_to_audio(reference_activity.astype(np.float32), reference_rate, audio_rate, audio.shape[0]) > 0.5
reference_active = _active_rms_amplitude(reference_audio, reference_activity)
audio_active = _active_rms_amplitude(audio, audio_activity)
if reference_active <= 1e-8 or audio_active <= 1e-8:
return audio_path
gain = reference_active / audio_active
peak = float(np.max(np.abs(audio))) if audio.size else 0.0
if peak > 0.0:
gain = min(gain, 1.0 / peak)
return write_wav_file(audio_path, audio * float(gain), audio_rate)
def _fit_audio_mask_to_audio(mask, mask_sample_rate, target_sample_rate, target_length):
import numpy as np
from shared.utils.audio_video import resample_audio_array
mask = np.asarray(mask, dtype=np.float32).reshape(-1)
if int(mask_sample_rate) != int(target_sample_rate):
mask = resample_audio_array(mask, int(mask_sample_rate), int(target_sample_rate))
mask = np.clip(mask, 0.0, 1.0)
if mask.shape[0] < target_length:
mask = np.pad(mask, (0, target_length - mask.shape[0]))
return mask[:target_length]
def _merge_audio_files_to_wav(audio_paths, output_path, masks=None, mask_sample_rate=None):
import numpy as np
import soundfile as sf
from shared.utils.audio_video import resample_audio_array, write_wav_file
mixed_audio = None
target_rate = 0
target_channels = 1
for track_no, audio_path in enumerate(audio_paths):
audio_data, sample_rate = sf.read(os.fspath(audio_path), dtype="float32", always_2d=True)
if mixed_audio is None:
target_rate = int(sample_rate)
target_channels = audio_data.shape[1]
mixed_audio = np.zeros((audio_data.shape[0], target_channels), dtype=np.float32)
elif int(sample_rate) != target_rate:
audio_data = resample_audio_array(audio_data, int(sample_rate), target_rate)
if audio_data.ndim == 1:
audio_data = audio_data[:, None]
if audio_data.shape[1] != target_channels:
audio_data = np.repeat(audio_data[:, :1], target_channels, axis=1) if audio_data.shape[1] == 1 else audio_data[:, :target_channels]
if masks is not None:
audio_data = audio_data * _fit_audio_mask_to_audio(masks[track_no], mask_sample_rate, target_rate, audio_data.shape[0])[:, None]
if audio_data.shape[0] > mixed_audio.shape[0]:
mixed_audio = np.pad(mixed_audio, ((0, audio_data.shape[0] - mixed_audio.shape[0]), (0, 0)))
mixed_audio[:audio_data.shape[0]] += audio_data
return write_wav_file(output_path, np.clip(mixed_audio, -1.0, 1.0), target_rate)
class SeedVCBridge:
MODE_OFF = 0
MODE_V1 = 1
MODE_SINGING = 2
MODE_ACCENT = 3
PERSIST_UNLOAD = 1
PERSIST_RAM = 2
CURRENT_VERSION_LABEL = "SeedVC"
_VERSIONS = {
MODE_V1: "v1.0 Speech",
MODE_SINGING: "v1.0 Singing / F0 44k",
MODE_ACCENT: "v2 Speech",
}
def __init__(self, server_config: dict[str, Any], files_locator):
self.server_config = server_config
self.files_locator = files_locator
@classmethod
def mode_choices(cls) -> list[tuple[str, int]]:
return [("Off", cls.MODE_OFF), *[(label, mode) for mode, label in cls._VERSIONS.items()]]
@classmethod
def persistence_choices(cls) -> list[tuple[str, int]]:
return [("Unload after use", cls.PERSIST_UNLOAD), ("Persistent in RAM", cls.PERSIST_RAM)]
def normalize_config(self, config: dict[str, Any] | None = None) -> tuple[int, int]:
config = self.server_config if config is None else config
mode = config.get("seedvc_mode", self.MODE_OFF)
persistence = config.get("seedvc_persistence", self.PERSIST_UNLOAD)
try:
mode = int(mode)
except (TypeError, ValueError):
mode = self.MODE_OFF
try:
persistence = int(persistence)
except (TypeError, ValueError):
persistence = self.PERSIST_UNLOAD
if mode not in self._VERSIONS and mode != self.MODE_OFF:
mode = self.MODE_OFF
if persistence not in (self.PERSIST_UNLOAD, self.PERSIST_RAM):
persistence = self.PERSIST_UNLOAD
config["seedvc_mode"] = mode
config["seedvc_persistence"] = persistence
return mode, persistence
def settings(self, config: dict[str, Any] | None = None) -> tuple[bool, str | None, int]:
mode, persistence = self.normalize_config(config)
return mode != self.MODE_OFF, self._VERSIONS.get(mode), persistence
def enabled(self) -> bool:
return self.settings()[0]
def query_download_def(self, enabled_only: bool = True) -> list[dict[str, Any]]:
mode, _ = self.normalize_config()
if enabled_only and mode == self.MODE_OFF:
return []
from postprocessing import seedvc
return seedvc.query_download_def(mode=mode)
def _assets_available(self) -> bool:
from postprocessing import seedvc
mode, _ = self.normalize_config()
required_files = seedvc.query_required_files(mode)
return all(self.files_locator.locate_file(path, error_if_none=False) is not None for path in required_files)
def download(self, process_files: Callable[..., Any], send_cmd=None, status_text: str | None = None) -> bool:
download_defs = self.query_download_def()
if not download_defs:
return False
downloaded = False
from shared.utils.download import download_def_missing_files, query_audio_background_replacement_download_def, send_download_status
if download_defs and not self._assets_available():
send_download_status(send_cmd, status_text)
for download_def in download_defs:
process_files(**download_def)
downloaded = True
if SEEDVC_RESTORE_BACKGROUND_STEM:
stem_download_def = query_audio_background_replacement_download_def()
if download_def_missing_files(stem_download_def):
send_download_status(send_cmd, "Downloading audio background replacement model files...")
process_files(**stem_download_def)
downloaded = True
return downloaded
def _replace_two_speaker_audio_file(self, source_audio_path: str, voice_sample_path: str, output_path: str, *, voice_sample2_path: str, process_files: Callable[..., Any], profile_no=4, verbose_level: int = 1, init_pipe: Callable[..., int] | None = None, prefix: str = "seedvc") -> str:
import numpy as np
from preprocessing.speakers_separator import extract_dual_audio
from shared.utils.audio_video import cleanup_temp_audio_files
output_dir = os.path.dirname(os.path.abspath(output_path)) or "."
split_track1 = _make_temp_wav(output_dir, f"{prefix}_speaker1_")
split_track2 = _make_temp_wav(output_dir, f"{prefix}_speaker2_")
converted_track1 = _make_temp_wav(output_dir, f"{prefix}_speaker1_seedvc_")
converted_track2 = _make_temp_wav(output_dir, f"{prefix}_speaker2_seedvc_")
temp_tracks = [split_track1, split_track2, converted_track1, converted_track2]
try:
_, speaker_masks, mask_sample_rate = extract_dual_audio(source_audio_path, split_track1, split_track2, verbose=verbose_level >= 2, return_masks=True, speech_masks_only=True)
mask_values = list(speaker_masks.values())
mode, persistence = self.normalize_config()
convert_audio_file(split_track1, voice_sample_path, converted_track1, persistent_models=persistence == self.PERSIST_RAM, profile_no=profile_no, verbose_level=verbose_level, init_pipe=init_pipe, mode=mode)
convert_audio_file(split_track2, voice_sample2_path, converted_track2, persistent_models=persistence == self.PERSIST_RAM, profile_no=profile_no, verbose_level=verbose_level, init_pipe=init_pipe, mode=mode)
_match_audio_file_amplitude(split_track1, converted_track1, active_mask=mask_values[0], mask_sample_rate=mask_sample_rate)
_match_audio_file_amplitude(split_track2, converted_track2, active_mask=mask_values[1], mask_sample_rate=mask_sample_rate)
merge_tracks, merge_masks = [converted_track1, converted_track2], mask_values
if KEEP_ORIGINAL_AUDIO_OUTSIDE_TWO_SPEAKERS:
merge_tracks.append(source_audio_path)
merge_masks.append(np.clip(1.0 - np.maximum(mask_values[0], mask_values[1]), 0.0, 1.0))
return _merge_audio_files_to_wav(merge_tracks, output_path, masks=merge_masks, mask_sample_rate=mask_sample_rate)
finally:
cleanup_temp_audio_files(temp_tracks)
def replace_audio_file(self, source_audio_path: str, voice_sample_path: str, output_path: str, *, process_files: Callable[..., Any], profile_no=4, verbose_level: int = 1, init_pipe: Callable[..., int] | None = None, voice_sample2_path: str | None = None, speaker_count: int = 1, prefix: str = "seedvc") -> str:
mode, persistence = self.normalize_config()
if mode == self.MODE_OFF:
raise RuntimeError("SeedVC voice replacement is disabled in Configuration > Extensions.")
self.download(process_files)
output_dir = os.path.dirname(os.path.abspath(output_path)) or "."
temp_tracks = []
conversion_source_path = source_audio_path
background_path = None
conversion_output_path = output_path
try:
if SEEDVC_RESTORE_BACKGROUND_STEM:
from preprocessing.extract_vocals import extract_vocal_and_background_stems
requested_vocals_path = _make_temp_wav(output_dir, "seedvc_vocals_")
requested_background_path = _make_temp_wav(output_dir, "seedvc_background_")
temp_tracks += [requested_vocals_path, requested_background_path]
conversion_source_path, background_path = extract_vocal_and_background_stems(source_audio_path, requested_vocals_path, requested_background_path)
temp_tracks += [conversion_source_path, background_path]
conversion_output_path = _make_temp_wav(output_dir, "seedvc_voice_")
temp_tracks.append(conversion_output_path)
if int(speaker_count) == 2:
if voice_sample2_path is None:
raise RuntimeError("Two-speaker SeedVC voice replacement requires a second voice sample.")
converted_path = self._replace_two_speaker_audio_file(conversion_source_path, voice_sample_path, conversion_output_path, voice_sample2_path=voice_sample2_path, process_files=process_files, profile_no=profile_no, verbose_level=verbose_level, init_pipe=init_pipe, prefix=prefix)
else:
converted_path = convert_audio_file(conversion_source_path, voice_sample_path, conversion_output_path, persistent_models=persistence == self.PERSIST_RAM, profile_no=profile_no, verbose_level=verbose_level, init_pipe=init_pipe, mode=mode)
_match_audio_file_amplitude(conversion_source_path, converted_path)
if background_path is not None:
return _merge_audio_files_to_wav([converted_path, background_path], output_path)
return converted_path
finally:
if temp_tracks:
from shared.utils.audio_video import cleanup_temp_audio_files
cleanup_temp_audio_files(temp_tracks)
def replace_audio_tracks(self, audio_tracks: list[str], voice_sample_path: str | None, output_dir: str, prefix: str, *, process_files: Callable[..., Any], profile_no=4, verbose_level: int = 1, init_pipe: Callable[..., int] | None = None, voice_sample2_path: str | None = None, speaker_count: int = 1) -> tuple[list[str], list[str]]:
if voice_sample_path is None or len(audio_tracks) == 0:
return audio_tracks, []
converted_tracks = []
for track_no, audio_track in enumerate(audio_tracks):
output_path = _make_temp_wav(output_dir, f"{prefix}_seedvc_track{track_no}_")
converted_tracks.append(self.replace_audio_file(audio_track, voice_sample_path, output_path, process_files=process_files, profile_no=profile_no, verbose_level=verbose_level, init_pipe=init_pipe, voice_sample2_path=voice_sample2_path, speaker_count=speaker_count, prefix=f"{prefix}_track{track_no}"))
return converted_tracks, converted_tracks
def release_vram(self) -> None:
release_models()