| """Vocence engine for the merged Qwen3-TTS VoiceDesign checkpoint. |
| |
| The Vocence Chutes wrapper instantiates ``Miner`` with the on-disk path of the HF |
| snapshot and then drives it through the contract: |
| |
| Miner(path_hf_repo: Path) |
| warmup() -> None |
| generate_wav(instruction: str, text: str) -> tuple[np.ndarray, int] |
| |
| All weights, the audio codec, and the tokenizer ship together in the snapshot — |
| nothing is fetched at runtime. |
| """ |
| from __future__ import annotations |
|
|
| import dataclasses |
| import threading |
| from pathlib import Path |
| from typing import Any |
|
|
| import numpy as np |
|
|
|
|
| _REPO_REQUIRED_FILE = "config.json" |
| _RUNTIME_CONFIG_FILE = "vocence_config.yaml" |
|
|
|
|
| @dataclasses.dataclass |
| class _RuntimeOpts: |
| """Subset of vocence_config.yaml that the engine actually consumes.""" |
|
|
| language: str = "English" |
| sample_rate: int = 24000 |
| max_instruction_chars: int = 600 |
| max_text_chars: int = 2000 |
| device_pref: str = "cuda" |
| dtype_pref: str = "bfloat16" |
| flash_attention_2: bool = False |
|
|
| @classmethod |
| def from_repo(cls, repo: Path) -> "_RuntimeOpts": |
| cfg_path = repo / _RUNTIME_CONFIG_FILE |
| if not cfg_path.is_file(): |
| return cls() |
| from yaml import safe_load |
|
|
| with cfg_path.open("r", encoding="utf-8") as fh: |
| data = safe_load(fh) or {} |
| runtime = data.get("runtime") or {} |
| generation = data.get("generation") or {} |
| limits = data.get("limits") or {} |
| return cls( |
| language=str(limits.get("default_language") or runtime.get("default_language") or "English"), |
| sample_rate=int(generation.get("sample_rate", 24000)), |
| max_instruction_chars=int(limits.get("max_instruction_chars", 600)), |
| max_text_chars=int(limits.get("max_text_chars", 2000)), |
| device_pref=str(runtime.get("device_preference", "cuda")).lower(), |
| dtype_pref=str(runtime.get("dtype", "bfloat16")).lower(), |
| flash_attention_2=bool(runtime.get("use_flash_attention_2", False)), |
| ) |
|
|
|
|
| class Miner: |
| """Loads merged Qwen3-TTS weights from the snapshot and serves the Vocence API.""" |
|
|
| WARMUP_BUDGET_S = 180.0 |
|
|
| def __init__(self, path_hf_repo: Path) -> None: |
| self.repo = Path(path_hf_repo).resolve() |
| if not (self.repo / _REPO_REQUIRED_FILE).is_file(): |
| raise FileNotFoundError( |
| f"Snapshot incomplete: {self.repo / _REPO_REQUIRED_FILE} not found" |
| ) |
| self.opts = _RuntimeOpts.from_repo(self.repo) |
| self.model = self._build_model() |
|
|
| def __repr__(self) -> str: |
| return f"<Miner repo={self.repo.name} language={self.opts.language!r}>" |
|
|
| |
| |
| |
|
|
| def warmup(self) -> None: |
| outcome: dict[str, Any] = {"ok": False, "err": None} |
|
|
| def _heat() -> None: |
| try: |
| self.generate_wav(instruction="Calm neutral delivery.", text="Warmup.") |
| outcome["ok"] = True |
| except Exception as exc: |
| outcome["err"] = repr(exc) |
|
|
| worker = threading.Thread(target=_heat, daemon=True) |
| worker.start() |
| worker.join(timeout=self.WARMUP_BUDGET_S) |
| if not outcome["ok"]: |
| raise RuntimeError(f"Miner warmup did not complete: {outcome['err'] or 'timeout'}") |
|
|
| def generate_wav(self, instruction: str, text: str) -> tuple[np.ndarray, int]: |
| prompt = self._truncate(instruction, self.opts.max_instruction_chars) |
| body = self._truncate(text, self.opts.max_text_chars) |
|
|
| wavs, sample_rate = self.model.generate_voice_design( |
| text=body, |
| instruct=prompt, |
| language=self.opts.language, |
| ) |
| if not wavs or wavs[0] is None: |
| raise ValueError("Qwen3-TTS returned no audio") |
|
|
| wave = self._coerce_mono_float32(wavs[0]) |
| return wave, int(sample_rate) |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def _truncate(value: str, limit: int) -> str: |
| return value[:limit] if limit and limit > 0 else value |
|
|
| @staticmethod |
| def _coerce_mono_float32(arr: Any) -> np.ndarray: |
| wave = np.asarray(arr, dtype=np.float32) |
| if wave.ndim > 1: |
| wave = wave.mean(axis=1) |
| return wave |
|
|
| def _build_model(self): |
| import torch |
| from qwen_tts import Qwen3TTSModel |
|
|
| cuda_available = bool(torch.cuda.is_available()) |
| device_map = "cuda:0" if (self.opts.device_pref == "cuda" and cuda_available) else "cpu" |
| torch_dtype = ( |
| torch.bfloat16 |
| if (self.opts.dtype_pref == "bfloat16" and cuda_available) |
| else torch.float32 |
| ) |
|
|
| attempt_order = ("flash_attention_2", "sdpa") if self.opts.flash_attention_2 else ("sdpa",) |
| last_error: BaseException | None = None |
| for attn in attempt_order: |
| try: |
| model = Qwen3TTSModel.from_pretrained( |
| pretrained_model_name_or_path=str(self.repo), |
| device_map=device_map, |
| dtype=torch_dtype, |
| attn_implementation=attn, |
| ) |
| print( |
| f"[Miner] Qwen3-TTS ready on {device_map} " |
| f"(dtype={self.opts.dtype_pref}, attn={attn})" |
| ) |
| return model |
| except Exception as exc: |
| last_error = exc |
| raise RuntimeError(f"Qwen3-TTS failed to load: {last_error!r}") |
|
|