Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import asyncio | |
| import gc | |
| import json | |
| import importlib | |
| import os | |
| import re | |
| import shutil | |
| import subprocess | |
| import sys | |
| import tempfile | |
| from collections.abc import AsyncIterator | |
| from dataclasses import dataclass | |
| from functools import cached_property | |
| from pathlib import Path | |
| import httpx | |
| import numpy as np | |
| from app.audio import wav_bytes_from_float32 | |
| from app.config import settings | |
| PARALINGUISTIC_TAG_PATTERN = re.compile(r"\[(laugh|chuckle|sigh|cough)\]", flags=re.I) | |
| ANSI_ESCAPE_PATTERN = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") | |
| THINK_BLOCK_PATTERN = re.compile(r"<think>.*?</think>", flags=re.I | re.S) | |
| THINK_TAG_PATTERN = re.compile(r"</?think>", flags=re.I) | |
| ROLE_CONTINUATION_PATTERN = re.compile(r"(?i)\b(?:human|user|assistant|system)\s*:\s*") | |
| META_TAIL_PATTERN = re.compile( | |
| r"(?is)\b(?:thought|analysis|reasoning|explanation|note)\s*:\s*.*$|" | |
| r"\bthis response\b.*$|" | |
| r"\banswering in a conversational tone\b.*$|" | |
| r"\byou can add more\b.*$|" | |
| r"\bwithout any unnecessary elaboration\b.*$" | |
| ) | |
| CLI_PROMPT_MARKER = "›" | |
| INTERNAL_REPLY_PATTERNS = [ | |
| re.compile(r"💭\s*Injected relevant context from memory\.?", flags=re.I), | |
| re.compile(r"Injected relevant context from memory\.?", flags=re.I), | |
| re.compile(r"Relevant context from memory\.?", flags=re.I), | |
| re.compile(r"Context from memory\.?", flags=re.I), | |
| ] | |
| CHATTERBOX_ONNX_SAMPLE_RATE = 24000 | |
| CHATTERBOX_ONNX_START_SPEECH_TOKEN = 6561 | |
| CHATTERBOX_ONNX_STOP_SPEECH_TOKEN = 6562 | |
| CHATTERBOX_ONNX_SILENCE_TOKEN = 4299 | |
| CHATTERBOX_ONNX_NUM_KV_HEADS = 16 | |
| CHATTERBOX_ONNX_HEAD_DIM = 64 | |
| KOKORO_SAMPLE_RATE = 24000 | |
| class TranscriptionResult: | |
| text: str | |
| language: str | None = None | |
| language_probability: float | None = None | |
| backend: str = "" | |
| class RepetitionPenaltyLogitsProcessor: | |
| def __init__(self, penalty: float): | |
| if penalty <= 0: | |
| raise ValueError("penalty must be > 0") | |
| self.penalty = float(penalty) | |
| def __call__(self, input_ids: np.ndarray, scores: np.ndarray) -> np.ndarray: | |
| score = np.take_along_axis(scores, input_ids, axis=1) | |
| score = np.where(score < 0, score * self.penalty, score / self.penalty) | |
| scores_processed = scores.copy() | |
| np.put_along_axis(scores_processed, input_ids, score, axis=1) | |
| return scores_processed | |
| class ChatterboxOnnxTTS: | |
| def __init__(self) -> None: | |
| self._model_id = settings.chatterbox_onnx_model_id | |
| self._dtype = settings.chatterbox_onnx_dtype | |
| self._provider = settings.chatterbox_onnx_provider | |
| self._ort = self._load_onnxruntime() | |
| self._AutoTokenizer = importlib.import_module("transformers").AutoTokenizer | |
| self._hf_hub_download = importlib.import_module("huggingface_hub").hf_hub_download | |
| self._librosa = importlib.import_module("librosa") | |
| self._soundfile = importlib.import_module("soundfile") | |
| self._tokenizer = self._AutoTokenizer.from_pretrained(self._model_id) | |
| self._repetition_penalty = RepetitionPenaltyLogitsProcessor(settings.chatterbox_onnx_repetition_penalty) | |
| self._sessions = self._load_sessions() | |
| def _load_onnxruntime(self): | |
| extra_path = settings.chatterbox_onnx_site_packages_path.strip() | |
| # Import torch first so the working project CUDA stack is initialized before ONNX Runtime. | |
| try: | |
| import torch # noqa: F401 | |
| except Exception: | |
| pass | |
| try: | |
| ort = importlib.import_module("onnxruntime") | |
| except Exception: | |
| if not extra_path: | |
| raise | |
| if extra_path not in sys.path: | |
| sys.path.insert(0, extra_path) | |
| ort = importlib.import_module("onnxruntime") | |
| if self._provider == "cuda": | |
| providers = ort.get_available_providers() | |
| if "CUDAExecutionProvider" not in providers: | |
| raise RuntimeError( | |
| "onnxruntime does not expose CUDAExecutionProvider in the project environment; " | |
| f"available providers: {providers}" | |
| ) | |
| ort.preload_dlls() | |
| return ort | |
| def _filename_for(self, name: str) -> str: | |
| if self._dtype == "fp32": | |
| return f"{name}.onnx" | |
| if self._dtype == "q8": | |
| return f"{name}_quantized.onnx" | |
| return f"{name}_{self._dtype}.onnx" | |
| def _download_graph(self, name: str) -> str: | |
| filename = self._filename_for(name) | |
| graph = self._hf_hub_download(self._model_id, subfolder="onnx", filename=filename) | |
| self._hf_hub_download(self._model_id, subfolder="onnx", filename=f"{filename}_data") | |
| return graph | |
| def _make_session(self, path: str): | |
| providers = ["CUDAExecutionProvider"] if self._provider == "cuda" else ["CPUExecutionProvider"] | |
| return self._ort.InferenceSession(path, providers=providers) | |
| def _load_sessions(self) -> dict[str, object]: | |
| return { | |
| "speech_encoder": self._make_session(self._download_graph("speech_encoder")), | |
| "embed_tokens": self._make_session(self._download_graph("embed_tokens")), | |
| "language_model": self._make_session(self._download_graph("language_model")), | |
| "conditional_decoder": self._make_session(self._download_graph("conditional_decoder")), | |
| } | |
| def _resolve_voice_path(self, audio_prompt_path: str | None) -> str: | |
| candidate = audio_prompt_path or settings.chatterbox_onnx_voice_path | |
| if not candidate: | |
| raise ValueError("No voice reference path configured for Chatterbox ONNX") | |
| if not Path(candidate).is_file(): | |
| raise FileNotFoundError(f"Voice reference not found: {candidate}") | |
| return candidate | |
| def generate(self, text: str, audio_prompt_path: str | None = None) -> np.ndarray: | |
| voice_path = self._resolve_voice_path(audio_prompt_path) | |
| audio_values, _ = self._librosa.load(voice_path, sr=CHATTERBOX_ONNX_SAMPLE_RATE) | |
| audio_values = audio_values[np.newaxis, :].astype(np.float32) | |
| input_ids = self._tokenizer(text, return_tensors="np")["input_ids"].astype(np.int64) | |
| generate_tokens = np.array([[CHATTERBOX_ONNX_START_SPEECH_TOKEN]], dtype=np.int64) | |
| speech_encoder_session = self._sessions["speech_encoder"] | |
| embed_tokens_session = self._sessions["embed_tokens"] | |
| language_model_session = self._sessions["language_model"] | |
| cond_decoder_session = self._sessions["conditional_decoder"] | |
| for i in range(settings.chatterbox_onnx_max_new_tokens): | |
| inputs_embeds = embed_tokens_session.run(None, {"input_ids": input_ids})[0] | |
| if i == 0: | |
| cond_emb, prompt_token, speaker_embeddings, speaker_features = speech_encoder_session.run( | |
| None, {"audio_values": audio_values} | |
| ) | |
| inputs_embeds = np.concatenate((cond_emb, inputs_embeds), axis=1) | |
| batch_size, seq_len, _ = inputs_embeds.shape | |
| past_key_values = { | |
| item.name: np.zeros( | |
| [batch_size, CHATTERBOX_ONNX_NUM_KV_HEADS, 0, CHATTERBOX_ONNX_HEAD_DIM], | |
| dtype=np.float16 if item.type == "tensor(float16)" else np.float32, | |
| ) | |
| for item in language_model_session.get_inputs() | |
| if "past_key_values" in item.name | |
| } | |
| attention_mask = np.ones((batch_size, seq_len), dtype=np.int64) | |
| position_ids = np.arange(seq_len, dtype=np.int64).reshape(1, -1).repeat(batch_size, axis=0) | |
| logits, *present_key_values = language_model_session.run( | |
| None, | |
| { | |
| "inputs_embeds": inputs_embeds, | |
| "attention_mask": attention_mask, | |
| "position_ids": position_ids, | |
| **past_key_values, | |
| }, | |
| ) | |
| logits = logits[:, -1, :] | |
| next_token_logits = self._repetition_penalty(generate_tokens, logits) | |
| input_ids = np.argmax(next_token_logits, axis=-1, keepdims=True).astype(np.int64) | |
| generate_tokens = np.concatenate((generate_tokens, input_ids), axis=-1) | |
| if (input_ids.flatten() == CHATTERBOX_ONNX_STOP_SPEECH_TOKEN).all(): | |
| break | |
| attention_mask = np.concatenate([attention_mask, np.ones((batch_size, 1), dtype=np.int64)], axis=1) | |
| position_ids = position_ids[:, -1:] + 1 | |
| for j, key in enumerate(past_key_values): | |
| past_key_values[key] = present_key_values[j] | |
| speech_tokens = generate_tokens[:, 1:-1] | |
| silence_tokens = np.full((speech_tokens.shape[0], 3), CHATTERBOX_ONNX_SILENCE_TOKEN, dtype=np.int64) | |
| speech_tokens = np.concatenate([prompt_token, speech_tokens, silence_tokens], axis=1) | |
| wav = cond_decoder_session.run( | |
| None, | |
| { | |
| "speech_tokens": speech_tokens, | |
| "speaker_embeddings": speaker_embeddings, | |
| "speaker_features": speaker_features, | |
| }, | |
| )[0].squeeze(axis=0) | |
| return np.asarray(wav, dtype=np.float32) | |
| class KokoroTTS: | |
| def __init__(self) -> None: | |
| kokoro_module = importlib.import_module("kokoro") | |
| self._pipeline = kokoro_module.KPipeline( | |
| lang_code=settings.kokoro_lang_code, | |
| repo_id=settings.kokoro_repo_id, | |
| device=self._resolve_device(), | |
| ) | |
| self._voice = settings.kokoro_voice | |
| self._speed = settings.kokoro_speed | |
| self._provider = self._resolve_device() | |
| self._model_id = settings.kokoro_repo_id | |
| self._dtype = "fp32" | |
| def _resolve_device(self) -> str: | |
| device, fallback_reason = SpeechPipeline._resolve_torch_device_static( | |
| settings.kokoro_device, | |
| component="Kokoro", | |
| ) | |
| if fallback_reason: | |
| print(fallback_reason, file=sys.stderr) | |
| if device != settings.kokoro_device: | |
| print( | |
| f"Kokoro fallback: using device={device} instead of {settings.kokoro_device}", | |
| file=sys.stderr, | |
| ) | |
| return device | |
| def generate(self, text: str, audio_prompt_path: str | None = None) -> np.ndarray: | |
| del audio_prompt_path | |
| chunks: list[np.ndarray] = [] | |
| for result in self._pipeline(text, voice=self._voice, speed=self._speed): | |
| audio = result.audio | |
| if audio is None: | |
| continue | |
| if hasattr(audio, "detach"): | |
| audio = audio.detach().cpu().numpy() | |
| chunks.append(np.asarray(audio, dtype=np.float32).flatten()) | |
| if not chunks: | |
| return np.zeros(0, dtype=np.float32) | |
| return np.concatenate(chunks).astype(np.float32, copy=False) | |
| class PersistentAgentCliSession: | |
| def __init__(self, model: str | None) -> None: | |
| self._model = model | |
| self._request_lock = asyncio.Lock() | |
| self._last_session_id: str | None = None | |
| async def stream_events(self, transcript: str, session_id: str | None = None) -> AsyncIterator[dict]: | |
| cleaned = transcript.strip() | |
| if not cleaned: | |
| return | |
| prompt_text = self._build_voice_prompt(cleaned) | |
| async with self._request_lock: | |
| command = [self._resolve_my_agent_command()] | |
| if settings.my_agent_force: | |
| command.append("--force") | |
| if settings.my_agent_cwd: | |
| command.extend(["--cwd", settings.my_agent_cwd]) | |
| if self._model: | |
| command.extend(["--model", self._model]) | |
| command.extend(["--stream-json", "--prompt", prompt_text]) | |
| active_session_id = session_id or self._last_session_id | |
| if active_session_id: | |
| command.extend(["--session", active_session_id]) | |
| process = await asyncio.create_subprocess_exec( | |
| *command, | |
| cwd=settings.my_agent_cwd or None, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| ) | |
| queue: asyncio.Queue[tuple[str, str | None]] = asyncio.Queue() | |
| stdout_task = asyncio.create_task(self._read_stream(process.stdout, "stdout", queue)) | |
| stderr_task = asyncio.create_task(self._read_stream(process.stderr, "stderr", queue)) | |
| saw_error = False | |
| try: | |
| open_streams = 2 | |
| while open_streams > 0: | |
| source, line = await queue.get() | |
| if line is None: | |
| open_streams -= 1 | |
| continue | |
| event = self._parse_stream_line(source, line, prompt_text) | |
| if event is None: | |
| continue | |
| if event.get("kind") == "session_created": | |
| self._last_session_id = ( | |
| event.get("newSessionId") | |
| or event.get("session_id") | |
| or self._last_session_id | |
| ) | |
| if event.get("kind") == "error": | |
| saw_error = True | |
| yield event | |
| return_code = await process.wait() | |
| if return_code != 0 and not saw_error: | |
| yield {"kind": "error", "content": f"my-agent prompt run exited with code {return_code}"} | |
| finally: | |
| stdout_task.cancel() | |
| stderr_task.cancel() | |
| if process.returncode is None: | |
| process.terminate() | |
| await process.wait() | |
| def _resolve_my_agent_command(self) -> str: | |
| configured = (settings.my_agent_command or "").strip() | |
| candidates: list[str] = [] | |
| if configured: | |
| candidates.append(configured) | |
| cargo_install = "/home/rapheal/.cargo/bin/my-agent" | |
| if cargo_install not in candidates: | |
| candidates.append(cargo_install) | |
| which_path = shutil.which("my-agent") | |
| if which_path and which_path not in candidates: | |
| candidates.append(which_path) | |
| for candidate in candidates: | |
| expanded = Path(candidate).expanduser() | |
| if expanded.is_file() and os.access(expanded, os.X_OK): | |
| return str(expanded) | |
| return configured or cargo_install | |
| def reset_session(self) -> None: | |
| self._last_session_id = None | |
| async def _read_stream( | |
| self, | |
| stream: asyncio.StreamReader | None, | |
| source: str, | |
| queue: asyncio.Queue[tuple[str, str | None]], | |
| ) -> None: | |
| if stream is None: | |
| await queue.put((source, None)) | |
| return | |
| try: | |
| while True: | |
| raw_line = await stream.readline() | |
| if not raw_line: | |
| break | |
| await queue.put((source, raw_line.decode("utf-8", errors="replace").rstrip())) | |
| finally: | |
| await queue.put((source, None)) | |
| def _parse_stream_line(self, source: str, line: str, transcript: str) -> dict | None: | |
| clean = self._strip_cli_formatting(line) | |
| if not clean: | |
| return None | |
| if source == "stderr": | |
| return self._normalize_stderr_line(clean) | |
| return self._normalize_json_line(clean, transcript) | |
| def _normalize_json_line(self, line: str, transcript: str) -> dict | None: | |
| try: | |
| payload = json.loads(line) | |
| except json.JSONDecodeError: | |
| return self._normalize_text_line(line, transcript) | |
| if not isinstance(payload, dict): | |
| return None | |
| kind = str(payload.get("kind") or "").strip() | |
| role = str(payload.get("role") or "").strip().lower() | |
| if kind == "thinking": | |
| return {"kind": "status", "text": payload.get("content") or "thinking"} | |
| if kind in {"session_created", "status", "tool_use", "tool_result", "error"}: | |
| return payload | |
| if kind == "text": | |
| payload.setdefault("role", "assistant") | |
| return payload | |
| if role in {"assistant", "model"} and isinstance(payload.get("content"), str): | |
| return {"kind": "text", "role": "assistant", "content": payload["content"]} | |
| if kind in {"assistant_message", "message"} and isinstance(payload.get("content"), str): | |
| return {"kind": "text", "role": "assistant", "content": payload["content"]} | |
| return None | |
| def _normalize_text_line(self, line: str, transcript: str) -> dict | None: | |
| if not line or line == transcript: | |
| return None | |
| lower = line.lower() | |
| if lower in { | |
| "working on it", | |
| "working on it.", | |
| "thinking", | |
| "building a plan", | |
| "starting multi-step work", | |
| "complex task detected; switching to orchestrate mode", | |
| "simple task detected; using tools", | |
| }: | |
| return {"kind": "status", "text": line} | |
| if "error:" in lower or line.startswith("✗"): | |
| return {"kind": "error", "content": line} | |
| return {"kind": "text", "role": "assistant", "content": line} | |
| def _normalize_stderr_line(self, line: str) -> dict | None: | |
| lower = line.lower() | |
| if ( | |
| lower.startswith("warning:") | |
| or lower.startswith("warn ") | |
| or lower.startswith("vision:") | |
| or lower.startswith("info:") | |
| ): | |
| return {"kind": "status", "text": line} | |
| return {"kind": "error", "content": line} | |
| def _build_voice_prompt(self, transcript: str) -> str: | |
| preamble = settings.my_agent_voice_preamble.strip() | |
| if not preamble: | |
| return transcript | |
| return f"{preamble}\n\n{transcript}" | |
| def _strip_cli_formatting(self, text: str) -> str: | |
| stripped = ANSI_ESCAPE_PATTERN.sub("", text) | |
| stripped = stripped.replace("\x07", "").replace("\x08", "") | |
| stripped = re.sub(r"\s+", " ", stripped) | |
| return stripped.strip() | |
| class SpeechPipeline: | |
| def __init__(self) -> None: | |
| self._whisper_error: str | None = None | |
| self._tts_error: str | None = None | |
| self._agent_cli = PersistentAgentCliSession(self.my_agent_chat_model) | |
| self._model_lock = asyncio.Lock() | |
| self._prefer_low_vram_gpu_swap = settings.whisper_device == "cuda" and self._tts_uses_cuda() | |
| self._keep_tts_gpu_resident = self._prefer_low_vram_gpu_swap and settings.tts_gpu_resident_preferred | |
| self._tts_runtime_logged = False | |
| def preload_models(self) -> None: | |
| if settings.stt_backend == "parakeet-tdt-v3" and not self._prefer_whisper_transcription(): | |
| _ = self.parakeet | |
| else: | |
| _ = self.whisper | |
| _ = self.tts | |
| if settings.assistant_backend == "hf-local": | |
| _ = self.hf_local_generator | |
| tts = self.__dict__.get("tts") | |
| if tts is not None: | |
| try: | |
| self._generate_tts(tts, "Okay.", None) | |
| except Exception: | |
| pass | |
| async def preload_assistant(self) -> None: | |
| if settings.assistant_backend != "llama-server": | |
| return | |
| try: | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| await client.get(f"{settings.llama_base_url.rstrip('/')[:-3]}/models") | |
| payload = { | |
| "model": settings.llama_model, | |
| "messages": self._build_chat_messages( | |
| settings.llama_system_prompt, | |
| settings.llama_model, | |
| "Say hello in two words.", | |
| ), | |
| "max_tokens": 8, | |
| "temperature": settings.llama_temperature, | |
| "top_p": settings.llama_top_p, | |
| "top_k": settings.llama_top_k, | |
| "repeat_penalty": settings.llama_repetition_penalty, | |
| "stop": [token.strip() for token in settings.llama_stop_tokens.split(",") if token.strip()], | |
| "stream": False, | |
| } | |
| await client.post( | |
| f"{settings.llama_base_url.rstrip('/')}/chat/completions", | |
| headers={"Content-Type": "application/json"}, | |
| json=payload, | |
| ) | |
| except Exception: | |
| pass | |
| def _clear_cached_model(self, *names: str) -> None: | |
| cleared = False | |
| for name in names: | |
| if name in self.__dict__: | |
| del self.__dict__[name] | |
| cleared = True | |
| if cleared: | |
| gc.collect() | |
| try: | |
| import torch | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| except Exception: | |
| pass | |
| def _drop_whisper(self) -> None: | |
| self._clear_cached_model("whisper") | |
| def _drop_tts(self) -> None: | |
| self._clear_cached_model("tts", "backchannel_clips", "tts_prefill_clips") | |
| def _ensure_whisper_ready(self): | |
| if self._prefer_low_vram_gpu_swap and not self._keep_tts_gpu_resident: | |
| self._drop_tts() | |
| return self.whisper | |
| def _ensure_tts_ready(self): | |
| if self._prefer_low_vram_gpu_swap: | |
| self._drop_whisper() | |
| return self.tts | |
| def _tts_uses_cuda(self) -> bool: | |
| if settings.tts_backend == "chatterbox-onnx": | |
| return settings.chatterbox_onnx_provider == "cuda" | |
| if settings.tts_backend == "kokoro": | |
| return settings.kokoro_device == "cuda" | |
| return settings.chatterbox_device == "cuda" | |
| def _resolve_torch_device_static(requested_device: str, component: str) -> tuple[str, str | None]: | |
| if requested_device != "cuda": | |
| return requested_device, None | |
| try: | |
| import torch | |
| except Exception as exc: # pragma: no cover | |
| return "cpu", f"{component} fallback: torch CUDA probe failed ({exc}); using cpu" | |
| if torch.cuda.is_available(): | |
| return requested_device, None | |
| return "cpu", f"{component} fallback: CUDA requested but unavailable; using cpu" | |
| def whisper(self): | |
| from faster_whisper import WhisperModel | |
| device, fallback_reason = self._resolve_torch_device(settings.whisper_device, component="Whisper") | |
| if fallback_reason: | |
| print(fallback_reason, file=sys.stderr) | |
| candidates: list[str] = [settings.whisper_compute_type] | |
| if device == "cuda": | |
| for fallback in ("float16", "int8", "float32"): | |
| if fallback not in candidates: | |
| candidates.append(fallback) | |
| else: | |
| for fallback in ("int8", "float32"): | |
| if fallback not in candidates: | |
| candidates.append(fallback) | |
| last_error: str | None = None | |
| for compute_type in candidates: | |
| try: | |
| model = WhisperModel( | |
| model_size_or_path=settings.whisper_model, | |
| device=device, | |
| compute_type=compute_type, | |
| ) | |
| if device != settings.whisper_device: | |
| print( | |
| f"Whisper fallback: using device={device} instead of {settings.whisper_device}", | |
| file=sys.stderr, | |
| ) | |
| if compute_type != settings.whisper_compute_type: | |
| print( | |
| f"Whisper fallback: using compute_type={compute_type} instead of {settings.whisper_compute_type}", | |
| file=sys.stderr, | |
| ) | |
| self._whisper_error = None | |
| return model | |
| except Exception as exc: # pragma: no cover | |
| last_error = str(exc) | |
| self._whisper_error = last_error | |
| return None | |
| def parakeet(self): | |
| print( | |
| "STT load backend=parakeet-tdt-v3 " | |
| f"device={settings.parakeet_device} " | |
| f"model={settings.parakeet_model_id}", | |
| file=sys.stderr, | |
| ) | |
| try: | |
| nemo_asr = importlib.import_module("nemo.collections.asr") | |
| model = nemo_asr.models.ASRModel.from_pretrained(model_name=settings.parakeet_model_id) | |
| device, fallback_reason = self._resolve_torch_device(settings.parakeet_device, component="Parakeet") | |
| if fallback_reason: | |
| print(fallback_reason, file=sys.stderr) | |
| try: | |
| model = model.to(device) | |
| except Exception as exc: | |
| if device != "cpu": | |
| print( | |
| f"Parakeet fallback: failed to move to {device} ({exc}); using cpu", | |
| file=sys.stderr, | |
| ) | |
| model = model.to("cpu") | |
| device = "cpu" | |
| else: | |
| raise | |
| if device != settings.parakeet_device: | |
| print( | |
| f"Parakeet fallback: using device={device} instead of {settings.parakeet_device}", | |
| file=sys.stderr, | |
| ) | |
| self._whisper_error = None | |
| return model | |
| except Exception as exc: # pragma: no cover | |
| self._whisper_error = str(exc) | |
| print(f"Parakeet load failure: {exc}", file=sys.stderr) | |
| return None | |
| def tts(self): | |
| try: | |
| if settings.tts_backend == "kokoro": | |
| print( | |
| "TTS load backend=kokoro " | |
| f"device={settings.kokoro_device} " | |
| f"voice={settings.kokoro_voice}", | |
| file=sys.stderr, | |
| ) | |
| return KokoroTTS() | |
| if settings.tts_backend == "chatterbox-onnx": | |
| tts = ChatterboxOnnxTTS() | |
| print( | |
| "TTS load backend=chatterbox-onnx " | |
| f"provider={settings.chatterbox_onnx_provider} " | |
| f"dtype={settings.chatterbox_onnx_dtype}", | |
| file=sys.stderr, | |
| ) | |
| return tts | |
| from chatterbox.tts_turbo import ChatterboxTurboTTS | |
| device, fallback_reason = self._resolve_torch_device(settings.chatterbox_device, component="Chatterbox") | |
| if fallback_reason: | |
| print(fallback_reason, file=sys.stderr) | |
| if device != settings.chatterbox_device: | |
| print( | |
| f"Chatterbox fallback: using device={device} instead of {settings.chatterbox_device}", | |
| file=sys.stderr, | |
| ) | |
| print( | |
| f"TTS load backend=chatterbox-pytorch device={device}", | |
| file=sys.stderr, | |
| ) | |
| return ChatterboxTurboTTS.from_pretrained(device=device) | |
| except Exception as exc: # pragma: no cover | |
| self._tts_error = str(exc) | |
| if settings.tts_backend == "chatterbox-onnx": | |
| try: | |
| from chatterbox.tts_turbo import ChatterboxTurboTTS | |
| device, fallback_reason = self._resolve_torch_device(settings.chatterbox_device, component="Chatterbox") | |
| if fallback_reason: | |
| print(fallback_reason, file=sys.stderr) | |
| if device != settings.chatterbox_device: | |
| print( | |
| f"Chatterbox fallback: using device={device} instead of {settings.chatterbox_device}", | |
| file=sys.stderr, | |
| ) | |
| print( | |
| f"TTS load backend=chatterbox-pytorch fallback_from=onnx device={device}", | |
| file=sys.stderr, | |
| ) | |
| fallback = ChatterboxTurboTTS.from_pretrained(device=device) | |
| self._tts_error = f"ONNX backend failed, fell back to PyTorch: {exc}" | |
| return fallback | |
| except Exception: | |
| pass | |
| return None | |
| def backchannel_clips(self) -> dict[str, np.ndarray]: | |
| tts = self.tts | |
| if tts is None: | |
| return {} | |
| clips: dict[str, np.ndarray] = {} | |
| for text in ("mm-hmm", "yeah", "right"): | |
| try: | |
| clips[text] = self._generate_tts(tts, text, None) | |
| except Exception: | |
| continue | |
| return clips | |
| def tts_prefill_clips(self) -> dict[str, np.ndarray]: | |
| tts = self.tts | |
| if tts is None or not settings.tts_prefill_enabled: | |
| return {} | |
| clips: dict[str, np.ndarray] = {} | |
| for text in (item.strip() for item in settings.tts_prefill_choices.split(",")): | |
| if not text: | |
| continue | |
| try: | |
| clips[text.lower()] = self._generate_tts(tts, text, None) | |
| except Exception: | |
| continue | |
| return clips | |
| def hf_local_generator(self): | |
| transformers = importlib.import_module("transformers") | |
| torch = importlib.import_module("torch") | |
| tokenizer = transformers.AutoTokenizer.from_pretrained(settings.hf_local_model_id) | |
| if tokenizer.pad_token_id is None and tokenizer.eos_token_id is not None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| dtype_name = settings.hf_local_dtype.lower().strip() | |
| dtype_map = { | |
| "float32": torch.float32, | |
| "fp32": torch.float32, | |
| "float16": torch.float16, | |
| "fp16": torch.float16, | |
| "bfloat16": torch.bfloat16, | |
| "bf16": torch.bfloat16, | |
| } | |
| torch_dtype = dtype_map.get(dtype_name, torch.float32) | |
| device, fallback_reason = self._resolve_torch_device(settings.hf_local_device, component="HF local LLM") | |
| if fallback_reason: | |
| print(fallback_reason, file=sys.stderr) | |
| model_kwargs: dict[str, object] = {"torch_dtype": torch_dtype} | |
| if device == "cuda": | |
| model_kwargs["device_map"] = "auto" | |
| model = transformers.AutoModelForCausalLM.from_pretrained( | |
| settings.hf_local_model_id, | |
| low_cpu_mem_usage=True, | |
| **model_kwargs, | |
| ) | |
| if device == "cpu": | |
| model.to("cpu") | |
| model.eval() | |
| if not settings.hf_local_do_sample: | |
| try: | |
| model.generation_config.do_sample = False | |
| model.generation_config.temperature = None | |
| model.generation_config.top_p = None | |
| model.generation_config.top_k = None | |
| except Exception: | |
| pass | |
| return { | |
| "torch": torch, | |
| "tokenizer": tokenizer, | |
| "model": model, | |
| "device": device, | |
| "model_id": settings.hf_local_model_id, | |
| } | |
| def my_agent_chat_model(self) -> str | None: | |
| if settings.my_agent_model: | |
| return settings.my_agent_model | |
| try: | |
| result = subprocess.run( | |
| [self._agent_cli._resolve_my_agent_command(), "config", "--get-model", "chat"], | |
| capture_output=True, | |
| check=True, | |
| text=True, | |
| ) | |
| except Exception: | |
| return None | |
| match = re.search(r"Model for 'chat':\s*(.+)", result.stdout) | |
| if not match: | |
| return None | |
| model = match.group(1).strip() | |
| return model or None | |
| def assistant_backend_metadata(self) -> tuple[str, str]: | |
| if settings.assistant_backend == "my-agent-cli": | |
| model = self.my_agent_chat_model or "cli default" | |
| return settings.assistant_backend, model | |
| if settings.assistant_backend == "hf-local": | |
| return settings.assistant_backend, settings.hf_local_model_id | |
| if settings.assistant_backend == "llama-server": | |
| return settings.assistant_backend, settings.llama_model | |
| return settings.assistant_backend, settings.openrouter_model | |
| def reset_assistant_session(self) -> None: | |
| self._agent_cli.reset_session() | |
| async def transcribe(self, audio: np.ndarray) -> TranscriptionResult: | |
| if audio.size == 0: | |
| return TranscriptionResult(text="", backend="none") | |
| async with self._model_lock: | |
| if settings.stt_backend == "parakeet-tdt-v3" and not self._prefer_whisper_transcription(): | |
| parakeet = self.parakeet | |
| if parakeet is None: | |
| return TranscriptionResult( | |
| text=f"[parakeet unavailable] {self._whisper_error or 'model failed to load'}", | |
| backend="parakeet", | |
| ) | |
| return await asyncio.to_thread(self._run_parakeet_transcription, parakeet, audio) | |
| whisper = self._ensure_whisper_ready() | |
| if whisper is None: | |
| return TranscriptionResult( | |
| text=f"[whisper unavailable] {self._whisper_error or 'model failed to load'}", | |
| backend="whisper", | |
| ) | |
| return await asyncio.to_thread( | |
| self._run_transcription, | |
| whisper, | |
| audio, | |
| settings.whisper_beam_size, | |
| settings.whisper_best_of, | |
| settings.whisper_log_prob_threshold, | |
| settings.whisper_no_speech_threshold, | |
| ) | |
| async def transcribe_fallback(self, audio: np.ndarray) -> TranscriptionResult: | |
| if audio.size == 0: | |
| return TranscriptionResult(text="", backend="none") | |
| async with self._model_lock: | |
| if settings.stt_backend == "parakeet-tdt-v3" and not self._prefer_whisper_transcription(): | |
| parakeet = self.parakeet | |
| if parakeet is None: | |
| return TranscriptionResult(text="", backend="parakeet") | |
| return await asyncio.to_thread(self._run_parakeet_transcription, parakeet, audio) | |
| whisper = self._ensure_whisper_ready() | |
| if whisper is None: | |
| return TranscriptionResult(text="", backend="whisper") | |
| return await asyncio.to_thread( | |
| self._run_transcription, | |
| whisper, | |
| audio, | |
| settings.whisper_fallback_beam_size, | |
| settings.whisper_fallback_best_of, | |
| settings.whisper_fallback_log_prob_threshold, | |
| settings.whisper_fallback_no_speech_threshold, | |
| ) | |
| async def transcribe_partial(self, audio: np.ndarray) -> str: | |
| if audio.size == 0: | |
| return "" | |
| async with self._model_lock: | |
| if settings.stt_backend == "parakeet-tdt-v3" and not self._prefer_whisper_transcription(): | |
| return "" | |
| whisper = self._ensure_whisper_ready() | |
| if whisper is None: | |
| return "" | |
| def _run() -> str: | |
| prepared_audio = self._prepare_audio_for_whisper(audio) | |
| language = self._resolved_stt_language() | |
| segments, _ = whisper.transcribe( | |
| prepared_audio, | |
| language=language, | |
| vad_filter=False, | |
| beam_size=1, | |
| best_of=1, | |
| temperature=0.0, | |
| condition_on_previous_text=False, | |
| without_timestamps=True, | |
| log_prob_threshold=settings.whisper_log_prob_threshold, | |
| no_speech_threshold=settings.whisper_no_speech_threshold, | |
| ) | |
| transcript = " ".join(segment.text.strip() for segment in segments).strip() | |
| return self._normalize_transcript(transcript) | |
| return await asyncio.to_thread(_run) | |
| async def generate_reply(self, transcript: str) -> str: | |
| sentences = [] | |
| async for sentence in self.stream_reply_sentences(transcript): | |
| sentences.append(sentence) | |
| if not sentences: | |
| cleaned = transcript.strip() | |
| return "I didn't catch that." if not cleaned else "Sorry, give me a second." | |
| return " ".join(sentences).strip() | |
| async def stream_reply_sentences( | |
| self, | |
| transcript: str, | |
| *, | |
| conversation_history: list[dict[str, str]] | None = None, | |
| response_language: str | None = None, | |
| ) -> AsyncIterator[str]: | |
| cleaned = transcript.strip() | |
| if not cleaned: | |
| yield "I didn't catch that." | |
| return | |
| try: | |
| if settings.assistant_backend == "hf-local": | |
| async for sentence in self._stream_hf_local_sentences( | |
| cleaned, | |
| conversation_history=conversation_history, | |
| response_language=response_language, | |
| ): | |
| yield sentence | |
| return | |
| if settings.assistant_backend == "llama-server": | |
| async for sentence in self._stream_openai_compatible_sentences( | |
| transcript=cleaned, | |
| conversation_history=conversation_history, | |
| response_language=response_language, | |
| base_url=settings.llama_base_url, | |
| api_key=settings.llama_api_key, | |
| model=settings.llama_model, | |
| system_prompt=settings.llama_system_prompt, | |
| max_tokens=settings.llama_max_tokens, | |
| temperature=settings.llama_temperature, | |
| top_p=settings.llama_top_p, | |
| top_k=settings.llama_top_k, | |
| repetition_penalty=settings.llama_repetition_penalty, | |
| stop_tokens=[token.strip() for token in settings.llama_stop_tokens.split(",") if token.strip()], | |
| ): | |
| yield sentence | |
| return | |
| api_key = os.getenv("OPENROUTER_API_KEY") or settings.openrouter_api_key | |
| if api_key and api_key.startswith("$OPENROUTER_API_KEY"): | |
| api_key = None | |
| if not api_key: | |
| yield "Sorry, give me a second." | |
| return | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "HTTP-Referer": settings.openrouter_site_url, | |
| "X-Title": settings.openrouter_app_name, | |
| } | |
| async for sentence in self._stream_openai_compatible_sentences( | |
| transcript=cleaned, | |
| conversation_history=conversation_history, | |
| response_language=response_language, | |
| base_url=settings.openrouter_base_url, | |
| api_key=api_key, | |
| model=settings.openrouter_model, | |
| system_prompt=settings.openrouter_system_prompt, | |
| max_tokens=settings.openrouter_max_tokens, | |
| temperature=settings.openrouter_temperature, | |
| top_p=1.0, | |
| top_k=0, | |
| repetition_penalty=1.0, | |
| stop_tokens=[], | |
| extra_headers=headers, | |
| ): | |
| yield sentence | |
| except Exception as exc: | |
| print(f"Assistant backend failure backend={settings.assistant_backend}: {exc}", file=sys.stderr) | |
| yield "Sorry, give me a second." | |
| async def _stream_hf_local_sentences( | |
| self, | |
| transcript: str, | |
| *, | |
| conversation_history: list[dict[str, str]] | None = None, | |
| response_language: str | None = None, | |
| ) -> AsyncIterator[str]: | |
| text = await asyncio.to_thread( | |
| self._generate_hf_local_reply, | |
| transcript, | |
| conversation_history, | |
| response_language, | |
| ) | |
| if not text: | |
| return | |
| buffer = text.strip() | |
| sentences, remainder = self._split_complete_sentences(buffer) | |
| for sentence in sentences: | |
| yield sentence | |
| if remainder.strip(): | |
| yield remainder.strip() | |
| def _generate_hf_local_reply( | |
| self, | |
| transcript: str, | |
| conversation_history: list[dict[str, str]] | None = None, | |
| response_language: str | None = None, | |
| ) -> str: | |
| runtime = self.hf_local_generator | |
| torch = runtime["torch"] | |
| tokenizer = runtime["tokenizer"] | |
| model = runtime["model"] | |
| device = runtime["device"] | |
| encoded = self._encode_hf_local_prompt( | |
| tokenizer, | |
| transcript, | |
| conversation_history=conversation_history, | |
| response_language=response_language, | |
| ) | |
| tokenized = encoded["input_ids"] | |
| attention_mask = encoded["attention_mask"] | |
| if device == "cuda": | |
| tokenized = tokenized.to("cuda") | |
| attention_mask = attention_mask.to("cuda") | |
| generate_kwargs = { | |
| "input_ids": tokenized, | |
| "attention_mask": attention_mask, | |
| "max_new_tokens": settings.hf_local_max_new_tokens, | |
| "pad_token_id": tokenizer.pad_token_id, | |
| "eos_token_id": tokenizer.eos_token_id, | |
| } | |
| if settings.hf_local_do_sample: | |
| generate_kwargs.update( | |
| { | |
| "do_sample": True, | |
| "temperature": settings.hf_local_temperature, | |
| "top_p": settings.hf_local_top_p, | |
| } | |
| ) | |
| else: | |
| generate_kwargs["do_sample"] = False | |
| with torch.inference_mode(): | |
| generated = model.generate(**generate_kwargs) | |
| new_tokens = generated[:, tokenized.shape[-1]:] | |
| text = tokenizer.decode(new_tokens[0], skip_special_tokens=True) | |
| normalized = self._normalize_hf_local_reply(text) | |
| print(f"HF local raw reply={text!r}", file=sys.stderr) | |
| print(f"HF local normalized reply={normalized!r}", file=sys.stderr) | |
| return normalized | |
| def _encode_hf_local_prompt( | |
| self, | |
| tokenizer, | |
| transcript: str, | |
| *, | |
| conversation_history: list[dict[str, str]] | None = None, | |
| response_language: str | None = None, | |
| ): | |
| prefix = self._augment_system_prompt(settings.hf_local_prompt_prefix, response_language) | |
| messages = [] | |
| if prefix: | |
| messages.append({"role": "system", "content": prefix}) | |
| for message in conversation_history or []: | |
| role = (message.get("role") or "").strip().lower() | |
| content = (message.get("content") or "").strip() | |
| if role in {"user", "assistant"} and content: | |
| messages.append({"role": role, "content": content}) | |
| messages.append({"role": "user", "content": transcript}) | |
| if hasattr(tokenizer, "apply_chat_template"): | |
| try: | |
| return tokenizer.apply_chat_template( | |
| messages, | |
| tokenize=True, | |
| add_generation_prompt=True, | |
| return_tensors="pt", | |
| return_dict=True, | |
| ) | |
| except Exception: | |
| pass | |
| prompt = self._build_hf_local_prompt( | |
| transcript, | |
| conversation_history=conversation_history, | |
| response_language=response_language, | |
| ) | |
| return tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=1024, | |
| padding=False, | |
| ) | |
| async def _stream_openai_compatible_sentences( | |
| self, | |
| *, | |
| transcript: str, | |
| conversation_history: list[dict[str, str]] | None, | |
| response_language: str | None, | |
| base_url: str, | |
| api_key: str | None, | |
| model: str, | |
| system_prompt: str, | |
| max_tokens: int, | |
| temperature: float, | |
| top_p: float, | |
| top_k: int, | |
| repetition_penalty: float, | |
| stop_tokens: list[str], | |
| extra_headers: dict[str, str] | None = None, | |
| ) -> AsyncIterator[str]: | |
| headers = {"Content-Type": "application/json"} | |
| if api_key: | |
| headers["Authorization"] = f"Bearer {api_key}" | |
| if extra_headers: | |
| headers.update(extra_headers) | |
| payload = { | |
| "model": model, | |
| "messages": self._build_chat_messages( | |
| system_prompt, | |
| model, | |
| transcript, | |
| conversation_history=conversation_history, | |
| response_language=response_language, | |
| ), | |
| "max_tokens": max_tokens, | |
| "temperature": temperature, | |
| "top_p": top_p, | |
| "top_k": top_k, | |
| "repeat_penalty": repetition_penalty, | |
| "stream": True, | |
| } | |
| if stop_tokens: | |
| payload["stop"] = stop_tokens | |
| async with httpx.AsyncClient(timeout=20.0) as client: | |
| async with client.stream( | |
| "POST", | |
| f"{base_url.rstrip('/')}/chat/completions", | |
| headers=headers, | |
| json=payload, | |
| ) as response: | |
| response.raise_for_status() | |
| buffer = "" | |
| async for raw_line in response.aiter_lines(): | |
| line = raw_line.strip() | |
| if not line.startswith("data:"): | |
| continue | |
| data = line[5:].strip() | |
| if data == "[DONE]": | |
| break | |
| try: | |
| event = json.loads(data) | |
| except json.JSONDecodeError: | |
| continue | |
| delta = self._extract_stream_delta(event) | |
| if not delta: | |
| continue | |
| buffer += delta | |
| sentences, buffer = self._split_complete_sentences(buffer) | |
| for sentence in sentences: | |
| yield sentence | |
| if buffer.strip(): | |
| yield buffer.strip() | |
| async def stream_agent_cli_events(self, transcript: str, session_id: str | None = None) -> AsyncIterator[dict]: | |
| async for event in self._agent_cli.stream_events(transcript, session_id): | |
| yield event | |
| def _build_chat_messages( | |
| self, | |
| system_prompt: str, | |
| model: str, | |
| transcript: str, | |
| *, | |
| conversation_history: list[dict[str, str]] | None = None, | |
| response_language: str | None = None, | |
| ) -> list[dict[str, str]]: | |
| normalized_prompt = self._augment_system_prompt(system_prompt, response_language) | |
| model_name = model.lower() | |
| if "gemma" in model_name: | |
| prompt_parts = [normalized_prompt] | |
| for message in conversation_history or []: | |
| role = (message.get("role") or "").strip().lower() | |
| content = (message.get("content") or "").strip() | |
| if role == "user" and content: | |
| prompt_parts.append(f"User: {content}") | |
| elif role == "assistant" and content: | |
| prompt_parts.append(f"Assistant: {content}") | |
| prompt_parts.append(f"User: {transcript}") | |
| prompt_parts.append("Assistant:") | |
| prompt = "\n\n".join(prompt_parts) | |
| return [{"role": "user", "content": prompt}] | |
| messages = [{"role": "system", "content": normalized_prompt}] | |
| for message in conversation_history or []: | |
| role = (message.get("role") or "").strip().lower() | |
| content = (message.get("content") or "").strip() | |
| if role in {"user", "assistant"} and content: | |
| messages.append({"role": role, "content": content}) | |
| messages.append({"role": "user", "content": transcript}) | |
| return messages | |
| def _build_hf_local_prompt( | |
| self, | |
| transcript: str, | |
| *, | |
| conversation_history: list[dict[str, str]] | None = None, | |
| response_language: str | None = None, | |
| ) -> str: | |
| prefix = self._augment_system_prompt(settings.hf_local_prompt_prefix, response_language) | |
| if not prefix: | |
| return transcript | |
| prompt_parts = [prefix] | |
| for message in conversation_history or []: | |
| role = (message.get("role") or "").strip().lower() | |
| content = (message.get("content") or "").strip() | |
| if role == "user" and content: | |
| prompt_parts.append(f"User: {content}") | |
| elif role == "assistant" and content: | |
| prompt_parts.append(f"Assistant: {content}") | |
| prompt_parts.append(f"User: {transcript}") | |
| prompt_parts.append("Assistant:") | |
| return "\n\n".join(prompt_parts) | |
| def _augment_system_prompt(self, system_prompt: str, response_language: str | None) -> str: | |
| prompt = system_prompt.strip() | |
| language_hint = self._language_instruction(response_language) | |
| if not language_hint: | |
| return prompt | |
| if not prompt: | |
| return language_hint | |
| return f"{prompt} {language_hint}" | |
| def _language_instruction(self, language_code: str | None) -> str: | |
| if not language_code or language_code == "auto": | |
| return ( | |
| "By default, reply in the same language as the user's latest message. " | |
| "If the user explicitly asks for translation, comparison, or output in two or more languages, include all requested languages." | |
| ) | |
| language_names = { | |
| "en": "English", | |
| "es": "Spanish", | |
| "fr": "French", | |
| "de": "German", | |
| "it": "Italian", | |
| "pt": "Portuguese", | |
| "nl": "Dutch", | |
| "ru": "Russian", | |
| "uk": "Ukrainian", | |
| "pl": "Polish", | |
| "tr": "Turkish", | |
| "ar": "Arabic", | |
| "hi": "Hindi", | |
| "ja": "Japanese", | |
| "ko": "Korean", | |
| "zh": "Chinese", | |
| } | |
| label = language_names.get(language_code.lower(), language_code) | |
| return ( | |
| f"By default, reply in {label}. " | |
| "If the user explicitly asks for translation, comparison, or output in two or more languages, include all requested languages. " | |
| "Keep the answer natural and concise." | |
| ) | |
| async def stream_synthesized_chunks(self, text: str, voice_prompt_path: str | None = None) -> AsyncIterator[np.ndarray]: | |
| chunks = self._split_tts_chunks(text) | |
| if not chunks: | |
| chunks = [text] | |
| async with self._model_lock: | |
| tts = self._ensure_tts_ready() | |
| if tts is None: | |
| yield self._beep() | |
| return | |
| for chunk in chunks: | |
| audio = await asyncio.to_thread(self._generate_tts, tts, chunk, voice_prompt_path) | |
| yield audio | |
| async def synthesize_sentences(self, text: str, voice_prompt_path: str | None = None) -> list[np.ndarray]: | |
| return [chunk async for chunk in self.stream_synthesized_chunks(text, voice_prompt_path=voice_prompt_path)] | |
| async def stream_reply_audio(self, text: str, transcript: str, voice_prompt_path: str | None = None) -> AsyncIterator[np.ndarray]: | |
| if ( | |
| not self._prefer_low_vram_gpu_swap | |
| and self.tts is not None | |
| and voice_prompt_path is None | |
| and settings.tts_prefill_enabled | |
| and len(text.strip()) >= settings.tts_prefill_min_chars | |
| ): | |
| prefill = self._choose_tts_prefill(text, transcript) | |
| if prefill: | |
| cached = self.tts_prefill_clips.get(prefill.lower()) | |
| if cached is not None: | |
| yield cached.copy() | |
| async for chunk in self.stream_synthesized_chunks(text, voice_prompt_path=voice_prompt_path): | |
| yield chunk | |
| async def synthesize_reply(self, text: str, transcript: str, voice_prompt_path: str | None = None) -> list[np.ndarray]: | |
| return [ | |
| chunk | |
| async for chunk in self.stream_reply_audio(text, transcript, voice_prompt_path=voice_prompt_path) | |
| ] | |
| async def synthesize_backchannel(self, text: str, voice_prompt_path: str | None = None) -> list[np.ndarray]: | |
| if not self._prefer_low_vram_gpu_swap and not voice_prompt_path: | |
| normalized = text.strip().lower() | |
| cached = self.backchannel_clips.get(normalized) | |
| if cached is not None: | |
| return [cached.copy()] | |
| return await self.synthesize_sentences(text, voice_prompt_path=voice_prompt_path) | |
| def _generate_tts(self, tts, text: str, voice_prompt_path: str | None) -> np.ndarray: | |
| if not self._tts_runtime_logged: | |
| backend = type(tts).__name__ | |
| details = [f"class={backend}"] | |
| provider = getattr(tts, "_provider", None) | |
| dtype = getattr(tts, "_dtype", None) | |
| model_id = getattr(tts, "_model_id", None) | |
| if provider: | |
| details.append(f"provider={provider}") | |
| if dtype: | |
| details.append(f"dtype={dtype}") | |
| if model_id: | |
| details.append(f"model={model_id}") | |
| print(f"TTS generate {' '.join(details)}", file=sys.stderr) | |
| self._tts_runtime_logged = True | |
| kwargs = {"audio_prompt_path": voice_prompt_path} if voice_prompt_path else {} | |
| wav = tts.generate(text, **kwargs) | |
| if hasattr(wav, "detach"): | |
| wav = wav.detach().cpu().numpy() | |
| return np.asarray(wav, dtype=np.float32).flatten() | |
| def _beep(self) -> np.ndarray: | |
| duration_s = 0.25 | |
| sample_rate = 24000 | |
| time = np.linspace(0, duration_s, int(sample_rate * duration_s), endpoint=False) | |
| return (0.15 * np.sin(2 * np.pi * 660 * time)).astype(np.float32) | |
| def _extract_stream_delta(self, event: dict) -> str: | |
| choices = event.get("choices") or [] | |
| if not choices: | |
| return "" | |
| delta = choices[0].get("delta") or {} | |
| content = delta.get("content") or "" | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| parts: list[str] = [] | |
| for item in content: | |
| if isinstance(item, dict) and item.get("type") == "text": | |
| parts.append(item.get("text") or "") | |
| return "".join(parts) | |
| return "" | |
| def render_assistant_text(self, text: str, transcript: str) -> tuple[str, str]: | |
| tts_text = self._prepare_tts_text(text, transcript) | |
| display_text = self._strip_paralinguistic_tags(tts_text) | |
| return display_text, tts_text | |
| def _split_complete_sentences(self, buffer: str) -> tuple[list[str], str]: | |
| chunks: list[str] = [] | |
| remainder = buffer.lstrip() | |
| while remainder: | |
| end_index = self._next_stream_boundary_end(remainder) | |
| if end_index is None: | |
| break | |
| chunk = remainder[:end_index].strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| remainder = remainder[end_index:].lstrip() | |
| return chunks, remainder | |
| def _next_stream_boundary_end(self, text: str) -> int | None: | |
| split_match = self._find_chunk_boundary(text) | |
| if split_match is not None: | |
| return split_match.end() | |
| return None | |
| def _find_chunk_boundary(self, text: str) -> re.Match[str] | None: | |
| clause_match = re.search(r".{18,}?[,:;](?:\s+|$)", text, flags=re.S) | |
| soft_clause_match = re.search( | |
| r".{22,}?\b(?:and|but|so|because|then|while|which)\b(?:\s+|$)", | |
| text, | |
| flags=re.S | re.I, | |
| ) | |
| sentence_match = re.search(r".*?[.!?](?:\s+|$)", text, flags=re.S) | |
| clause_match = clause_match or soft_clause_match | |
| if clause_match is None: | |
| return sentence_match | |
| if sentence_match is None: | |
| return clause_match | |
| return clause_match if clause_match.end() <= sentence_match.end() else sentence_match | |
| def _find_early_stream_boundary(self, text: str) -> int | None: | |
| if len(text) < settings.assistant_stream_chunk_min_chars: | |
| return None | |
| max_chars = max(settings.assistant_stream_chunk_min_chars, settings.assistant_stream_chunk_max_chars) | |
| search_limit = min(len(text), max_chars) | |
| dangling_words = { | |
| "a", | |
| "an", | |
| "the", | |
| "and", | |
| "or", | |
| "but", | |
| "so", | |
| "because", | |
| "then", | |
| "while", | |
| "which", | |
| "who", | |
| "what", | |
| "when", | |
| "where", | |
| "why", | |
| "how", | |
| "if", | |
| "that", | |
| "this", | |
| "these", | |
| "those", | |
| "to", | |
| "of", | |
| "for", | |
| "with", | |
| "at", | |
| "from", | |
| "in", | |
| "on", | |
| "is", | |
| "are", | |
| "was", | |
| "were", | |
| "be", | |
| "been", | |
| "being", | |
| "do", | |
| "does", | |
| "did", | |
| "can", | |
| "could", | |
| "should", | |
| "would", | |
| "will", | |
| "have", | |
| "has", | |
| "had", | |
| } | |
| boundary = None | |
| for match in re.finditer(r"\s+", text[:search_limit]): | |
| boundary = match.end() | |
| if boundary is None: | |
| return None | |
| candidate = text[:boundary].strip(" \t\r\n,;:") | |
| if len(candidate) < settings.assistant_stream_chunk_min_chars: | |
| return None | |
| words = re.findall(r"\b[\w'-]+\b", candidate) | |
| if len(words) < settings.assistant_stream_chunk_min_words: | |
| return None | |
| if words[-1].lower() in dangling_words: | |
| return None | |
| return boundary | |
| def _prepare_tts_text(self, text: str, transcript: str) -> str: | |
| cleaned = self._normalize_reply_text(text) | |
| if not cleaned: | |
| return "" | |
| cleaned = self._normalize_existing_tags(cleaned) | |
| transcript_word_count = len(re.findall(r"\w+", transcript)) | |
| if ( | |
| settings.tts_auto_ack_prefix_enabled | |
| and transcript_word_count >= 10 | |
| and not re.match(r"(?i)^(yeah|right|mm-hmm|got it|okay|ok|sure)\b", cleaned) | |
| ): | |
| if len(cleaned) > 1: | |
| cleaned = f"Mm-hmm, {cleaned[0].lower() + cleaned[1:]}" | |
| else: | |
| cleaned = f"Mm-hmm, {cleaned.lower()}" | |
| return cleaned | |
| def _split_tts_chunks(self, text: str) -> list[str]: | |
| cleaned = text.strip() | |
| if not cleaned: | |
| return [] | |
| return [cleaned] | |
| def _extract_short_lead_chunk(self, text: str) -> str: | |
| words = text.split() | |
| if len(words) <= settings.tts_first_chunk_max_words and len(text) <= settings.tts_first_chunk_max_chars: | |
| return text | |
| clause_match = re.search(rf"^(.{{1,{settings.tts_first_chunk_max_chars}}}[,;:])(?=\s|$)", text) | |
| if clause_match: | |
| candidate = clause_match.group(1).strip() | |
| if len(candidate.split()) <= settings.tts_first_chunk_max_words + 2: | |
| return candidate | |
| candidate = " ".join(words[: settings.tts_first_chunk_max_words]).strip() | |
| return re.sub(r"[,:;]+$", "", candidate).strip() or text | |
| def _choose_tts_prefill(self, text: str, transcript: str) -> str: | |
| if re.match(r"(?i)^(okay|yeah|right|got it|mm-hmm|uh-huh)\b", text.strip()): | |
| return "" | |
| choices = [item.strip() for item in settings.tts_prefill_choices.split(",") if item.strip()] | |
| if not choices: | |
| return "" | |
| seed = f"{transcript.strip().lower()}::{text.strip().lower()}" | |
| return choices[abs(hash(seed)) % len(choices)] | |
| def _normalize_reply_text(self, text: str) -> str: | |
| cleaned = text | |
| for pattern in INTERNAL_REPLY_PATTERNS: | |
| cleaned = pattern.sub(" ", cleaned) | |
| if settings.hf_local_hide_thinking: | |
| cleaned = THINK_BLOCK_PATTERN.sub(" ", cleaned) | |
| cleaned = THINK_TAG_PATTERN.sub(" ", cleaned) | |
| cleaned = re.sub(r"\s+", " ", cleaned).strip() | |
| cleaned = re.sub(r"^\s*[,.:;!?-]+\s*", "", cleaned) | |
| cleaned = re.sub(r"\s+([,.:;!?])", r"\1", cleaned) | |
| return cleaned | |
| def _normalize_hf_local_reply(self, text: str) -> str: | |
| cleaned = self._normalize_reply_text(text) | |
| cleaned = re.sub(r"(?i)^assistant:\s*", "", cleaned) | |
| cleaned = re.sub(r"(?i)\buser:\s*.*$", "", cleaned).strip() | |
| role_match = ROLE_CONTINUATION_PATTERN.search(cleaned) | |
| if role_match: | |
| cleaned = cleaned[: role_match.start()].strip() | |
| cleaned = META_TAIL_PATTERN.sub("", cleaned).strip() | |
| cleaned = re.sub(r"[🙂-🙏🤖😊😂🤣😍😘😉😎😄😁😃]+", "", cleaned) | |
| cleaned = self._dedupe_repeated_reply(cleaned) | |
| cleaned = self._limit_spoken_sentences(cleaned, max_sentences=2) | |
| return cleaned | |
| def _dedupe_repeated_reply(self, text: str) -> str: | |
| cleaned = text.strip() | |
| if not cleaned: | |
| return cleaned | |
| if len(cleaned) % 2 == 0: | |
| half = len(cleaned) // 2 | |
| left = cleaned[:half].strip(" \t\r\n,.;:!?") | |
| right = cleaned[half:].strip(" \t\r\n,.;:!?") | |
| if left and left.lower() == right.lower(): | |
| return left | |
| sentence_parts = re.split(r"(?<=[.!?])\s+", cleaned) | |
| deduped: list[str] = [] | |
| for part in sentence_parts: | |
| normalized = part.strip() | |
| if not normalized: | |
| continue | |
| if deduped and deduped[-1].strip().lower() == normalized.lower(): | |
| continue | |
| if normalized.endswith("?"): | |
| if any(existing.strip().lower() == normalized.lower() for existing in deduped): | |
| continue | |
| deduped.append(normalized) | |
| return " ".join(deduped).strip() | |
| def _limit_spoken_sentences(self, text: str, max_sentences: int) -> str: | |
| cleaned = text.strip() | |
| if not cleaned or max_sentences <= 0: | |
| return cleaned | |
| parts = re.split(r"(?<=[.!?])\s+", cleaned) | |
| kept: list[str] = [] | |
| for part in parts: | |
| normalized = part.strip() | |
| if not normalized: | |
| continue | |
| kept.append(normalized) | |
| if len(kept) >= max_sentences: | |
| break | |
| if kept: | |
| return " ".join(kept).strip() | |
| return cleaned | |
| def _normalize_existing_tags(self, text: str) -> str: | |
| return PARALINGUISTIC_TAG_PATTERN.sub(lambda match: match.group(0).lower(), text) | |
| def _strip_paralinguistic_tags(self, text: str) -> str: | |
| stripped = PARALINGUISTIC_TAG_PATTERN.sub("", text) | |
| stripped = re.sub(r"\s+", " ", stripped) | |
| stripped = re.sub(r"\s+([,.:;!?])", r"\1", stripped) | |
| return stripped.strip() | |
| def _normalize_transcript(self, transcript: str) -> str: | |
| cleaned = transcript.strip() | |
| normalized = re.sub(r"[^\w\s']", "", cleaned.lower()) | |
| normalized = re.sub(r"\s+", " ", normalized).strip() | |
| if normalized in { | |
| "", | |
| "thank you for watching", | |
| "thanks for watching", | |
| }: | |
| return "" | |
| return cleaned | |
| def is_likely_hallucination(self, transcript: str, audio_rms: float) -> bool: | |
| normalized = re.sub(r"[^\w\s']", "", transcript.lower()) | |
| normalized = re.sub(r"\s+", " ", normalized).strip() | |
| if not normalized: | |
| return False | |
| if audio_rms > settings.hallucination_max_rms: | |
| return False | |
| words = [word for word in normalized.split(" ") if word] | |
| if len(words) > settings.hallucination_max_words: | |
| return False | |
| blocked = {phrase.strip() for phrase in settings.hallucination_phrases.split(",") if phrase.strip()} | |
| return normalized in blocked | |
| def _resolve_torch_device(self, requested_device: str, component: str) -> tuple[str, str | None]: | |
| return self._resolve_torch_device_static(requested_device, component) | |
| def _resolved_stt_language(self) -> str | None: | |
| language = settings.stt_language.strip().lower() | |
| if not language or language == "auto": | |
| return None | |
| return language | |
| def _whisper_model_supports_multilingual(self) -> bool: | |
| return not settings.whisper_model.strip().lower().endswith(".en") | |
| def _prefer_whisper_transcription(self) -> bool: | |
| if settings.stt_backend != "parakeet-tdt-v3": | |
| return True | |
| if not settings.stt_multilingual_enabled: | |
| return False | |
| return self._whisper_model_supports_multilingual() | |
| def _prepare_audio_for_whisper(self, audio: np.ndarray) -> np.ndarray: | |
| duration_ms = (audio.size / settings.sample_rate) * 1000.0 | |
| if duration_ms > settings.short_utterance_ms: | |
| return audio | |
| pad_samples = int(settings.sample_rate * (settings.short_utterance_pad_ms / 1000.0)) | |
| prepared = np.pad(audio, (pad_samples, pad_samples), mode="constant") | |
| min_samples = int(settings.sample_rate * (settings.short_utterance_min_transcription_ms / 1000.0)) | |
| if prepared.size >= min_samples: | |
| return prepared.astype(np.float32, copy=False) | |
| extra = min_samples - prepared.size | |
| left = extra // 2 | |
| right = extra - left | |
| return np.pad(prepared, (left, right), mode="constant").astype(np.float32, copy=False) | |
| def _run_transcription( | |
| self, | |
| whisper, | |
| audio: np.ndarray, | |
| beam_size: int, | |
| best_of: int, | |
| log_prob_threshold: float | None, | |
| no_speech_threshold: float | None, | |
| ) -> TranscriptionResult: | |
| prepared_audio = self._prepare_audio_for_whisper(audio) | |
| language = self._resolved_stt_language() | |
| kwargs = { | |
| "vad_filter": False, | |
| "beam_size": beam_size, | |
| "best_of": best_of, | |
| "temperature": settings.whisper_temperature, | |
| "condition_on_previous_text": settings.whisper_condition_on_previous_text, | |
| "without_timestamps": True, | |
| } | |
| if language is not None: | |
| kwargs["language"] = language | |
| if log_prob_threshold is not None: | |
| kwargs["log_prob_threshold"] = log_prob_threshold | |
| if no_speech_threshold is not None: | |
| kwargs["no_speech_threshold"] = no_speech_threshold | |
| segments, info = whisper.transcribe(prepared_audio, **kwargs) | |
| transcript = " ".join(segment.text.strip() for segment in segments).strip() | |
| return TranscriptionResult( | |
| text=self._normalize_transcript(transcript), | |
| language=getattr(info, "language", None), | |
| language_probability=getattr(info, "language_probability", None), | |
| backend="whisper", | |
| ) | |
| def _run_parakeet_transcription(self, parakeet, audio: np.ndarray) -> TranscriptionResult: | |
| prepared_audio = self._prepare_audio_for_whisper(audio) | |
| wav_bytes = wav_bytes_from_float32(prepared_audio, settings.sample_rate) | |
| temp_path: str | None = None | |
| try: | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as handle: | |
| handle.write(wav_bytes) | |
| temp_path = handle.name | |
| results = parakeet.transcribe([temp_path]) | |
| if not results: | |
| return TranscriptionResult(text="", language="en", backend="parakeet") | |
| first = results[0] | |
| transcript = getattr(first, "text", None) | |
| if transcript is None: | |
| transcript = str(first) | |
| return TranscriptionResult( | |
| text=self._normalize_transcript(str(transcript).strip()), | |
| language="en", | |
| backend="parakeet", | |
| ) | |
| finally: | |
| if temp_path: | |
| try: | |
| os.unlink(temp_path) | |
| except FileNotFoundError: | |
| pass | |
| pipeline = SpeechPipeline() | |