| """ |
| MiniCPM Forge β Hybrid Multi-Model Showcase |
| Two backends, one interface: |
| β’ Vision (v46, v46t) β transformers + ZeroGPU (same as the working single-model space) |
| β’ Text (cpm5, cpm41) β llama-cpp-python (CPU/GPU, works on free tier) |
| β’ Omni (o45) β API placeholder |
| |
| build-small-hackathon 2026 Β· Chris4K |
| """ |
| from __future__ import annotations |
|
|
| import asyncio |
| import base64 |
| import json |
| import os |
| import pathlib |
| import re |
| import threading |
| import time |
| import uuid |
| from io import BytesIO |
| from typing import Generator, Optional |
|
|
| from PIL import Image |
| from fastapi import Request |
| from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse |
| from gradio import Server |
|
|
| |
| |
| |
| try: |
| import spaces |
| HAS_SPACES_GPU = True |
| except ImportError: |
| HAS_SPACES_GPU = False |
| class _FakeSpaces: |
| @staticmethod |
| def GPU(duration: int = 120): |
| def _wrap(fn): return fn |
| return _wrap |
| spaces = _FakeSpaces() |
|
|
| |
| |
| |
| MODELS: dict[str, dict] = { |
| "v46": { |
| "id": "v46", |
| "name": "MiniCPM-V 4.6", |
| "tag": "Vision Β· OCR", |
| "color": "#06b6d4", |
| "backend": "transformers", |
| "hf_id": "openbmb/MiniCPM-V-4.6", |
| "thinking": False, |
| "ctx": 8192, |
| "vision": True, |
| }, |
| "v46t": { |
| "id": "v46t", |
| "name": "MiniCPM-V 4.6-T", |
| "tag": "Vision Β· Thinking", |
| "color": "#a855f7", |
| "backend": "transformers", |
| "hf_id": "openbmb/MiniCPM-V-4.6-Thinking", |
| "thinking": True, |
| "ctx": 8192, |
| "vision": True, |
| }, |
| "cpm5": { |
| "id": "cpm5", |
| "name": "MiniCPM5-1B", |
| "tag": "β‘ Ultra-light", |
| "color": "#22c55e", |
| "backend": "llama", |
| "repo": "openbmb/MiniCPM5-1B-GGUF", |
| "file": "MiniCPM5-1B-Q4_K_M.gguf", |
| "ctx": 4096, |
| "vision": False, |
| "thinking": False, |
| }, |
| "cpm41": { |
| "id": "cpm41", |
| "name": "MiniCPM4.1-8B", |
| "tag": "π§ Reasoning", |
| "color": "#f97316", |
| "backend": "llama", |
| "repo": "openbmb/MiniCPM4.1-8B-GGUF", |
| "file": "MiniCPM4.1-8B-Q4_K_M.gguf", |
| "ctx": 16384, |
| "vision": False, |
| "thinking": True, |
| }, |
| "o45": { |
| "id": "o45", |
| "name": "MiniCPM-o 4.5", |
| "tag": "π Omni", |
| "color": "#ec4899", |
| "backend": "api", |
| "ctx": 8192, |
| "vision": True, |
| "thinking": False, |
| }, |
| } |
|
|
| |
| |
| |
| CACHE_DIR = pathlib.Path(os.environ.get("HF_HOME", "/tmp/hf_cache")) / "forge" |
| _load_status: dict[str, str] = {} |
| _load_lock = threading.Lock() |
|
|
| |
| _llama_models: dict[str, object] = {} |
|
|
| |
| _tr_processors: dict[str, object] = {} |
| _tr_models: dict[str, object] = {} |
|
|
| |
| |
| |
| _NORM_PATTERN = re.compile( |
| r'(```[\s\S]*?```|`[^`]+`|\$\$[\s\S]*?\$\$|\$[^$]+\$' |
| r'|\\\([\s\S]*?\\\)|\\\[[\s\S]*?\\\])' |
| r'|(?<!\\)(?:\\r\\n|\\[nr])' |
| ) |
|
|
| def normalize_response_text(text: str) -> str: |
| if not isinstance(text, str) or "\\" not in text: |
| return text |
| return _NORM_PATTERN.sub(lambda m: m.group(1) or '\n', text) |
|
|
| |
| |
| |
|
|
| def _load_transformers(model_id: str) -> None: |
| """Load processor + model into _tr_processors/_tr_models. Thread-safe.""" |
| if model_id in _tr_models: |
| return |
| with _load_lock: |
| if model_id in _tr_models: |
| return |
| _load_status[model_id] = "loading" |
| cfg = MODELS[model_id] |
| hf_id = cfg["hf_id"] |
| print(f"[forge] Loading transformers model: {hf_id}") |
| try: |
| import torch |
| from transformers import AutoProcessor, AutoModelForImageTextToText |
|
|
| processor = AutoProcessor.from_pretrained(hf_id, trust_remote_code=True) |
|
|
| if torch.cuda.is_available(): |
| model = AutoModelForImageTextToText.from_pretrained( |
| hf_id, |
| torch_dtype=torch.bfloat16, |
| attn_implementation="sdpa", |
| trust_remote_code=True, |
| device_map="cuda", |
| ).eval() |
| else: |
| |
| model = AutoModelForImageTextToText.from_pretrained( |
| hf_id, |
| torch_dtype=torch.float32, |
| trust_remote_code=True, |
| device_map="cpu", |
| ).eval() |
|
|
| _tr_processors[model_id] = processor |
| _tr_models[model_id] = model |
| _load_status[model_id] = "ready" |
| print(f"[forge] β transformers {model_id} ready") |
| except Exception as exc: |
| _load_status[model_id] = f"error:{exc}" |
| print(f"[forge] β transformers {model_id} failed: {exc}") |
| raise |
|
|
|
|
| @spaces.GPU(duration=120) |
| def _run_transformers( |
| model_id: str, |
| messages: list, |
| params: dict, |
| ) -> Generator[str, None, None]: |
| """ |
| Sync generator β yields *delta* text chunks. |
| Decorated with @spaces.GPU so it runs on ZeroGPU when available; |
| falls back to CPU silently when spaces is not installed. |
| """ |
| import torch |
| from transformers import TextIteratorStreamer |
|
|
| processor = _tr_processors[model_id] |
| model = _tr_models[model_id] |
| thinking = params.get("thinking_mode", False) |
|
|
| is_video = any( |
| it.get("type") == "video" |
| for msg in messages |
| for it in (msg.get("content") or []) |
| ) |
|
|
| with torch.no_grad(): |
| inputs = processor.apply_chat_template( |
| messages, |
| add_generation_prompt=True, |
| tokenize=True, |
| return_dict=True, |
| return_tensors="pt", |
| enable_thinking=thinking, |
| processor_kwargs={ |
| "downsample_mode": "16x", |
| "max_slice_nums": 1 if is_video else 9, |
| "use_image_id": not is_video, |
| }, |
| ).to(model.device) |
|
|
| if model.device.type == "cuda": |
| import torch as _torch |
| for k, v in inputs.items(): |
| if isinstance(v, _torch.Tensor) and _torch.is_floating_point(v): |
| inputs[k] = v.to(dtype=_torch.bfloat16) |
|
|
| streamer = TextIteratorStreamer( |
| processor.tokenizer, |
| skip_prompt=True, |
| skip_special_tokens=True, |
| timeout=60.0, |
| ) |
|
|
| gen_kw = { |
| **inputs, |
| "max_new_tokens": params.get("max_tokens", 1024), |
| "do_sample": True, |
| "temperature": params.get("temperature", 0.7), |
| "top_p": params.get("top_p", 0.8), |
| "top_k": int(params.get("top_k", 100)), |
| "streamer": streamer, |
| "downsample_mode": "16x", |
| } |
|
|
| t = threading.Thread(target=model.generate, kwargs=gen_kw, daemon=True) |
| t.start() |
|
|
| for chunk in streamer: |
| yield normalize_response_text(chunk) |
|
|
| t.join(timeout=10) |
|
|
|
|
| async def _stream_transformers( |
| model_id: str, |
| messages: list, |
| params: dict, |
| loop: asyncio.AbstractEventLoop, |
| ): |
| """Async generator: bridges sync _run_transformers β async SSE.""" |
| queue: asyncio.Queue = asyncio.Queue(maxsize=256) |
|
|
| def _worker(): |
| try: |
| for chunk in _run_transformers(model_id, messages, params): |
| loop.call_soon_threadsafe(queue.put_nowait, chunk) |
| except Exception as exc: |
| loop.call_soon_threadsafe(queue.put_nowait, f"\n\n[β {exc}]") |
| finally: |
| loop.call_soon_threadsafe(queue.put_nowait, None) |
|
|
| loop.run_in_executor(None, _worker) |
|
|
| while True: |
| token = await queue.get() |
| if token is None: |
| break |
| yield token |
|
|
|
|
| |
| |
| |
|
|
| def _hub_download_robust(repo_id: str, filename: str, local_dir: str) -> str: |
| """hf_hub_download with glob fallback for filename drift.""" |
| import fnmatch |
| from huggingface_hub import hf_hub_download, list_repo_files |
|
|
| pathlib.Path(local_dir).mkdir(parents=True, exist_ok=True) |
| try: |
| return hf_hub_download(repo_id=repo_id, filename=filename, local_dir=local_dir) |
| except Exception: |
| pass |
| |
| quant = re.search(r'(Q\d_K_[MS]|Q\d_\d|F16|BF16)', filename) |
| pat = f"*{quant.group(1)}*.gguf" if quant else f"*{pathlib.Path(filename).stem}*" |
| candidates = [f for f in list_repo_files(repo_id) |
| if fnmatch.fnmatch(f, pat) and f.endswith(".gguf")] |
| if not candidates: |
| raise FileNotFoundError( |
| f"No file matching {pat!r} in {repo_id}. " |
| f"GGUFs: {[f for f in list_repo_files(repo_id) if f.endswith('.gguf')]}" |
| ) |
| best = next((f for f in candidates if "Q4_K_M" in f), candidates[0]) |
| print(f" β glob fallback: {best!r}") |
| return hf_hub_download(repo_id=repo_id, filename=best, local_dir=local_dir) |
|
|
|
|
| def _load_llama(model_id: str) -> None: |
| """Load a text-only GGUF model via llama-cpp-python.""" |
| if model_id in _llama_models: |
| return |
| with _load_lock: |
| if model_id in _llama_models: |
| return |
| _load_status[model_id] = "loading" |
| cfg = MODELS[model_id] |
| local_dir = str(CACHE_DIR / model_id) |
| print(f"[forge] Downloading LM: {cfg['repo']} / {cfg['file']}") |
| try: |
| from llama_cpp import Llama |
|
|
| model_path = _hub_download_robust(cfg["repo"], cfg["file"], local_dir) |
| n_gpu = int(os.environ.get("N_GPU_LAYERS", "-1")) |
| llm = Llama( |
| model_path=model_path, |
| n_ctx=cfg["ctx"], |
| n_gpu_layers=n_gpu, |
| verbose=False, |
| ) |
| _llama_models[model_id] = llm |
| _load_status[model_id] = "ready" |
| print(f"[forge] β llama {model_id} ready") |
| except Exception as exc: |
| _load_status[model_id] = f"error:{exc}" |
| print(f"[forge] β llama {model_id} failed: {exc}") |
| raise |
|
|
|
|
| async def _stream_llama( |
| llm, |
| messages: list, |
| params: dict, |
| loop: asyncio.AbstractEventLoop, |
| ): |
| """Async generator: bridges sync llama-cpp stream β async SSE.""" |
| queue: asyncio.Queue = asyncio.Queue(maxsize=256) |
|
|
| def _worker(): |
| try: |
| output = llm.create_chat_completion( |
| messages=messages, |
| stream=True, |
| max_tokens=params.get("max_tokens", 1024), |
| temperature=params.get("temperature", 0.7), |
| top_p=params.get("top_p", 0.8), |
| top_k=int(params.get("top_k", 100)), |
| repeat_penalty=params.get("repeat_penalty", 1.05), |
| ) |
| for chunk in output: |
| delta = chunk["choices"][0]["delta"].get("content", "") |
| if delta: |
| loop.call_soon_threadsafe(queue.put_nowait, delta) |
| except Exception as exc: |
| loop.call_soon_threadsafe(queue.put_nowait, f"\n\n[β {exc}]") |
| finally: |
| loop.call_soon_threadsafe(queue.put_nowait, None) |
|
|
| loop.run_in_executor(None, _worker) |
|
|
| while True: |
| token = await queue.get() |
| if token is None: |
| break |
| yield token |
|
|
|
|
| |
| |
| |
|
|
| def _build_messages( |
| message: str, |
| history: list, |
| image_b64: Optional[str], |
| backend: str, |
| ) -> list[dict]: |
| msgs: list[dict] = [] |
|
|
| for turn in history or []: |
| if turn.get("user"): |
| msgs.append({"role": "user", |
| "content": [{"type": "text", "text": turn["user"]}]}) |
| if turn.get("assistant"): |
| msgs.append({"role": "assistant", |
| "content": [{"type": "text", "text": turn["assistant"]}]}) |
|
|
| content: list[dict] = [] |
|
|
| if image_b64: |
| if backend == "transformers": |
| |
| img_bytes = base64.b64decode(image_b64) |
| img = Image.open(BytesIO(img_bytes)).convert("RGB") |
| content.append({"type": "image", "image": img}) |
| else: |
| |
| raw = base64.b64decode(image_b64[:32]) |
| mime = "image/png" if raw[:8] == b"\x89PNG\r\n\x1a\n" else "image/jpeg" |
| content.append({"type": "image_url", |
| "image_url": {"url": f"data:{mime};base64,{image_b64}"}}) |
|
|
| content.append({"type": "text", "text": message or "Describe this image."}) |
| msgs.append({"role": "user", "content": content}) |
| return msgs |
|
|
|
|
| |
| |
| |
| demo = Server() |
|
|
|
|
| @demo.get("/", response_class=HTMLResponse) |
| async def homepage(): |
| html = pathlib.Path(__file__).parent / "index.html" |
| return html.read_text(encoding="utf-8") |
|
|
|
|
| @demo.get("/api/models") |
| async def api_models(): |
| out = {} |
| for mid, cfg in MODELS.items(): |
| out[mid] = {k: cfg[k] for k in ("id", "name", "tag", "color", "ctx", "vision", "thinking") |
| if k in cfg} |
| out[mid]["status"] = _load_status.get(mid, "idle") |
| out[mid]["api"] = cfg.get("backend") == "api" |
| out[mid]["backend"] = cfg.get("backend", "llama") |
| return JSONResponse(out) |
|
|
|
|
| @demo.post("/api/load") |
| async def api_load(request: Request): |
| data = await request.json() |
| mid = data.get("model_id", "") |
| cfg = MODELS.get(mid) |
| if not cfg: |
| return JSONResponse({"error": "unknown model"}, status_code=400) |
| if cfg["backend"] == "api": |
| return JSONResponse({"status": "api"}) |
|
|
| |
| current = _load_status.get(mid, "idle") |
| if current not in ("loading", "ready"): |
| loop = asyncio.get_event_loop() |
| loader = _load_transformers if cfg["backend"] == "transformers" else _load_llama |
| loop.run_in_executor(None, loader, mid) |
|
|
| return JSONResponse({"status": _load_status.get(mid, "loading")}) |
|
|
|
|
| @demo.post("/stream/chat") |
| async def stream_chat(request: Request): |
| data = await request.json() |
| mid = data.get("model_id", "cpm5") |
| message = data.get("message", "") |
| history = data.get("history", []) |
| image_b64 = data.get("image_b64") |
| params = data.get("params", {}) |
|
|
| cfg = MODELS.get(mid) |
| if not cfg: |
| return JSONResponse({"error": "unknown model"}, status_code=400) |
|
|
| backend = cfg.get("backend", "llama") |
|
|
| |
| if backend == "api": |
| async def _api_sse(): |
| yield f"data: {json.dumps({'token': 'π MiniCPM-o 4.5 API mode β set OPENBMB_API_KEY in Space secrets.'})}\n\n" |
| yield f"data: {json.dumps({'done': True})}\n\n" |
| return StreamingResponse(_api_sse(), media_type="text/event-stream", |
| headers={"Cache-Control": "no-cache"}) |
|
|
| |
| store = _tr_models if backend == "transformers" else _llama_models |
| if mid not in store: |
| msg = f"Model '{mid}' not loaded (status: {_load_status.get(mid, 'idle')}). Click β¬ LOAD first." |
| async def _err_sse(): |
| yield f"data: {json.dumps({'token': f'β {msg}'})}\n\n" |
| yield f"data: {json.dumps({'done': True})}\n\n" |
| return StreamingResponse(_err_sse(), media_type="text/event-stream", |
| headers={"Cache-Control": "no-cache"}) |
|
|
| messages = _build_messages(message, history, image_b64, backend) |
| loop = asyncio.get_event_loop() |
| t0 = time.monotonic() |
| n_tok = [0] |
|
|
| async def sse_gen(): |
| if backend == "transformers": |
| gen = _stream_transformers(mid, messages, params, loop) |
| else: |
| gen = _stream_llama(_llama_models[mid], messages, params, loop) |
|
|
| async for token in gen: |
| n_tok[0] += 1 |
| elapsed = time.monotonic() - t0 |
| speed = round(n_tok[0] / elapsed, 1) if elapsed > 0 else 0 |
| yield f"data: {json.dumps({'token': token, 'speed': speed, 'n': n_tok[0]})}\n\n" |
|
|
| yield f"data: {json.dumps({'done': True, 'total': n_tok[0]})}\n\n" |
|
|
| return StreamingResponse( |
| sse_gen(), |
| media_type="text/event-stream", |
| headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no", |
| "Connection": "keep-alive"}, |
| ) |
|
|
|
|
| @demo.get("/health") |
| async def health(): |
| return JSONResponse({ |
| "status": "ok", |
| "backends": { |
| "transformers": list(_tr_models.keys()), |
| "llama": list(_llama_models.keys()), |
| }, |
| "spaces_gpu": HAS_SPACES_GPU, |
| }) |
|
|
|
|
| |
| if __name__ == "__main__": |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=int(os.environ.get("PORT", 7860)), |
| show_error=True, |
| ) |