| """Lost Frequency Radio backend — gradio.Server + streaming /tune.""" |
|
|
| import queue |
| import re |
| import threading |
| import time |
| import zlib |
| from pathlib import Path |
|
|
| from fastapi.responses import HTMLResponse |
| from fastapi.staticfiles import StaticFiles |
| from gradio import Server |
|
|
| from decrypt import ( |
| ENCRYPTED_FREQUENCY, |
| OPERATOR_SYSTEM_PROMPTS, |
| is_valid_code, |
| numbers_broadcast, |
| reveal_text, |
| seq_for, |
| validate_bypass, |
| ) |
| from game import ( |
| FINALE_SEED, |
| FINALE_SYSTEM_PROMPT, |
| FINALE_USER_PROMPT, |
| fragment_text, |
| get_fragment, |
| hidden_band_catalog, |
| ) |
| from model import stream_broadcast |
| from stations import ( |
| CURSED_FREQUENCY, |
| CURSED_SYSTEM_PROMPT, |
| STATION_TOLERANCE, |
| get_station, |
| localized_system_prompt, |
| localized_user_prompt, |
| nearest_station_frequency, |
| normalize_lang, |
| signal_strength, |
| station_catalog, |
| station_seed, |
| station_system_prompt, |
| ) |
|
|
| ROOT = Path(__file__).parent |
|
|
|
|
| class _NoCacheStatic(StaticFiles): |
| """Serve static assets with revalidation so updated CSS/JS reach listeners |
| immediately after a deploy (and during local development).""" |
|
|
| async def get_response(self, path, scope): |
| response = await super().get_response(path, scope) |
| response.headers["Cache-Control"] = "no-cache, must-revalidate" |
| return response |
|
|
|
|
| app = Server(title="Lost Frequency Radio") |
| app.mount("/static", _NoCacheStatic(directory=ROOT / "static"), name="static") |
|
|
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def homepage(): |
| return (ROOT / "index.html").read_text(encoding="utf-8") |
|
|
|
|
| @app.get("/api/stations") |
| async def stations_api(variant: float = 0): |
| catalog = station_catalog() |
| enc = catalog.get(str(ENCRYPTED_FREQUENCY)) |
| if enc is not None: |
| |
| enc["sequence"] = seq_for(_norm_variant(variant)) |
| return catalog |
|
|
|
|
| @app.get("/api/band") |
| async def band_api(code: str = "", variant: float = 0): |
| """Hidden band: only with the code obtained by breaking the cipher.""" |
| if not is_valid_code(code, _norm_variant(variant)): |
| return {"ok": False} |
| return {"ok": True, **hidden_band_catalog()} |
|
|
|
|
| def _stream_fixed(text: str, char_delay: float = 0.025, should_continue=None): |
| """Streams a fixed text with teletype cadence (accumulated).""" |
| accumulated = "" |
| for char in text: |
| if should_continue is not None and not should_continue(): |
| return |
| accumulated += char |
| yield accumulated |
| time.sleep(char_delay) |
|
|
|
|
| |
| |
| _broadcast_cache: dict[str, str] = {} |
| _CACHE_MAX = 256 |
|
|
| |
| |
| |
| |
| |
| _tune_epoch = 0 |
| _tune_epoch_lock = threading.Lock() |
|
|
|
|
| def _next_tune_epoch() -> int: |
| global _tune_epoch |
| with _tune_epoch_lock: |
| _tune_epoch += 1 |
| return _tune_epoch |
|
|
|
|
| |
| |
| |
| |
|
|
| |
| |
| _TOKEN_RE = re.compile( |
| r"(?is)<think>.*?</think>|</?think>|" |
| r"<\|im_start\|>\s*(?:system|user|assistant)?|<\|im_end\|>|<\|endoftext\|>" |
| ) |
| _ROLE_LINE_RE = re.compile(r"(?im)^\s*(?:system|user|assistant)\s*[::]?\s*$") |
|
|
| |
| _LEAK_KEYWORDS = ( |
| |
| r"escrib\w*\s+s[oó]lo\s+el\s+gui[oó]n", r"gui[oó]n\s+al\s+aire", |
| r"sin\s+explicaciones", r"entre\s+\d+\s+y\s+\d+\s+palabras", |
| r"escribes?\s+(?:el\s+)?gui[oó]n", r"traduce\s+tu\s+frase", |
| r"ni\s+una\s+palabra\s+en\s+(?:ingl[eé]s|espa[nñ]ol)", |
| r"ejemplo\s+del\s+tono", r"responde\s+en\s+una\s+o\s+dos", |
| r"cierra\s+(?:siempre\s+)?con\s+\[", r"transmites\s+[ií]ntegramente", |
| |
| r"write\s+only\s+the\s+on-?air", r"on-?air\s+script\s+only", |
| r"no\s+explanations", r"between\s+\d+\s+and\s+\d+\s+words", |
| r"you\s+write\s+(?:the\s+)?(?:on-?air\s+|radio\s+)?scripts?", |
| r"translate\s+your\s+opening", r"not\s+a\s+single\s+word\s+of", |
| r"example\s+of\s+the\s+(?:expected\s+)?tone", |
| r"reply\s+in\s+one\s+or\s+two", r"always\s+close\s+with\s+\[", |
| r"you\s+(?:broadcast|speak)\s+(?:and\s+write\s+)?(?:only|entirely)\s+in", |
| r"never\s+in\s+spanish", r"translate\s+every\s+line", |
| |
| r"uniquement\s+le\s+script", r"sans\s+explications", |
| r"entre\s+\d+\s+et\s+\d+\s+mots", r"tu\s+[ée]cris\s+le\s+script", |
| r"traduis\s+(?:ta\s+phrase|chaque\s+ligne)", r"pas\s+un\s+mot\s+d", |
| r"exemple\s+du\s+ton", r"r[ée]ponds\s+en\s+une\s+ou\s+deux", |
| r"termine\s+(?:toujours\s+)?par\s+\[", r"tu\s+[ée]mets\s+enti[èe]rement", |
| r"tu\s+parles\s+et\s+[ée]cris", r"jamais\s+en\s+espagnol", |
| r"hablas\s+y\s+escribes\s+solo", r"nunca\s+en\s+(?:ingl[eé]s|espa[nñ]ol)", |
| ) |
| _LEAK_RE = re.compile( |
| r"(?i)[^.\n!?]*(?:" + "|".join(_LEAK_KEYWORDS) + r")[^.\n!?]*[.\n!?]*" |
| ) |
|
|
| |
| |
| _LEAD_RE = re.compile( |
| r"(?i)\b(?:you\s+begin\s+with|empiezas\s+con|tu\s+commences\s+par)\s*[::]?\s*" |
| ) |
|
|
|
|
| def _strip_leaks(text: str) -> str: |
| text = _TOKEN_RE.sub(" ", text) |
| text = _ROLE_LINE_RE.sub(" ", text) |
| text = _LEAK_RE.sub(" ", text) |
| text = _LEAD_RE.sub("", text) |
| |
| text = re.sub(r"[ \t]{2,}", " ", text) |
| return text.lstrip() |
|
|
|
|
| _GEN_ERR = object() |
|
|
|
|
| def _threaded_broadcast(system_prompt: str, should_continue, **kwargs): |
| """Generate in a worker thread and hand tokens over via a queue. |
| |
| The model lock lives inside `stream_broadcast`, held for the whole |
| generation. If we iterated it directly from the HTTP response generator and |
| the client disconnected, this generator would be suspended at a `yield` |
| forever — still holding the lock — and every later generation would block |
| (the "everything turns to static and never recovers" bug). |
| |
| Running it in a worker thread decouples the lock from the HTTP consumer: the |
| worker keeps pulling tokens and checks `should_continue` itself, so it always |
| finishes (and releases the lock) — when superseded, on disconnect, or at |
| max_tokens — regardless of whether anyone is still listening. |
| """ |
| q: queue.Queue = queue.Queue() |
| DONE = object() |
|
|
| def worker(): |
| try: |
| for token in stream_broadcast(system_prompt, **kwargs): |
| if should_continue is not None and not should_continue(): |
| break |
| q.put(token) |
| except Exception as exc: |
| q.put((_GEN_ERR, exc)) |
| finally: |
| q.put(DONE) |
|
|
| threading.Thread(target=worker, daemon=True).start() |
|
|
| while True: |
| item = q.get() |
| if item is DONE: |
| return |
| if isinstance(item, tuple) and item and item[0] is _GEN_ERR: |
| print(f"[model] generation error: {item[1]}") |
| return |
| yield item |
|
|
|
|
| def _stream_model_cached(cache_key: str, system_prompt: str, should_continue=None, **kwargs): |
| def superseded(): |
| return should_continue is not None and not should_continue() |
|
|
| cached = _broadcast_cache.get(cache_key) |
| if cached: |
| yield from _stream_fixed(cached, char_delay=0.018, should_continue=should_continue) |
| return |
|
|
| accumulated = "" |
| completed = False |
| try: |
| for token in _threaded_broadcast(system_prompt, should_continue, **kwargs): |
| if superseded(): |
| return |
| accumulated += token |
| yield _strip_leaks(accumulated) |
| completed = True |
| finally: |
| |
| |
| clean = _strip_leaks(accumulated) |
| if completed and clean.strip(): |
| if len(_broadcast_cache) >= _CACHE_MAX: |
| _broadcast_cache.pop(next(iter(_broadcast_cache))) |
| _broadcast_cache[cache_key] = clean |
|
|
|
|
| def _mix_seed(base: int, variant: int) -> int: |
| """Blends the station seed with the listener's session variant.""" |
| if not variant: |
| return base |
| return (base ^ (variant * 2654435761)) % (2**31 - 1) |
|
|
|
|
| def _norm_variant(variant) -> int: |
| try: |
| return abs(int(float(variant))) % (2**31 - 1) |
| except (TypeError, ValueError): |
| return 0 |
|
|
|
|
| |
| |
| |
| @app.api(name="tune", concurrency_limit=2, stream_every=0.02) |
| def tune(frequency: float, language: str = "es", variant: float = 0) -> str: |
| """Tunes a frequency and streams the broadcast (SSE). |
| |
| `variant` is a per-listener session nonce: every visitor hears their own |
| edition of each station (the world re-broadcasts for every new listener). |
| """ |
| frequency = float(frequency) |
| lang = normalize_lang(language) |
| var = _norm_variant(variant) |
| my_epoch = _next_tune_epoch() |
|
|
| def still_current(): |
| return my_epoch == _tune_epoch |
|
|
| |
| if abs(frequency - ENCRYPTED_FREQUENCY) <= STATION_TOLERANCE: |
| yield from _stream_fixed(numbers_broadcast(lang, var), should_continue=still_current) |
| return |
|
|
| |
| if abs(frequency - CURSED_FREQUENCY) <= STATION_TOLERANCE: |
| yield from _stream_model_cached( |
| f"cursed:{CURSED_FREQUENCY}:{lang}:{var}", |
| localized_system_prompt(CURSED_SYSTEM_PROMPT, "es", lang), |
| should_continue=still_current, |
| user_prompt=localized_user_prompt(None, lang), |
| seed=_mix_seed(station_seed(CURSED_FREQUENCY), var), |
| max_tokens=160, |
| ) |
| return |
|
|
| |
| fragment = get_fragment(frequency) |
| if fragment is not None: |
| yield from _stream_fixed(fragment_text(fragment, lang), should_continue=still_current) |
| return |
|
|
| canonical = nearest_station_frequency(frequency) |
| station = get_station(frequency) |
|
|
| if station is None or canonical is None or signal_strength(frequency) <= 0: |
| yield "" |
| return |
|
|
| yield from _stream_model_cached( |
| f"station:{canonical}:{lang}:{var}", |
| station_system_prompt(canonical, lang), |
| should_continue=still_current, |
| user_prompt=localized_user_prompt(station, lang), |
| seed=_mix_seed(station_seed(canonical), var), |
| ) |
|
|
|
|
| @app.api(name="transmit", concurrency_limit=1, stream_every=0.02) |
| def transmit(message: str, language: str = "es", variant: float = 0) -> str: |
| """Console of the encrypted frequency. |
| |
| Victory is decided by decrypt.validate_bypass (deterministic); |
| the model only role-plays the operator's resistance. |
| """ |
| message = str(message).strip()[:500] |
| lang = normalize_lang(language) |
| var = _norm_variant(variant) |
| if not message: |
| yield "" |
| return |
|
|
| if validate_bypass(message, var): |
| yield from _stream_fixed(reveal_text(lang, var), char_delay=0.02) |
| return |
|
|
| accumulated = "" |
| for token in _threaded_broadcast( |
| OPERATOR_SYSTEM_PROMPTS[lang], |
| None, |
| user_prompt=message, |
| seed=zlib.crc32(message.encode("utf-8")) % (2**31 - 1), |
| max_tokens=90, |
| ): |
| accumulated += token |
| yield _strip_leaks(accumulated) |
|
|
|
|
| @app.api(name="finale", concurrency_limit=1, stream_every=0.02) |
| def finale(code: str, language: str = "es", variant: float = 0) -> str: |
| """The complete final transmission (after collecting every fragment).""" |
| lang = normalize_lang(language) |
| if not is_valid_code(str(code), _norm_variant(variant)): |
| yield "[ACCESO DENEGADO]" |
| return |
|
|
| yield from _stream_model_cached( |
| f"finale:{lang}", |
| localized_system_prompt(FINALE_SYSTEM_PROMPT, "es", lang), |
| user_prompt=FINALE_USER_PROMPT, |
| seed=FINALE_SEED, |
| max_tokens=300, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| app.launch(server_name="0.0.0.0", server_port=7860, show_error=True) |
|
|