""" MiniCPM Forge — Hybrid Multi-Model Showcase Two backends, one interface: • Vision (v46, v46t) → transformers + ZeroGPU (same as the working single-model space) • Text (cpm5, cpm41) → llama-cpp-python (CPU/GPU, works on free tier) • Omni (o45) → API placeholder build-small-hackathon 2026 · Chris4K """ from __future__ import annotations import asyncio import base64 import json import os import pathlib import re import threading import time import uuid from io import BytesIO from typing import Generator, Optional from PIL import Image from fastapi import Request from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from gradio import Server # ───────────────────────────────────────────────────────────────────────────── # ZeroGPU (optional — works on HF Spaces GPU / ZeroGPU tiers) # ───────────────────────────────────────────────────────────────────────────── try: import spaces # type: ignore HAS_SPACES_GPU = True except ImportError: HAS_SPACES_GPU = False class _FakeSpaces: @staticmethod def GPU(duration: int = 120): def _wrap(fn): return fn return _wrap spaces = _FakeSpaces() # type: ignore # ───────────────────────────────────────────────────────────────────────────── # Model registry # ───────────────────────────────────────────────────────────────────────────── MODELS: dict[str, dict] = { "v46": { "id": "v46", "name": "MiniCPM-V 4.6", "tag": "Vision · OCR", "color": "#06b6d4", "backend": "transformers", "hf_id": "openbmb/MiniCPM-V-4.6", "thinking": False, "ctx": 8192, "vision": True, }, "v46t": { "id": "v46t", "name": "MiniCPM-V 4.6-T", "tag": "Vision · Thinking", "color": "#a855f7", "backend": "transformers", "hf_id": "openbmb/MiniCPM-V-4.6-Thinking", "thinking": True, "ctx": 8192, "vision": True, }, "cpm5": { "id": "cpm5", "name": "MiniCPM5-1B", "tag": "⚡ Ultra-light", "color": "#22c55e", "backend": "llama", "repo": "openbmb/MiniCPM5-1B-GGUF", "file": "MiniCPM5-1B-Q4_K_M.gguf", # ✓ confirmed working "ctx": 4096, "vision": False, "thinking": False, }, "cpm41": { "id": "cpm41", "name": "MiniCPM4.1-8B", "tag": "🧠 Reasoning", "color": "#f97316", "backend": "llama", "repo": "openbmb/MiniCPM4.1-8B-GGUF", "file": "MiniCPM4.1-8B-Q4_K_M.gguf", "ctx": 16384, "vision": False, "thinking": True, }, "o45": { "id": "o45", "name": "MiniCPM-o 4.5", "tag": "🌐 Omni", "color": "#ec4899", "backend": "api", "ctx": 8192, "vision": True, "thinking": False, }, } # ───────────────────────────────────────────────────────────────────────────── # Shared state # ───────────────────────────────────────────────────────────────────────────── CACHE_DIR = pathlib.Path(os.environ.get("HF_HOME", "/tmp/hf_cache")) / "forge" _load_status: dict[str, str] = {} _load_lock = threading.Lock() # llama-cpp loaded models _llama_models: dict[str, object] = {} # transformers loaded models (processor + model per id) _tr_processors: dict[str, object] = {} _tr_models: dict[str, object] = {} # ───────────────────────────────────────────────────────────────────────────── # Text normalisation (from official openbmb demo) # ───────────────────────────────────────────────────────────────────────────── _NORM_PATTERN = re.compile( r'(```[\s\S]*?```|`[^`]+`|\$\$[\s\S]*?\$\$|\$[^$]+\$' r'|\\\([\s\S]*?\\\)|\\\[[\s\S]*?\\\])' r'|(? str: if not isinstance(text, str) or "\\" not in text: return text return _NORM_PATTERN.sub(lambda m: m.group(1) or '\n', text) # ───────────────────────────────────────────────────────────────────────────── # Backend A: transformers (vision models, runs via ZeroGPU when available) # ───────────────────────────────────────────────────────────────────────────── def _load_transformers(model_id: str) -> None: """Load processor + model into _tr_processors/_tr_models. Thread-safe.""" if model_id in _tr_models: return with _load_lock: if model_id in _tr_models: return _load_status[model_id] = "loading" cfg = MODELS[model_id] hf_id = cfg["hf_id"] print(f"[forge] Loading transformers model: {hf_id}") try: import torch from transformers import AutoProcessor, AutoModelForImageTextToText # type: ignore processor = AutoProcessor.from_pretrained(hf_id, trust_remote_code=True) if torch.cuda.is_available(): model = AutoModelForImageTextToText.from_pretrained( hf_id, torch_dtype=torch.bfloat16, attn_implementation="sdpa", trust_remote_code=True, device_map="cuda", ).eval() else: # CPU fallback — slow but functional for demos model = AutoModelForImageTextToText.from_pretrained( hf_id, torch_dtype=torch.float32, trust_remote_code=True, device_map="cpu", ).eval() _tr_processors[model_id] = processor _tr_models[model_id] = model _load_status[model_id] = "ready" print(f"[forge] ✓ transformers {model_id} ready") except Exception as exc: _load_status[model_id] = f"error:{exc}" print(f"[forge] ✗ transformers {model_id} failed: {exc}") raise @spaces.GPU(duration=120) def _run_transformers( model_id: str, messages: list, params: dict, ) -> Generator[str, None, None]: """ Sync generator — yields *delta* text chunks. Decorated with @spaces.GPU so it runs on ZeroGPU when available; falls back to CPU silently when spaces is not installed. """ import torch from transformers import TextIteratorStreamer # type: ignore processor = _tr_processors[model_id] model = _tr_models[model_id] thinking = params.get("thinking_mode", False) is_video = any( it.get("type") == "video" for msg in messages for it in (msg.get("content") or []) ) with torch.no_grad(): inputs = processor.apply_chat_template( messages, add_generation_prompt=True, tokenize=True, return_dict=True, return_tensors="pt", enable_thinking=thinking, processor_kwargs={ "downsample_mode": "16x", "max_slice_nums": 1 if is_video else 9, "use_image_id": not is_video, }, ).to(model.device) if model.device.type == "cuda": import torch as _torch for k, v in inputs.items(): if isinstance(v, _torch.Tensor) and _torch.is_floating_point(v): inputs[k] = v.to(dtype=_torch.bfloat16) streamer = TextIteratorStreamer( processor.tokenizer, skip_prompt=True, skip_special_tokens=True, timeout=60.0, ) gen_kw = { **inputs, "max_new_tokens": params.get("max_tokens", 1024), "do_sample": True, "temperature": params.get("temperature", 0.7), "top_p": params.get("top_p", 0.8), "top_k": int(params.get("top_k", 100)), "streamer": streamer, "downsample_mode": "16x", } t = threading.Thread(target=model.generate, kwargs=gen_kw, daemon=True) t.start() for chunk in streamer: yield normalize_response_text(chunk) t.join(timeout=10) async def _stream_transformers( model_id: str, messages: list, params: dict, loop: asyncio.AbstractEventLoop, ): """Async generator: bridges sync _run_transformers → async SSE.""" queue: asyncio.Queue = asyncio.Queue(maxsize=256) def _worker(): try: for chunk in _run_transformers(model_id, messages, params): loop.call_soon_threadsafe(queue.put_nowait, chunk) except Exception as exc: loop.call_soon_threadsafe(queue.put_nowait, f"\n\n[⚠ {exc}]") finally: loop.call_soon_threadsafe(queue.put_nowait, None) loop.run_in_executor(None, _worker) while True: token = await queue.get() if token is None: break yield token # ───────────────────────────────────────────────────────────────────────────── # Backend B: llama-cpp (text models) # ───────────────────────────────────────────────────────────────────────────── def _hub_download_robust(repo_id: str, filename: str, local_dir: str) -> str: """hf_hub_download with glob fallback for filename drift.""" import fnmatch from huggingface_hub import hf_hub_download, list_repo_files # type: ignore pathlib.Path(local_dir).mkdir(parents=True, exist_ok=True) try: return hf_hub_download(repo_id=repo_id, filename=filename, local_dir=local_dir) except Exception: pass # Glob fallback quant = re.search(r'(Q\d_K_[MS]|Q\d_\d|F16|BF16)', filename) pat = f"*{quant.group(1)}*.gguf" if quant else f"*{pathlib.Path(filename).stem}*" candidates = [f for f in list_repo_files(repo_id) if fnmatch.fnmatch(f, pat) and f.endswith(".gguf")] if not candidates: raise FileNotFoundError( f"No file matching {pat!r} in {repo_id}. " f"GGUFs: {[f for f in list_repo_files(repo_id) if f.endswith('.gguf')]}" ) best = next((f for f in candidates if "Q4_K_M" in f), candidates[0]) print(f" → glob fallback: {best!r}") return hf_hub_download(repo_id=repo_id, filename=best, local_dir=local_dir) def _load_llama(model_id: str) -> None: """Load a text-only GGUF model via llama-cpp-python.""" if model_id in _llama_models: return with _load_lock: if model_id in _llama_models: return _load_status[model_id] = "loading" cfg = MODELS[model_id] local_dir = str(CACHE_DIR / model_id) print(f"[forge] Downloading LM: {cfg['repo']} / {cfg['file']}") try: from llama_cpp import Llama # type: ignore model_path = _hub_download_robust(cfg["repo"], cfg["file"], local_dir) n_gpu = int(os.environ.get("N_GPU_LAYERS", "-1")) llm = Llama( model_path=model_path, n_ctx=cfg["ctx"], n_gpu_layers=n_gpu, verbose=False, ) _llama_models[model_id] = llm _load_status[model_id] = "ready" print(f"[forge] ✓ llama {model_id} ready") except Exception as exc: _load_status[model_id] = f"error:{exc}" print(f"[forge] ✗ llama {model_id} failed: {exc}") raise async def _stream_llama( llm, messages: list, params: dict, loop: asyncio.AbstractEventLoop, ): """Async generator: bridges sync llama-cpp stream → async SSE.""" queue: asyncio.Queue = asyncio.Queue(maxsize=256) def _worker(): try: output = llm.create_chat_completion( messages=messages, stream=True, max_tokens=params.get("max_tokens", 1024), temperature=params.get("temperature", 0.7), top_p=params.get("top_p", 0.8), top_k=int(params.get("top_k", 100)), repeat_penalty=params.get("repeat_penalty", 1.05), ) for chunk in output: delta = chunk["choices"][0]["delta"].get("content", "") if delta: loop.call_soon_threadsafe(queue.put_nowait, delta) except Exception as exc: loop.call_soon_threadsafe(queue.put_nowait, f"\n\n[⚠ {exc}]") finally: loop.call_soon_threadsafe(queue.put_nowait, None) loop.run_in_executor(None, _worker) while True: token = await queue.get() if token is None: break yield token # ───────────────────────────────────────────────────────────────────────────── # Shared message builder # ───────────────────────────────────────────────────────────────────────────── def _build_messages( message: str, history: list, image_b64: Optional[str], backend: str, ) -> list[dict]: msgs: list[dict] = [] for turn in history or []: if turn.get("user"): msgs.append({"role": "user", "content": [{"type": "text", "text": turn["user"]}]}) if turn.get("assistant"): msgs.append({"role": "assistant", "content": [{"type": "text", "text": turn["assistant"]}]}) content: list[dict] = [] if image_b64: if backend == "transformers": # transformers expects a PIL Image object img_bytes = base64.b64decode(image_b64) img = Image.open(BytesIO(img_bytes)).convert("RGB") content.append({"type": "image", "image": img}) else: # llama-cpp expects a data: URI raw = base64.b64decode(image_b64[:32]) mime = "image/png" if raw[:8] == b"\x89PNG\r\n\x1a\n" else "image/jpeg" content.append({"type": "image_url", "image_url": {"url": f"data:{mime};base64,{image_b64}"}}) content.append({"type": "text", "text": message or "Describe this image."}) msgs.append({"role": "user", "content": content}) return msgs # ───────────────────────────────────────────────────────────────────────────── # Gradio Server # ───────────────────────────────────────────────────────────────────────────── demo = Server() @demo.get("/", response_class=HTMLResponse) async def homepage(): html = pathlib.Path(__file__).parent / "index.html" return html.read_text(encoding="utf-8") @demo.get("/api/models") async def api_models(): out = {} for mid, cfg in MODELS.items(): out[mid] = {k: cfg[k] for k in ("id", "name", "tag", "color", "ctx", "vision", "thinking") if k in cfg} out[mid]["status"] = _load_status.get(mid, "idle") out[mid]["api"] = cfg.get("backend") == "api" out[mid]["backend"] = cfg.get("backend", "llama") return JSONResponse(out) @demo.post("/api/load") async def api_load(request: Request): data = await request.json() mid = data.get("model_id", "") cfg = MODELS.get(mid) if not cfg: return JSONResponse({"error": "unknown model"}, status_code=400) if cfg["backend"] == "api": return JSONResponse({"status": "api"}) # Kick off load in a background thread if not already running current = _load_status.get(mid, "idle") if current not in ("loading", "ready"): loop = asyncio.get_event_loop() loader = _load_transformers if cfg["backend"] == "transformers" else _load_llama loop.run_in_executor(None, loader, mid) return JSONResponse({"status": _load_status.get(mid, "loading")}) @demo.post("/stream/chat") async def stream_chat(request: Request): data = await request.json() mid = data.get("model_id", "cpm5") message = data.get("message", "") history = data.get("history", []) image_b64 = data.get("image_b64") params = data.get("params", {}) cfg = MODELS.get(mid) if not cfg: return JSONResponse({"error": "unknown model"}, status_code=400) backend = cfg.get("backend", "llama") # ── API mode ────────────────────────────────────────────────────────────── if backend == "api": async def _api_sse(): yield f"data: {json.dumps({'token': '🌐 MiniCPM-o 4.5 API mode — set OPENBMB_API_KEY in Space secrets.'})}\n\n" yield f"data: {json.dumps({'done': True})}\n\n" return StreamingResponse(_api_sse(), media_type="text/event-stream", headers={"Cache-Control": "no-cache"}) # ── Check model is loaded ───────────────────────────────────────────────── store = _tr_models if backend == "transformers" else _llama_models if mid not in store: msg = f"Model '{mid}' not loaded (status: {_load_status.get(mid, 'idle')}). Click ⬇ LOAD first." async def _err_sse(): yield f"data: {json.dumps({'token': f'⚠ {msg}'})}\n\n" yield f"data: {json.dumps({'done': True})}\n\n" return StreamingResponse(_err_sse(), media_type="text/event-stream", headers={"Cache-Control": "no-cache"}) messages = _build_messages(message, history, image_b64, backend) loop = asyncio.get_event_loop() t0 = time.monotonic() n_tok = [0] async def sse_gen(): if backend == "transformers": gen = _stream_transformers(mid, messages, params, loop) else: gen = _stream_llama(_llama_models[mid], messages, params, loop) async for token in gen: n_tok[0] += 1 elapsed = time.monotonic() - t0 speed = round(n_tok[0] / elapsed, 1) if elapsed > 0 else 0 yield f"data: {json.dumps({'token': token, 'speed': speed, 'n': n_tok[0]})}\n\n" yield f"data: {json.dumps({'done': True, 'total': n_tok[0]})}\n\n" return StreamingResponse( sse_gen(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no", "Connection": "keep-alive"}, ) @demo.get("/health") async def health(): return JSONResponse({ "status": "ok", "backends": { "transformers": list(_tr_models.keys()), "llama": list(_llama_models.keys()), }, "spaces_gpu": HAS_SPACES_GPU, }) # ───────────────────────────────────────────────────────────────────────────── if __name__ == "__main__": demo.launch( server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)), show_error=True, )