tiny-army / llm.py
polats's picture
Persona: cap n_threads=2 (cpu_count over-reports host cores in a container → llama.cpp thrashing/50x slowdown); smaller ctx; /persona/status diag; lower token cap
2f7e532
"""Configurable llama.cpp runtime for Tiny Army's persona + war-diary.
Two modes, env-selected, behind ONE uniform `stream_chat()` generator so callers are
runtime-agnostic:
• External (TINY_LLM_BASE_URL set): stream from any OpenAI-compatible llama.cpp
server — your local `llama-server`, or an HF-hosted GGUF endpoint. This mirrors
woid's LOCAL_LLM_BASE_URL switch (reused, not reinvented).
• In-Space (default): llama-cpp-python loads a GGUF — a local file
(TINY_LLM_MODEL_PATH) or one pulled from Hugging Face (TINY_LLM_HF_REPO +
TINY_LLM_HF_FILE).
Generation is synchronous and CPU-bound, so it's serialized by a lock (one CPU model
can't decode in parallel) — async callers (the SSE endpoint) run `stream_chat` in a
threadpool. If no backend can load, `stream_chat` raises LlmUnavailable and callers
fall back to a stub so the Space still works.
"""
import json
import os
import threading
import urllib.request
BASE_URL = os.environ.get("TINY_LLM_BASE_URL", "").rstrip("/")
API_KEY = os.environ.get("TINY_LLM_API_KEY", "")
MODEL_PATH = os.environ.get("TINY_LLM_MODEL_PATH", "")
HF_REPO = os.environ.get("TINY_LLM_HF_REPO", "Qwen/Qwen2.5-0.5B-Instruct-GGUF")
HF_FILE = os.environ.get("TINY_LLM_HF_FILE", "*q4_k_m.gguf")
N_CTX = int(os.environ.get("TINY_LLM_N_CTX", "2048"))
# DEFAULT 2, not os.cpu_count(): in a container cpu_count() reports the HOST's cores
# (often 16-32), but an HF free Space only has ~2 vCPU. Over-subscribing threads makes
# llama.cpp thrash and run ~50x too slow. Override with TINY_LLM_N_THREADS on bigger HW.
N_THREADS = int(os.environ.get("TINY_LLM_N_THREADS") or 2)
# A label for the `model` SSE event / UI — not used to route requests.
MODEL_ID = (
os.environ.get("TINY_LLM_MODEL")
or ("external" if BASE_URL else "")
or (os.path.basename(MODEL_PATH) if MODEL_PATH else "")
or HF_REPO.split("/")[-1]
)
_lock = threading.Lock() # serializes GENERATION (one CPU model at a time)
_load_lock = threading.Lock() # serializes the one-time model LOAD (the slow download)
_llm = None
_load_error = None
class LlmUnavailable(RuntimeError):
"""No backend could be reached/loaded — callers should fall back to a stub."""
def model_id():
return MODEL_ID or "tiny-llm"
def status():
"""Diagnostics for the persona backend (model load state + thread/CPU info)."""
return {
"mode": "external" if BASE_URL else "in-space",
"model": model_id(),
"loaded": _llm is not None,
"load_error": _load_error,
"n_threads": N_THREADS,
"n_ctx": N_CTX,
"cpu_count": os.cpu_count(),
"base_url": BASE_URL or None,
}
def _get_local():
"""Load the GGUF once. Uses its OWN lock (not the generation lock) so the slow
first-time download doesn't make concurrent requests time out — they just wait
here until the model is ready."""
global _llm, _load_error
if _llm is not None:
return _llm
if _load_error is not None:
raise LlmUnavailable(_load_error)
with _load_lock:
if _llm is not None:
return _llm
if _load_error is not None:
raise LlmUnavailable(_load_error)
try:
from llama_cpp import Llama
common = dict(n_ctx=N_CTX, n_threads=N_THREADS, verbose=False)
if MODEL_PATH:
_llm = Llama(model_path=MODEL_PATH, **common)
else: # pulls + caches the GGUF from Hugging Face on first use
_llm = Llama.from_pretrained(repo_id=HF_REPO, filename=HF_FILE, **common)
return _llm
except Exception as e: # import / download / OOM / bad file
_load_error = f"{type(e).__name__}: {e}"
raise LlmUnavailable(_load_error)
def prewarm():
"""Kick off the model load in the background so the app starts immediately and the
download happens before the first user request (best-effort)."""
if BASE_URL:
return # external endpoint — nothing to load locally
def _bg():
try:
_get_local()
except Exception:
pass # _load_error is recorded; callers fall back to the stub
threading.Thread(target=_bg, daemon=True).start()
def _stream_external(system, user, max_tokens, temperature):
body = json.dumps({
"model": os.environ.get("TINY_LLM_MODEL", "local"),
"messages": [{"role": "system", "content": system}, {"role": "user", "content": user}],
"temperature": temperature, "max_tokens": max_tokens, "stream": True,
}).encode()
headers = {"Content-Type": "application/json"}
if API_KEY:
headers["Authorization"] = f"Bearer {API_KEY}"
req = urllib.request.Request(f"{BASE_URL}/chat/completions", data=body, headers=headers)
try:
with urllib.request.urlopen(req, timeout=120) as resp:
for raw in resp:
line = raw.decode("utf-8").strip()
if not line.startswith("data:"):
continue
data = line[5:].strip()
if data == "[DONE]":
break
try:
delta = json.loads(data)["choices"][0]["delta"].get("content")
except Exception:
continue
if delta:
yield delta
except Exception as e:
raise LlmUnavailable(f"external endpoint: {type(e).__name__}: {e}")
def _stream_local(system, user, max_tokens, temperature):
llm = _get_local()
for chunk in llm.create_chat_completion(
messages=[{"role": "system", "content": system}, {"role": "user", "content": user}],
max_tokens=max_tokens, temperature=temperature, stream=True,
):
delta = chunk["choices"][0]["delta"].get("content")
if delta:
yield delta
def stream_chat(system, user, max_tokens=400, temperature=0.8, should_stop=None):
"""Yield text chunks from the configured backend. Serialized by a module lock so
one CPU model never decodes two requests at once. `should_stop()` is polled each
chunk so an abandoned request (client gone) stops promptly and frees the lock.
Raises LlmUnavailable if no backend is available or the model is busy."""
# Ensure the model is loaded FIRST (its own lock; the slow download must not count
# against the short generation-lock timeout below).
if not BASE_URL:
_get_local()
if not _lock.acquire(timeout=2):
raise LlmUnavailable("the model is busy with another request — try again in a moment")
try:
gen = _stream_external if BASE_URL else _stream_local
for chunk in gen(system, user, max_tokens, temperature):
if should_stop and should_stop():
break
yield chunk
finally:
_lock.release()