"""Lost Frequency Radio backend — gradio.Server + streaming /tune.""" import queue import re import threading import time import zlib from pathlib import Path from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from gradio import Server from decrypt import ( ENCRYPTED_FREQUENCY, OPERATOR_SYSTEM_PROMPTS, is_valid_code, numbers_broadcast, reveal_text, seq_for, validate_bypass, ) from game import ( FINALE_SEED, FINALE_SYSTEM_PROMPT, FINALE_USER_PROMPT, fragment_text, get_fragment, hidden_band_catalog, ) from model import stream_broadcast from stations import ( CURSED_FREQUENCY, CURSED_SYSTEM_PROMPT, STATION_TOLERANCE, get_station, localized_system_prompt, localized_user_prompt, nearest_station_frequency, normalize_lang, signal_strength, station_catalog, station_seed, station_system_prompt, ) ROOT = Path(__file__).parent class _NoCacheStatic(StaticFiles): """Serve static assets with revalidation so updated CSS/JS reach listeners immediately after a deploy (and during local development).""" async def get_response(self, path, scope): response = await super().get_response(path, scope) response.headers["Cache-Control"] = "no-cache, must-revalidate" return response app = Server(title="Lost Frequency Radio") app.mount("/static", _NoCacheStatic(directory=ROOT / "static"), name="static") @app.get("/", response_class=HTMLResponse) async def homepage(): return (ROOT / "index.html").read_text(encoding="utf-8") @app.get("/api/stations") async def stations_api(variant: float = 0): catalog = station_catalog() enc = catalog.get(str(ENCRYPTED_FREQUENCY)) if enc is not None: # the per-listener Morse sequence (public: it is dictated on air) enc["sequence"] = seq_for(_norm_variant(variant)) return catalog @app.get("/api/band") async def band_api(code: str = "", variant: float = 0): """Hidden band: only with the code obtained by breaking the cipher.""" if not is_valid_code(code, _norm_variant(variant)): return {"ok": False} return {"ok": True, **hidden_band_catalog()} def _stream_fixed(text: str, char_delay: float = 0.025, should_continue=None): """Streams a fixed text with teletype cadence (accumulated).""" accumulated = "" for char in text: if should_continue is not None and not should_continue(): return accumulated += char yield accumulated time.sleep(char_delay) # Broadcasts are deterministic per (frequency, language, listener variant) → # in-memory cache: retuning the same station replays instantly (key on CPU). _broadcast_cache: dict[str, str] = {} _CACHE_MAX = 256 # Rapid retuning: each /tune bumps this epoch. An in-flight generation that is # no longer the latest self-terminates at the next token, releasing the model # lock so the newest tune can run. Without this, a superseded generation runs to # completion holding the lock and every later tune stalls behind it (the # "everything goes to static and never comes back" bug). _tune_epoch = 0 _tune_epoch_lock = threading.Lock() def _next_tune_epoch() -> int: global _tune_epoch with _tune_epoch_lock: _tune_epoch += 1 return _tune_epoch # The 1B model occasionally regurgitates its own system prompt on air. Three # layers scrub it before any listener hears it, and none of them touch the # canonical [JINGLE]/[INTERFERENCIA]/[CORTE COMERCIAL]/[FIN DE TRANSMISION] # markers (those are control tokens the frontend localizes and acts on). # 1) Chat/special tokens, whole blocks, and bare role labels that may # bleed through (model.py already filters thinking; this is a backstop). _TOKEN_RE = re.compile( r"(?is).*?||" r"<\|im_start\|>\s*(?:system|user|assistant)?|<\|im_end\|>|<\|endoftext\|>" ) _ROLE_LINE_RE = re.compile(r"(?im)^\s*(?:system|user|assistant)\s*[::]?\s*$") # 2) Whole sentences that are instruction-shaped (the dangerous parrots). _LEAK_KEYWORDS = ( # Spanish r"escrib\w*\s+s[oó]lo\s+el\s+gui[oó]n", r"gui[oó]n\s+al\s+aire", r"sin\s+explicaciones", r"entre\s+\d+\s+y\s+\d+\s+palabras", r"escribes?\s+(?:el\s+)?gui[oó]n", r"traduce\s+tu\s+frase", r"ni\s+una\s+palabra\s+en\s+(?:ingl[eé]s|espa[nñ]ol)", r"ejemplo\s+del\s+tono", r"responde\s+en\s+una\s+o\s+dos", r"cierra\s+(?:siempre\s+)?con\s+\[", r"transmites\s+[ií]ntegramente", # English r"write\s+only\s+the\s+on-?air", r"on-?air\s+script\s+only", r"no\s+explanations", r"between\s+\d+\s+and\s+\d+\s+words", r"you\s+write\s+(?:the\s+)?(?:on-?air\s+|radio\s+)?scripts?", r"translate\s+your\s+opening", r"not\s+a\s+single\s+word\s+of", r"example\s+of\s+the\s+(?:expected\s+)?tone", r"reply\s+in\s+one\s+or\s+two", r"always\s+close\s+with\s+\[", r"you\s+(?:broadcast|speak)\s+(?:and\s+write\s+)?(?:only|entirely)\s+in", r"never\s+in\s+spanish", r"translate\s+every\s+line", # French r"uniquement\s+le\s+script", r"sans\s+explications", r"entre\s+\d+\s+et\s+\d+\s+mots", r"tu\s+[ée]cris\s+le\s+script", r"traduis\s+(?:ta\s+phrase|chaque\s+ligne)", r"pas\s+un\s+mot\s+d", r"exemple\s+du\s+ton", r"r[ée]ponds\s+en\s+une\s+ou\s+deux", r"termine\s+(?:toujours\s+)?par\s+\[", r"tu\s+[ée]mets\s+enti[èe]rement", r"tu\s+parles\s+et\s+[ée]cris", r"jamais\s+en\s+espagnol", r"hablas\s+y\s+escribes\s+solo", r"nunca\s+en\s+(?:ingl[eé]s|espa[nñ]ol)", ) _LEAK_RE = re.compile( r"(?i)[^.\n!?]*(?:" + "|".join(_LEAK_KEYWORDS) + r")[^.\n!?]*[.\n!?]*" ) # 3) Bare "you begin with:" lead-ins — strip only the prefix, keep the opening # line that legitimately follows it. _LEAD_RE = re.compile( r"(?i)\b(?:you\s+begin\s+with|empiezas\s+con|tu\s+commences\s+par)\s*[::]?\s*" ) def _strip_leaks(text: str) -> str: text = _TOKEN_RE.sub(" ", text) text = _ROLE_LINE_RE.sub(" ", text) text = _LEAK_RE.sub(" ", text) text = _LEAD_RE.sub("", text) # collapse the blanks the substitutions leave behind, keep newlines text = re.sub(r"[ \t]{2,}", " ", text) return text.lstrip() _GEN_ERR = object() def _threaded_broadcast(system_prompt: str, should_continue, **kwargs): """Generate in a worker thread and hand tokens over via a queue. The model lock lives inside `stream_broadcast`, held for the whole generation. If we iterated it directly from the HTTP response generator and the client disconnected, this generator would be suspended at a `yield` forever — still holding the lock — and every later generation would block (the "everything turns to static and never recovers" bug). Running it in a worker thread decouples the lock from the HTTP consumer: the worker keeps pulling tokens and checks `should_continue` itself, so it always finishes (and releases the lock) — when superseded, on disconnect, or at max_tokens — regardless of whether anyone is still listening. """ q: queue.Queue = queue.Queue() DONE = object() def worker(): try: for token in stream_broadcast(system_prompt, **kwargs): if should_continue is not None and not should_continue(): break q.put(token) except Exception as exc: # noqa: BLE001 - surface, don't crash the worker q.put((_GEN_ERR, exc)) finally: q.put(DONE) threading.Thread(target=worker, daemon=True).start() while True: item = q.get() if item is DONE: return if isinstance(item, tuple) and item and item[0] is _GEN_ERR: print(f"[model] generation error: {item[1]}") return yield item def _stream_model_cached(cache_key: str, system_prompt: str, should_continue=None, **kwargs): def superseded(): return should_continue is not None and not should_continue() cached = _broadcast_cache.get(cache_key) if cached: yield from _stream_fixed(cached, char_delay=0.018, should_continue=should_continue) return accumulated = "" completed = False try: for token in _threaded_broadcast(system_prompt, should_continue, **kwargs): if superseded(): return # a newer tune arrived: stop streaming to this listener accumulated += token yield _strip_leaks(accumulated) completed = True finally: # Only cache full broadcasts: a cancelled stream (listener retuned) # must never freeze a truncated script for everyone after. clean = _strip_leaks(accumulated) if completed and clean.strip(): if len(_broadcast_cache) >= _CACHE_MAX: _broadcast_cache.pop(next(iter(_broadcast_cache))) _broadcast_cache[cache_key] = clean def _mix_seed(base: int, variant: int) -> int: """Blends the station seed with the listener's session variant.""" if not variant: return base return (base ^ (variant * 2654435761)) % (2**31 - 1) def _norm_variant(variant) -> int: try: return abs(int(float(variant))) % (2**31 - 1) except (TypeError, ValueError): return 0 # concurrency 2: a cancelled-but-unfinished stream must not head-block the # queue (that froze "the other stations" when retuning fast). Real generation # stays serialized by model._gen_lock. @app.api(name="tune", concurrency_limit=2, stream_every=0.02) def tune(frequency: float, language: str = "es", variant: float = 0) -> str: """Tunes a frequency and streams the broadcast (SSE). `variant` is a per-listener session nonce: every visitor hears their own edition of each station (the world re-broadcasts for every new listener). """ frequency = float(frequency) lang = normalize_lang(language) var = _norm_variant(variant) my_epoch = _next_tune_epoch() def still_current(): return my_epoch == _tune_epoch # Encrypted number station: fixed text, the frontend adds the Morse. if abs(frequency - ENCRYPTED_FREQUENCY) <= STATION_TOLERANCE: yield from _stream_fixed(numbers_broadcast(lang, var), should_continue=still_current) return # The unsolvable station: the model broadcasts, the frontend never cleans it. if abs(frequency - CURSED_FREQUENCY) <= STATION_TOLERANCE: yield from _stream_model_cached( f"cursed:{CURSED_FREQUENCY}:{lang}:{var}", localized_system_prompt(CURSED_SYSTEM_PROMPT, "es", lang), should_continue=still_current, user_prompt=localized_user_prompt(None, lang), seed=_mix_seed(station_seed(CURSED_FREQUENCY), var), max_tokens=160, ) return # Hidden band fragments (minigame): deterministic text. fragment = get_fragment(frequency) if fragment is not None: yield from _stream_fixed(fragment_text(fragment, lang), should_continue=still_current) return canonical = nearest_station_frequency(frequency) station = get_station(frequency) if station is None or canonical is None or signal_strength(frequency) <= 0: yield "" return yield from _stream_model_cached( f"station:{canonical}:{lang}:{var}", station_system_prompt(canonical, lang), should_continue=still_current, user_prompt=localized_user_prompt(station, lang), seed=_mix_seed(station_seed(canonical), var), ) @app.api(name="transmit", concurrency_limit=1, stream_every=0.02) def transmit(message: str, language: str = "es", variant: float = 0) -> str: """Console of the encrypted frequency. Victory is decided by decrypt.validate_bypass (deterministic); the model only role-plays the operator's resistance. """ message = str(message).strip()[:500] lang = normalize_lang(language) var = _norm_variant(variant) if not message: yield "" return if validate_bypass(message, var): yield from _stream_fixed(reveal_text(lang, var), char_delay=0.02) return accumulated = "" for token in _threaded_broadcast( OPERATOR_SYSTEM_PROMPTS[lang], None, user_prompt=message, seed=zlib.crc32(message.encode("utf-8")) % (2**31 - 1), max_tokens=90, ): accumulated += token yield _strip_leaks(accumulated) @app.api(name="finale", concurrency_limit=1, stream_every=0.02) def finale(code: str, language: str = "es", variant: float = 0) -> str: """The complete final transmission (after collecting every fragment).""" lang = normalize_lang(language) if not is_valid_code(str(code), _norm_variant(variant)): yield "[ACCESO DENEGADO]" return yield from _stream_model_cached( f"finale:{lang}", localized_system_prompt(FINALE_SYSTEM_PROMPT, "es", lang), user_prompt=FINALE_USER_PROMPT, seed=FINALE_SEED, max_tokens=300, ) if __name__ == "__main__": app.launch(server_name="0.0.0.0", server_port=7860, show_error=True)