VoiceDirector / core /adapters.py
dsa2dsads's picture
demo: package VoiceDirector stage-1 integration app
c0c4a30 verified
from __future__ import annotations
import asyncio
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Protocol, Sequence
DEFAULT_FUNASR_MODEL_ID = "FunAudioLLM/SenseVoiceSmall"
DEFAULT_FUNASR_MODEL_REVISION = "3eb3b4eeffc2f2dde6051b853983753db33e35c3"
DEFAULT_FASTER_WHISPER_MODEL_SIZE = "small"
DEFAULT_EDGE_TTS_VOICE = "zh-CN-XiaoxiaoNeural"
AVAILABLE_EDGE_TTS_VOICES = (
"zh-CN-XiaoxiaoNeural",
"zh-CN-YunxiNeural",
"zh-CN-YunjianNeural",
)
@dataclass(frozen=True)
class AsrResult:
transcript: str
partial_transcript: str
latency_ms: int
backend: str = "mock-asr"
runtime_note: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class TtsPreview:
backend: str
status: str
preview_text: str
latency_ms: int
audio_path: str | None = None
voice: str | None = None
runtime_note: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
class AudioAsrAdapter(Protocol):
backend_name: str
def transcribe(self, audio_path: str | Path, *, hotwords: Sequence[str] | None = None) -> AsrResult:
...
class TtsAdapter(Protocol):
backend_name: str
def synthesize(
self,
text: str,
*,
voice: str | None = None,
audio_path: str | Path | None = None,
) -> TtsPreview:
...
class MockASRAdapter:
backend_name = "mock-asr"
def transcribe_text(self, raw_input: str) -> AsrResult:
cleaned = " ".join(raw_input.split())
midpoint = len(cleaned) if len(cleaned) <= 12 else max(len(cleaned) // 2, 1)
partial = cleaned[:midpoint]
return AsrResult(
transcript=cleaned,
partial_transcript=partial,
latency_ms=60,
backend=self.backend_name,
runtime_note="Manual text passthrough only. No real ASR backend was used.",
metadata={"input_mode": "manual_text"},
)
class MockTTSAdapter:
backend_name = "mock-tts"
def synthesize(
self,
text: str,
*,
voice: str | None = None,
audio_path: str | Path | None = None,
) -> TtsPreview:
preview = _preview_text(text)
return TtsPreview(
backend=self.backend_name,
status="mock_preview_ready",
preview_text=preview,
latency_ms=120,
voice=voice or "mock-preview",
runtime_note="Mock fallback only. No audio file was generated.",
metadata={"audio_generated": False},
)
class FunASRSenseVoiceAdapter:
backend_name = "funasr-sensevoice"
def __init__(
self,
model_id: str = DEFAULT_FUNASR_MODEL_ID,
model_revision: str = DEFAULT_FUNASR_MODEL_REVISION,
hub: str = "hf",
device_order: Sequence[str] = ("cuda:0", "cpu"),
language: str = "zh",
) -> None:
self.model_id = model_id
self.model_revision = model_revision
self.hub = hub
self.device_order = tuple(device_order)
self.language = language
self._models: dict[str, Any] = {}
self._active_device: str | None = None
self._runtime_note: str | None = None
def transcribe(self, audio_path: str | Path, *, hotwords: Sequence[str] | None = None) -> AsrResult:
del hotwords
last_error: Exception | None = None
runtime_notes: list[str] = []
for device in self._candidate_devices():
try:
model = self._get_model(device)
started = time.perf_counter()
result = model.generate(input=str(audio_path), language=self.language, use_itn=True)
latency_ms = int((time.perf_counter() - started) * 1000)
transcript, raw_text = _extract_funasr_text(result)
self._active_device = device
note = self._runtime_note or _merge_notes(runtime_notes)
return AsrResult(
transcript=transcript,
partial_transcript=transcript,
latency_ms=latency_ms,
backend=self.backend_name,
runtime_note=note,
metadata={
"device": device,
"model_id": self.model_id,
"model_revision": self.model_revision,
"partial_available": False,
"raw_text": raw_text,
},
)
except Exception as exc: # pragma: no cover - exercised in integration smoke only.
runtime_notes.append(f"{device} failed: {exc}")
last_error = exc
if device.startswith("cuda"):
self._runtime_note = f"Primary GPU path failed ({exc}); retrying on CPU."
continue
break
raise RuntimeError(f"FunASR transcription failed: {last_error}") from last_error
def _candidate_devices(self) -> tuple[str, ...]:
if self._active_device is None:
return self.device_order
ordered = [self._active_device]
ordered.extend(device for device in self.device_order if device != self._active_device)
return tuple(ordered)
def _get_model(self, device: str):
if device in self._models:
return self._models[device]
from funasr import AutoModel
model = AutoModel(
model=self.model_id,
hub=self.hub,
model_revision=self.model_revision,
device=device,
disable_update=True,
disable_pbar=True,
)
self._models[device] = model
return model
class FasterWhisperAdapter:
backend_name = "faster-whisper"
def __init__(
self,
model_size: str = DEFAULT_FASTER_WHISPER_MODEL_SIZE,
language: str = "zh",
device_order: Sequence[tuple[str, str]] = (("cuda", "float16"), ("cpu", "int8")),
beam_size: int = 1,
vad_filter: bool = True,
) -> None:
self.model_size = model_size
self.language = language
self.device_order = tuple(device_order)
self.beam_size = beam_size
self.vad_filter = vad_filter
self._models: dict[tuple[str, str], Any] = {}
self._active_candidate: tuple[str, str] | None = None
self._runtime_note: str | None = None
def transcribe(self, audio_path: str | Path, *, hotwords: Sequence[str] | None = None) -> AsrResult:
hotword_text = " ".join(hotwords or ()) or None
last_error: Exception | None = None
runtime_notes: list[str] = []
for candidate in self._candidate_order():
device, compute_type = candidate
try:
model = self._get_model(candidate)
started = time.perf_counter()
segments, info = model.transcribe(
str(audio_path),
language=self.language,
beam_size=self.beam_size,
vad_filter=self.vad_filter,
hotwords=hotword_text,
)
latency_ms = int((time.perf_counter() - started) * 1000)
transcript = "".join(segment.text for segment in segments).strip()
self._active_candidate = candidate
note = self._runtime_note or _merge_notes(runtime_notes)
return AsrResult(
transcript=transcript,
partial_transcript=transcript,
latency_ms=latency_ms,
backend=self.backend_name,
runtime_note=note,
metadata={
"device": device,
"compute_type": compute_type,
"model_size": self.model_size,
"partial_available": False,
"language": getattr(info, "language", None),
"language_probability": getattr(info, "language_probability", None),
"hotwords_used": bool(hotword_text),
},
)
except Exception as exc: # pragma: no cover - exercised in integration smoke only.
runtime_notes.append(f"{device}/{compute_type} failed: {exc}")
last_error = exc
if device == "cuda":
self._runtime_note = f"CUDA path unavailable ({exc}); using cpu/int8 fallback."
continue
break
raise RuntimeError(f"faster-whisper transcription failed: {last_error}") from last_error
def _candidate_order(self) -> tuple[tuple[str, str], ...]:
if self._active_candidate is None:
return self.device_order
ordered = [self._active_candidate]
ordered.extend(candidate for candidate in self.device_order if candidate != self._active_candidate)
return tuple(ordered)
def _get_model(self, candidate: tuple[str, str]):
if candidate in self._models:
return self._models[candidate]
from faster_whisper import WhisperModel
device, compute_type = candidate
model = WhisperModel(self.model_size, device=device, compute_type=compute_type)
self._models[candidate] = model
return model
class EdgeTTSAdapter:
backend_name = "edge-tts"
def __init__(self, default_voice: str = DEFAULT_EDGE_TTS_VOICE) -> None:
self.default_voice = default_voice
def synthesize(
self,
text: str,
*,
voice: str | None = None,
audio_path: str | Path | None = None,
) -> TtsPreview:
import edge_tts
selected_voice = voice or self.default_voice
output_path = _resolve_output_path(audio_path)
started = time.perf_counter()
asyncio.run(edge_tts.Communicate(text, voice=selected_voice).save(str(output_path)))
latency_ms = int((time.perf_counter() - started) * 1000)
return TtsPreview(
backend=self.backend_name,
status="audio_ready",
preview_text=_preview_text(text),
latency_ms=latency_ms,
audio_path=str(output_path),
voice=selected_voice,
metadata={"audio_generated": True},
)
def _resolve_output_path(audio_path: str | Path | None) -> Path:
if audio_path is not None:
resolved = Path(audio_path)
resolved.parent.mkdir(parents=True, exist_ok=True)
return resolved
with tempfile.NamedTemporaryFile(prefix="voicedirector-tts-", suffix=".mp3", delete=False) as handle:
return Path(handle.name)
def _extract_funasr_text(result: Any) -> tuple[str, str]:
raw_text = ""
if isinstance(result, list) and result:
candidate = result[0]
if isinstance(candidate, dict):
raw_text = str(candidate.get("text", ""))
elif isinstance(result, dict):
raw_text = str(result.get("text", ""))
else:
raw_text = str(result)
from funasr.utils.postprocess_utils import rich_transcription_postprocess
transcript = rich_transcription_postprocess(raw_text).strip()
return transcript, raw_text
def _merge_notes(notes: Sequence[str]) -> str | None:
merged = [note.strip() for note in notes if note and note.strip()]
if not merged:
return None
return " | ".join(merged)
def _preview_text(text: str) -> str:
preview = text if len(text) <= 64 else text[:61] + "..."
return preview.strip()