Spaces:
Paused
Paused
| from __future__ import annotations | |
| import asyncio | |
| import tempfile | |
| import time | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any, Protocol, Sequence | |
| DEFAULT_FUNASR_MODEL_ID = "FunAudioLLM/SenseVoiceSmall" | |
| DEFAULT_FUNASR_MODEL_REVISION = "3eb3b4eeffc2f2dde6051b853983753db33e35c3" | |
| DEFAULT_FASTER_WHISPER_MODEL_SIZE = "small" | |
| DEFAULT_EDGE_TTS_VOICE = "zh-CN-XiaoxiaoNeural" | |
| AVAILABLE_EDGE_TTS_VOICES = ( | |
| "zh-CN-XiaoxiaoNeural", | |
| "zh-CN-YunxiNeural", | |
| "zh-CN-YunjianNeural", | |
| ) | |
| class AsrResult: | |
| transcript: str | |
| partial_transcript: str | |
| latency_ms: int | |
| backend: str = "mock-asr" | |
| runtime_note: str | None = None | |
| metadata: dict[str, Any] = field(default_factory=dict) | |
| class TtsPreview: | |
| backend: str | |
| status: str | |
| preview_text: str | |
| latency_ms: int | |
| audio_path: str | None = None | |
| voice: str | None = None | |
| runtime_note: str | None = None | |
| metadata: dict[str, Any] = field(default_factory=dict) | |
| class AudioAsrAdapter(Protocol): | |
| backend_name: str | |
| def transcribe(self, audio_path: str | Path, *, hotwords: Sequence[str] | None = None) -> AsrResult: | |
| ... | |
| class TtsAdapter(Protocol): | |
| backend_name: str | |
| def synthesize( | |
| self, | |
| text: str, | |
| *, | |
| voice: str | None = None, | |
| audio_path: str | Path | None = None, | |
| ) -> TtsPreview: | |
| ... | |
| class MockASRAdapter: | |
| backend_name = "mock-asr" | |
| def transcribe_text(self, raw_input: str) -> AsrResult: | |
| cleaned = " ".join(raw_input.split()) | |
| midpoint = len(cleaned) if len(cleaned) <= 12 else max(len(cleaned) // 2, 1) | |
| partial = cleaned[:midpoint] | |
| return AsrResult( | |
| transcript=cleaned, | |
| partial_transcript=partial, | |
| latency_ms=60, | |
| backend=self.backend_name, | |
| runtime_note="Manual text passthrough only. No real ASR backend was used.", | |
| metadata={"input_mode": "manual_text"}, | |
| ) | |
| class MockTTSAdapter: | |
| backend_name = "mock-tts" | |
| def synthesize( | |
| self, | |
| text: str, | |
| *, | |
| voice: str | None = None, | |
| audio_path: str | Path | None = None, | |
| ) -> TtsPreview: | |
| preview = _preview_text(text) | |
| return TtsPreview( | |
| backend=self.backend_name, | |
| status="mock_preview_ready", | |
| preview_text=preview, | |
| latency_ms=120, | |
| voice=voice or "mock-preview", | |
| runtime_note="Mock fallback only. No audio file was generated.", | |
| metadata={"audio_generated": False}, | |
| ) | |
| class FunASRSenseVoiceAdapter: | |
| backend_name = "funasr-sensevoice" | |
| def __init__( | |
| self, | |
| model_id: str = DEFAULT_FUNASR_MODEL_ID, | |
| model_revision: str = DEFAULT_FUNASR_MODEL_REVISION, | |
| hub: str = "hf", | |
| device_order: Sequence[str] = ("cuda:0", "cpu"), | |
| language: str = "zh", | |
| ) -> None: | |
| self.model_id = model_id | |
| self.model_revision = model_revision | |
| self.hub = hub | |
| self.device_order = tuple(device_order) | |
| self.language = language | |
| self._models: dict[str, Any] = {} | |
| self._active_device: str | None = None | |
| self._runtime_note: str | None = None | |
| def transcribe(self, audio_path: str | Path, *, hotwords: Sequence[str] | None = None) -> AsrResult: | |
| del hotwords | |
| last_error: Exception | None = None | |
| runtime_notes: list[str] = [] | |
| for device in self._candidate_devices(): | |
| try: | |
| model = self._get_model(device) | |
| started = time.perf_counter() | |
| result = model.generate(input=str(audio_path), language=self.language, use_itn=True) | |
| latency_ms = int((time.perf_counter() - started) * 1000) | |
| transcript, raw_text = _extract_funasr_text(result) | |
| self._active_device = device | |
| note = self._runtime_note or _merge_notes(runtime_notes) | |
| return AsrResult( | |
| transcript=transcript, | |
| partial_transcript=transcript, | |
| latency_ms=latency_ms, | |
| backend=self.backend_name, | |
| runtime_note=note, | |
| metadata={ | |
| "device": device, | |
| "model_id": self.model_id, | |
| "model_revision": self.model_revision, | |
| "partial_available": False, | |
| "raw_text": raw_text, | |
| }, | |
| ) | |
| except Exception as exc: # pragma: no cover - exercised in integration smoke only. | |
| runtime_notes.append(f"{device} failed: {exc}") | |
| last_error = exc | |
| if device.startswith("cuda"): | |
| self._runtime_note = f"Primary GPU path failed ({exc}); retrying on CPU." | |
| continue | |
| break | |
| raise RuntimeError(f"FunASR transcription failed: {last_error}") from last_error | |
| def _candidate_devices(self) -> tuple[str, ...]: | |
| if self._active_device is None: | |
| return self.device_order | |
| ordered = [self._active_device] | |
| ordered.extend(device for device in self.device_order if device != self._active_device) | |
| return tuple(ordered) | |
| def _get_model(self, device: str): | |
| if device in self._models: | |
| return self._models[device] | |
| from funasr import AutoModel | |
| model = AutoModel( | |
| model=self.model_id, | |
| hub=self.hub, | |
| model_revision=self.model_revision, | |
| device=device, | |
| disable_update=True, | |
| disable_pbar=True, | |
| ) | |
| self._models[device] = model | |
| return model | |
| class FasterWhisperAdapter: | |
| backend_name = "faster-whisper" | |
| def __init__( | |
| self, | |
| model_size: str = DEFAULT_FASTER_WHISPER_MODEL_SIZE, | |
| language: str = "zh", | |
| device_order: Sequence[tuple[str, str]] = (("cuda", "float16"), ("cpu", "int8")), | |
| beam_size: int = 1, | |
| vad_filter: bool = True, | |
| ) -> None: | |
| self.model_size = model_size | |
| self.language = language | |
| self.device_order = tuple(device_order) | |
| self.beam_size = beam_size | |
| self.vad_filter = vad_filter | |
| self._models: dict[tuple[str, str], Any] = {} | |
| self._active_candidate: tuple[str, str] | None = None | |
| self._runtime_note: str | None = None | |
| def transcribe(self, audio_path: str | Path, *, hotwords: Sequence[str] | None = None) -> AsrResult: | |
| hotword_text = " ".join(hotwords or ()) or None | |
| last_error: Exception | None = None | |
| runtime_notes: list[str] = [] | |
| for candidate in self._candidate_order(): | |
| device, compute_type = candidate | |
| try: | |
| model = self._get_model(candidate) | |
| started = time.perf_counter() | |
| segments, info = model.transcribe( | |
| str(audio_path), | |
| language=self.language, | |
| beam_size=self.beam_size, | |
| vad_filter=self.vad_filter, | |
| hotwords=hotword_text, | |
| ) | |
| latency_ms = int((time.perf_counter() - started) * 1000) | |
| transcript = "".join(segment.text for segment in segments).strip() | |
| self._active_candidate = candidate | |
| note = self._runtime_note or _merge_notes(runtime_notes) | |
| return AsrResult( | |
| transcript=transcript, | |
| partial_transcript=transcript, | |
| latency_ms=latency_ms, | |
| backend=self.backend_name, | |
| runtime_note=note, | |
| metadata={ | |
| "device": device, | |
| "compute_type": compute_type, | |
| "model_size": self.model_size, | |
| "partial_available": False, | |
| "language": getattr(info, "language", None), | |
| "language_probability": getattr(info, "language_probability", None), | |
| "hotwords_used": bool(hotword_text), | |
| }, | |
| ) | |
| except Exception as exc: # pragma: no cover - exercised in integration smoke only. | |
| runtime_notes.append(f"{device}/{compute_type} failed: {exc}") | |
| last_error = exc | |
| if device == "cuda": | |
| self._runtime_note = f"CUDA path unavailable ({exc}); using cpu/int8 fallback." | |
| continue | |
| break | |
| raise RuntimeError(f"faster-whisper transcription failed: {last_error}") from last_error | |
| def _candidate_order(self) -> tuple[tuple[str, str], ...]: | |
| if self._active_candidate is None: | |
| return self.device_order | |
| ordered = [self._active_candidate] | |
| ordered.extend(candidate for candidate in self.device_order if candidate != self._active_candidate) | |
| return tuple(ordered) | |
| def _get_model(self, candidate: tuple[str, str]): | |
| if candidate in self._models: | |
| return self._models[candidate] | |
| from faster_whisper import WhisperModel | |
| device, compute_type = candidate | |
| model = WhisperModel(self.model_size, device=device, compute_type=compute_type) | |
| self._models[candidate] = model | |
| return model | |
| class EdgeTTSAdapter: | |
| backend_name = "edge-tts" | |
| def __init__(self, default_voice: str = DEFAULT_EDGE_TTS_VOICE) -> None: | |
| self.default_voice = default_voice | |
| def synthesize( | |
| self, | |
| text: str, | |
| *, | |
| voice: str | None = None, | |
| audio_path: str | Path | None = None, | |
| ) -> TtsPreview: | |
| import edge_tts | |
| selected_voice = voice or self.default_voice | |
| output_path = _resolve_output_path(audio_path) | |
| started = time.perf_counter() | |
| asyncio.run(edge_tts.Communicate(text, voice=selected_voice).save(str(output_path))) | |
| latency_ms = int((time.perf_counter() - started) * 1000) | |
| return TtsPreview( | |
| backend=self.backend_name, | |
| status="audio_ready", | |
| preview_text=_preview_text(text), | |
| latency_ms=latency_ms, | |
| audio_path=str(output_path), | |
| voice=selected_voice, | |
| metadata={"audio_generated": True}, | |
| ) | |
| def _resolve_output_path(audio_path: str | Path | None) -> Path: | |
| if audio_path is not None: | |
| resolved = Path(audio_path) | |
| resolved.parent.mkdir(parents=True, exist_ok=True) | |
| return resolved | |
| with tempfile.NamedTemporaryFile(prefix="voicedirector-tts-", suffix=".mp3", delete=False) as handle: | |
| return Path(handle.name) | |
| def _extract_funasr_text(result: Any) -> tuple[str, str]: | |
| raw_text = "" | |
| if isinstance(result, list) and result: | |
| candidate = result[0] | |
| if isinstance(candidate, dict): | |
| raw_text = str(candidate.get("text", "")) | |
| elif isinstance(result, dict): | |
| raw_text = str(result.get("text", "")) | |
| else: | |
| raw_text = str(result) | |
| from funasr.utils.postprocess_utils import rich_transcription_postprocess | |
| transcript = rich_transcription_postprocess(raw_text).strip() | |
| return transcript, raw_text | |
| def _merge_notes(notes: Sequence[str]) -> str | None: | |
| merged = [note.strip() for note in notes if note and note.strip()] | |
| if not merged: | |
| return None | |
| return " | ".join(merged) | |
| def _preview_text(text: str) -> str: | |
| preview = text if len(text) <= 64 else text[:61] + "..." | |
| return preview.strip() |