from __future__ import annotations import os import platform import re import threading import time import subprocess import tarfile import urllib.request import json from pathlib import Path from typing import Any import gradio as gr from huggingface_hub import HfApi, hf_hub_download try: import spaces except Exception: # pragma: no cover - the package exists on HF ZeroGPU runtimes spaces = None # type: ignore[assignment] MODEL_REPO = os.getenv("PHASE3_MODEL_REPO", "build-small-hackathon/phase-3-gguf") MODEL_FILE = os.getenv("PHASE3_MODEL_FILE", "model-Q8_0.gguf") MODEL_LABEL = "First-Principle AI" LOCAL_MODEL_PATH = Path("/Users/user/.lmstudio/models/owenisas/Phase-3-GGUF/model-Q8_0.gguf") LLAMA_RELEASE = os.getenv("PHASE3_LLAMA_RELEASE", "b9360") LLAMA_URL = os.getenv( "PHASE3_LLAMA_URL", f"https://github.com/ggml-org/llama.cpp/releases/download/{LLAMA_RELEASE}/llama-{LLAMA_RELEASE}-bin-ubuntu-x64.tar.gz", ) MAX_CONTEXT = int(os.getenv("PHASE3_MAX_CONTEXT", "2048")) MIN_RAM_GB = float(os.getenv("PHASE3_MIN_RAM_GB", "38")) DISABLE_MODEL = os.getenv("PHASE3_DISABLE_MODEL", "").lower() in {"1", "true", "yes"} USE_ZEROGPU_DECORATOR = os.getenv("PHASE3_USE_ZEROGPU", "").lower() in {"1", "true", "yes"} N_BATCH = int(os.getenv("PHASE3_N_BATCH", "256")) N_UBATCH = int(os.getenv("PHASE3_N_UBATCH", "64")) N_THREADS = int(os.getenv("PHASE3_THREADS", str(max(1, min(16, os.cpu_count() or 2))))) N_THREADS_BATCH = int(os.getenv("PHASE3_THREADS_BATCH", str(N_THREADS))) USE_MMAP = os.getenv("PHASE3_USE_MMAP", "1").lower() not in {"0", "false", "no"} USE_MLOCK = os.getenv("PHASE3_USE_MLOCK", "").lower() in {"1", "true", "yes"} FLASH_ATTN = os.getenv("PHASE3_FLASH_ATTN", "").lower() in {"1", "true", "yes"} OFFLOAD_KQV = os.getenv("PHASE3_OFFLOAD_KQV", "1").lower() not in {"0", "false", "no"} INFER_TIMEOUT = int(os.getenv("PHASE3_INFER_TIMEOUT", "900")) SERVER_HOST = "127.0.0.1" SERVER_PORT = int(os.getenv("PHASE3_SERVER_PORT", "8088")) NO_WARMUP = os.getenv("PHASE3_NO_WARMUP", "1").lower() not in {"0", "false", "no"} MODEL_LOCK = threading.Lock() MODEL_PATH: Path | None = None LLAMA_CLI_PATH: Path | None = None LLAMA_SERVER_PATH: Path | None = None LLAMA_SERVER_PROCESS: subprocess.Popen[str] | None = None MODEL_ERROR: str | None = None MODEL_SETTINGS: dict[str, Any] = {} def _gpu_decorator(fn): if not USE_ZEROGPU_DECORATOR: return fn if spaces is None: return fn try: return spaces.GPU(duration=120)(fn) except Exception: return fn if spaces is not None: try: @spaces.GPU(duration=1) def _zerogpu_startup_probe() -> str: return "ZeroGPU configured" except Exception: def _zerogpu_startup_probe() -> str: return "ZeroGPU helper importable" else: def _zerogpu_startup_probe() -> str: return "ZeroGPU helper unavailable" def _meminfo_gb() -> tuple[float | None, float | None]: meminfo = Path("/proc/meminfo") if not meminfo.exists(): return None, None data: dict[str, int] = {} for line in meminfo.read_text(encoding="utf-8", errors="ignore").splitlines(): match = re.match(r"^(\w+):\s+(\d+)\s+kB", line) if match: data[match.group(1)] = int(match.group(2)) total = data.get("MemTotal") available = data.get("MemAvailable") gb = 1024 * 1024 return (total / gb if total else None, available / gb if available else None) def _safe_env_summary() -> dict[str, str]: keys = [ "SPACE_ID", "SPACE_HOST", "SPACE_AUTHOR_NAME", "SPACE_REPO_NAME", "CUDA_VISIBLE_DEVICES", "PHASE3_MODEL_REPO", "PHASE3_MODEL_FILE", "PHASE3_LLAMA_RELEASE", "PHASE3_MAX_CONTEXT", "PHASE3_DISABLE_MODEL", "PHASE3_USE_ZEROGPU", "PHASE3_N_GPU_LAYERS", "PHASE3_THREADS", "PHASE3_N_BATCH", "PHASE3_N_UBATCH", ] return {key: os.environ[key] for key in keys if key in os.environ} def _repo_file_size() -> int | None: try: info = HfApi().model_info(MODEL_REPO, files_metadata=True) except Exception: return None for sibling in info.siblings or []: if sibling.rfilename == MODEL_FILE: return getattr(sibling, "size", None) return None def _find_model_path() -> Path: if DISABLE_MODEL: raise RuntimeError("Model loading is disabled with PHASE3_DISABLE_MODEL=1.") explicit = os.getenv("PHASE3_MODEL_PATH") if explicit: path = Path(explicit) if path.exists(): return path raise RuntimeError(f"PHASE3_MODEL_PATH does not exist: {explicit}") if LOCAL_MODEL_PATH.exists(): return LOCAL_MODEL_PATH data_dir = Path(os.getenv("PHASE3_MODEL_DIR", "/data/phase-3-gguf")) if data_dir.parent.exists() and os.access(data_dir.parent, os.W_OK): data_dir.mkdir(parents=True, exist_ok=True) downloaded = hf_hub_download(repo_id=MODEL_REPO, filename=MODEL_FILE, local_dir=data_dir) else: downloaded = hf_hub_download(repo_id=MODEL_REPO, filename=MODEL_FILE) return Path(downloaded) def _gpu_layers() -> int: if "PHASE3_N_GPU_LAYERS" in os.environ: return int(os.environ["PHASE3_N_GPU_LAYERS"]) if os.getenv("CUDA_VISIBLE_DEVICES") and os.getenv("PHASE3_AUTO_GPU", "1").lower() not in {"0", "false", "no"}: return -1 return 0 def _ensure_llama_binary(name: str) -> Path: global LLAMA_CLI_PATH, LLAMA_SERVER_PATH if name == "llama-cli" and LLAMA_CLI_PATH is not None and LLAMA_CLI_PATH.exists(): return LLAMA_CLI_PATH if name == "llama-server" and LLAMA_SERVER_PATH is not None and LLAMA_SERVER_PATH.exists(): return LLAMA_SERVER_PATH root = Path(os.getenv("PHASE3_LLAMA_DIR", "/tmp/phase3-llama.cpp")) release_dir = root / f"llama-{LLAMA_RELEASE}" binary = release_dir / name if binary.exists(): binary.chmod(0o755) if name == "llama-cli": LLAMA_CLI_PATH = binary if name == "llama-server": LLAMA_SERVER_PATH = binary return binary root.mkdir(parents=True, exist_ok=True) archive = root / f"llama-{LLAMA_RELEASE}-bin-ubuntu-x64.tar.gz" if not archive.exists(): urllib.request.urlretrieve(LLAMA_URL, archive) with tarfile.open(archive, "r:gz") as tar: tar.extractall(root) if not binary.exists(): raise RuntimeError(f"{name} was not found after extracting {LLAMA_URL}") binary.chmod(0o755) if name == "llama-cli": LLAMA_CLI_PATH = binary if name == "llama-server": LLAMA_SERVER_PATH = binary return binary def _prepare_runtime() -> tuple[Path, Path]: global MODEL_PATH, MODEL_ERROR, MODEL_SETTINGS if MODEL_ERROR is not None: raise RuntimeError(MODEL_ERROR) with MODEL_LOCK: if MODEL_ERROR is not None: raise RuntimeError(MODEL_ERROR) total_gb, available_gb = _meminfo_gb() if total_gb is not None and total_gb < MIN_RAM_GB: MODEL_ERROR = ( f"Runtime has {total_gb:.1f} GB RAM, below the configured load threshold " f"of {MIN_RAM_GB:.1f} GB for the 31 GB Q8 GGUF." ) raise RuntimeError(MODEL_ERROR) path = _find_model_path() server = _ensure_llama_binary("llama-server") MODEL_PATH = path n_gpu_layers = _gpu_layers() MODEL_SETTINGS = { "path": str(path), "llama_server": str(server), "n_ctx": MAX_CONTEXT, "n_batch": N_BATCH, "n_ubatch": N_UBATCH, "n_threads": N_THREADS, "n_threads_batch": N_THREADS_BATCH, "n_gpu_layers": n_gpu_layers, "use_mmap": USE_MMAP, "use_mlock": USE_MLOCK, "flash_attn": FLASH_ATTN, "offload_kqv": OFFLOAD_KQV, "no_warmup": NO_WARMUP, } return path, server def _server_log_path() -> Path: return Path(os.getenv("PHASE3_SERVER_LOG", "/tmp/phase3-llama-server.log")) def _tail_server_log(limit: int = 4000) -> str: path = _server_log_path() if not path.exists(): return "" data = path.read_text(encoding="utf-8", errors="ignore") return data[-limit:] def _server_url(path: str) -> str: return f"http://{SERVER_HOST}:{SERVER_PORT}{path}" def _server_is_ready() -> bool: try: with urllib.request.urlopen(_server_url("/health"), timeout=5) as resp: return 200 <= resp.status < 500 except Exception: return False def _start_server() -> None: global LLAMA_SERVER_PROCESS model_path, server = _prepare_runtime() if LLAMA_SERVER_PROCESS is not None and LLAMA_SERVER_PROCESS.poll() is None and _server_is_ready(): return cmd = [ str(server), "-m", str(model_path), "--host", SERVER_HOST, "--port", str(SERVER_PORT), "-c", str(MAX_CONTEXT), "-t", str(N_THREADS), "-b", str(N_BATCH), "-ub", str(N_UBATCH), ] if _gpu_layers() != 0: cmd.extend(["-ngl", str(_gpu_layers())]) if USE_MLOCK: cmd.append("--mlock") if not USE_MMAP: cmd.append("--no-mmap") if FLASH_ATTN: cmd.append("-fa") if NO_WARMUP: cmd.append("--no-warmup") env = os.environ.copy() binary_dir = str(server.parent) env["LD_LIBRARY_PATH"] = f"{binary_dir}:{env.get('LD_LIBRARY_PATH', '')}" log_path = _server_log_path() log_file = log_path.open("a", encoding="utf-8") log_file.write(f"\n--- starting llama-server: {' '.join(cmd)} ---\n") log_file.flush() LLAMA_SERVER_PROCESS = subprocess.Popen( cmd, cwd=binary_dir, env=env, stdout=log_file, stderr=subprocess.STDOUT, text=True, ) deadline = time.time() + INFER_TIMEOUT while time.time() < deadline: if LLAMA_SERVER_PROCESS.poll() is not None: raise RuntimeError(f"llama-server exited early.\n{_tail_server_log()}") if _server_is_ready(): return time.sleep(2) raise RuntimeError(f"llama-server did not become ready within {INFER_TIMEOUT}s.\n{_tail_server_log()}") def _format_prompt(system_prompt: str, history: list[dict[str, str]], message: str) -> str: system = system_prompt.strip() or "You are a precise, direct model in a technical lab console." turns = [f"<|im_start|>system\n{system}<|im_end|>"] for item in history[-10:]: role = item.get("role", "user") content = item.get("content", "") if role in {"user", "assistant"} and content: turns.append(f"<|im_start|>{role}\n{content}<|im_end|>") turns.append(f"<|im_start|>user\n{message}<|im_end|>") turns.append("<|im_start|>assistant\n") return "\n".join(turns) @_gpu_decorator def _complete( prompt: str, max_tokens: int, temperature: float, top_p: float, repeat_penalty: float, ) -> tuple[str, dict[str, Any]]: started = time.time() _start_server() payload = { "prompt": prompt, "n_predict": int(max_tokens), "temperature": float(temperature), "top_p": float(top_p), "repeat_penalty": float(repeat_penalty), "stop": ["<|im_end|>", "<|endoftext|>"], } req = urllib.request.Request( _server_url("/completion"), data=json.dumps(payload).encode("utf-8"), headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=INFER_TIMEOUT) as resp: output = json.loads(resp.read().decode("utf-8")) except Exception as exc: raise RuntimeError(f"llama-server completion failed: {exc}\n{_tail_server_log()}") from exc elapsed = max(time.time() - started, 0.001) text = (output.get("content") or "").strip() text = text.split("<|im_end|>", 1)[0].strip() completion_tokens = max(1, len(text.split())) return text, { "elapsed": elapsed, "completion_tokens": completion_tokens, "tokens_per_second": completion_tokens / elapsed, "usage": {}, } def _status_markdown() -> str: total_gb, available_gb = _meminfo_gb() size = _repo_file_size() size_text = f"{size / (1024 ** 3):.1f} GB" if size else "unknown" spaces_state = "importable" if spaces is not None else "not importable" model_state = "Ready" if MODEL_PATH is not None else ("Error" if MODEL_ERROR else "Ready to load on first prompt") available_text = f"{available_gb:.1f} GB" if available_gb is not None else "unknown" path_text = f"`{MODEL_PATH}`" if MODEL_PATH else "not resolved yet" server_text = f"`{LLAMA_SERVER_PATH}`" if LLAMA_SERVER_PATH else f"`{LLAMA_RELEASE}` not extracted yet" server_state = "running" if LLAMA_SERVER_PROCESS is not None and LLAMA_SERVER_PROCESS.poll() is None else "not started" settings = MODEL_SETTINGS or { "n_ctx": MAX_CONTEXT, "n_batch": N_BATCH, "n_ubatch": N_UBATCH, "n_threads": N_THREADS, "n_threads_batch": N_THREADS_BATCH, "n_gpu_layers": _gpu_layers(), "use_mmap": USE_MMAP, "use_mlock": USE_MLOCK, "flash_attn": FLASH_ATTN, "offload_kqv": OFFLOAD_KQV, } env = _safe_env_summary() cuda_text = env.get("CUDA_VISIBLE_DEVICES", "not visible") return f"""### Model Status **{model_state}** - llama.cpp inference is enabled. | Check | Value | | --- | --- | | Model | `{MODEL_REPO}` | | File | `{MODEL_FILE}` ({size_text}) | | Runtime | `llama.cpp` CLI `{LLAMA_RELEASE}`; ZeroGPU helper {spaces_state} | | Available RAM | {available_text} | | CUDA devices | `{cuda_text}` | | Model path | {path_text} | | llama-server | {server_text} ({server_state}) | | llama.cpp settings | `ctx={settings.get('n_ctx')}`, `batch={settings.get('n_batch')}`, `ubatch={settings.get('n_ubatch')}`, `threads={settings.get('n_threads')}`, `gpu_layers={settings.get('n_gpu_layers')}` | | Memory/options | `mmap={settings.get('use_mmap')}`, `mlock={settings.get('use_mlock')}`, `flash_attn={settings.get('flash_attn')}`, `no_warmup={settings.get('no_warmup')}` | The first prompt starts `llama-server` and loads the 31 GB Q8 GGUF if it is not already cached. Later prompts reuse the same llama.cpp server process. """ def _metrics_markdown(meta: dict[str, Any] | None = None) -> str: if not meta: return "Generation metrics will appear after a run." return ( f"Elapsed: `{meta['elapsed']:.2f}s` \n" f"Completion tokens: `{meta['completion_tokens']}` \n" f"Approx tokens/sec: `{meta['tokens_per_second']:.2f}`" ) def _clear() -> tuple[list[dict[str, str]], str, str, str]: return [], "", _status_markdown(), _metrics_markdown() def _chunk_text(text: str): if not text: yield "" return parts = re.split(r"(\s+)", text) acc = "" for part in parts: acc += part yield acc def respond( message: str, history: list[dict[str, str]] | None, system_prompt: str, max_tokens: int, temperature: float, top_p: float, repeat_penalty: float, ) -> Any: history = list(history or []) message = (message or "").strip() if not message: yield history, "", _status_markdown(), _metrics_markdown() return prior = [item for item in history if item.get("role") in {"user", "assistant"}] history.append({"role": "user", "content": message}) history.append({"role": "assistant", "content": "Loading runtime and preparing generation..."}) yield history, "", _status_markdown(), "Queued." prompt = _format_prompt(system_prompt, prior, message) try: text, meta = _complete(prompt, max_tokens, temperature, top_p, repeat_penalty) except Exception as exc: text = ( "Model load or inference failed.\n\n" f"{exc}\n\n" "The UI is live and the model artifact is published, but the runtime could not complete " "a llama.cpp server generation pass. Check the runtime status and Space logs before retrying." ) meta = {"elapsed": 0.0, "completion_tokens": len(text.split()), "tokens_per_second": 0.0} for partial in _chunk_text(text): history[-1]["content"] = partial yield history, "", _status_markdown(), _metrics_markdown(meta) CSS = """ :root { --phase-bg: #f6f8fb; --phase-panel: #ffffff; --phase-panel-soft: #f9fafb; --phase-border: #d8dee8; --phase-text: #111827; --phase-muted: #5f6b7a; --phase-accent: #2563eb; --phase-accent-dark: #1d4ed8; } .gradio-container { background: var(--phase-bg) !important; color: var(--phase-text) !important; max-width: none !important; font-family: Inter, ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif !important; } .phase-shell { max-width: 1180px; margin: 0 auto; padding: 24px 18px 40px; } .phase-title { border: 1px solid var(--phase-border); background: linear-gradient(180deg, #ffffff, #eef4ff); padding: 22px 24px; border-radius: 10px; margin-bottom: 18px; box-shadow: 0 12px 34px rgba(31, 41, 55, 0.08); } .phase-title h1 { color: var(--phase-text); font-size: 30px; line-height: 1.15; margin: 0 0 8px; letter-spacing: 0; } .phase-title p { color: var(--phase-muted); font-size: 15px; margin: 0; max-width: 760px; } .phase-badge-row { display: flex; flex-wrap: wrap; gap: 8px; margin-top: 12px; } .phase-badge { border: 1px solid var(--phase-border); background: #ffffff; color: var(--phase-muted); border-radius: 7px; padding: 7px 10px; font-size: 12px; } .phase-badge strong { color: var(--phase-text); font-weight: 650; } .gradio-container .block { border-color: var(--phase-border) !important; border-radius: 10px !important; box-shadow: none !important; } .gradio-container label, .gradio-container .wrap, .gradio-container .prose, .gradio-container .markdown-body, .gradio-container .svelte-1gfkn6j, .gradio-container .svelte-1hguek3 { color: var(--phase-text) !important; } textarea, input { background: #ffffff !important; color: var(--phase-text) !important; border-color: var(--phase-border) !important; } textarea::placeholder { color: #8a95a5 !important; } button.primary { background: var(--phase-accent) !important; color: #ffffff !important; border-color: var(--phase-accent) !important; } button.primary:hover { background: var(--phase-accent-dark) !important; } .message { border-radius: 8px !important; } .chatbot { background: #ffffff !important; border: 1px solid var(--phase-border) !important; min-height: 560px; } .chatbot .message, .chatbot .bubble-wrap { color: var(--phase-text) !important; } .phase-side-note { border: 1px solid #bfdbfe; background: #eff6ff; color: #1e3a8a; border-radius: 10px; padding: 12px 14px; margin-bottom: 12px; font-size: 13px; line-height: 1.45; } .phase-side-note strong { color: #1e40af; } .gradio-container table { background: #ffffff !important; color: var(--phase-text) !important; } .gradio-container code { background: #eef2f7 !important; color: #111827 !important; border-radius: 4px; padding: 1px 4px; } @media (max-width: 900px) { .phase-title h1 { font-size: 24px; } } """ with gr.Blocks(title="First-Principle AI", fill_width=True) as demo: with gr.Column(elem_classes=["phase-shell"]): gr.HTML( """
A clean model-console interface for probing the Phase-3 Q8 GGUF with transparent runtime status.