MarianaCodebase's picture
Upload model.py with huggingface_hub
82050bb verified
"""Loads the MiniCPM5 GGUF and streams generations with thinking mode disabled.
MiniCPM5 starts every reply with a long <think> block when the default chat
template is used. The GGUF template supports the "<think>\n\n</think>\n\n"
prefill (equivalent to enable_thinking=False), so here we render the ChatML
prompt by hand, tokenize it with special=True and generate via
create_completion. Result: immediate response, no thinking.
"""
from __future__ import annotations
import threading
import time
from pathlib import Path
_llm = None
_using_mock = False
_model_checked = False
_gen_lock = threading.Lock()
IM_START = "<|im_start|>"
IM_END = "<|im_end|>"
NO_THINK_PREFILL = "<think>\n\n</think>\n\n"
DEFAULT_USER_PROMPT = "Escribe la transmisión de esta noche. Solo el guion al aire."
MOCK_SCRIPT = (
"[JINGLE] Buenas noches, oyentes del vacío. "
"Señal de prueba en esta frecuencia no autorizada. "
"[INTERFERENCIA] El tiempo aquí corre hacia atrás los martes. "
"[FIN DE TRANSMISION]"
)
# Published fine-tune repo; falls back to OpenBMB's base GGUF if missing.
FINETUNE_REPO = "MarianaCodebase/MiniCPM5-1B-lost-frequency-radio-GGUF"
BASE_REPO = "openbmb/MiniCPM5-1B-GGUF"
def _find_gguf() -> Path | None:
models_dir = Path(__file__).parent / "models" / "minicpm"
for pattern in ("*lost-frequency*.gguf", "*Q4_K_M*.gguf", "*.gguf"):
matches = sorted(models_dir.glob(pattern)) if models_dir.exists() else []
if matches:
return matches[0]
# No local GGUF (e.g. Space cold start): download from the Hub.
try:
from huggingface_hub import snapshot_download
for repo in (FINETUNE_REPO, BASE_REPO):
try:
snapshot_download(
repo_id=repo,
local_dir=str(models_dir),
allow_patterns="*Q4_K_M*.gguf",
)
matches = sorted(models_dir.glob("*.gguf"))
if matches:
print(f"[model] GGUF downloaded from {repo}")
return matches[0]
except Exception as exc:
print(f"[model] Could not download {repo}: {exc}")
except Exception as exc:
print(f"[model] huggingface_hub unavailable: {exc}")
return None
def _load_model():
global _llm, _using_mock, _model_checked
if _model_checked:
return
_model_checked = True
gguf = _find_gguf()
if gguf is None:
_using_mock = True
return
try:
from llama_cpp import Llama
_llm = Llama(model_path=str(gguf), n_ctx=2048, verbose=False)
_using_mock = False
except Exception as exc:
print(f"[model] Could not load GGUF ({gguf}): {exc}")
_llm = None
_using_mock = True
def _build_prompt(system_prompt: str, user_prompt: str) -> str:
return (
"<s>"
f"{IM_START}system\n{system_prompt}{IM_END}\n"
f"{IM_START}user\n{user_prompt}{IM_END}\n"
f"{IM_START}assistant\n{NO_THINK_PREFILL}"
)
class _ThinkingFilter:
"""Safety net: if a <think> block still shows up, it is never emitted."""
_OPEN = "<think>"
_CLOSE = "</think>"
def __init__(self):
self._buf = ""
self._in_thinking = False
def feed(self, token: str) -> str:
self._buf += token
out: list[str] = []
while self._buf:
if self._in_thinking:
idx = self._buf.find(self._CLOSE)
if idx == -1:
break
self._buf = self._buf[idx + len(self._CLOSE) :]
self._in_thinking = False
continue
idx = self._buf.find(self._OPEN)
if idx == -1:
# Hold back a suffix that could be a partial <think> opening
safe_until = len(self._buf)
for k in range(1, len(self._OPEN)):
if self._buf.endswith(self._OPEN[:k]):
safe_until = len(self._buf) - k
break
if safe_until:
out.append(self._buf[:safe_until])
self._buf = self._buf[safe_until:]
break
if idx > 0:
out.append(self._buf[:idx])
self._buf = self._buf[idx + len(self._OPEN) :]
self._in_thinking = True
return "".join(out)
def flush(self) -> str:
if self._in_thinking:
return ""
rest = self._buf
self._buf = ""
return rest
def stream_broadcast(
system_prompt: str,
user_prompt: str = DEFAULT_USER_PROMPT,
seed: int | None = None,
max_tokens: int = 220,
temperature: float = 0.7,
):
"""Generator that yields the broadcast text token by token."""
_load_model()
if _using_mock or _llm is None:
yield from _mock_stream(MOCK_SCRIPT)
return
prompt = _build_prompt(system_prompt, user_prompt)
tokens = _llm.tokenize(prompt.encode("utf-8"), add_bos=False, special=True)
safety = _ThinkingFilter()
with _gen_lock:
# reset(): forces full prompt re-evaluation. Without it, the first
# generation (batched eval) differs from later ones (cached prefix)
# and the frequency → broadcast determinism is lost.
_llm.reset()
stream = _llm.create_completion(
prompt=tokens,
max_tokens=max_tokens,
stream=True,
temperature=temperature,
top_p=0.9,
repeat_penalty=1.1,
seed=seed,
stop=[IM_END, "<|endoftext|>"],
)
for event in stream:
token = event["choices"][0].get("text") or ""
if not token:
continue
cleaned = safety.feed(token)
if cleaned:
yield cleaned
tail = safety.flush()
if tail:
yield tail
def _mock_stream(text: str):
"""Simulates character-by-character streaming (when no GGUF is present)."""
for char in text:
yield char
time.sleep(0.015)