"""GODSEED wish queue — one god, one wish at a time. A single global asyncio worker serializes every wish: pop -> moderate -> planner.grant(wish, world_summary, act, emit) -> persist trace (injected callback) -> emit wish_granted Admission control in `submit`: 1 pending wish per client, queue cap 50 ("the god is overwhelmed; return at dusk"). The hourly rate limit lives in the server's ratelimit.py, not here. Crash-safe by construction: an exception while granting one wish emits a poetic generic wish_rejected and the worker moves on. SSE emit failures are swallowed (a broken pipe must never reject a wish), persist failures are logged but do not undo a grant (the world already changed, durably enough until the next snapshot). Stdlib only; this is the only async module in the engine. """ from __future__ import annotations import asyncio import contextlib import inspect import logging import re import time import uuid from dataclasses import dataclass from typing import Any, Optional log = logging.getLogger("godseed.engine.queue") QUEUE_MAX = 50 MAX_CALLS_PER_WISH = 12 # planner caps at 7 turns; this is the engine backstop TEXT_PREVIEW_LEN = 60 _SUBMIT_TEXT_CAP = 500 # keep raw text bounded; moderation still sees >140 and denies # Browser-invoked granting (ZeroGPU): GPU work must run inside the WISHER'S # gradio request (their HF auth headers carry the quota — a server-side # background spaces.GPU call fails by design). The worker emits your_turn and # waits for the browser to fetch its token (authenticated) then invoke(). # Windows kept tight so an un-invoked wish can't freeze the shared world for # long: a real browser fetches + invokes within ~2s; 25s covers slow connects. INVOCATION_WINDOW_S = 25.0 GRANT_DEADLINE_S = 150.0 # a 9B wish runs ~50-90s; this bounds a hung grant QUEUE_FULL_REASON = "the god is overwhelmed; return at dusk" ALREADY_PENDING_REASON = "one wish per soul at a time; yours already stands before the god" WANDERED_REASON = "the wisher wandered from the temple; the god moved on" INTERRUPTED_REASON = "the rite was interrupted; the god moved on" NOT_YOUR_TURN_REASON = "this wish is not before the god" RITE_BEGUN_REASON = "the rite has already begun" CRASH_REASON = "the god reached for this wish and it slipped beyond the veil; wish again" BUSY_HEAVENS_REASON = "the heavens are busy and the god could not reach your wish; offer it again in a moment" HANDS_FULL_OBSERVATION = "rejected: the god's hands are full; say done" FALLBACK_EPITAPH = "it is done" _WISH_ID_RE = re.compile(r"^w_(\d+)$") class QueueError(Exception): """Admission failure; `.reason` is poetic and safe to show users.""" def __init__(self, reason: str) -> None: super().__init__(reason) self.reason = reason class QueueFull(QueueError): pass class AlreadyPending(QueueError): pass @dataclass class _Item: wish_id: str text: str client_id: str submitted_at: float def _trace_get(trace: Any, key: str) -> Any: if isinstance(trace, dict): return trace.get(key) return getattr(trace, key, None) def _trace_set(trace: Any, key: str, value: Any) -> None: try: if isinstance(trace, dict): trace[key] = value else: setattr(trace, key, value) except Exception: # frozen dataclass etc. — best-effort pass def _epitaph_from_reading(reading: Any) -> Optional[str]: """When the model never reaches a clean done-turn, the log fills with a generic epitaph. Salvage the reading's last sentence instead — it's the god's own closing line about THIS wish, so the Genesis Log stays varied.""" text = str(reading or "").strip() if not text: return None parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", text) if p.strip()] return parts[-1][:120] if parts else None class QueueWorker: """Single serialized worker; construct once, `await start()` at boot. Injected collaborators (duck-typed, no engine->mind/server imports): world engine.world.World (apply/summary/record_epitaph/epoch) moderator engine.moderation.Moderator (async check) planner object with `async grant(wish, world_summary, act, emit)` emit async callable broadcasting one SSE event dict persist callable (sync or async) receiving the WishTrace; optional """ def __init__( self, world: Any, moderator: Any, planner: Any, emit: Any, persist: Any = None, *, max_queue: int = QUEUE_MAX, max_calls_per_wish: int = MAX_CALLS_PER_WISH, next_wish_index: Optional[int] = None, browser_invoked: bool = False, invocation_window: float = INVOCATION_WINDOW_S, grant_deadline: float = GRANT_DEADLINE_S, ) -> None: self._world = world self._moderator = moderator self._planner = planner self._emit = emit self._persist = persist self._max_queue = max_queue self._max_calls = max_calls_per_wish self._queue: asyncio.Queue[_Item] = asyncio.Queue() self._pending: dict[str, str] = {} # client_id -> wish_id self._current: Optional[_Item] = None self._task: Optional[asyncio.Task] = None self._browser_invoked = browser_invoked self._invocation_window = invocation_window self._grant_deadline = grant_deadline self._awaiting: Optional[dict[str, Any]] = None self._wish_counter = ( next_wish_index if next_wish_index is not None else self._derive_wish_counter(world) ) @staticmethod def _derive_wish_counter(world: Any) -> int: """Continue numbering after the highest persisted w_NNNNNN.""" best = 0 for feature in getattr(world, "features", ()) or (): match = _WISH_ID_RE.match(getattr(feature, "wish_id", "") or "") if match: best = max(best, int(match.group(1)) + 1) return best or 1 # first-ever wish is w_000001 # --------------------------------------------------------------- admission async def submit(self, text: Any, client_id: Any) -> tuple[str, int]: """Enqueue a wish. Returns (wish_id, position). Position counts the wish itself plus everyone ahead of it (including the one being granted), so an idle empty queue yields position 1. Raises AlreadyPending / QueueFull with poetic `.reason`. """ text = ("" if text is None else str(text)).strip()[:_SUBMIT_TEXT_CAP] client_id = str(client_id) if client_id in self._pending: raise AlreadyPending(ALREADY_PENDING_REASON) if self._queue.qsize() >= self._max_queue: raise QueueFull(QUEUE_FULL_REASON) wish_id = f"w_{self._wish_counter:06d}" self._wish_counter += 1 item = _Item(wish_id=wish_id, text=text, client_id=client_id, submitted_at=time.time()) self._pending[client_id] = wish_id self._queue.put_nowait(item) position = self._queue.qsize() + (1 if self._current else 0) await self._emit_queue() return wish_id, position # --------------------------------------------------------------- lifecycle async def start(self) -> None: if self._task is None or self._task.done(): self._task = asyncio.get_running_loop().create_task( self._run(), name="godseed-queue-worker" ) async def stop(self) -> None: if self._task is not None: self._task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._task self._task = None @property def is_running(self) -> bool: return self._task is not None and not self._task.done() @property def queue_length(self) -> int: return self._queue.qsize() @property def current(self) -> Optional[dict]: if self._current is None: return None return { "wish_id": self._current.wish_id, "text_preview": self._current.text[:TEXT_PREVIEW_LEN], } def snapshot(self) -> dict: """The server's QueueLike view: waiting count + the wish being granted.""" return {"length": self._queue.qsize(), "current": self.current} # --------------------------------------------------------------- worker async def _run(self) -> None: while True: item = await self._queue.get() self._current = item try: await self._process(item) except asyncio.CancelledError: raise except Exception: # one wish must NEVER kill the worker log.exception("wish %s crashed; the god moves on", item.wish_id) await self._safe_emit( {"type": "wish_rejected", "wish_id": item.wish_id, "reason": CRASH_REASON} ) finally: self._current = None self._pending.pop(item.client_id, None) self._queue.task_done() await self._emit_queue() async def _process(self, item: _Item) -> None: if not self._browser_invoked: await self.grant_item(item) return # Browser-invoked mode: announce the turn, then wait for the wisher's # browser to call invoke() (a gradio API request carrying their ZeroGPU # quota). No invocation within the window -> the god moves on. token = uuid.uuid4().hex loop = asyncio.get_running_loop() started: asyncio.Future[bool] = loop.create_future() done: asyncio.Future[bool] = loop.create_future() self._awaiting = { "wish_id": item.wish_id, "token": token, "client_id": item.client_id, "item": item, "started": started, "done": done, "begun": False, } try: # SECURITY: the token is NOT broadcast. your_turn carries only the # wish_id over the public SSE; the owner's browser fetches the secret # token from GET /api/turn (cookie-authenticated, identity-bound). # A stranger tailing SSE sees the wish_id but cannot get the token. turn_event = { "type": "your_turn", "wish_id": item.wish_id, "window_s": self._invocation_window, } # Announce in thirds: re-emits cover SSE reconnects and slow tabs # (the browser also buffers early arrivals, but belt and braces). third = max(self._invocation_window / 3.0, 0.01) invoked = False for _ in range(3): await self._safe_emit(turn_event) try: await asyncio.wait_for(asyncio.shield(started), third) invoked = True break except asyncio.TimeoutError: continue if not invoked: await self._safe_emit( {"type": "wish_rejected", "wish_id": item.wish_id, "reason": WANDERED_REASON} ) return try: await asyncio.wait_for(asyncio.shield(done), self._grant_deadline) except asyncio.TimeoutError: # The invoked grant is hung past any sane wish length; the queue # must not stall behind it. (The stray request may still finish # server-side — logged, accepted as a rare edge.) log.error("grant for %s exceeded deadline; moving on", item.wish_id) await self._safe_emit( {"type": "wish_rejected", "wish_id": item.wish_id, "reason": INTERRUPTED_REASON} ) finally: self._awaiting = None def turn_token(self, wish_id: Any, client_id: Any) -> Optional[str]: """Return the secret invocation token for the wish currently before the god — but ONLY to its owner (identity-bound). The server exposes this over a cookie-authenticated route so the token never touches public SSE. Returns None when it isn't this caller's turn.""" awaiting = self._awaiting if ( awaiting is None or awaiting["wish_id"] != str(wish_id) or awaiting["client_id"] != str(client_id) ): return None return awaiting["token"] async def invoke(self, wish_id: Any, token: Any) -> dict: """Run the grant for the wish currently before the god. Called from the wisher's gradio API request so the GPU work runs in THEIR quota context. The token is a secret capability delivered only to the authenticated owner (see turn_token). Raises QueueError when it isn't this wish's turn.""" awaiting = self._awaiting if ( awaiting is None or awaiting["wish_id"] != str(wish_id) or awaiting["token"] != str(token) ): raise QueueError(NOT_YOUR_TURN_REASON) if awaiting["begun"]: raise QueueError(RITE_BEGUN_REASON) awaiting["begun"] = True if not awaiting["started"].done(): awaiting["started"].set_result(True) try: await self.grant_item(awaiting["item"]) return {"ok": True, "wish_id": str(wish_id)} except Exception: log.exception("invoked grant for %s crashed", wish_id) await self._safe_emit( {"type": "wish_rejected", "wish_id": str(wish_id), "reason": CRASH_REASON} ) return {"ok": False, "wish_id": str(wish_id), "reason": CRASH_REASON} finally: if not awaiting["done"].done(): awaiting["done"].set_result(True) async def grant_item(self, item: _Item) -> None: """Moderate -> plan/act -> persist -> emit. The whole rite for one wish; runs in the worker task (server mode) or inside the wisher's gradio request (browser-invoked mode).""" await self._safe_emit({"type": "wish_started", "wish_id": item.wish_id, "text": item.text}) await self._emit_queue() verdict = await self._moderator.check(item.text) if not verdict.allowed: await self._safe_emit( { "type": "wish_rejected", "wish_id": item.wish_id, "reason": verdict.poetic_reason or "the god declines this wish", } ) return world_summary = self._world.summary() calls = 0 landed = 0 # features actually applied to the world this wish async def act(call: dict) -> str: """Execute one tool call; emits tool_call + world_delta on success.""" nonlocal calls, landed index = calls calls += 1 if index >= self._max_calls: return HANDS_FULL_OBSERVATION # SECURITY: the MODEL composes inscribe_wish text — a clean wish can # still make it write a disallowed phrase that gets drawn PERMANENTLY # onto the shared world. Re-moderate that text before it can land; # default-deny so the world never carries an unvetted inscription. if isinstance(call, dict) and call.get("tool") == "inscribe_wish": ins = (call.get("args") or {}).get("text", "") if not self._moderator.check_content(ins).allowed: return "rejected: those words may not be written here; choose gentler ones" feature, observation = self._world.apply(item.wish_id, index, call) if feature is not None: landed += 1 await self._safe_emit( { "type": "tool_call", "wish_id": item.wish_id, "call_index": index, "tool": feature.tool, "args": dict(feature.args), } ) await self._safe_emit({"type": "world_delta", "feature": feature.to_dict()}) return observation trace = await self._planner.grant(item.text, world_summary, act, self._safe_emit) # If the wish built NOTHING and the god never actually planned — every # turn an error (GPU/model failure), not a clean "done" — reject honestly # and DON'T persist an empty trace that pollutes the shared Genesis Log. # A silent empty grant looks broken to the wisher (June 12: "can't see if # something happened"). A genuine done-with-no-build leaves a non-error # turn, so this only fires on real failure. turns = _trace_get(trace, "turns") or [] all_errored = bool(turns) and all( str((t or {}).get("observation") or "").startswith("error:") for t in turns ) if landed == 0 and (not turns or all_errored): await self._safe_emit( {"type": "wish_rejected", "wish_id": item.wish_id, "reason": BUSY_HEAVENS_REASON} ) return # SECURITY: the reading + epitaph are model-composed and persist to the # public dataset + the Genesis Log. Re-moderate; redact on denial so no # unvetted prose survives in the archive. (The reading already streamed # live, but the durable record — what judges and visitors re-read — is # clean.) Epitaph shows on the grant card, so a denied one is replaced. reading = _trace_get(trace, "reading") if reading and not self._moderator.check_content(reading).allowed: _trace_set(trace, "reading", "the god read this wish in silence.") epitaph = _trace_get(trace, "epitaph") if not epitaph or not self._moderator.check_content(epitaph).allowed: epitaph = _epitaph_from_reading(_trace_get(trace, "reading")) or FALLBACK_EPITAPH _trace_set(trace, "epitaph", epitaph) self._enrich_trace(trace, item) if self._persist is not None: try: result = self._persist(trace) if inspect.isawaitable(result): result = await result except asyncio.CancelledError: raise except Exception: log.exception("persist failed for %s (grant stands)", item.wish_id) self._world.record_epitaph(epitaph) await self._safe_emit( { "type": "wish_granted", "wish_id": item.wish_id, "epitaph": str(epitaph), "epoch": self._world.epoch, } ) # --------------------------------------------------------------- helpers def _enrich_trace(self, trace: Any, item: _Item) -> None: """Fill trace fields the planner cannot know (wish_id, moderation, ...).""" fields = { "wish_id": item.wish_id, "text": item.text, "submitted_at": item.submitted_at, } try: if isinstance(trace, dict): for key, value in fields.items(): trace.setdefault(key, value) trace.setdefault("moderation", {"allowed": True, "category": None}) elif trace is not None: for key, value in fields.items(): if not getattr(trace, key, None): setattr(trace, key, value) if getattr(trace, "moderation", None) is None: setattr(trace, "moderation", {"allowed": True, "category": None}) except Exception: # frozen dataclass etc. — enrichment is best-effort log.debug("trace enrichment skipped for %s", item.wish_id, exc_info=True) async def _emit_queue(self) -> None: await self._safe_emit( {"type": "queue", "length": self._queue.qsize(), "current": self.current} ) async def _safe_emit(self, event: dict) -> None: """Broadcast one SSE event; emit failures must never break a grant.""" try: await self._emit(event) except asyncio.CancelledError: raise except Exception: log.warning("emit failed for event type %s", event.get("type"), exc_info=True)