"""Loads the MiniCPM5 GGUF and streams generations with thinking mode disabled. MiniCPM5 starts every reply with a long block when the default chat template is used. The GGUF template supports the "\n\n\n\n" prefill (equivalent to enable_thinking=False), so here we render the ChatML prompt by hand, tokenize it with special=True and generate via create_completion. Result: immediate response, no thinking. """ from __future__ import annotations import threading import time from pathlib import Path _llm = None _using_mock = False _model_checked = False _gen_lock = threading.Lock() IM_START = "<|im_start|>" IM_END = "<|im_end|>" NO_THINK_PREFILL = "\n\n\n\n" DEFAULT_USER_PROMPT = "Escribe la transmisión de esta noche. Solo el guion al aire." MOCK_SCRIPT = ( "[JINGLE] Buenas noches, oyentes del vacío. " "Señal de prueba en esta frecuencia no autorizada. " "[INTERFERENCIA] El tiempo aquí corre hacia atrás los martes. " "[FIN DE TRANSMISION]" ) # Published fine-tune repo; falls back to OpenBMB's base GGUF if missing. FINETUNE_REPO = "MarianaCodebase/MiniCPM5-1B-lost-frequency-radio-GGUF" BASE_REPO = "openbmb/MiniCPM5-1B-GGUF" def _find_gguf() -> Path | None: models_dir = Path(__file__).parent / "models" / "minicpm" for pattern in ("*lost-frequency*.gguf", "*Q4_K_M*.gguf", "*.gguf"): matches = sorted(models_dir.glob(pattern)) if models_dir.exists() else [] if matches: return matches[0] # No local GGUF (e.g. Space cold start): download from the Hub. try: from huggingface_hub import snapshot_download for repo in (FINETUNE_REPO, BASE_REPO): try: snapshot_download( repo_id=repo, local_dir=str(models_dir), allow_patterns="*Q4_K_M*.gguf", ) matches = sorted(models_dir.glob("*.gguf")) if matches: print(f"[model] GGUF downloaded from {repo}") return matches[0] except Exception as exc: print(f"[model] Could not download {repo}: {exc}") except Exception as exc: print(f"[model] huggingface_hub unavailable: {exc}") return None def _load_model(): global _llm, _using_mock, _model_checked if _model_checked: return _model_checked = True gguf = _find_gguf() if gguf is None: _using_mock = True return try: from llama_cpp import Llama _llm = Llama(model_path=str(gguf), n_ctx=2048, verbose=False) _using_mock = False except Exception as exc: print(f"[model] Could not load GGUF ({gguf}): {exc}") _llm = None _using_mock = True def _build_prompt(system_prompt: str, user_prompt: str) -> str: return ( "" f"{IM_START}system\n{system_prompt}{IM_END}\n" f"{IM_START}user\n{user_prompt}{IM_END}\n" f"{IM_START}assistant\n{NO_THINK_PREFILL}" ) class _ThinkingFilter: """Safety net: if a block still shows up, it is never emitted.""" _OPEN = "" _CLOSE = "" def __init__(self): self._buf = "" self._in_thinking = False def feed(self, token: str) -> str: self._buf += token out: list[str] = [] while self._buf: if self._in_thinking: idx = self._buf.find(self._CLOSE) if idx == -1: break self._buf = self._buf[idx + len(self._CLOSE) :] self._in_thinking = False continue idx = self._buf.find(self._OPEN) if idx == -1: # Hold back a suffix that could be a partial opening safe_until = len(self._buf) for k in range(1, len(self._OPEN)): if self._buf.endswith(self._OPEN[:k]): safe_until = len(self._buf) - k break if safe_until: out.append(self._buf[:safe_until]) self._buf = self._buf[safe_until:] break if idx > 0: out.append(self._buf[:idx]) self._buf = self._buf[idx + len(self._OPEN) :] self._in_thinking = True return "".join(out) def flush(self) -> str: if self._in_thinking: return "" rest = self._buf self._buf = "" return rest def stream_broadcast( system_prompt: str, user_prompt: str = DEFAULT_USER_PROMPT, seed: int | None = None, max_tokens: int = 220, temperature: float = 0.7, ): """Generator that yields the broadcast text token by token.""" _load_model() if _using_mock or _llm is None: yield from _mock_stream(MOCK_SCRIPT) return prompt = _build_prompt(system_prompt, user_prompt) tokens = _llm.tokenize(prompt.encode("utf-8"), add_bos=False, special=True) safety = _ThinkingFilter() with _gen_lock: # reset(): forces full prompt re-evaluation. Without it, the first # generation (batched eval) differs from later ones (cached prefix) # and the frequency → broadcast determinism is lost. _llm.reset() stream = _llm.create_completion( prompt=tokens, max_tokens=max_tokens, stream=True, temperature=temperature, top_p=0.9, repeat_penalty=1.1, seed=seed, stop=[IM_END, "<|endoftext|>"], ) for event in stream: token = event["choices"][0].get("text") or "" if not token: continue cleaned = safety.feed(token) if cleaned: yield cleaned tail = safety.flush() if tail: yield tail def _mock_stream(text: str): """Simulates character-by-character streaming (when no GGUF is present).""" for char in text: yield char time.sleep(0.015)