from __future__ import annotations import json import time import traceback from pathlib import Path import sounddevice as sd import soundfile as sf from faster_whisper import WhisperModel from llama_cpp import Llama import win32com.client ROOT = Path(__file__).resolve().parent CFG = json.loads((ROOT / "config.json").read_text(encoding="utf-8-sig")) OUTPUT = ROOT / "output" LOGS = ROOT / "logs" OUTPUT.mkdir(exist_ok=True) LOGS.mkdir(exist_ok=True) STT = None LLM = None SPEAKER = None def log(msg: str) -> None: stamp = time.strftime("%Y-%m-%d %H:%M:%S") line = f"[{stamp}] {msg}" print(line) with open(LOGS / "shell_s2s.log", "a", encoding="utf-8", errors="replace") as f: f.write(line + "\n") def resolve_torch_cuda() -> bool: try: import torch return bool(torch.cuda.is_available()) except Exception: return False def resolve_stt_device() -> str: requested = str(CFG.get("stt_device", "auto")).lower().strip() if requested in ("cpu", "cuda"): return requested return "cuda" if resolve_torch_cuda() else "cpu" def resolve_stt_compute(device: str) -> str: requested = str(CFG.get("stt_compute_type", "auto")).lower().strip() if requested != "auto": return requested return "float16" if device == "cuda" else "int8" def resolve_llm_gpu_layers() -> int: requested = CFG.get("llm_gpu_layers", "auto") if isinstance(requested, int): return requested requested = str(requested).lower().strip() if requested == "cpu": return 0 if requested == "gpu": return -1 if requested == "auto": return -1 if resolve_torch_cuda() else 0 try: return int(requested) except Exception: return 0 def load_stt() -> None: global STT if STT is not None: return model = CFG.get("stt_model", "medium") device = resolve_stt_device() compute = resolve_stt_compute(device) log(f"Loading STT: faster-whisper {model} device={device} compute={compute}") STT = WhisperModel(model, device=device, compute_type=compute) def load_llm() -> None: global LLM if LLM is not None: return model_path = ROOT / CFG["llm_model_path"] if not model_path.exists(): raise FileNotFoundError(f"Missing LLM model: {model_path}. Run download_models.py first.") gpu_layers = resolve_llm_gpu_layers() log(f"Loading LLM: {model_path.name} gpu_layers={gpu_layers}") LLM = Llama( model_path=str(model_path), n_ctx=int(CFG.get("llm_context_size", 2048)), n_gpu_layers=gpu_layers, verbose=False ) def load_sapi() -> None: global SPEAKER if SPEAKER is not None: return log("Loading Windows SAPI voice") SPEAKER = win32com.client.Dispatch("SAPI.SpVoice") try: # Slightly faster than default. SPEAKER.Rate = 1 SPEAKER.Volume = 100 except Exception: pass def load_all() -> None: t0 = time.perf_counter() load_stt() load_llm() load_sapi() log(f"GREEN: all models loaded in {time.perf_counter() - t0:.2f}s") def record_audio() -> Path: seconds = float(CFG.get("record_seconds", 4)) sample_rate = int(CFG.get("sample_rate", 16000)) out = OUTPUT / "input.wav" print("") print(f"Recording {seconds:.1f}s. Speak now.") audio = sd.rec( int(seconds * sample_rate), samplerate=sample_rate, channels=1, dtype="float32" ) sd.wait() sf.write(str(out), audio, sample_rate) return out def transcribe(audio_path: Path) -> str: t0 = time.perf_counter() segments, info = STT.transcribe( str(audio_path), beam_size=1, vad_filter=True, condition_on_previous_text=False ) text = " ".join(seg.text.strip() for seg in segments).strip() log(f"STT {time.perf_counter() - t0:.2f}s: {text}") return text def generate_reply(user_text: str) -> str: t0 = time.perf_counter() system = CFG.get("system_prompt", "You are concise.") prompt = ( "<|im_start|>system\n" f"{system}\n" "<|im_end|>\n" "<|im_start|>user\n" f"{user_text}\n" "<|im_end|>\n" "<|im_start|>assistant\n" ) result = LLM( prompt, max_tokens=int(CFG.get("llm_max_tokens", 140)), temperature=float(CFG.get("llm_temperature", 0.35)), stop=["<|im_end|>", "<|im_start|>"] ) reply = result["choices"][0]["text"].strip() log(f"LLM {time.perf_counter() - t0:.2f}s: {reply}") return reply def speak_text(reply: str) -> None: t0 = time.perf_counter() SPEAKER.Speak(reply) log(f"SAPI SPEAK {time.perf_counter() - t0:.2f}s") def show_devices() -> None: print(sd.query_devices()) def one_turn_from_text(text: str) -> None: if not text.strip(): return reply = generate_reply(text.strip()) speak_text(reply) def one_turn_from_mic() -> None: audio = record_audio() text = transcribe(audio) if not text: log("No speech detected.") return one_turn_from_text(text) def main() -> int: print("LOCAL S2S SHELL - SAPI LOW LATENCY") print("") print("Commands:") print(" Enter = record mic and run speech-to-speech") print(" t = type text and hear reply") print(" d = list audio devices") print(" q = quit") print("") load_all() while True: cmd = input("\nS2S> ").strip().lower() if cmd in ("q", "quit", "exit"): print("bye") return 0 try: if cmd == "d": show_devices() elif cmd == "t": text = input("TEXT> ") one_turn_from_text(text) else: one_turn_from_mic() except KeyboardInterrupt: print("") return 0 except Exception as e: log("ERROR: " + repr(e)) traceback.print_exc() return 0 if __name__ == "__main__": raise SystemExit(main())