MiniCPM / app.py
Chris4K's picture
Update app.py
16ebcba verified
Raw
History Blame Contribute Delete
21.3 kB
"""
MiniCPM Forge β€” Hybrid Multi-Model Showcase
Two backends, one interface:
β€’ Vision (v46, v46t) β†’ transformers + ZeroGPU (same as the working single-model space)
β€’ Text (cpm5, cpm41) β†’ llama-cpp-python (CPU/GPU, works on free tier)
β€’ Omni (o45) β†’ API placeholder
build-small-hackathon 2026 Β· Chris4K
"""
from __future__ import annotations
import asyncio
import base64
import json
import os
import pathlib
import re
import threading
import time
import uuid
from io import BytesIO
from typing import Generator, Optional
from PIL import Image
from fastapi import Request
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from gradio import Server
# ─────────────────────────────────────────────────────────────────────────────
# ZeroGPU (optional β€” works on HF Spaces GPU / ZeroGPU tiers)
# ─────────────────────────────────────────────────────────────────────────────
try:
import spaces # type: ignore
HAS_SPACES_GPU = True
except ImportError:
HAS_SPACES_GPU = False
class _FakeSpaces:
@staticmethod
def GPU(duration: int = 120):
def _wrap(fn): return fn
return _wrap
spaces = _FakeSpaces() # type: ignore
# ─────────────────────────────────────────────────────────────────────────────
# Model registry
# ─────────────────────────────────────────────────────────────────────────────
MODELS: dict[str, dict] = {
"v46": {
"id": "v46",
"name": "MiniCPM-V 4.6",
"tag": "Vision Β· OCR",
"color": "#06b6d4",
"backend": "transformers",
"hf_id": "openbmb/MiniCPM-V-4.6",
"thinking": False,
"ctx": 8192,
"vision": True,
},
"v46t": {
"id": "v46t",
"name": "MiniCPM-V 4.6-T",
"tag": "Vision Β· Thinking",
"color": "#a855f7",
"backend": "transformers",
"hf_id": "openbmb/MiniCPM-V-4.6-Thinking",
"thinking": True,
"ctx": 8192,
"vision": True,
},
"cpm5": {
"id": "cpm5",
"name": "MiniCPM5-1B",
"tag": "⚑ Ultra-light",
"color": "#22c55e",
"backend": "llama",
"repo": "openbmb/MiniCPM5-1B-GGUF",
"file": "MiniCPM5-1B-Q4_K_M.gguf", # βœ“ confirmed working
"ctx": 4096,
"vision": False,
"thinking": False,
},
"cpm41": {
"id": "cpm41",
"name": "MiniCPM4.1-8B",
"tag": "🧠 Reasoning",
"color": "#f97316",
"backend": "llama",
"repo": "openbmb/MiniCPM4.1-8B-GGUF",
"file": "MiniCPM4.1-8B-Q4_K_M.gguf",
"ctx": 16384,
"vision": False,
"thinking": True,
},
"o45": {
"id": "o45",
"name": "MiniCPM-o 4.5",
"tag": "🌐 Omni",
"color": "#ec4899",
"backend": "api",
"ctx": 8192,
"vision": True,
"thinking": False,
},
}
# ─────────────────────────────────────────────────────────────────────────────
# Shared state
# ─────────────────────────────────────────────────────────────────────────────
CACHE_DIR = pathlib.Path(os.environ.get("HF_HOME", "/tmp/hf_cache")) / "forge"
_load_status: dict[str, str] = {}
_load_lock = threading.Lock()
# llama-cpp loaded models
_llama_models: dict[str, object] = {}
# transformers loaded models (processor + model per id)
_tr_processors: dict[str, object] = {}
_tr_models: dict[str, object] = {}
# ─────────────────────────────────────────────────────────────────────────────
# Text normalisation (from official openbmb demo)
# ─────────────────────────────────────────────────────────────────────────────
_NORM_PATTERN = re.compile(
r'(```[\s\S]*?```|`[^`]+`|\$\$[\s\S]*?\$\$|\$[^$]+\$'
r'|\\\([\s\S]*?\\\)|\\\[[\s\S]*?\\\])'
r'|(?<!\\)(?:\\r\\n|\\[nr])'
)
def normalize_response_text(text: str) -> str:
if not isinstance(text, str) or "\\" not in text:
return text
return _NORM_PATTERN.sub(lambda m: m.group(1) or '\n', text)
# ─────────────────────────────────────────────────────────────────────────────
# Backend A: transformers (vision models, runs via ZeroGPU when available)
# ─────────────────────────────────────────────────────────────────────────────
def _load_transformers(model_id: str) -> None:
"""Load processor + model into _tr_processors/_tr_models. Thread-safe."""
if model_id in _tr_models:
return
with _load_lock:
if model_id in _tr_models:
return
_load_status[model_id] = "loading"
cfg = MODELS[model_id]
hf_id = cfg["hf_id"]
print(f"[forge] Loading transformers model: {hf_id}")
try:
import torch
from transformers import AutoProcessor, AutoModelForImageTextToText # type: ignore
processor = AutoProcessor.from_pretrained(hf_id, trust_remote_code=True)
if torch.cuda.is_available():
model = AutoModelForImageTextToText.from_pretrained(
hf_id,
torch_dtype=torch.bfloat16,
attn_implementation="sdpa",
trust_remote_code=True,
device_map="cuda",
).eval()
else:
# CPU fallback β€” slow but functional for demos
model = AutoModelForImageTextToText.from_pretrained(
hf_id,
torch_dtype=torch.float32,
trust_remote_code=True,
device_map="cpu",
).eval()
_tr_processors[model_id] = processor
_tr_models[model_id] = model
_load_status[model_id] = "ready"
print(f"[forge] βœ“ transformers {model_id} ready")
except Exception as exc:
_load_status[model_id] = f"error:{exc}"
print(f"[forge] βœ— transformers {model_id} failed: {exc}")
raise
@spaces.GPU(duration=120)
def _run_transformers(
model_id: str,
messages: list,
params: dict,
) -> Generator[str, None, None]:
"""
Sync generator β€” yields *delta* text chunks.
Decorated with @spaces.GPU so it runs on ZeroGPU when available;
falls back to CPU silently when spaces is not installed.
"""
import torch
from transformers import TextIteratorStreamer # type: ignore
processor = _tr_processors[model_id]
model = _tr_models[model_id]
thinking = params.get("thinking_mode", False)
is_video = any(
it.get("type") == "video"
for msg in messages
for it in (msg.get("content") or [])
)
with torch.no_grad():
inputs = processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_dict=True,
return_tensors="pt",
enable_thinking=thinking,
processor_kwargs={
"downsample_mode": "16x",
"max_slice_nums": 1 if is_video else 9,
"use_image_id": not is_video,
},
).to(model.device)
if model.device.type == "cuda":
import torch as _torch
for k, v in inputs.items():
if isinstance(v, _torch.Tensor) and _torch.is_floating_point(v):
inputs[k] = v.to(dtype=_torch.bfloat16)
streamer = TextIteratorStreamer(
processor.tokenizer,
skip_prompt=True,
skip_special_tokens=True,
timeout=60.0,
)
gen_kw = {
**inputs,
"max_new_tokens": params.get("max_tokens", 1024),
"do_sample": True,
"temperature": params.get("temperature", 0.7),
"top_p": params.get("top_p", 0.8),
"top_k": int(params.get("top_k", 100)),
"streamer": streamer,
"downsample_mode": "16x",
}
t = threading.Thread(target=model.generate, kwargs=gen_kw, daemon=True)
t.start()
for chunk in streamer:
yield normalize_response_text(chunk)
t.join(timeout=10)
async def _stream_transformers(
model_id: str,
messages: list,
params: dict,
loop: asyncio.AbstractEventLoop,
):
"""Async generator: bridges sync _run_transformers β†’ async SSE."""
queue: asyncio.Queue = asyncio.Queue(maxsize=256)
def _worker():
try:
for chunk in _run_transformers(model_id, messages, params):
loop.call_soon_threadsafe(queue.put_nowait, chunk)
except Exception as exc:
loop.call_soon_threadsafe(queue.put_nowait, f"\n\n[⚠ {exc}]")
finally:
loop.call_soon_threadsafe(queue.put_nowait, None)
loop.run_in_executor(None, _worker)
while True:
token = await queue.get()
if token is None:
break
yield token
# ─────────────────────────────────────────────────────────────────────────────
# Backend B: llama-cpp (text models)
# ─────────────────────────────────────────────────────────────────────────────
def _hub_download_robust(repo_id: str, filename: str, local_dir: str) -> str:
"""hf_hub_download with glob fallback for filename drift."""
import fnmatch
from huggingface_hub import hf_hub_download, list_repo_files # type: ignore
pathlib.Path(local_dir).mkdir(parents=True, exist_ok=True)
try:
return hf_hub_download(repo_id=repo_id, filename=filename, local_dir=local_dir)
except Exception:
pass
# Glob fallback
quant = re.search(r'(Q\d_K_[MS]|Q\d_\d|F16|BF16)', filename)
pat = f"*{quant.group(1)}*.gguf" if quant else f"*{pathlib.Path(filename).stem}*"
candidates = [f for f in list_repo_files(repo_id)
if fnmatch.fnmatch(f, pat) and f.endswith(".gguf")]
if not candidates:
raise FileNotFoundError(
f"No file matching {pat!r} in {repo_id}. "
f"GGUFs: {[f for f in list_repo_files(repo_id) if f.endswith('.gguf')]}"
)
best = next((f for f in candidates if "Q4_K_M" in f), candidates[0])
print(f" β†’ glob fallback: {best!r}")
return hf_hub_download(repo_id=repo_id, filename=best, local_dir=local_dir)
def _load_llama(model_id: str) -> None:
"""Load a text-only GGUF model via llama-cpp-python."""
if model_id in _llama_models:
return
with _load_lock:
if model_id in _llama_models:
return
_load_status[model_id] = "loading"
cfg = MODELS[model_id]
local_dir = str(CACHE_DIR / model_id)
print(f"[forge] Downloading LM: {cfg['repo']} / {cfg['file']}")
try:
from llama_cpp import Llama # type: ignore
model_path = _hub_download_robust(cfg["repo"], cfg["file"], local_dir)
n_gpu = int(os.environ.get("N_GPU_LAYERS", "-1"))
llm = Llama(
model_path=model_path,
n_ctx=cfg["ctx"],
n_gpu_layers=n_gpu,
verbose=False,
)
_llama_models[model_id] = llm
_load_status[model_id] = "ready"
print(f"[forge] βœ“ llama {model_id} ready")
except Exception as exc:
_load_status[model_id] = f"error:{exc}"
print(f"[forge] βœ— llama {model_id} failed: {exc}")
raise
async def _stream_llama(
llm,
messages: list,
params: dict,
loop: asyncio.AbstractEventLoop,
):
"""Async generator: bridges sync llama-cpp stream β†’ async SSE."""
queue: asyncio.Queue = asyncio.Queue(maxsize=256)
def _worker():
try:
output = llm.create_chat_completion(
messages=messages,
stream=True,
max_tokens=params.get("max_tokens", 1024),
temperature=params.get("temperature", 0.7),
top_p=params.get("top_p", 0.8),
top_k=int(params.get("top_k", 100)),
repeat_penalty=params.get("repeat_penalty", 1.05),
)
for chunk in output:
delta = chunk["choices"][0]["delta"].get("content", "")
if delta:
loop.call_soon_threadsafe(queue.put_nowait, delta)
except Exception as exc:
loop.call_soon_threadsafe(queue.put_nowait, f"\n\n[⚠ {exc}]")
finally:
loop.call_soon_threadsafe(queue.put_nowait, None)
loop.run_in_executor(None, _worker)
while True:
token = await queue.get()
if token is None:
break
yield token
# ─────────────────────────────────────────────────────────────────────────────
# Shared message builder
# ─────────────────────────────────────────────────────────────────────────────
def _build_messages(
message: str,
history: list,
image_b64: Optional[str],
backend: str,
) -> list[dict]:
msgs: list[dict] = []
for turn in history or []:
if turn.get("user"):
msgs.append({"role": "user",
"content": [{"type": "text", "text": turn["user"]}]})
if turn.get("assistant"):
msgs.append({"role": "assistant",
"content": [{"type": "text", "text": turn["assistant"]}]})
content: list[dict] = []
if image_b64:
if backend == "transformers":
# transformers expects a PIL Image object
img_bytes = base64.b64decode(image_b64)
img = Image.open(BytesIO(img_bytes)).convert("RGB")
content.append({"type": "image", "image": img})
else:
# llama-cpp expects a data: URI
raw = base64.b64decode(image_b64[:32])
mime = "image/png" if raw[:8] == b"\x89PNG\r\n\x1a\n" else "image/jpeg"
content.append({"type": "image_url",
"image_url": {"url": f"data:{mime};base64,{image_b64}"}})
content.append({"type": "text", "text": message or "Describe this image."})
msgs.append({"role": "user", "content": content})
return msgs
# ─────────────────────────────────────────────────────────────────────────────
# Gradio Server
# ─────────────────────────────────────────────────────────────────────────────
demo = Server()
@demo.get("/", response_class=HTMLResponse)
async def homepage():
html = pathlib.Path(__file__).parent / "index.html"
return html.read_text(encoding="utf-8")
@demo.get("/api/models")
async def api_models():
out = {}
for mid, cfg in MODELS.items():
out[mid] = {k: cfg[k] for k in ("id", "name", "tag", "color", "ctx", "vision", "thinking")
if k in cfg}
out[mid]["status"] = _load_status.get(mid, "idle")
out[mid]["api"] = cfg.get("backend") == "api"
out[mid]["backend"] = cfg.get("backend", "llama")
return JSONResponse(out)
@demo.post("/api/load")
async def api_load(request: Request):
data = await request.json()
mid = data.get("model_id", "")
cfg = MODELS.get(mid)
if not cfg:
return JSONResponse({"error": "unknown model"}, status_code=400)
if cfg["backend"] == "api":
return JSONResponse({"status": "api"})
# Kick off load in a background thread if not already running
current = _load_status.get(mid, "idle")
if current not in ("loading", "ready"):
loop = asyncio.get_event_loop()
loader = _load_transformers if cfg["backend"] == "transformers" else _load_llama
loop.run_in_executor(None, loader, mid)
return JSONResponse({"status": _load_status.get(mid, "loading")})
@demo.post("/stream/chat")
async def stream_chat(request: Request):
data = await request.json()
mid = data.get("model_id", "cpm5")
message = data.get("message", "")
history = data.get("history", [])
image_b64 = data.get("image_b64")
params = data.get("params", {})
cfg = MODELS.get(mid)
if not cfg:
return JSONResponse({"error": "unknown model"}, status_code=400)
backend = cfg.get("backend", "llama")
# ── API mode ──────────────────────────────────────────────────────────────
if backend == "api":
async def _api_sse():
yield f"data: {json.dumps({'token': '🌐 MiniCPM-o 4.5 API mode β€” set OPENBMB_API_KEY in Space secrets.'})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
return StreamingResponse(_api_sse(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache"})
# ── Check model is loaded ─────────────────────────────────────────────────
store = _tr_models if backend == "transformers" else _llama_models
if mid not in store:
msg = f"Model '{mid}' not loaded (status: {_load_status.get(mid, 'idle')}). Click ⬇ LOAD first."
async def _err_sse():
yield f"data: {json.dumps({'token': f'⚠ {msg}'})}\n\n"
yield f"data: {json.dumps({'done': True})}\n\n"
return StreamingResponse(_err_sse(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache"})
messages = _build_messages(message, history, image_b64, backend)
loop = asyncio.get_event_loop()
t0 = time.monotonic()
n_tok = [0]
async def sse_gen():
if backend == "transformers":
gen = _stream_transformers(mid, messages, params, loop)
else:
gen = _stream_llama(_llama_models[mid], messages, params, loop)
async for token in gen:
n_tok[0] += 1
elapsed = time.monotonic() - t0
speed = round(n_tok[0] / elapsed, 1) if elapsed > 0 else 0
yield f"data: {json.dumps({'token': token, 'speed': speed, 'n': n_tok[0]})}\n\n"
yield f"data: {json.dumps({'done': True, 'total': n_tok[0]})}\n\n"
return StreamingResponse(
sse_gen(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no",
"Connection": "keep-alive"},
)
@demo.get("/health")
async def health():
return JSONResponse({
"status": "ok",
"backends": {
"transformers": list(_tr_models.keys()),
"llama": list(_llama_models.keys()),
},
"spaces_gpu": HAS_SPACES_GPU,
})
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
server_port=int(os.environ.get("PORT", 7860)),
show_error=True,
)