File size: 14,402 Bytes
79529ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
"""Vocence engine for the merged Qwen3-TTS VoiceDesign checkpoint.

The Vocence Chutes wrapper instantiates ``Miner`` with the on-disk path of the HF
snapshot and then drives it through the contract:

    Miner(path_hf_repo: Path)
    warmup() -> None
    generate_wav(instruction: str, text: str) -> tuple[np.ndarray, int]

All weights, the audio codec, and the tokenizer ship together in the snapshot —
nothing is fetched at runtime.
"""
from __future__ import annotations

import dataclasses
import re
import threading
from pathlib import Path
from typing import Any

import numpy as np


_REPO_REQUIRED_FILE = "config.json"
_RUNTIME_CONFIG_FILE = "vocence_config.yaml"


# --------------------------------------------------------------------------- #
# Instruction rewrite (tag -> natural-language preamble)                      #
# --------------------------------------------------------------------------- #
#
# Validators may send instructions in the legacy pipe-tag form, e.g.
# ``| gender: male | pitch: mid | accent: uk |``. The base voice_design
# checkpoint was conditioned on natural-language descriptions, so we paraphrase
# the tags into a short imperative preamble and *prepend* it to whatever the
# caller sent. Free-form prompts (no ``| key: value |`` pairs) pass through
# unchanged because ``_parse_instruction`` returns ``{}`` for them.

# One ``| key: value |`` pair. Value runs until the next ``|`` or end-of-string;
# the lookahead keeps the trailing ``|`` available for the next iteration.
_INSTRUCTION_TAG_RE = re.compile(
    r"\|\s*([A-Za-z_]+)\s*:\s*([^|]+?)\s*(?=\||$)"
)

_GENDER_PHRASE = {
    "male": "male", "female": "female", "neutral": "gender-neutral",
}
_PITCH_PHRASE = {
    "low": "deep low-pitched voice", "mid": "medium natural pitch", "high": "high-pitched voice",
}
_SPEED_PHRASE = {
    "slow": "slow deliberate pace", "normal": "natural conversational pace", "fast": "brisk fast pace",
}
_AGE_PHRASE = {
    "child": "child", "young_adult": "young adult", "adult": "adult", "senior": "elderly senior",
}
_EMOTION_PHRASE = {
    "neutral": "neutral composed delivery",
    "happy": "cheerful happy upbeat warm",
    "sad": "sorrowful sad subdued downcast",
    "angry": "firm angry forceful assertive tense",
    "calm": "calm relaxed measured peaceful unhurried",
    "excited": "excited enthusiastic energetic lively",
    "serious": "serious grave deliberate weighty",
    "fearful": "nervous fearful hesitant trembling",
}
_TONE_PHRASE = {
    "warm": "warm", "cold": "cold detached", "friendly": "friendly",
    "formal": "formal", "casual": "casual", "authoritative": "authoritative commanding",
}
_ACCENT_PHRASE = {
    "us": "standard American English accent with rhotic r sounds",
    "uk": "standard British English accent with non-rhotic received pronunciation",
    "au": "Australian English accent",
    "in": "Indian English accent",
    "neutral": "neutral international English accent",
    "other": "non-native English accent",
}


def _parse_instruction(instruction: str) -> dict[str, str]:
    """Parse a pipe-tag instruction (``| key: value | ...``) into a flat dict.

    Keys are lowercased; values are lowercased and stripped. Returns ``{}``
    for free-form natural-language prompts (no tag pairs found), which
    signals ``_enhance_instruction`` to pass them through unchanged. Unknown
    or out-of-vocabulary values quietly drop out at preamble-build time
    because the phrase tables only contain mappings we trust to be in the
    base model's training distribution.
    """
    if not instruction or "|" not in instruction:
        return {}
    out: dict[str, str] = {}
    for m in _INSTRUCTION_TAG_RE.finditer(instruction):
        key = m.group(1).strip().lower()
        val = m.group(2).strip().lower()
        if key and val:
            out[key] = val
    return out


def _build_natural_preamble(parsed: dict[str, str]) -> str:
    gender = _GENDER_PHRASE.get(parsed.get("gender", ""), "")
    age = _AGE_PHRASE.get(parsed.get("age_group", ""), "")
    pitch = _PITCH_PHRASE.get(parsed.get("pitch", ""), "")
    speed = _SPEED_PHRASE.get(parsed.get("speed", ""), "")
    emotion = _EMOTION_PHRASE.get(parsed.get("emotion", ""), "")
    tone = _TONE_PHRASE.get(parsed.get("tone", ""), "")
    accent = _ACCENT_PHRASE.get(parsed.get("accent", ""), "")

    parts: list[str] = []

    # Gender-first to avoid timbre drift on emotion-heavy prompts
    identity = " ".join(p for p in [gender, age] if p)
    if identity:
        parts.append(f"a {identity} voice")
    if emotion:
        parts.append(emotion)
    if accent:
        parts.append(f"speaking with a {accent}")
    if pitch:
        parts.append(pitch)
    if speed:
        parts.append(speed)
    if tone:
        parts.append(f"{tone} tone")

    if not parts:
        return ""
    preamble = "Speak as " + ", ".join(parts) + "."
    return preamble + " Use natural human prosody with realistic breath placement and varied intonation."


def _enhance_instruction(instruction: str) -> str:
    """Prepend a natural-language preamble derived from any pipe tags.

    Pass-through when the input has no parseable tags or none of them map
    to a known phrase (so the preamble would be empty). Always keeps the
    original instruction at the end so the caller's free-form instructions
    still influence the model.
    """
    parsed = _parse_instruction(instruction)
    if not parsed:
        return instruction
    preamble = _build_natural_preamble(parsed)
    if not preamble:
        return instruction
    return f"{preamble} {instruction}"


# --------------------------------------------------------------------------- #
# Text normalization                                                          #
# --------------------------------------------------------------------------- #

_NUM_WORDS = {
    "0": "zero", "1": "one", "2": "two", "3": "three", "4": "four",
    "5": "five", "6": "six", "7": "seven", "8": "eight", "9": "nine",
    "10": "ten", "11": "eleven", "12": "twelve", "13": "thirteen",
    "14": "fourteen", "15": "fifteen", "16": "sixteen", "17": "seventeen",
    "18": "eighteen", "19": "nineteen", "20": "twenty", "30": "thirty",
    "40": "forty", "50": "fifty", "60": "sixty", "70": "seventy",
    "80": "eighty", "90": "ninety", "100": "one hundred",
}
_ABBREV = {
    "Mr.": "Mister", "Mrs.": "Missus", "Dr.": "Doctor", "St.": "Saint",
    "etc.": "et cetera", "vs.": "versus", "approx.": "approximately",
    "dept.": "department", "govt.": "government", "mgr.": "manager",
}

# Pre-compiled at module load so we don't recompile on every call.
_DOLLAR_RE = re.compile(r"\$(\d+)")
_POUND_RE = re.compile(r"£(\d+)")
_EURO_RE = re.compile(r"€(\d+)")
_SMALL_INT_RE = re.compile(r"\b(\d{1,2})\b")
_CONJ_RE = re.compile(
    r"(?<!\,)\s+(but|however|although|though|yet)\s+",
    flags=re.IGNORECASE,
)


def _normalize_text_for_tts(text: str) -> str:
    """Rewrite a transcript so the talker emits cleaner, more prosodic speech.

    Concretely: expand a small list of common abbreviations, turn currency-
    prefixed integers into spelled-out phrases (``$5`` -> ``five dollars``),
    spell out 1-2 digit standalone integers, and insert a comma before
    coordinating conjunctions in long sentences so the model hears a beat
    where humans naturally take one. Larger numbers, decimals, and unknown
    abbreviations pass through unchanged.
    """
    # Expand known abbreviations
    for abbr, expansion in _ABBREV.items():
        text = text.replace(abbr, expansion)

    # Expand $N / £N / €N → "N dollars/pounds/euros"
    text = _DOLLAR_RE.sub(
        lambda m: f"{_NUM_WORDS.get(m.group(1), m.group(1))} dollars", text
    )
    text = _POUND_RE.sub(
        lambda m: f"{_NUM_WORDS.get(m.group(1), m.group(1))} pounds", text
    )
    text = _EURO_RE.sub(
        lambda m: f"{_NUM_WORDS.get(m.group(1), m.group(1))} euros", text
    )

    # Expand standalone small integers (not part of larger numbers)
    text = _SMALL_INT_RE.sub(
        lambda m: _NUM_WORDS.get(m.group(1), m.group(1)),
        text,
    )

    # Add comma pause before coordinating conjunctions in long sentences
    text = _CONJ_RE.sub(r", \1 ", text)

    return text.strip()


@dataclasses.dataclass
class _RuntimeOpts:
    """Subset of vocence_config.yaml that the engine actually consumes."""

    language: str = "English"
    sample_rate: int = 24000
    max_instruction_chars: int = 600
    max_text_chars: int = 2000
    device_pref: str = "cuda"
    dtype_pref: str = "bfloat16"
    flash_attention_2: bool = False

    @classmethod
    def from_repo(cls, repo: Path) -> "_RuntimeOpts":
        cfg_path = repo / _RUNTIME_CONFIG_FILE
        if not cfg_path.is_file():
            return cls()
        from yaml import safe_load

        with cfg_path.open("r", encoding="utf-8") as fh:
            data = safe_load(fh) or {}
        runtime = data.get("runtime") or {}
        generation = data.get("generation") or {}
        limits = data.get("limits") or {}
        return cls(
            language=str(limits.get("default_language") or runtime.get("default_language") or "English"),
            sample_rate=int(generation.get("sample_rate", 24000)),
            max_instruction_chars=int(limits.get("max_instruction_chars", 600)),
            max_text_chars=int(limits.get("max_text_chars", 2000)),
            device_pref=str(runtime.get("device_preference", "cuda")).lower(),
            dtype_pref=str(runtime.get("dtype", "bfloat16")).lower(),
            flash_attention_2=bool(runtime.get("use_flash_attention_2", False)),
        )


class Miner:
    """Loads merged Qwen3-TTS weights from the snapshot and serves the Vocence API."""

    WARMUP_BUDGET_S = 180.0

    def __init__(self, path_hf_repo: Path) -> None:
        self.repo = Path(path_hf_repo).resolve()
        if not (self.repo / _REPO_REQUIRED_FILE).is_file():
            raise FileNotFoundError(
                f"Snapshot incomplete: {self.repo / _REPO_REQUIRED_FILE} not found"
            )
        self.opts = _RuntimeOpts.from_repo(self.repo)
        self.model = self._build_model()

    def __repr__(self) -> str:
        return f"<Miner repo={self.repo.name} language={self.opts.language!r}>"

    # ------------------------------------------------------------------ #
    # Vocence contract                                                    #
    # ------------------------------------------------------------------ #

    def warmup(self) -> None:
        outcome: dict[str, Any] = {"ok": False, "err": None}

        def _heat() -> None:
            try:
                self.generate_wav(instruction="Calm neutral delivery.", text="Warmup.")
                outcome["ok"] = True
            except Exception as exc:  # noqa: BLE001 — surface to host
                outcome["err"] = repr(exc)

        worker = threading.Thread(target=_heat, daemon=True)
        worker.start()
        worker.join(timeout=self.WARMUP_BUDGET_S)
        if not outcome["ok"]:
            raise RuntimeError(f"Miner warmup did not complete: {outcome['err'] or 'timeout'}")

    def generate_wav(self, instruction: str, text: str) -> tuple[np.ndarray, int]:
        # Cap raw inputs first so an oversized payload never reaches the
        # rewriter (which would just throw away the surplus anyway).
        prompt = self._truncate(instruction, self.opts.max_instruction_chars)
        body = self._truncate(text, self.opts.max_text_chars)

        # Tag-form instructions get a natural-language preamble prepended;
        # already-natural instructions pass through untouched.
        prompt = _enhance_instruction(prompt)
        # Spell out numbers/currency, expand a few abbreviations, and add
        # a beat before coordinating conjunctions in long sentences.
        body = _normalize_text_for_tts(body)

        # The preamble + abbreviation/number expansion can lengthen the
        # strings; re-clip to the same limits so we honour the contract
        # advertised in vocence_config.yaml's ``limits`` block.
        prompt = self._truncate(prompt, self.opts.max_instruction_chars)
        body = self._truncate(body, self.opts.max_text_chars)

        wavs, sample_rate = self.model.generate_voice_design(
            text=body,
            instruct=prompt,
            language=self.opts.language,
        )
        if not wavs or wavs[0] is None:
            raise ValueError("Qwen3-TTS returned no audio")

        wave = self._coerce_mono_float32(wavs[0])
        return wave, int(sample_rate)

    # ------------------------------------------------------------------ #
    # Internal                                                            #
    # ------------------------------------------------------------------ #

    @staticmethod
    def _truncate(value: str, limit: int) -> str:
        return value[:limit] if limit and limit > 0 else value

    @staticmethod
    def _coerce_mono_float32(arr: Any) -> np.ndarray:
        wave = np.asarray(arr, dtype=np.float32)
        if wave.ndim > 1:
            wave = wave.mean(axis=1)
        return wave

    def _build_model(self):
        import torch
        from qwen_tts import Qwen3TTSModel

        cuda_available = bool(torch.cuda.is_available())
        device_map = "cuda:0" if (self.opts.device_pref == "cuda" and cuda_available) else "cpu"
        torch_dtype = (
            torch.bfloat16
            if (self.opts.dtype_pref == "bfloat16" and cuda_available)
            else torch.float32
        )

        attempt_order = ("flash_attention_2", "sdpa") if self.opts.flash_attention_2 else ("sdpa",)
        last_error: BaseException | None = None
        for attn in attempt_order:
            try:
                model = Qwen3TTSModel.from_pretrained(
                    pretrained_model_name_or_path=str(self.repo),
                    device_map=device_map,
                    dtype=torch_dtype,
                    attn_implementation=attn,
                )
                print(
                    f"[Miner] Qwen3-TTS ready on {device_map} "
                    f"(dtype={self.opts.dtype_pref}, attn={attn})"
                )
                return model
            except Exception as exc:  # noqa: BLE001 — try next attn variant
                last_error = exc
        raise RuntimeError(f"Qwen3-TTS failed to load: {last_error!r}")