from __future__ import annotations import asyncio import tempfile import time from dataclasses import dataclass, field from pathlib import Path from typing import Any, Protocol, Sequence DEFAULT_FUNASR_MODEL_ID = "FunAudioLLM/SenseVoiceSmall" DEFAULT_FUNASR_MODEL_REVISION = "3eb3b4eeffc2f2dde6051b853983753db33e35c3" DEFAULT_FASTER_WHISPER_MODEL_SIZE = "small" DEFAULT_EDGE_TTS_VOICE = "zh-CN-XiaoxiaoNeural" AVAILABLE_EDGE_TTS_VOICES = ( "zh-CN-XiaoxiaoNeural", "zh-CN-YunxiNeural", "zh-CN-YunjianNeural", ) @dataclass(frozen=True) class AsrResult: transcript: str partial_transcript: str latency_ms: int backend: str = "mock-asr" runtime_note: str | None = None metadata: dict[str, Any] = field(default_factory=dict) @dataclass(frozen=True) class TtsPreview: backend: str status: str preview_text: str latency_ms: int audio_path: str | None = None voice: str | None = None runtime_note: str | None = None metadata: dict[str, Any] = field(default_factory=dict) class AudioAsrAdapter(Protocol): backend_name: str def transcribe(self, audio_path: str | Path, *, hotwords: Sequence[str] | None = None) -> AsrResult: ... class TtsAdapter(Protocol): backend_name: str def synthesize( self, text: str, *, voice: str | None = None, audio_path: str | Path | None = None, ) -> TtsPreview: ... class MockASRAdapter: backend_name = "mock-asr" def transcribe_text(self, raw_input: str) -> AsrResult: cleaned = " ".join(raw_input.split()) midpoint = len(cleaned) if len(cleaned) <= 12 else max(len(cleaned) // 2, 1) partial = cleaned[:midpoint] return AsrResult( transcript=cleaned, partial_transcript=partial, latency_ms=60, backend=self.backend_name, runtime_note="Manual text passthrough only. No real ASR backend was used.", metadata={"input_mode": "manual_text"}, ) class MockTTSAdapter: backend_name = "mock-tts" def synthesize( self, text: str, *, voice: str | None = None, audio_path: str | Path | None = None, ) -> TtsPreview: preview = _preview_text(text) return TtsPreview( backend=self.backend_name, status="mock_preview_ready", preview_text=preview, latency_ms=120, voice=voice or "mock-preview", runtime_note="Mock fallback only. No audio file was generated.", metadata={"audio_generated": False}, ) class FunASRSenseVoiceAdapter: backend_name = "funasr-sensevoice" def __init__( self, model_id: str = DEFAULT_FUNASR_MODEL_ID, model_revision: str = DEFAULT_FUNASR_MODEL_REVISION, hub: str = "hf", device_order: Sequence[str] = ("cuda:0", "cpu"), language: str = "zh", ) -> None: self.model_id = model_id self.model_revision = model_revision self.hub = hub self.device_order = tuple(device_order) self.language = language self._models: dict[str, Any] = {} self._active_device: str | None = None self._runtime_note: str | None = None def transcribe(self, audio_path: str | Path, *, hotwords: Sequence[str] | None = None) -> AsrResult: del hotwords last_error: Exception | None = None runtime_notes: list[str] = [] for device in self._candidate_devices(): try: model = self._get_model(device) started = time.perf_counter() result = model.generate(input=str(audio_path), language=self.language, use_itn=True) latency_ms = int((time.perf_counter() - started) * 1000) transcript, raw_text = _extract_funasr_text(result) self._active_device = device note = self._runtime_note or _merge_notes(runtime_notes) return AsrResult( transcript=transcript, partial_transcript=transcript, latency_ms=latency_ms, backend=self.backend_name, runtime_note=note, metadata={ "device": device, "model_id": self.model_id, "model_revision": self.model_revision, "partial_available": False, "raw_text": raw_text, }, ) except Exception as exc: # pragma: no cover - exercised in integration smoke only. runtime_notes.append(f"{device} failed: {exc}") last_error = exc if device.startswith("cuda"): self._runtime_note = f"Primary GPU path failed ({exc}); retrying on CPU." continue break raise RuntimeError(f"FunASR transcription failed: {last_error}") from last_error def _candidate_devices(self) -> tuple[str, ...]: if self._active_device is None: return self.device_order ordered = [self._active_device] ordered.extend(device for device in self.device_order if device != self._active_device) return tuple(ordered) def _get_model(self, device: str): if device in self._models: return self._models[device] from funasr import AutoModel model = AutoModel( model=self.model_id, hub=self.hub, model_revision=self.model_revision, device=device, disable_update=True, disable_pbar=True, ) self._models[device] = model return model class FasterWhisperAdapter: backend_name = "faster-whisper" def __init__( self, model_size: str = DEFAULT_FASTER_WHISPER_MODEL_SIZE, language: str = "zh", device_order: Sequence[tuple[str, str]] = (("cuda", "float16"), ("cpu", "int8")), beam_size: int = 1, vad_filter: bool = True, ) -> None: self.model_size = model_size self.language = language self.device_order = tuple(device_order) self.beam_size = beam_size self.vad_filter = vad_filter self._models: dict[tuple[str, str], Any] = {} self._active_candidate: tuple[str, str] | None = None self._runtime_note: str | None = None def transcribe(self, audio_path: str | Path, *, hotwords: Sequence[str] | None = None) -> AsrResult: hotword_text = " ".join(hotwords or ()) or None last_error: Exception | None = None runtime_notes: list[str] = [] for candidate in self._candidate_order(): device, compute_type = candidate try: model = self._get_model(candidate) started = time.perf_counter() segments, info = model.transcribe( str(audio_path), language=self.language, beam_size=self.beam_size, vad_filter=self.vad_filter, hotwords=hotword_text, ) latency_ms = int((time.perf_counter() - started) * 1000) transcript = "".join(segment.text for segment in segments).strip() self._active_candidate = candidate note = self._runtime_note or _merge_notes(runtime_notes) return AsrResult( transcript=transcript, partial_transcript=transcript, latency_ms=latency_ms, backend=self.backend_name, runtime_note=note, metadata={ "device": device, "compute_type": compute_type, "model_size": self.model_size, "partial_available": False, "language": getattr(info, "language", None), "language_probability": getattr(info, "language_probability", None), "hotwords_used": bool(hotword_text), }, ) except Exception as exc: # pragma: no cover - exercised in integration smoke only. runtime_notes.append(f"{device}/{compute_type} failed: {exc}") last_error = exc if device == "cuda": self._runtime_note = f"CUDA path unavailable ({exc}); using cpu/int8 fallback." continue break raise RuntimeError(f"faster-whisper transcription failed: {last_error}") from last_error def _candidate_order(self) -> tuple[tuple[str, str], ...]: if self._active_candidate is None: return self.device_order ordered = [self._active_candidate] ordered.extend(candidate for candidate in self.device_order if candidate != self._active_candidate) return tuple(ordered) def _get_model(self, candidate: tuple[str, str]): if candidate in self._models: return self._models[candidate] from faster_whisper import WhisperModel device, compute_type = candidate model = WhisperModel(self.model_size, device=device, compute_type=compute_type) self._models[candidate] = model return model class EdgeTTSAdapter: backend_name = "edge-tts" def __init__(self, default_voice: str = DEFAULT_EDGE_TTS_VOICE) -> None: self.default_voice = default_voice def synthesize( self, text: str, *, voice: str | None = None, audio_path: str | Path | None = None, ) -> TtsPreview: import edge_tts selected_voice = voice or self.default_voice output_path = _resolve_output_path(audio_path) started = time.perf_counter() asyncio.run(edge_tts.Communicate(text, voice=selected_voice).save(str(output_path))) latency_ms = int((time.perf_counter() - started) * 1000) return TtsPreview( backend=self.backend_name, status="audio_ready", preview_text=_preview_text(text), latency_ms=latency_ms, audio_path=str(output_path), voice=selected_voice, metadata={"audio_generated": True}, ) def _resolve_output_path(audio_path: str | Path | None) -> Path: if audio_path is not None: resolved = Path(audio_path) resolved.parent.mkdir(parents=True, exist_ok=True) return resolved with tempfile.NamedTemporaryFile(prefix="voicedirector-tts-", suffix=".mp3", delete=False) as handle: return Path(handle.name) def _extract_funasr_text(result: Any) -> tuple[str, str]: raw_text = "" if isinstance(result, list) and result: candidate = result[0] if isinstance(candidate, dict): raw_text = str(candidate.get("text", "")) elif isinstance(result, dict): raw_text = str(result.get("text", "")) else: raw_text = str(result) from funasr.utils.postprocess_utils import rich_transcription_postprocess transcript = rich_transcription_postprocess(raw_text).strip() return transcript, raw_text def _merge_notes(notes: Sequence[str]) -> str | None: merged = [note.strip() for note in notes if note and note.strip()] if not merged: return None return " | ".join(merged) def _preview_text(text: str) -> str: preview = text if len(text) <= 64 else text[:61] + "..." return preview.strip()