s2s-complete-setup / shell_s2s.py
raichemathew1
Initial local S2S shell starter
de2ad9c
from __future__ import annotations
import json
import time
import traceback
from pathlib import Path
import sounddevice as sd
import soundfile as sf
from faster_whisper import WhisperModel
from llama_cpp import Llama
import win32com.client
ROOT = Path(__file__).resolve().parent
CFG = json.loads((ROOT / "config.json").read_text(encoding="utf-8-sig"))
OUTPUT = ROOT / "output"
LOGS = ROOT / "logs"
OUTPUT.mkdir(exist_ok=True)
LOGS.mkdir(exist_ok=True)
STT = None
LLM = None
SPEAKER = None
def log(msg: str) -> None:
stamp = time.strftime("%Y-%m-%d %H:%M:%S")
line = f"[{stamp}] {msg}"
print(line)
with open(LOGS / "shell_s2s.log", "a", encoding="utf-8", errors="replace") as f:
f.write(line + "\n")
def resolve_torch_cuda() -> bool:
try:
import torch
return bool(torch.cuda.is_available())
except Exception:
return False
def resolve_stt_device() -> str:
requested = str(CFG.get("stt_device", "auto")).lower().strip()
if requested in ("cpu", "cuda"):
return requested
return "cuda" if resolve_torch_cuda() else "cpu"
def resolve_stt_compute(device: str) -> str:
requested = str(CFG.get("stt_compute_type", "auto")).lower().strip()
if requested != "auto":
return requested
return "float16" if device == "cuda" else "int8"
def resolve_llm_gpu_layers() -> int:
requested = CFG.get("llm_gpu_layers", "auto")
if isinstance(requested, int):
return requested
requested = str(requested).lower().strip()
if requested == "cpu":
return 0
if requested == "gpu":
return -1
if requested == "auto":
return -1 if resolve_torch_cuda() else 0
try:
return int(requested)
except Exception:
return 0
def load_stt() -> None:
global STT
if STT is not None:
return
model = CFG.get("stt_model", "medium")
device = resolve_stt_device()
compute = resolve_stt_compute(device)
log(f"Loading STT: faster-whisper {model} device={device} compute={compute}")
STT = WhisperModel(model, device=device, compute_type=compute)
def load_llm() -> None:
global LLM
if LLM is not None:
return
model_path = ROOT / CFG["llm_model_path"]
if not model_path.exists():
raise FileNotFoundError(f"Missing LLM model: {model_path}. Run download_models.py first.")
gpu_layers = resolve_llm_gpu_layers()
log(f"Loading LLM: {model_path.name} gpu_layers={gpu_layers}")
LLM = Llama(
model_path=str(model_path),
n_ctx=int(CFG.get("llm_context_size", 2048)),
n_gpu_layers=gpu_layers,
verbose=False
)
def load_sapi() -> None:
global SPEAKER
if SPEAKER is not None:
return
log("Loading Windows SAPI voice")
SPEAKER = win32com.client.Dispatch("SAPI.SpVoice")
try:
# Slightly faster than default.
SPEAKER.Rate = 1
SPEAKER.Volume = 100
except Exception:
pass
def load_all() -> None:
t0 = time.perf_counter()
load_stt()
load_llm()
load_sapi()
log(f"GREEN: all models loaded in {time.perf_counter() - t0:.2f}s")
def record_audio() -> Path:
seconds = float(CFG.get("record_seconds", 4))
sample_rate = int(CFG.get("sample_rate", 16000))
out = OUTPUT / "input.wav"
print("")
print(f"Recording {seconds:.1f}s. Speak now.")
audio = sd.rec(
int(seconds * sample_rate),
samplerate=sample_rate,
channels=1,
dtype="float32"
)
sd.wait()
sf.write(str(out), audio, sample_rate)
return out
def transcribe(audio_path: Path) -> str:
t0 = time.perf_counter()
segments, info = STT.transcribe(
str(audio_path),
beam_size=1,
vad_filter=True,
condition_on_previous_text=False
)
text = " ".join(seg.text.strip() for seg in segments).strip()
log(f"STT {time.perf_counter() - t0:.2f}s: {text}")
return text
def generate_reply(user_text: str) -> str:
t0 = time.perf_counter()
system = CFG.get("system_prompt", "You are concise.")
prompt = (
"<|im_start|>system\n"
f"{system}\n"
"<|im_end|>\n"
"<|im_start|>user\n"
f"{user_text}\n"
"<|im_end|>\n"
"<|im_start|>assistant\n"
)
result = LLM(
prompt,
max_tokens=int(CFG.get("llm_max_tokens", 140)),
temperature=float(CFG.get("llm_temperature", 0.35)),
stop=["<|im_end|>", "<|im_start|>"]
)
reply = result["choices"][0]["text"].strip()
log(f"LLM {time.perf_counter() - t0:.2f}s: {reply}")
return reply
def speak_text(reply: str) -> None:
t0 = time.perf_counter()
SPEAKER.Speak(reply)
log(f"SAPI SPEAK {time.perf_counter() - t0:.2f}s")
def show_devices() -> None:
print(sd.query_devices())
def one_turn_from_text(text: str) -> None:
if not text.strip():
return
reply = generate_reply(text.strip())
speak_text(reply)
def one_turn_from_mic() -> None:
audio = record_audio()
text = transcribe(audio)
if not text:
log("No speech detected.")
return
one_turn_from_text(text)
def main() -> int:
print("LOCAL S2S SHELL - SAPI LOW LATENCY")
print("")
print("Commands:")
print(" Enter = record mic and run speech-to-speech")
print(" t = type text and hear reply")
print(" d = list audio devices")
print(" q = quit")
print("")
load_all()
while True:
cmd = input("\nS2S> ").strip().lower()
if cmd in ("q", "quit", "exit"):
print("bye")
return 0
try:
if cmd == "d":
show_devices()
elif cmd == "t":
text = input("TEXT> ")
one_turn_from_text(text)
else:
one_turn_from_mic()
except KeyboardInterrupt:
print("")
return 0
except Exception as e:
log("ERROR: " + repr(e))
traceback.print_exc()
return 0
if __name__ == "__main__":
raise SystemExit(main())