OffGridSchedula / server /model.py
ParetoOptimal's picture
Initial Commit
0366d65
Raw
History Blame Contribute Delete
12.3 kB
"""Load the fine-tuned Gemma 4 GGUF and run inference via llama.cpp.
Llama Champion: all generation goes through llama-cpp-python — no cloud AI API.
The GGUF is downloaded from HF at startup so the Space image stays small.
Two inference locations, selected by env:
- in-process llama.cpp, GPU-offloaded inside an @spaces.GPU lease (ZeroGPU), or
- a remote OpenAI-compatible / llama.cpp server via INFERENCE_BASE_URL
(e.g. a llama-server on the phone itself, or a backend).
"""
from __future__ import annotations
import os
import threading
import time
from huggingface_hub import hf_hub_download
from . import events
# The platform runs the gemma-cal EDGE fine-tune (Gemma-4 E4B, ~5GB Q4) — our own
# calendar-native model, eval-gated before every publish (docs/eval-roadmap.md).
# MODEL SIZE (hackathon hard constraint, <= 32B): E4B = ~4B effective params.
# All inference is local via llama.cpp (no cloud AI).
MODEL_REPO = os.environ.get("MODEL_REPO", "ParetoOptimal/gemma-4-cal-gguf")
MODEL_FILE = os.environ.get("MODEL_FILE", "gemma-cal-e4b-Q4_K_M.gguf")
# Vision projector (mmproj). Set to enable image input; leave empty for text-only.
# MMPROJ_REPO lets the projector come from a different repo than the LLM — the E4B
# edge model pairs with the base E4B's projector, not a projector in our repo.
MMPROJ_REPO = os.environ.get("MMPROJ_REPO", "") or os.environ.get("MODEL_REPO", "ParetoOptimal/gemma-4-cal-gguf")
MMPROJ_FILE = os.environ.get("MMPROJ_FILE", "")
# llama-cpp-python vision handler class (in llama_cpp.llama_chat_format). Gemma 4
# vision may ship a dedicated handler; the generic clip/Llava handler is the default.
CHAT_HANDLER = os.environ.get("CHAT_HANDLER", "Llava15ChatHandler")
N_CTX = int(os.environ.get("N_CTX", "8192"))
N_GPU_LAYERS = int(os.environ.get("N_GPU_LAYERS", "-1")) # -1 = offload all (GPU)
GPU_DURATION = int(os.environ.get("GPU_DURATION", "120")) # ZeroGPU lease seconds
# Configurable inference location. If INFERENCE_BASE_URL is set, generation is
# delegated to a remote OpenAI-compatible / llama.cpp server (e.g. a llama-server
# running on the phone itself, or a backend) instead of loading the GGUF in-process.
# This is how the same agent runs on-device OR thin-client — selected by env.
INFERENCE_BASE_URL = os.environ.get("INFERENCE_BASE_URL", "")
INFERENCE_API_KEY = os.environ.get("INFERENCE_API_KEY", "")
INFERENCE_MODEL = os.environ.get("INFERENCE_MODEL", "local")
# Let a tool-calling model (Hermes) write its own long-term memory mid-run.
# Only applies to the remote path (server/tools.py); off by default.
HERMES_TOOLS = os.environ.get("HERMES_TOOLS") == "1"
_llm = None
_lock = threading.Lock()
# ZeroGPU: GPU-bound work must run inside an @spaces.GPU function (the GPU is
# attached only for that call). Locally / in CI the `spaces` package is absent,
# so `gpu` degrades to a no-op decorator and stub mode never touches this path.
try:
from spaces import GPU as _spaces_gpu
def gpu(fn):
return _spaces_gpu(duration=GPU_DURATION)(fn)
except Exception: # noqa: BLE001 - spaces not installed (local/CI)
def gpu(fn):
return fn
def _preload_cuda_libs():
"""Preload CUDA userspace libs so the prebuilt CUDA llama-cpp-python wheel can
dlopen. The ZeroGPU/Gradio-SDK env lacks libcudart.so.12 on the default loader
path; the nvidia-*-cu12 pip packages provide them. We CDLL them RTLD_GLOBAL so
the llama .so's NEEDED deps resolve. Path-independent (no LD_LIBRARY_PATH guess);
a no-op off-Linux / when the packages aren't installed."""
import ctypes
import glob
import os
try:
import nvidia # namespace package from nvidia-*-cu12 wheels
except Exception: # noqa: BLE001
return
# nvidia is a PEP 420 namespace package: __file__ is None, use __path__.
bases = list(getattr(nvidia, "__path__", []) or [])
# cublas before its dependents is unnecessary ($ORIGIN RPATH resolves siblings).
for base in bases:
for sub in ("cuda_runtime", "cuda_nvrtc", "cublas"):
for so in sorted(glob.glob(os.path.join(base, sub, "lib", "*.so*"))):
try:
ctypes.CDLL(so, mode=ctypes.RTLD_GLOBAL)
except OSError:
pass
def _build_chat_handler():
"""Return a vision chat handler if MMPROJ_FILE is set, else None (text-only)."""
if not MMPROJ_FILE:
return None
import llama_cpp.llama_chat_format as fmt
mmproj_path = hf_hub_download(repo_id=MMPROJ_REPO, filename=MMPROJ_FILE)
handler_cls = getattr(fmt, CHAT_HANDLER)
return handler_cls(clip_model_path=mmproj_path, verbose=False)
def get_llm():
"""Lazily download + load the GGUF once, thread-safe."""
global _llm
if _llm is None:
with _lock:
if _llm is None:
_preload_cuda_libs() # satisfy libcudart.so.12 etc. before loading
from llama_cpp import Llama # imported lazily so tests can stub
path = hf_hub_download(repo_id=MODEL_REPO, filename=MODEL_FILE)
_llm = Llama(
model_path=path,
n_ctx=N_CTX,
n_gpu_layers=N_GPU_LAYERS,
chat_handler=_build_chat_handler(), # enables image_url inputs
verbose=False,
)
return _llm
# --- GPU-scoped inner functions (run inside the ZeroGPU lease) ---
# These do the actual in-process llama.cpp work; emits stay in the main-process
# wrappers below because in-memory state (the events bus) isn't shared back from
# the ZeroGPU subprocess.
@gpu
def _infer_text(messages: list[dict], temperature: float, max_tokens: int) -> str:
out = get_llm().create_chat_completion(
messages=messages, temperature=temperature, max_tokens=max_tokens
)
return out["choices"][0]["message"]["content"]
@gpu
def _infer_json(messages: list[dict], json_schema: dict, temperature: float, max_tokens: int):
out = get_llm().create_chat_completion(
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
response_format={"type": "json_object", "schema": json_schema},
)
usage = out.get("usage") or {}
return out["choices"][0]["message"]["content"], usage.get("completion_tokens")
@gpu
def _infer_stream(messages: list[dict], json_schema: dict, temperature: float, max_tokens: int):
stream = get_llm().create_chat_completion(
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
response_format={"type": "json_object", "schema": json_schema},
stream=True,
)
for chunk in stream:
delta = chunk["choices"][0].get("delta", {}).get("content")
if delta:
yield delta
# --- remote inference seam (on-device / thin-client via INFERENCE_BASE_URL) ---
def _remote_payload(messages, json_schema, temperature, max_tokens, stream):
return {
"model": INFERENCE_MODEL,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
# llama-server accepts json_schema (OpenAI-style); the in-process path uses
# the json_object+schema form. Both grammar-constrain the output.
"response_format": {
"type": "json_schema",
"json_schema": {"name": "ActionPlan", "schema": json_schema, "strict": True},
},
"stream": stream,
}
def _remote_headers() -> dict:
h = {"Content-Type": "application/json"}
if INFERENCE_API_KEY:
h["Authorization"] = f"Bearer {INFERENCE_API_KEY}"
return h
def _remote_complete_json(messages, json_schema, temperature, max_tokens) -> str:
import requests # already a dependency; imported here to keep import light
t0 = time.perf_counter()
if HERMES_TOOLS:
# Tool-calling loop: the model may call `remember` to update memory before
# returning the final ActionPlan JSON. See server/tools.py.
from .tools import TOOL_SPECS, run_with_tools
def _post(msgs):
payload = _remote_payload(msgs, json_schema, temperature, max_tokens, False)
payload["tools"] = TOOL_SPECS
r = requests.post(
f"{INFERENCE_BASE_URL.rstrip('/')}/chat/completions",
json=payload,
headers=_remote_headers(),
timeout=120,
)
r.raise_for_status()
return r.json()
content, out = run_with_tools(list(messages), _post)
usage = out.get("usage") or {}
events.emit(
"model",
"remote inference complete (tools)",
latency_ms=round((time.perf_counter() - t0) * 1000),
tokens=usage.get("completion_tokens"),
)
return content
resp = requests.post(
f"{INFERENCE_BASE_URL.rstrip('/')}/chat/completions",
json=_remote_payload(messages, json_schema, temperature, max_tokens, False),
headers=_remote_headers(),
timeout=120,
)
resp.raise_for_status()
out = resp.json()
usage = out.get("usage") or {}
events.emit(
"model",
"remote inference complete",
latency_ms=round((time.perf_counter() - t0) * 1000),
tokens=usage.get("completion_tokens"),
)
return out["choices"][0]["message"]["content"]
def _remote_stream_json(messages, json_schema, temperature, max_tokens):
import json as _json
import requests
t0 = time.perf_counter()
events.emit("model", "remote inference started")
with requests.post(
f"{INFERENCE_BASE_URL.rstrip('/')}/chat/completions",
json=_remote_payload(messages, json_schema, temperature, max_tokens, True),
headers=_remote_headers(),
timeout=120,
stream=True,
) as resp:
resp.raise_for_status()
for raw in resp.iter_lines():
if not raw:
continue
line = raw.decode("utf-8").removeprefix("data: ").strip()
if not line or line == "[DONE]":
continue
try:
delta = _json.loads(line)["choices"][0].get("delta", {}).get("content")
except (ValueError, KeyError, IndexError):
continue
if delta:
yield delta
events.emit(
"model", "remote stream complete", latency_ms=round((time.perf_counter() - t0) * 1000)
)
# --- main-process wrappers (own the activity-bus emits; pick local vs remote) ---
def complete(messages: list[dict], temperature: float = 0.2, max_tokens: int = 1024) -> str:
"""Chat-completion helper returning the assistant text."""
return _infer_text(messages, temperature, max_tokens)
def complete_json(
messages: list[dict],
json_schema: dict,
temperature: float = 0.2,
max_tokens: int = 2048,
) -> str:
"""Constrained completion: grammar-constrained so the output always parses.
Delegates to a remote server if INFERENCE_BASE_URL is set, else runs the
GPU-offloaded in-process llama.cpp path."""
if INFERENCE_BASE_URL:
return _remote_complete_json(messages, json_schema, temperature, max_tokens)
t0 = time.perf_counter()
text, tokens = _infer_json(messages, json_schema, temperature, max_tokens)
events.emit(
"model",
"inference complete",
latency_ms=round((time.perf_counter() - t0) * 1000),
tokens=tokens,
)
return text
def stream_complete_json(
messages: list[dict],
json_schema: dict,
temperature: float = 0.2,
max_tokens: int = 2048,
):
"""Streaming constrained completion: yields text deltas so the UI can show the
model 'thinking'. Remote seam when INFERENCE_BASE_URL is set, else GPU-offloaded
in-process llama.cpp. Emits model events around the call."""
if INFERENCE_BASE_URL:
yield from _remote_stream_json(messages, json_schema, temperature, max_tokens)
return
t0 = time.perf_counter()
events.emit("model", "inference started")
for delta in _infer_stream(messages, json_schema, temperature, max_tokens):
yield delta
events.emit(
"model", "stream complete", latency_ms=round((time.perf_counter() - t0) * 1000)
)