| """Loads the MiniCPM5 GGUF and streams generations with thinking mode disabled. |
| |
| MiniCPM5 starts every reply with a long <think> block when the default chat |
| template is used. The GGUF template supports the "<think>\n\n</think>\n\n" |
| prefill (equivalent to enable_thinking=False), so here we render the ChatML |
| prompt by hand, tokenize it with special=True and generate via |
| create_completion. Result: immediate response, no thinking. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import threading |
| import time |
| from pathlib import Path |
|
|
| _llm = None |
| _using_mock = False |
| _model_checked = False |
| _gen_lock = threading.Lock() |
|
|
| IM_START = "<|im_start|>" |
| IM_END = "<|im_end|>" |
| NO_THINK_PREFILL = "<think>\n\n</think>\n\n" |
|
|
| DEFAULT_USER_PROMPT = "Escribe la transmisión de esta noche. Solo el guion al aire." |
|
|
| MOCK_SCRIPT = ( |
| "[JINGLE] Buenas noches, oyentes del vacío. " |
| "Señal de prueba en esta frecuencia no autorizada. " |
| "[INTERFERENCIA] El tiempo aquí corre hacia atrás los martes. " |
| "[FIN DE TRANSMISION]" |
| ) |
|
|
|
|
| |
| FINETUNE_REPO = "MarianaCodebase/MiniCPM5-1B-lost-frequency-radio-GGUF" |
| BASE_REPO = "openbmb/MiniCPM5-1B-GGUF" |
|
|
|
|
| def _find_gguf() -> Path | None: |
| models_dir = Path(__file__).parent / "models" / "minicpm" |
| for pattern in ("*lost-frequency*.gguf", "*Q4_K_M*.gguf", "*.gguf"): |
| matches = sorted(models_dir.glob(pattern)) if models_dir.exists() else [] |
| if matches: |
| return matches[0] |
|
|
| |
| try: |
| from huggingface_hub import snapshot_download |
|
|
| for repo in (FINETUNE_REPO, BASE_REPO): |
| try: |
| snapshot_download( |
| repo_id=repo, |
| local_dir=str(models_dir), |
| allow_patterns="*Q4_K_M*.gguf", |
| ) |
| matches = sorted(models_dir.glob("*.gguf")) |
| if matches: |
| print(f"[model] GGUF downloaded from {repo}") |
| return matches[0] |
| except Exception as exc: |
| print(f"[model] Could not download {repo}: {exc}") |
| except Exception as exc: |
| print(f"[model] huggingface_hub unavailable: {exc}") |
| return None |
|
|
|
|
| def _load_model(): |
| global _llm, _using_mock, _model_checked |
| if _model_checked: |
| return |
|
|
| _model_checked = True |
| gguf = _find_gguf() |
| if gguf is None: |
| _using_mock = True |
| return |
|
|
| try: |
| from llama_cpp import Llama |
|
|
| _llm = Llama(model_path=str(gguf), n_ctx=2048, verbose=False) |
| _using_mock = False |
| except Exception as exc: |
| print(f"[model] Could not load GGUF ({gguf}): {exc}") |
| _llm = None |
| _using_mock = True |
|
|
|
|
| def _build_prompt(system_prompt: str, user_prompt: str) -> str: |
| return ( |
| "<s>" |
| f"{IM_START}system\n{system_prompt}{IM_END}\n" |
| f"{IM_START}user\n{user_prompt}{IM_END}\n" |
| f"{IM_START}assistant\n{NO_THINK_PREFILL}" |
| ) |
|
|
|
|
| class _ThinkingFilter: |
| """Safety net: if a <think> block still shows up, it is never emitted.""" |
|
|
| _OPEN = "<think>" |
| _CLOSE = "</think>" |
|
|
| def __init__(self): |
| self._buf = "" |
| self._in_thinking = False |
|
|
| def feed(self, token: str) -> str: |
| self._buf += token |
| out: list[str] = [] |
|
|
| while self._buf: |
| if self._in_thinking: |
| idx = self._buf.find(self._CLOSE) |
| if idx == -1: |
| break |
| self._buf = self._buf[idx + len(self._CLOSE) :] |
| self._in_thinking = False |
| continue |
|
|
| idx = self._buf.find(self._OPEN) |
| if idx == -1: |
| |
| safe_until = len(self._buf) |
| for k in range(1, len(self._OPEN)): |
| if self._buf.endswith(self._OPEN[:k]): |
| safe_until = len(self._buf) - k |
| break |
| if safe_until: |
| out.append(self._buf[:safe_until]) |
| self._buf = self._buf[safe_until:] |
| break |
| if idx > 0: |
| out.append(self._buf[:idx]) |
| self._buf = self._buf[idx + len(self._OPEN) :] |
| self._in_thinking = True |
|
|
| return "".join(out) |
|
|
| def flush(self) -> str: |
| if self._in_thinking: |
| return "" |
| rest = self._buf |
| self._buf = "" |
| return rest |
|
|
|
|
| def stream_broadcast( |
| system_prompt: str, |
| user_prompt: str = DEFAULT_USER_PROMPT, |
| seed: int | None = None, |
| max_tokens: int = 220, |
| temperature: float = 0.7, |
| ): |
| """Generator that yields the broadcast text token by token.""" |
| _load_model() |
|
|
| if _using_mock or _llm is None: |
| yield from _mock_stream(MOCK_SCRIPT) |
| return |
|
|
| prompt = _build_prompt(system_prompt, user_prompt) |
| tokens = _llm.tokenize(prompt.encode("utf-8"), add_bos=False, special=True) |
|
|
| safety = _ThinkingFilter() |
| with _gen_lock: |
| |
| |
| |
| _llm.reset() |
| stream = _llm.create_completion( |
| prompt=tokens, |
| max_tokens=max_tokens, |
| stream=True, |
| temperature=temperature, |
| top_p=0.9, |
| repeat_penalty=1.1, |
| seed=seed, |
| stop=[IM_END, "<|endoftext|>"], |
| ) |
| for event in stream: |
| token = event["choices"][0].get("text") or "" |
| if not token: |
| continue |
| cleaned = safety.feed(token) |
| if cleaned: |
| yield cleaned |
| tail = safety.flush() |
| if tail: |
| yield tail |
|
|
|
|
| def _mock_stream(text: str): |
| """Simulates character-by-character streaming (when no GGUF is present).""" |
| for char in text: |
| yield char |
| time.sleep(0.015) |
|
|