| from __future__ import annotations |
|
|
| import json |
| import time |
| import traceback |
| from pathlib import Path |
|
|
| import sounddevice as sd |
| import soundfile as sf |
| from faster_whisper import WhisperModel |
| from llama_cpp import Llama |
| import win32com.client |
|
|
|
|
| ROOT = Path(__file__).resolve().parent |
| CFG = json.loads((ROOT / "config.json").read_text(encoding="utf-8-sig")) |
| OUTPUT = ROOT / "output" |
| LOGS = ROOT / "logs" |
| OUTPUT.mkdir(exist_ok=True) |
| LOGS.mkdir(exist_ok=True) |
|
|
| STT = None |
| LLM = None |
| SPEAKER = None |
|
|
|
|
| def log(msg: str) -> None: |
| stamp = time.strftime("%Y-%m-%d %H:%M:%S") |
| line = f"[{stamp}] {msg}" |
| print(line) |
| with open(LOGS / "shell_s2s.log", "a", encoding="utf-8", errors="replace") as f: |
| f.write(line + "\n") |
|
|
|
|
| def resolve_torch_cuda() -> bool: |
| try: |
| import torch |
| return bool(torch.cuda.is_available()) |
| except Exception: |
| return False |
|
|
|
|
| def resolve_stt_device() -> str: |
| requested = str(CFG.get("stt_device", "auto")).lower().strip() |
| if requested in ("cpu", "cuda"): |
| return requested |
| return "cuda" if resolve_torch_cuda() else "cpu" |
|
|
|
|
| def resolve_stt_compute(device: str) -> str: |
| requested = str(CFG.get("stt_compute_type", "auto")).lower().strip() |
| if requested != "auto": |
| return requested |
| return "float16" if device == "cuda" else "int8" |
|
|
|
|
| def resolve_llm_gpu_layers() -> int: |
| requested = CFG.get("llm_gpu_layers", "auto") |
|
|
| if isinstance(requested, int): |
| return requested |
|
|
| requested = str(requested).lower().strip() |
|
|
| if requested == "cpu": |
| return 0 |
|
|
| if requested == "gpu": |
| return -1 |
|
|
| if requested == "auto": |
| return -1 if resolve_torch_cuda() else 0 |
|
|
| try: |
| return int(requested) |
| except Exception: |
| return 0 |
|
|
|
|
| def load_stt() -> None: |
| global STT |
| if STT is not None: |
| return |
|
|
| model = CFG.get("stt_model", "medium") |
| device = resolve_stt_device() |
| compute = resolve_stt_compute(device) |
|
|
| log(f"Loading STT: faster-whisper {model} device={device} compute={compute}") |
| STT = WhisperModel(model, device=device, compute_type=compute) |
|
|
|
|
| def load_llm() -> None: |
| global LLM |
| if LLM is not None: |
| return |
|
|
| model_path = ROOT / CFG["llm_model_path"] |
| if not model_path.exists(): |
| raise FileNotFoundError(f"Missing LLM model: {model_path}. Run download_models.py first.") |
|
|
| gpu_layers = resolve_llm_gpu_layers() |
| log(f"Loading LLM: {model_path.name} gpu_layers={gpu_layers}") |
|
|
| LLM = Llama( |
| model_path=str(model_path), |
| n_ctx=int(CFG.get("llm_context_size", 2048)), |
| n_gpu_layers=gpu_layers, |
| verbose=False |
| ) |
|
|
|
|
| def load_sapi() -> None: |
| global SPEAKER |
| if SPEAKER is not None: |
| return |
|
|
| log("Loading Windows SAPI voice") |
| SPEAKER = win32com.client.Dispatch("SAPI.SpVoice") |
|
|
| try: |
| |
| SPEAKER.Rate = 1 |
| SPEAKER.Volume = 100 |
| except Exception: |
| pass |
|
|
|
|
| def load_all() -> None: |
| t0 = time.perf_counter() |
| load_stt() |
| load_llm() |
| load_sapi() |
| log(f"GREEN: all models loaded in {time.perf_counter() - t0:.2f}s") |
|
|
|
|
| def record_audio() -> Path: |
| seconds = float(CFG.get("record_seconds", 4)) |
| sample_rate = int(CFG.get("sample_rate", 16000)) |
| out = OUTPUT / "input.wav" |
|
|
| print("") |
| print(f"Recording {seconds:.1f}s. Speak now.") |
| audio = sd.rec( |
| int(seconds * sample_rate), |
| samplerate=sample_rate, |
| channels=1, |
| dtype="float32" |
| ) |
| sd.wait() |
|
|
| sf.write(str(out), audio, sample_rate) |
| return out |
|
|
|
|
| def transcribe(audio_path: Path) -> str: |
| t0 = time.perf_counter() |
|
|
| segments, info = STT.transcribe( |
| str(audio_path), |
| beam_size=1, |
| vad_filter=True, |
| condition_on_previous_text=False |
| ) |
|
|
| text = " ".join(seg.text.strip() for seg in segments).strip() |
| log(f"STT {time.perf_counter() - t0:.2f}s: {text}") |
| return text |
|
|
|
|
| def generate_reply(user_text: str) -> str: |
| t0 = time.perf_counter() |
|
|
| system = CFG.get("system_prompt", "You are concise.") |
| prompt = ( |
| "<|im_start|>system\n" |
| f"{system}\n" |
| "<|im_end|>\n" |
| "<|im_start|>user\n" |
| f"{user_text}\n" |
| "<|im_end|>\n" |
| "<|im_start|>assistant\n" |
| ) |
|
|
| result = LLM( |
| prompt, |
| max_tokens=int(CFG.get("llm_max_tokens", 140)), |
| temperature=float(CFG.get("llm_temperature", 0.35)), |
| stop=["<|im_end|>", "<|im_start|>"] |
| ) |
|
|
| reply = result["choices"][0]["text"].strip() |
| log(f"LLM {time.perf_counter() - t0:.2f}s: {reply}") |
| return reply |
|
|
|
|
| def speak_text(reply: str) -> None: |
| t0 = time.perf_counter() |
| SPEAKER.Speak(reply) |
| log(f"SAPI SPEAK {time.perf_counter() - t0:.2f}s") |
|
|
|
|
| def show_devices() -> None: |
| print(sd.query_devices()) |
|
|
|
|
| def one_turn_from_text(text: str) -> None: |
| if not text.strip(): |
| return |
| reply = generate_reply(text.strip()) |
| speak_text(reply) |
|
|
|
|
| def one_turn_from_mic() -> None: |
| audio = record_audio() |
| text = transcribe(audio) |
| if not text: |
| log("No speech detected.") |
| return |
| one_turn_from_text(text) |
|
|
|
|
| def main() -> int: |
| print("LOCAL S2S SHELL - SAPI LOW LATENCY") |
| print("") |
| print("Commands:") |
| print(" Enter = record mic and run speech-to-speech") |
| print(" t = type text and hear reply") |
| print(" d = list audio devices") |
| print(" q = quit") |
| print("") |
|
|
| load_all() |
|
|
| while True: |
| cmd = input("\nS2S> ").strip().lower() |
|
|
| if cmd in ("q", "quit", "exit"): |
| print("bye") |
| return 0 |
|
|
| try: |
| if cmd == "d": |
| show_devices() |
| elif cmd == "t": |
| text = input("TEXT> ") |
| one_turn_from_text(text) |
| else: |
| one_turn_from_mic() |
|
|
| except KeyboardInterrupt: |
| print("") |
| return 0 |
| except Exception as e: |
| log("ERROR: " + repr(e)) |
| traceback.print_exc() |
|
|
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |