| from __future__ import annotations |
|
|
| import gc |
| import os |
| import tempfile |
| from typing import Any, Callable |
|
|
|
|
| _persistent_converter = None |
| _persistent_offloadobj = None |
| _persistent_profile = None |
| KEEP_ORIGINAL_AUDIO_OUTSIDE_TWO_SPEAKERS = True |
| SEEDVC_RESTORE_BACKGROUND_STEM = True |
|
|
|
|
| def _release_runtime_objects(converter=None, offloadobj=None) -> None: |
| import torch |
|
|
| if offloadobj is not None: |
| offloadobj.unload_all() |
| offloadobj.release() |
| del converter |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| gc.collect() |
|
|
|
|
| def release_models() -> None: |
| global _persistent_converter, _persistent_offloadobj, _persistent_profile |
|
|
| _release_runtime_objects(_persistent_converter, _persistent_offloadobj) |
| _persistent_converter = None |
| _persistent_offloadobj = None |
| _persistent_profile = None |
|
|
|
|
| def _get_runtime(persistent_models: bool, profile_no=4, verbose_level: int = 1, init_pipe: Callable[..., int] | None = None, mode: int = 1): |
| import torch |
| from mmgp import offload |
| from postprocessing import seedvc |
|
|
| global _persistent_converter, _persistent_offloadobj, _persistent_profile |
|
|
| mode = seedvc.normalize_mode(mode) |
| profile_key = (profile_no, mode) |
| if _persistent_offloadobj is not None and _persistent_profile != profile_key: |
| release_models() |
|
|
| keep_alive = persistent_models |
| if _persistent_offloadobj is None: |
| converter = seedvc.get_model(dtype=torch.float16, mode=mode) |
| pipe = seedvc.get_pipe(profile_no=profile_no, model=converter, mode=mode) |
| offload_kwargs = {"coTenantsMap": seedvc.get_cotenants_map(pipe)} |
| if init_pipe is not None: |
| profile_no = init_pipe(pipe, offload_kwargs, profile_no) |
| offload_kwargs["pinnedMemory"] = False |
| offloadobj = offload.profile(pipe, profile_no=profile_no, quantizeTransformer=False, convertWeightsFloatTo=torch.float16, verboseLevel=verbose_level, **offload_kwargs) |
| if persistent_models: |
| _persistent_converter = converter |
| _persistent_offloadobj = offloadobj |
| _persistent_profile = profile_key |
| else: |
| converter = _persistent_converter |
| offloadobj = _persistent_offloadobj |
| keep_alive = True |
|
|
| return converter, offloadobj, keep_alive |
|
|
|
|
| def convert_audio_file(source_audio_path: str, voice_sample_path: str, output_path: str, *, persistent_models: bool = False, profile_no=4, verbose_level: int = 1, init_pipe: Callable[..., int] | None = None, diffusion_steps: int | None = None, cfg_rate: float | None = None, mode: int = 1, amplitude_match_audio_path: str | None = None) -> str: |
| import torch |
| from postprocessing import seedvc |
| from shared.utils.audio_video import write_wav_file |
|
|
| mode = seedvc.normalize_mode(mode) |
| converter, offloadobj, keep_alive = _get_runtime(persistent_models, profile_no=profile_no, verbose_level=verbose_level, init_pipe=init_pipe, mode=mode) |
| try: |
| source_audio, source_rate = _load_audio_tensor(source_audio_path) |
| reference_audio, reference_rate = _load_audio_tensor(voice_sample_path) |
| amplitude_audio = source_audio if amplitude_match_audio_path is None else _load_audio_tensor(amplitude_match_audio_path)[0] |
| reference_audio = _match_reference_amplitude(amplitude_audio, reference_audio) |
| with torch.inference_mode(): |
| converted = converter.convert_tensor( |
| source_audio, |
| source_rate, |
| reference_audio, |
| reference_rate, |
| output_rate=source_rate, |
| diffusion_steps=seedvc.get_default_steps(mode) if diffusion_steps is None else diffusion_steps, |
| cfg_rate=seedvc.get_default_cfg_rate(mode) if cfg_rate is None else cfg_rate, |
| ) |
| write_wav_file(output_path, converted, source_rate) |
| finally: |
| if offloadobj is not None: |
| offloadobj.unload_all() |
| if not keep_alive: |
| _release_runtime_objects(converter, offloadobj) |
| return output_path |
|
|
|
|
| def _make_temp_wav(output_dir: str, prefix: str) -> str: |
| os.makedirs(output_dir, exist_ok=True) |
| fd, path = tempfile.mkstemp(prefix=prefix, suffix=".wav", dir=output_dir) |
| os.close(fd) |
| return path |
|
|
|
|
| def _load_audio_tensor(audio_path: str): |
| import librosa |
| import numpy as np |
| import torch |
|
|
| audio_data, sample_rate = librosa.load(os.fspath(audio_path), sr=None, mono=False) |
| audio_data = np.asarray(audio_data, dtype=np.float32) |
| if audio_data.ndim == 1: |
| audio_data = audio_data[None, :] |
| return torch.from_numpy(audio_data), int(sample_rate) |
|
|
|
|
| def _audio_to_frames(audio, *, channels_first: bool): |
| import numpy as np |
|
|
| if hasattr(audio, "detach"): |
| audio = audio.detach().cpu().float().numpy() |
| audio = np.asarray(audio, dtype=np.float32) |
| if audio.ndim == 1: |
| return audio[:, None] |
| return audio.T if channels_first else audio |
|
|
|
|
| def _reference_active_mask(reference_audio, base_mask=None): |
| import numpy as np |
|
|
| frame_abs = np.mean(np.abs(reference_audio), axis=1) |
| active_mask = np.ones(reference_audio.shape[0], dtype=bool) if base_mask is None else np.asarray(base_mask, dtype=np.float32).reshape(-1)[:reference_audio.shape[0]] > 0.5 |
| active_abs = frame_abs[active_mask] |
| if active_abs.size == 0: |
| return active_mask |
| threshold = 0.1 * float(active_abs.mean()) |
| return active_mask & (frame_abs > threshold) |
|
|
|
|
| def _active_rms_amplitude(audio, active_mask=None, *, channels_first: bool = False) -> float: |
| import numpy as np |
|
|
| audio = _audio_to_frames(audio, channels_first=channels_first) |
| if active_mask is None: |
| active_mask = _reference_active_mask(audio) |
| else: |
| active_mask = np.asarray(active_mask, dtype=bool).reshape(-1)[:audio.shape[0]] |
| active_audio = audio[:active_mask.shape[0]][active_mask] |
| return float(np.sqrt(np.mean(np.square(active_audio)))) if active_audio.size else 0.0 |
|
|
|
|
| def _match_reference_amplitude(source_audio, reference_audio): |
| source_active = _active_rms_amplitude(source_audio, channels_first=True) |
| reference_active = _active_rms_amplitude(reference_audio, channels_first=True) |
| if source_active <= 1e-8 or reference_active <= 1e-8: |
| return reference_audio |
| gain = source_active / reference_active |
| peak = float(reference_audio.detach().abs().max().item()) |
| if peak > 0.0: |
| gain = min(gain, 1.0 / peak) |
| return reference_audio * float(gain) |
|
|
|
|
| def _match_audio_file_amplitude(reference_path, audio_path, active_mask=None, mask_sample_rate=None) -> str: |
| import numpy as np |
| import soundfile as sf |
| from shared.utils.audio_video import write_wav_file |
|
|
| reference_audio, reference_rate = sf.read(os.fspath(reference_path), dtype="float32", always_2d=True) |
| audio, audio_rate = sf.read(os.fspath(audio_path), dtype="float32", always_2d=True) |
| reference_mask = _fit_audio_mask_to_audio(active_mask, mask_sample_rate, reference_rate, reference_audio.shape[0]) if active_mask is not None else None |
| reference_activity = _reference_active_mask(reference_audio, reference_mask) |
| audio_activity = _fit_audio_mask_to_audio(reference_activity.astype(np.float32), reference_rate, audio_rate, audio.shape[0]) > 0.5 |
| reference_active = _active_rms_amplitude(reference_audio, reference_activity) |
| audio_active = _active_rms_amplitude(audio, audio_activity) |
| if reference_active <= 1e-8 or audio_active <= 1e-8: |
| return audio_path |
| gain = reference_active / audio_active |
| peak = float(np.max(np.abs(audio))) if audio.size else 0.0 |
| if peak > 0.0: |
| gain = min(gain, 1.0 / peak) |
| return write_wav_file(audio_path, audio * float(gain), audio_rate) |
|
|
|
|
| def _fit_audio_mask_to_audio(mask, mask_sample_rate, target_sample_rate, target_length): |
| import numpy as np |
| from shared.utils.audio_video import resample_audio_array |
|
|
| mask = np.asarray(mask, dtype=np.float32).reshape(-1) |
| if int(mask_sample_rate) != int(target_sample_rate): |
| mask = resample_audio_array(mask, int(mask_sample_rate), int(target_sample_rate)) |
| mask = np.clip(mask, 0.0, 1.0) |
| if mask.shape[0] < target_length: |
| mask = np.pad(mask, (0, target_length - mask.shape[0])) |
| return mask[:target_length] |
|
|
|
|
| def _merge_audio_files_to_wav(audio_paths, output_path, masks=None, mask_sample_rate=None): |
| import numpy as np |
| import soundfile as sf |
| from shared.utils.audio_video import resample_audio_array, write_wav_file |
|
|
| mixed_audio = None |
| target_rate = 0 |
| target_channels = 1 |
| for track_no, audio_path in enumerate(audio_paths): |
| audio_data, sample_rate = sf.read(os.fspath(audio_path), dtype="float32", always_2d=True) |
| if mixed_audio is None: |
| target_rate = int(sample_rate) |
| target_channels = audio_data.shape[1] |
| mixed_audio = np.zeros((audio_data.shape[0], target_channels), dtype=np.float32) |
| elif int(sample_rate) != target_rate: |
| audio_data = resample_audio_array(audio_data, int(sample_rate), target_rate) |
| if audio_data.ndim == 1: |
| audio_data = audio_data[:, None] |
| if audio_data.shape[1] != target_channels: |
| audio_data = np.repeat(audio_data[:, :1], target_channels, axis=1) if audio_data.shape[1] == 1 else audio_data[:, :target_channels] |
| if masks is not None: |
| audio_data = audio_data * _fit_audio_mask_to_audio(masks[track_no], mask_sample_rate, target_rate, audio_data.shape[0])[:, None] |
| if audio_data.shape[0] > mixed_audio.shape[0]: |
| mixed_audio = np.pad(mixed_audio, ((0, audio_data.shape[0] - mixed_audio.shape[0]), (0, 0))) |
| mixed_audio[:audio_data.shape[0]] += audio_data |
| return write_wav_file(output_path, np.clip(mixed_audio, -1.0, 1.0), target_rate) |
|
|
|
|
| class SeedVCBridge: |
| MODE_OFF = 0 |
| MODE_V1 = 1 |
| MODE_SINGING = 2 |
| MODE_ACCENT = 3 |
| PERSIST_UNLOAD = 1 |
| PERSIST_RAM = 2 |
| CURRENT_VERSION_LABEL = "SeedVC" |
|
|
| _VERSIONS = { |
| MODE_V1: "v1.0 Speech", |
| MODE_SINGING: "v1.0 Singing / F0 44k", |
| MODE_ACCENT: "v2 Speech", |
| } |
|
|
| def __init__(self, server_config: dict[str, Any], files_locator): |
| self.server_config = server_config |
| self.files_locator = files_locator |
|
|
| @classmethod |
| def mode_choices(cls) -> list[tuple[str, int]]: |
| return [("Off", cls.MODE_OFF), *[(label, mode) for mode, label in cls._VERSIONS.items()]] |
|
|
| @classmethod |
| def persistence_choices(cls) -> list[tuple[str, int]]: |
| return [("Unload after use", cls.PERSIST_UNLOAD), ("Persistent in RAM", cls.PERSIST_RAM)] |
|
|
| def normalize_config(self, config: dict[str, Any] | None = None) -> tuple[int, int]: |
| config = self.server_config if config is None else config |
| mode = config.get("seedvc_mode", self.MODE_OFF) |
| persistence = config.get("seedvc_persistence", self.PERSIST_UNLOAD) |
| try: |
| mode = int(mode) |
| except (TypeError, ValueError): |
| mode = self.MODE_OFF |
| try: |
| persistence = int(persistence) |
| except (TypeError, ValueError): |
| persistence = self.PERSIST_UNLOAD |
| if mode not in self._VERSIONS and mode != self.MODE_OFF: |
| mode = self.MODE_OFF |
| if persistence not in (self.PERSIST_UNLOAD, self.PERSIST_RAM): |
| persistence = self.PERSIST_UNLOAD |
| config["seedvc_mode"] = mode |
| config["seedvc_persistence"] = persistence |
| return mode, persistence |
|
|
| def settings(self, config: dict[str, Any] | None = None) -> tuple[bool, str | None, int]: |
| mode, persistence = self.normalize_config(config) |
| return mode != self.MODE_OFF, self._VERSIONS.get(mode), persistence |
|
|
| def enabled(self) -> bool: |
| return self.settings()[0] |
|
|
| def query_download_def(self, enabled_only: bool = True) -> list[dict[str, Any]]: |
| mode, _ = self.normalize_config() |
| if enabled_only and mode == self.MODE_OFF: |
| return [] |
| from postprocessing import seedvc |
| return seedvc.query_download_def(mode=mode) |
|
|
| def _assets_available(self) -> bool: |
| from postprocessing import seedvc |
|
|
| mode, _ = self.normalize_config() |
| required_files = seedvc.query_required_files(mode) |
| return all(self.files_locator.locate_file(path, error_if_none=False) is not None for path in required_files) |
|
|
| def download(self, process_files: Callable[..., Any], send_cmd=None, status_text: str | None = None) -> bool: |
| download_defs = self.query_download_def() |
| if not download_defs: |
| return False |
| downloaded = False |
| from shared.utils.download import download_def_missing_files, query_audio_background_replacement_download_def, send_download_status |
|
|
| if download_defs and not self._assets_available(): |
| send_download_status(send_cmd, status_text) |
| for download_def in download_defs: |
| process_files(**download_def) |
| downloaded = True |
| if SEEDVC_RESTORE_BACKGROUND_STEM: |
| stem_download_def = query_audio_background_replacement_download_def() |
| if download_def_missing_files(stem_download_def): |
| send_download_status(send_cmd, "Downloading audio background replacement model files...") |
| process_files(**stem_download_def) |
| downloaded = True |
| return downloaded |
|
|
| def _replace_two_speaker_audio_file(self, source_audio_path: str, voice_sample_path: str, output_path: str, *, voice_sample2_path: str, process_files: Callable[..., Any], profile_no=4, verbose_level: int = 1, init_pipe: Callable[..., int] | None = None, prefix: str = "seedvc") -> str: |
| import numpy as np |
| from preprocessing.speakers_separator import extract_dual_audio |
| from shared.utils.audio_video import cleanup_temp_audio_files |
|
|
| output_dir = os.path.dirname(os.path.abspath(output_path)) or "." |
| split_track1 = _make_temp_wav(output_dir, f"{prefix}_speaker1_") |
| split_track2 = _make_temp_wav(output_dir, f"{prefix}_speaker2_") |
| converted_track1 = _make_temp_wav(output_dir, f"{prefix}_speaker1_seedvc_") |
| converted_track2 = _make_temp_wav(output_dir, f"{prefix}_speaker2_seedvc_") |
| temp_tracks = [split_track1, split_track2, converted_track1, converted_track2] |
| try: |
| _, speaker_masks, mask_sample_rate = extract_dual_audio(source_audio_path, split_track1, split_track2, verbose=verbose_level >= 2, return_masks=True, speech_masks_only=True) |
| mask_values = list(speaker_masks.values()) |
| mode, persistence = self.normalize_config() |
| convert_audio_file(split_track1, voice_sample_path, converted_track1, persistent_models=persistence == self.PERSIST_RAM, profile_no=profile_no, verbose_level=verbose_level, init_pipe=init_pipe, mode=mode) |
| convert_audio_file(split_track2, voice_sample2_path, converted_track2, persistent_models=persistence == self.PERSIST_RAM, profile_no=profile_no, verbose_level=verbose_level, init_pipe=init_pipe, mode=mode) |
| _match_audio_file_amplitude(split_track1, converted_track1, active_mask=mask_values[0], mask_sample_rate=mask_sample_rate) |
| _match_audio_file_amplitude(split_track2, converted_track2, active_mask=mask_values[1], mask_sample_rate=mask_sample_rate) |
| merge_tracks, merge_masks = [converted_track1, converted_track2], mask_values |
| if KEEP_ORIGINAL_AUDIO_OUTSIDE_TWO_SPEAKERS: |
| merge_tracks.append(source_audio_path) |
| merge_masks.append(np.clip(1.0 - np.maximum(mask_values[0], mask_values[1]), 0.0, 1.0)) |
| return _merge_audio_files_to_wav(merge_tracks, output_path, masks=merge_masks, mask_sample_rate=mask_sample_rate) |
| finally: |
| cleanup_temp_audio_files(temp_tracks) |
|
|
| def replace_audio_file(self, source_audio_path: str, voice_sample_path: str, output_path: str, *, process_files: Callable[..., Any], profile_no=4, verbose_level: int = 1, init_pipe: Callable[..., int] | None = None, voice_sample2_path: str | None = None, speaker_count: int = 1, prefix: str = "seedvc") -> str: |
| mode, persistence = self.normalize_config() |
| if mode == self.MODE_OFF: |
| raise RuntimeError("SeedVC voice replacement is disabled in Configuration > Extensions.") |
| self.download(process_files) |
| output_dir = os.path.dirname(os.path.abspath(output_path)) or "." |
| temp_tracks = [] |
| conversion_source_path = source_audio_path |
| background_path = None |
| conversion_output_path = output_path |
| try: |
| if SEEDVC_RESTORE_BACKGROUND_STEM: |
| from preprocessing.extract_vocals import extract_vocal_and_background_stems |
|
|
| requested_vocals_path = _make_temp_wav(output_dir, "seedvc_vocals_") |
| requested_background_path = _make_temp_wav(output_dir, "seedvc_background_") |
| temp_tracks += [requested_vocals_path, requested_background_path] |
| conversion_source_path, background_path = extract_vocal_and_background_stems(source_audio_path, requested_vocals_path, requested_background_path) |
| temp_tracks += [conversion_source_path, background_path] |
| conversion_output_path = _make_temp_wav(output_dir, "seedvc_voice_") |
| temp_tracks.append(conversion_output_path) |
|
|
| if int(speaker_count) == 2: |
| if voice_sample2_path is None: |
| raise RuntimeError("Two-speaker SeedVC voice replacement requires a second voice sample.") |
| converted_path = self._replace_two_speaker_audio_file(conversion_source_path, voice_sample_path, conversion_output_path, voice_sample2_path=voice_sample2_path, process_files=process_files, profile_no=profile_no, verbose_level=verbose_level, init_pipe=init_pipe, prefix=prefix) |
| else: |
| converted_path = convert_audio_file(conversion_source_path, voice_sample_path, conversion_output_path, persistent_models=persistence == self.PERSIST_RAM, profile_no=profile_no, verbose_level=verbose_level, init_pipe=init_pipe, mode=mode) |
| _match_audio_file_amplitude(conversion_source_path, converted_path) |
| if background_path is not None: |
| return _merge_audio_files_to_wav([converted_path, background_path], output_path) |
| return converted_path |
| finally: |
| if temp_tracks: |
| from shared.utils.audio_video import cleanup_temp_audio_files |
|
|
| cleanup_temp_audio_files(temp_tracks) |
|
|
| def replace_audio_tracks(self, audio_tracks: list[str], voice_sample_path: str | None, output_dir: str, prefix: str, *, process_files: Callable[..., Any], profile_no=4, verbose_level: int = 1, init_pipe: Callable[..., int] | None = None, voice_sample2_path: str | None = None, speaker_count: int = 1) -> tuple[list[str], list[str]]: |
| if voice_sample_path is None or len(audio_tracks) == 0: |
| return audio_tracks, [] |
| converted_tracks = [] |
| for track_no, audio_track in enumerate(audio_tracks): |
| output_path = _make_temp_wav(output_dir, f"{prefix}_seedvc_track{track_no}_") |
| converted_tracks.append(self.replace_audio_file(audio_track, voice_sample_path, output_path, process_files=process_files, profile_no=profile_no, verbose_level=verbose_level, init_pipe=init_pipe, voice_sample2_path=voice_sample2_path, speaker_count=speaker_count, prefix=f"{prefix}_track{track_no}")) |
| return converted_tracks, converted_tracks |
|
|
| def release_vram(self) -> None: |
| release_models() |
|
|