| """Load the fine-tuned Gemma 4 GGUF and run inference via llama.cpp. |
| |
| Llama Champion: all generation goes through llama-cpp-python — no cloud AI API. |
| The GGUF is downloaded from HF at startup so the Space image stays small. |
| |
| Two inference locations, selected by env: |
| - in-process llama.cpp, GPU-offloaded inside an @spaces.GPU lease (ZeroGPU), or |
| - a remote OpenAI-compatible / llama.cpp server via INFERENCE_BASE_URL |
| (e.g. a llama-server on the phone itself, or a backend). |
| """ |
| from __future__ import annotations |
|
|
| import os |
| import threading |
| import time |
|
|
| from huggingface_hub import hf_hub_download |
|
|
| from . import events |
|
|
| |
| |
| |
| |
| MODEL_REPO = os.environ.get("MODEL_REPO", "ParetoOptimal/gemma-4-cal-gguf") |
| MODEL_FILE = os.environ.get("MODEL_FILE", "gemma-cal-e4b-Q4_K_M.gguf") |
| |
| |
| |
| MMPROJ_REPO = os.environ.get("MMPROJ_REPO", "") or os.environ.get("MODEL_REPO", "ParetoOptimal/gemma-4-cal-gguf") |
| MMPROJ_FILE = os.environ.get("MMPROJ_FILE", "") |
| |
| |
| CHAT_HANDLER = os.environ.get("CHAT_HANDLER", "Llava15ChatHandler") |
|
|
| N_CTX = int(os.environ.get("N_CTX", "8192")) |
| N_GPU_LAYERS = int(os.environ.get("N_GPU_LAYERS", "-1")) |
| GPU_DURATION = int(os.environ.get("GPU_DURATION", "120")) |
|
|
| |
| |
| |
| |
| INFERENCE_BASE_URL = os.environ.get("INFERENCE_BASE_URL", "") |
| INFERENCE_API_KEY = os.environ.get("INFERENCE_API_KEY", "") |
| INFERENCE_MODEL = os.environ.get("INFERENCE_MODEL", "local") |
| |
| |
| HERMES_TOOLS = os.environ.get("HERMES_TOOLS") == "1" |
|
|
| _llm = None |
| _lock = threading.Lock() |
|
|
|
|
| |
| |
| |
| try: |
| from spaces import GPU as _spaces_gpu |
|
|
| def gpu(fn): |
| return _spaces_gpu(duration=GPU_DURATION)(fn) |
| except Exception: |
|
|
| def gpu(fn): |
| return fn |
|
|
|
|
| def _preload_cuda_libs(): |
| """Preload CUDA userspace libs so the prebuilt CUDA llama-cpp-python wheel can |
| dlopen. The ZeroGPU/Gradio-SDK env lacks libcudart.so.12 on the default loader |
| path; the nvidia-*-cu12 pip packages provide them. We CDLL them RTLD_GLOBAL so |
| the llama .so's NEEDED deps resolve. Path-independent (no LD_LIBRARY_PATH guess); |
| a no-op off-Linux / when the packages aren't installed.""" |
| import ctypes |
| import glob |
| import os |
|
|
| try: |
| import nvidia |
| except Exception: |
| return |
| |
| bases = list(getattr(nvidia, "__path__", []) or []) |
| |
| for base in bases: |
| for sub in ("cuda_runtime", "cuda_nvrtc", "cublas"): |
| for so in sorted(glob.glob(os.path.join(base, sub, "lib", "*.so*"))): |
| try: |
| ctypes.CDLL(so, mode=ctypes.RTLD_GLOBAL) |
| except OSError: |
| pass |
|
|
|
|
| def _build_chat_handler(): |
| """Return a vision chat handler if MMPROJ_FILE is set, else None (text-only).""" |
| if not MMPROJ_FILE: |
| return None |
| import llama_cpp.llama_chat_format as fmt |
|
|
| mmproj_path = hf_hub_download(repo_id=MMPROJ_REPO, filename=MMPROJ_FILE) |
| handler_cls = getattr(fmt, CHAT_HANDLER) |
| return handler_cls(clip_model_path=mmproj_path, verbose=False) |
|
|
|
|
| def get_llm(): |
| """Lazily download + load the GGUF once, thread-safe.""" |
| global _llm |
| if _llm is None: |
| with _lock: |
| if _llm is None: |
| _preload_cuda_libs() |
| from llama_cpp import Llama |
|
|
| path = hf_hub_download(repo_id=MODEL_REPO, filename=MODEL_FILE) |
| _llm = Llama( |
| model_path=path, |
| n_ctx=N_CTX, |
| n_gpu_layers=N_GPU_LAYERS, |
| chat_handler=_build_chat_handler(), |
| verbose=False, |
| ) |
| return _llm |
|
|
|
|
| |
| |
| |
| |
| @gpu |
| def _infer_text(messages: list[dict], temperature: float, max_tokens: int) -> str: |
| out = get_llm().create_chat_completion( |
| messages=messages, temperature=temperature, max_tokens=max_tokens |
| ) |
| return out["choices"][0]["message"]["content"] |
|
|
|
|
| @gpu |
| def _infer_json(messages: list[dict], json_schema: dict, temperature: float, max_tokens: int): |
| out = get_llm().create_chat_completion( |
| messages=messages, |
| temperature=temperature, |
| max_tokens=max_tokens, |
| response_format={"type": "json_object", "schema": json_schema}, |
| ) |
| usage = out.get("usage") or {} |
| return out["choices"][0]["message"]["content"], usage.get("completion_tokens") |
|
|
|
|
| @gpu |
| def _infer_stream(messages: list[dict], json_schema: dict, temperature: float, max_tokens: int): |
| stream = get_llm().create_chat_completion( |
| messages=messages, |
| temperature=temperature, |
| max_tokens=max_tokens, |
| response_format={"type": "json_object", "schema": json_schema}, |
| stream=True, |
| ) |
| for chunk in stream: |
| delta = chunk["choices"][0].get("delta", {}).get("content") |
| if delta: |
| yield delta |
|
|
|
|
| |
| def _remote_payload(messages, json_schema, temperature, max_tokens, stream): |
| return { |
| "model": INFERENCE_MODEL, |
| "messages": messages, |
| "temperature": temperature, |
| "max_tokens": max_tokens, |
| |
| |
| "response_format": { |
| "type": "json_schema", |
| "json_schema": {"name": "ActionPlan", "schema": json_schema, "strict": True}, |
| }, |
| "stream": stream, |
| } |
|
|
|
|
| def _remote_headers() -> dict: |
| h = {"Content-Type": "application/json"} |
| if INFERENCE_API_KEY: |
| h["Authorization"] = f"Bearer {INFERENCE_API_KEY}" |
| return h |
|
|
|
|
| def _remote_complete_json(messages, json_schema, temperature, max_tokens) -> str: |
| import requests |
|
|
| t0 = time.perf_counter() |
|
|
| if HERMES_TOOLS: |
| |
| |
| from .tools import TOOL_SPECS, run_with_tools |
|
|
| def _post(msgs): |
| payload = _remote_payload(msgs, json_schema, temperature, max_tokens, False) |
| payload["tools"] = TOOL_SPECS |
| r = requests.post( |
| f"{INFERENCE_BASE_URL.rstrip('/')}/chat/completions", |
| json=payload, |
| headers=_remote_headers(), |
| timeout=120, |
| ) |
| r.raise_for_status() |
| return r.json() |
|
|
| content, out = run_with_tools(list(messages), _post) |
| usage = out.get("usage") or {} |
| events.emit( |
| "model", |
| "remote inference complete (tools)", |
| latency_ms=round((time.perf_counter() - t0) * 1000), |
| tokens=usage.get("completion_tokens"), |
| ) |
| return content |
|
|
| resp = requests.post( |
| f"{INFERENCE_BASE_URL.rstrip('/')}/chat/completions", |
| json=_remote_payload(messages, json_schema, temperature, max_tokens, False), |
| headers=_remote_headers(), |
| timeout=120, |
| ) |
| resp.raise_for_status() |
| out = resp.json() |
| usage = out.get("usage") or {} |
| events.emit( |
| "model", |
| "remote inference complete", |
| latency_ms=round((time.perf_counter() - t0) * 1000), |
| tokens=usage.get("completion_tokens"), |
| ) |
| return out["choices"][0]["message"]["content"] |
|
|
|
|
| def _remote_stream_json(messages, json_schema, temperature, max_tokens): |
| import json as _json |
|
|
| import requests |
|
|
| t0 = time.perf_counter() |
| events.emit("model", "remote inference started") |
| with requests.post( |
| f"{INFERENCE_BASE_URL.rstrip('/')}/chat/completions", |
| json=_remote_payload(messages, json_schema, temperature, max_tokens, True), |
| headers=_remote_headers(), |
| timeout=120, |
| stream=True, |
| ) as resp: |
| resp.raise_for_status() |
| for raw in resp.iter_lines(): |
| if not raw: |
| continue |
| line = raw.decode("utf-8").removeprefix("data: ").strip() |
| if not line or line == "[DONE]": |
| continue |
| try: |
| delta = _json.loads(line)["choices"][0].get("delta", {}).get("content") |
| except (ValueError, KeyError, IndexError): |
| continue |
| if delta: |
| yield delta |
| events.emit( |
| "model", "remote stream complete", latency_ms=round((time.perf_counter() - t0) * 1000) |
| ) |
|
|
|
|
| |
| def complete(messages: list[dict], temperature: float = 0.2, max_tokens: int = 1024) -> str: |
| """Chat-completion helper returning the assistant text.""" |
| return _infer_text(messages, temperature, max_tokens) |
|
|
|
|
| def complete_json( |
| messages: list[dict], |
| json_schema: dict, |
| temperature: float = 0.2, |
| max_tokens: int = 2048, |
| ) -> str: |
| """Constrained completion: grammar-constrained so the output always parses. |
| Delegates to a remote server if INFERENCE_BASE_URL is set, else runs the |
| GPU-offloaded in-process llama.cpp path.""" |
| if INFERENCE_BASE_URL: |
| return _remote_complete_json(messages, json_schema, temperature, max_tokens) |
| t0 = time.perf_counter() |
| text, tokens = _infer_json(messages, json_schema, temperature, max_tokens) |
| events.emit( |
| "model", |
| "inference complete", |
| latency_ms=round((time.perf_counter() - t0) * 1000), |
| tokens=tokens, |
| ) |
| return text |
|
|
|
|
| def stream_complete_json( |
| messages: list[dict], |
| json_schema: dict, |
| temperature: float = 0.2, |
| max_tokens: int = 2048, |
| ): |
| """Streaming constrained completion: yields text deltas so the UI can show the |
| model 'thinking'. Remote seam when INFERENCE_BASE_URL is set, else GPU-offloaded |
| in-process llama.cpp. Emits model events around the call.""" |
| if INFERENCE_BASE_URL: |
| yield from _remote_stream_json(messages, json_schema, temperature, max_tokens) |
| return |
| t0 = time.perf_counter() |
| events.emit("model", "inference started") |
| for delta in _infer_stream(messages, json_schema, temperature, max_tokens): |
| yield delta |
| events.emit( |
| "model", "stream complete", latency_ms=round((time.perf_counter() - t0) * 1000) |
| ) |
|
|