MarianaCodebase's picture
Upload app.py with huggingface_hub
97239fa verified
"""Lost Frequency Radio backend — gradio.Server + streaming /tune."""
import queue
import re
import threading
import time
import zlib
from pathlib import Path
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from gradio import Server
from decrypt import (
ENCRYPTED_FREQUENCY,
OPERATOR_SYSTEM_PROMPTS,
is_valid_code,
numbers_broadcast,
reveal_text,
seq_for,
validate_bypass,
)
from game import (
FINALE_SEED,
FINALE_SYSTEM_PROMPT,
FINALE_USER_PROMPT,
fragment_text,
get_fragment,
hidden_band_catalog,
)
from model import stream_broadcast
from stations import (
CURSED_FREQUENCY,
CURSED_SYSTEM_PROMPT,
STATION_TOLERANCE,
get_station,
localized_system_prompt,
localized_user_prompt,
nearest_station_frequency,
normalize_lang,
signal_strength,
station_catalog,
station_seed,
station_system_prompt,
)
ROOT = Path(__file__).parent
class _NoCacheStatic(StaticFiles):
"""Serve static assets with revalidation so updated CSS/JS reach listeners
immediately after a deploy (and during local development)."""
async def get_response(self, path, scope):
response = await super().get_response(path, scope)
response.headers["Cache-Control"] = "no-cache, must-revalidate"
return response
app = Server(title="Lost Frequency Radio")
app.mount("/static", _NoCacheStatic(directory=ROOT / "static"), name="static")
@app.get("/", response_class=HTMLResponse)
async def homepage():
return (ROOT / "index.html").read_text(encoding="utf-8")
@app.get("/api/stations")
async def stations_api(variant: float = 0):
catalog = station_catalog()
enc = catalog.get(str(ENCRYPTED_FREQUENCY))
if enc is not None:
# the per-listener Morse sequence (public: it is dictated on air)
enc["sequence"] = seq_for(_norm_variant(variant))
return catalog
@app.get("/api/band")
async def band_api(code: str = "", variant: float = 0):
"""Hidden band: only with the code obtained by breaking the cipher."""
if not is_valid_code(code, _norm_variant(variant)):
return {"ok": False}
return {"ok": True, **hidden_band_catalog()}
def _stream_fixed(text: str, char_delay: float = 0.025, should_continue=None):
"""Streams a fixed text with teletype cadence (accumulated)."""
accumulated = ""
for char in text:
if should_continue is not None and not should_continue():
return
accumulated += char
yield accumulated
time.sleep(char_delay)
# Broadcasts are deterministic per (frequency, language, listener variant) →
# in-memory cache: retuning the same station replays instantly (key on CPU).
_broadcast_cache: dict[str, str] = {}
_CACHE_MAX = 256
# Rapid retuning: each /tune bumps this epoch. An in-flight generation that is
# no longer the latest self-terminates at the next token, releasing the model
# lock so the newest tune can run. Without this, a superseded generation runs to
# completion holding the lock and every later tune stalls behind it (the
# "everything goes to static and never comes back" bug).
_tune_epoch = 0
_tune_epoch_lock = threading.Lock()
def _next_tune_epoch() -> int:
global _tune_epoch
with _tune_epoch_lock:
_tune_epoch += 1
return _tune_epoch
# The 1B model occasionally regurgitates its own system prompt on air. Three
# layers scrub it before any listener hears it, and none of them touch the
# canonical [JINGLE]/[INTERFERENCIA]/[CORTE COMERCIAL]/[FIN DE TRANSMISION]
# markers (those are control tokens the frontend localizes and acts on).
# 1) Chat/special tokens, whole <think> blocks, and bare role labels that may
# bleed through (model.py already filters thinking; this is a backstop).
_TOKEN_RE = re.compile(
r"(?is)<think>.*?</think>|</?think>|"
r"<\|im_start\|>\s*(?:system|user|assistant)?|<\|im_end\|>|<\|endoftext\|>"
)
_ROLE_LINE_RE = re.compile(r"(?im)^\s*(?:system|user|assistant)\s*[::]?\s*$")
# 2) Whole sentences that are instruction-shaped (the dangerous parrots).
_LEAK_KEYWORDS = (
# Spanish
r"escrib\w*\s+s[oó]lo\s+el\s+gui[oó]n", r"gui[oó]n\s+al\s+aire",
r"sin\s+explicaciones", r"entre\s+\d+\s+y\s+\d+\s+palabras",
r"escribes?\s+(?:el\s+)?gui[oó]n", r"traduce\s+tu\s+frase",
r"ni\s+una\s+palabra\s+en\s+(?:ingl[eé]s|espa[nñ]ol)",
r"ejemplo\s+del\s+tono", r"responde\s+en\s+una\s+o\s+dos",
r"cierra\s+(?:siempre\s+)?con\s+\[", r"transmites\s+[ií]ntegramente",
# English
r"write\s+only\s+the\s+on-?air", r"on-?air\s+script\s+only",
r"no\s+explanations", r"between\s+\d+\s+and\s+\d+\s+words",
r"you\s+write\s+(?:the\s+)?(?:on-?air\s+|radio\s+)?scripts?",
r"translate\s+your\s+opening", r"not\s+a\s+single\s+word\s+of",
r"example\s+of\s+the\s+(?:expected\s+)?tone",
r"reply\s+in\s+one\s+or\s+two", r"always\s+close\s+with\s+\[",
r"you\s+(?:broadcast|speak)\s+(?:and\s+write\s+)?(?:only|entirely)\s+in",
r"never\s+in\s+spanish", r"translate\s+every\s+line",
# French
r"uniquement\s+le\s+script", r"sans\s+explications",
r"entre\s+\d+\s+et\s+\d+\s+mots", r"tu\s+[ée]cris\s+le\s+script",
r"traduis\s+(?:ta\s+phrase|chaque\s+ligne)", r"pas\s+un\s+mot\s+d",
r"exemple\s+du\s+ton", r"r[ée]ponds\s+en\s+une\s+ou\s+deux",
r"termine\s+(?:toujours\s+)?par\s+\[", r"tu\s+[ée]mets\s+enti[èe]rement",
r"tu\s+parles\s+et\s+[ée]cris", r"jamais\s+en\s+espagnol",
r"hablas\s+y\s+escribes\s+solo", r"nunca\s+en\s+(?:ingl[eé]s|espa[nñ]ol)",
)
_LEAK_RE = re.compile(
r"(?i)[^.\n!?]*(?:" + "|".join(_LEAK_KEYWORDS) + r")[^.\n!?]*[.\n!?]*"
)
# 3) Bare "you begin with:" lead-ins — strip only the prefix, keep the opening
# line that legitimately follows it.
_LEAD_RE = re.compile(
r"(?i)\b(?:you\s+begin\s+with|empiezas\s+con|tu\s+commences\s+par)\s*[::]?\s*"
)
def _strip_leaks(text: str) -> str:
text = _TOKEN_RE.sub(" ", text)
text = _ROLE_LINE_RE.sub(" ", text)
text = _LEAK_RE.sub(" ", text)
text = _LEAD_RE.sub("", text)
# collapse the blanks the substitutions leave behind, keep newlines
text = re.sub(r"[ \t]{2,}", " ", text)
return text.lstrip()
_GEN_ERR = object()
def _threaded_broadcast(system_prompt: str, should_continue, **kwargs):
"""Generate in a worker thread and hand tokens over via a queue.
The model lock lives inside `stream_broadcast`, held for the whole
generation. If we iterated it directly from the HTTP response generator and
the client disconnected, this generator would be suspended at a `yield`
forever — still holding the lock — and every later generation would block
(the "everything turns to static and never recovers" bug).
Running it in a worker thread decouples the lock from the HTTP consumer: the
worker keeps pulling tokens and checks `should_continue` itself, so it always
finishes (and releases the lock) — when superseded, on disconnect, or at
max_tokens — regardless of whether anyone is still listening.
"""
q: queue.Queue = queue.Queue()
DONE = object()
def worker():
try:
for token in stream_broadcast(system_prompt, **kwargs):
if should_continue is not None and not should_continue():
break
q.put(token)
except Exception as exc: # noqa: BLE001 - surface, don't crash the worker
q.put((_GEN_ERR, exc))
finally:
q.put(DONE)
threading.Thread(target=worker, daemon=True).start()
while True:
item = q.get()
if item is DONE:
return
if isinstance(item, tuple) and item and item[0] is _GEN_ERR:
print(f"[model] generation error: {item[1]}")
return
yield item
def _stream_model_cached(cache_key: str, system_prompt: str, should_continue=None, **kwargs):
def superseded():
return should_continue is not None and not should_continue()
cached = _broadcast_cache.get(cache_key)
if cached:
yield from _stream_fixed(cached, char_delay=0.018, should_continue=should_continue)
return
accumulated = ""
completed = False
try:
for token in _threaded_broadcast(system_prompt, should_continue, **kwargs):
if superseded():
return # a newer tune arrived: stop streaming to this listener
accumulated += token
yield _strip_leaks(accumulated)
completed = True
finally:
# Only cache full broadcasts: a cancelled stream (listener retuned)
# must never freeze a truncated script for everyone after.
clean = _strip_leaks(accumulated)
if completed and clean.strip():
if len(_broadcast_cache) >= _CACHE_MAX:
_broadcast_cache.pop(next(iter(_broadcast_cache)))
_broadcast_cache[cache_key] = clean
def _mix_seed(base: int, variant: int) -> int:
"""Blends the station seed with the listener's session variant."""
if not variant:
return base
return (base ^ (variant * 2654435761)) % (2**31 - 1)
def _norm_variant(variant) -> int:
try:
return abs(int(float(variant))) % (2**31 - 1)
except (TypeError, ValueError):
return 0
# concurrency 2: a cancelled-but-unfinished stream must not head-block the
# queue (that froze "the other stations" when retuning fast). Real generation
# stays serialized by model._gen_lock.
@app.api(name="tune", concurrency_limit=2, stream_every=0.02)
def tune(frequency: float, language: str = "es", variant: float = 0) -> str:
"""Tunes a frequency and streams the broadcast (SSE).
`variant` is a per-listener session nonce: every visitor hears their own
edition of each station (the world re-broadcasts for every new listener).
"""
frequency = float(frequency)
lang = normalize_lang(language)
var = _norm_variant(variant)
my_epoch = _next_tune_epoch()
def still_current():
return my_epoch == _tune_epoch
# Encrypted number station: fixed text, the frontend adds the Morse.
if abs(frequency - ENCRYPTED_FREQUENCY) <= STATION_TOLERANCE:
yield from _stream_fixed(numbers_broadcast(lang, var), should_continue=still_current)
return
# The unsolvable station: the model broadcasts, the frontend never cleans it.
if abs(frequency - CURSED_FREQUENCY) <= STATION_TOLERANCE:
yield from _stream_model_cached(
f"cursed:{CURSED_FREQUENCY}:{lang}:{var}",
localized_system_prompt(CURSED_SYSTEM_PROMPT, "es", lang),
should_continue=still_current,
user_prompt=localized_user_prompt(None, lang),
seed=_mix_seed(station_seed(CURSED_FREQUENCY), var),
max_tokens=160,
)
return
# Hidden band fragments (minigame): deterministic text.
fragment = get_fragment(frequency)
if fragment is not None:
yield from _stream_fixed(fragment_text(fragment, lang), should_continue=still_current)
return
canonical = nearest_station_frequency(frequency)
station = get_station(frequency)
if station is None or canonical is None or signal_strength(frequency) <= 0:
yield ""
return
yield from _stream_model_cached(
f"station:{canonical}:{lang}:{var}",
station_system_prompt(canonical, lang),
should_continue=still_current,
user_prompt=localized_user_prompt(station, lang),
seed=_mix_seed(station_seed(canonical), var),
)
@app.api(name="transmit", concurrency_limit=1, stream_every=0.02)
def transmit(message: str, language: str = "es", variant: float = 0) -> str:
"""Console of the encrypted frequency.
Victory is decided by decrypt.validate_bypass (deterministic);
the model only role-plays the operator's resistance.
"""
message = str(message).strip()[:500]
lang = normalize_lang(language)
var = _norm_variant(variant)
if not message:
yield ""
return
if validate_bypass(message, var):
yield from _stream_fixed(reveal_text(lang, var), char_delay=0.02)
return
accumulated = ""
for token in _threaded_broadcast(
OPERATOR_SYSTEM_PROMPTS[lang],
None,
user_prompt=message,
seed=zlib.crc32(message.encode("utf-8")) % (2**31 - 1),
max_tokens=90,
):
accumulated += token
yield _strip_leaks(accumulated)
@app.api(name="finale", concurrency_limit=1, stream_every=0.02)
def finale(code: str, language: str = "es", variant: float = 0) -> str:
"""The complete final transmission (after collecting every fragment)."""
lang = normalize_lang(language)
if not is_valid_code(str(code), _norm_variant(variant)):
yield "[ACCESO DENEGADO]"
return
yield from _stream_model_cached(
f"finale:{lang}",
localized_system_prompt(FINALE_SYSTEM_PROMPT, "es", lang),
user_prompt=FINALE_USER_PROMPT,
seed=FINALE_SEED,
max_tokens=300,
)
if __name__ == "__main__":
app.launch(server_name="0.0.0.0", server_port=7860, show_error=True)