File size: 6,132 Bytes
82050bb
192bcd3
82050bb
 
 
 
 
192bcd3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82050bb
192bcd3
 
 
 
 
 
 
 
 
 
 
82050bb
192bcd3
 
 
 
 
 
 
 
 
 
 
 
82050bb
192bcd3
 
82050bb
192bcd3
82050bb
192bcd3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82050bb
192bcd3
 
 
 
 
 
 
 
 
 
 
 
 
 
82050bb
192bcd3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82050bb
192bcd3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82050bb
192bcd3
 
 
 
 
 
 
 
 
 
 
82050bb
 
 
192bcd3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82050bb
192bcd3
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
"""Loads the MiniCPM5 GGUF and streams generations with thinking mode disabled.

MiniCPM5 starts every reply with a long <think> block when the default chat
template is used. The GGUF template supports the "<think>\n\n</think>\n\n"
prefill (equivalent to enable_thinking=False), so here we render the ChatML
prompt by hand, tokenize it with special=True and generate via
create_completion. Result: immediate response, no thinking.
"""

from __future__ import annotations

import threading
import time
from pathlib import Path

_llm = None
_using_mock = False
_model_checked = False
_gen_lock = threading.Lock()

IM_START = "<|im_start|>"
IM_END = "<|im_end|>"
NO_THINK_PREFILL = "<think>\n\n</think>\n\n"

DEFAULT_USER_PROMPT = "Escribe la transmisión de esta noche. Solo el guion al aire."

MOCK_SCRIPT = (
    "[JINGLE] Buenas noches, oyentes del vacío. "
    "Señal de prueba en esta frecuencia no autorizada. "
    "[INTERFERENCIA] El tiempo aquí corre hacia atrás los martes. "
    "[FIN DE TRANSMISION]"
)


# Published fine-tune repo; falls back to OpenBMB's base GGUF if missing.
FINETUNE_REPO = "MarianaCodebase/MiniCPM5-1B-lost-frequency-radio-GGUF"
BASE_REPO = "openbmb/MiniCPM5-1B-GGUF"


def _find_gguf() -> Path | None:
    models_dir = Path(__file__).parent / "models" / "minicpm"
    for pattern in ("*lost-frequency*.gguf", "*Q4_K_M*.gguf", "*.gguf"):
        matches = sorted(models_dir.glob(pattern)) if models_dir.exists() else []
        if matches:
            return matches[0]

    # No local GGUF (e.g. Space cold start): download from the Hub.
    try:
        from huggingface_hub import snapshot_download

        for repo in (FINETUNE_REPO, BASE_REPO):
            try:
                snapshot_download(
                    repo_id=repo,
                    local_dir=str(models_dir),
                    allow_patterns="*Q4_K_M*.gguf",
                )
                matches = sorted(models_dir.glob("*.gguf"))
                if matches:
                    print(f"[model] GGUF downloaded from {repo}")
                    return matches[0]
            except Exception as exc:
                print(f"[model] Could not download {repo}: {exc}")
    except Exception as exc:
        print(f"[model] huggingface_hub unavailable: {exc}")
    return None


def _load_model():
    global _llm, _using_mock, _model_checked
    if _model_checked:
        return

    _model_checked = True
    gguf = _find_gguf()
    if gguf is None:
        _using_mock = True
        return

    try:
        from llama_cpp import Llama

        _llm = Llama(model_path=str(gguf), n_ctx=2048, verbose=False)
        _using_mock = False
    except Exception as exc:
        print(f"[model] Could not load GGUF ({gguf}): {exc}")
        _llm = None
        _using_mock = True


def _build_prompt(system_prompt: str, user_prompt: str) -> str:
    return (
        "<s>"
        f"{IM_START}system\n{system_prompt}{IM_END}\n"
        f"{IM_START}user\n{user_prompt}{IM_END}\n"
        f"{IM_START}assistant\n{NO_THINK_PREFILL}"
    )


class _ThinkingFilter:
    """Safety net: if a <think> block still shows up, it is never emitted."""

    _OPEN = "<think>"
    _CLOSE = "</think>"

    def __init__(self):
        self._buf = ""
        self._in_thinking = False

    def feed(self, token: str) -> str:
        self._buf += token
        out: list[str] = []

        while self._buf:
            if self._in_thinking:
                idx = self._buf.find(self._CLOSE)
                if idx == -1:
                    break
                self._buf = self._buf[idx + len(self._CLOSE) :]
                self._in_thinking = False
                continue

            idx = self._buf.find(self._OPEN)
            if idx == -1:
                # Hold back a suffix that could be a partial <think> opening
                safe_until = len(self._buf)
                for k in range(1, len(self._OPEN)):
                    if self._buf.endswith(self._OPEN[:k]):
                        safe_until = len(self._buf) - k
                        break
                if safe_until:
                    out.append(self._buf[:safe_until])
                    self._buf = self._buf[safe_until:]
                break
            if idx > 0:
                out.append(self._buf[:idx])
            self._buf = self._buf[idx + len(self._OPEN) :]
            self._in_thinking = True

        return "".join(out)

    def flush(self) -> str:
        if self._in_thinking:
            return ""
        rest = self._buf
        self._buf = ""
        return rest


def stream_broadcast(
    system_prompt: str,
    user_prompt: str = DEFAULT_USER_PROMPT,
    seed: int | None = None,
    max_tokens: int = 220,
    temperature: float = 0.7,
):
    """Generator that yields the broadcast text token by token."""
    _load_model()

    if _using_mock or _llm is None:
        yield from _mock_stream(MOCK_SCRIPT)
        return

    prompt = _build_prompt(system_prompt, user_prompt)
    tokens = _llm.tokenize(prompt.encode("utf-8"), add_bos=False, special=True)

    safety = _ThinkingFilter()
    with _gen_lock:
        # reset(): forces full prompt re-evaluation. Without it, the first
        # generation (batched eval) differs from later ones (cached prefix)
        # and the frequency → broadcast determinism is lost.
        _llm.reset()
        stream = _llm.create_completion(
            prompt=tokens,
            max_tokens=max_tokens,
            stream=True,
            temperature=temperature,
            top_p=0.9,
            repeat_penalty=1.1,
            seed=seed,
            stop=[IM_END, "<|endoftext|>"],
        )
        for event in stream:
            token = event["choices"][0].get("text") or ""
            if not token:
                continue
            cleaned = safety.feed(token)
            if cleaned:
                yield cleaned
        tail = safety.flush()
        if tail:
            yield tail


def _mock_stream(text: str):
    """Simulates character-by-character streaming (when no GGUF is present)."""
    for char in text:
        yield char
        time.sleep(0.015)