Spaces:
Sleeping
Sleeping
FORGIVING TOWNS: map any building kind (never reject), district reroute, restore god's voice (content-only moderation), town few-shot pushes build_district+roads, honest fail on empty
baf9d60 verified | """GODSEED wish queue — one god, one wish at a time. | |
| A single global asyncio worker serializes every wish: | |
| pop -> moderate -> planner.grant(wish, world_summary, act, emit) | |
| -> persist trace (injected callback) -> emit wish_granted | |
| Admission control in `submit`: 1 pending wish per client, queue cap 50 | |
| ("the god is overwhelmed; return at dusk"). The hourly rate limit lives in | |
| the server's ratelimit.py, not here. | |
| Crash-safe by construction: an exception while granting one wish emits a | |
| poetic generic wish_rejected and the worker moves on. SSE emit failures are | |
| swallowed (a broken pipe must never reject a wish), persist failures are | |
| logged but do not undo a grant (the world already changed, durably enough | |
| until the next snapshot). | |
| Stdlib only; this is the only async module in the engine. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import contextlib | |
| import inspect | |
| import logging | |
| import re | |
| import time | |
| import uuid | |
| from dataclasses import dataclass | |
| from typing import Any, Optional | |
| log = logging.getLogger("godseed.engine.queue") | |
| QUEUE_MAX = 50 | |
| MAX_CALLS_PER_WISH = 12 # planner caps at 7 turns; this is the engine backstop | |
| TEXT_PREVIEW_LEN = 60 | |
| _SUBMIT_TEXT_CAP = 500 # keep raw text bounded; moderation still sees >140 and denies | |
| # Browser-invoked granting (ZeroGPU): GPU work must run inside the WISHER'S | |
| # gradio request (their HF auth headers carry the quota — a server-side | |
| # background spaces.GPU call fails by design). The worker emits your_turn and | |
| # waits for the browser to fetch its token (authenticated) then invoke(). | |
| # Windows kept tight so an un-invoked wish can't freeze the shared world for | |
| # long: a real browser fetches + invokes within ~2s; 25s covers slow connects. | |
| INVOCATION_WINDOW_S = 25.0 | |
| GRANT_DEADLINE_S = 150.0 # a 9B wish runs ~50-90s; this bounds a hung grant | |
| QUEUE_FULL_REASON = "the god is overwhelmed; return at dusk" | |
| ALREADY_PENDING_REASON = "one wish per soul at a time; yours already stands before the god" | |
| WANDERED_REASON = "the wisher wandered from the temple; the god moved on" | |
| INTERRUPTED_REASON = "the rite was interrupted; the god moved on" | |
| NOT_YOUR_TURN_REASON = "this wish is not before the god" | |
| RITE_BEGUN_REASON = "the rite has already begun" | |
| CRASH_REASON = "the god reached for this wish and it slipped beyond the veil; wish again" | |
| BUSY_HEAVENS_REASON = "the heavens are busy and the god could not reach your wish; offer it again in a moment" | |
| HANDS_FULL_OBSERVATION = "rejected: the god's hands are full; say done" | |
| FALLBACK_EPITAPH = "it is done" | |
| _WISH_ID_RE = re.compile(r"^w_(\d+)$") | |
| class QueueError(Exception): | |
| """Admission failure; `.reason` is poetic and safe to show users.""" | |
| def __init__(self, reason: str) -> None: | |
| super().__init__(reason) | |
| self.reason = reason | |
| class QueueFull(QueueError): | |
| pass | |
| class AlreadyPending(QueueError): | |
| pass | |
| class _Item: | |
| wish_id: str | |
| text: str | |
| client_id: str | |
| submitted_at: float | |
| def _trace_get(trace: Any, key: str) -> Any: | |
| if isinstance(trace, dict): | |
| return trace.get(key) | |
| return getattr(trace, key, None) | |
| def _trace_set(trace: Any, key: str, value: Any) -> None: | |
| try: | |
| if isinstance(trace, dict): | |
| trace[key] = value | |
| else: | |
| setattr(trace, key, value) | |
| except Exception: # frozen dataclass etc. — best-effort | |
| pass | |
| def _epitaph_from_reading(reading: Any) -> Optional[str]: | |
| """When the model never reaches a clean done-turn, the log fills with a | |
| generic epitaph. Salvage the reading's last sentence instead — it's the | |
| god's own closing line about THIS wish, so the Genesis Log stays varied.""" | |
| text = str(reading or "").strip() | |
| if not text: | |
| return None | |
| parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", text) if p.strip()] | |
| return parts[-1][:120] if parts else None | |
| class QueueWorker: | |
| """Single serialized worker; construct once, `await start()` at boot. | |
| Injected collaborators (duck-typed, no engine->mind/server imports): | |
| world engine.world.World (apply/summary/record_epitaph/epoch) | |
| moderator engine.moderation.Moderator (async check) | |
| planner object with `async grant(wish, world_summary, act, emit)` | |
| emit async callable broadcasting one SSE event dict | |
| persist callable (sync or async) receiving the WishTrace; optional | |
| """ | |
| def __init__( | |
| self, | |
| world: Any, | |
| moderator: Any, | |
| planner: Any, | |
| emit: Any, | |
| persist: Any = None, | |
| *, | |
| max_queue: int = QUEUE_MAX, | |
| max_calls_per_wish: int = MAX_CALLS_PER_WISH, | |
| next_wish_index: Optional[int] = None, | |
| browser_invoked: bool = False, | |
| invocation_window: float = INVOCATION_WINDOW_S, | |
| grant_deadline: float = GRANT_DEADLINE_S, | |
| ) -> None: | |
| self._world = world | |
| self._moderator = moderator | |
| self._planner = planner | |
| self._emit = emit | |
| self._persist = persist | |
| self._max_queue = max_queue | |
| self._max_calls = max_calls_per_wish | |
| self._queue: asyncio.Queue[_Item] = asyncio.Queue() | |
| self._pending: dict[str, str] = {} # client_id -> wish_id | |
| self._current: Optional[_Item] = None | |
| self._task: Optional[asyncio.Task] = None | |
| self._browser_invoked = browser_invoked | |
| self._invocation_window = invocation_window | |
| self._grant_deadline = grant_deadline | |
| self._awaiting: Optional[dict[str, Any]] = None | |
| self._wish_counter = ( | |
| next_wish_index if next_wish_index is not None else self._derive_wish_counter(world) | |
| ) | |
| def _derive_wish_counter(world: Any) -> int: | |
| """Continue numbering after the highest persisted w_NNNNNN.""" | |
| best = 0 | |
| for feature in getattr(world, "features", ()) or (): | |
| match = _WISH_ID_RE.match(getattr(feature, "wish_id", "") or "") | |
| if match: | |
| best = max(best, int(match.group(1)) + 1) | |
| return best or 1 # first-ever wish is w_000001 | |
| # --------------------------------------------------------------- admission | |
| async def submit(self, text: Any, client_id: Any) -> tuple[str, int]: | |
| """Enqueue a wish. Returns (wish_id, position). Position counts the | |
| wish itself plus everyone ahead of it (including the one being granted), | |
| so an idle empty queue yields position 1. | |
| Raises AlreadyPending / QueueFull with poetic `.reason`. | |
| """ | |
| text = ("" if text is None else str(text)).strip()[:_SUBMIT_TEXT_CAP] | |
| client_id = str(client_id) | |
| if client_id in self._pending: | |
| raise AlreadyPending(ALREADY_PENDING_REASON) | |
| if self._queue.qsize() >= self._max_queue: | |
| raise QueueFull(QUEUE_FULL_REASON) | |
| wish_id = f"w_{self._wish_counter:06d}" | |
| self._wish_counter += 1 | |
| item = _Item(wish_id=wish_id, text=text, client_id=client_id, submitted_at=time.time()) | |
| self._pending[client_id] = wish_id | |
| self._queue.put_nowait(item) | |
| position = self._queue.qsize() + (1 if self._current else 0) | |
| await self._emit_queue() | |
| return wish_id, position | |
| # --------------------------------------------------------------- lifecycle | |
| async def start(self) -> None: | |
| if self._task is None or self._task.done(): | |
| self._task = asyncio.get_running_loop().create_task( | |
| self._run(), name="godseed-queue-worker" | |
| ) | |
| async def stop(self) -> None: | |
| if self._task is not None: | |
| self._task.cancel() | |
| with contextlib.suppress(asyncio.CancelledError): | |
| await self._task | |
| self._task = None | |
| def is_running(self) -> bool: | |
| return self._task is not None and not self._task.done() | |
| def queue_length(self) -> int: | |
| return self._queue.qsize() | |
| def current(self) -> Optional[dict]: | |
| if self._current is None: | |
| return None | |
| return { | |
| "wish_id": self._current.wish_id, | |
| "text_preview": self._current.text[:TEXT_PREVIEW_LEN], | |
| } | |
| def snapshot(self) -> dict: | |
| """The server's QueueLike view: waiting count + the wish being granted.""" | |
| return {"length": self._queue.qsize(), "current": self.current} | |
| # --------------------------------------------------------------- worker | |
| async def _run(self) -> None: | |
| while True: | |
| item = await self._queue.get() | |
| self._current = item | |
| try: | |
| await self._process(item) | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception: | |
| # one wish must NEVER kill the worker | |
| log.exception("wish %s crashed; the god moves on", item.wish_id) | |
| await self._safe_emit( | |
| {"type": "wish_rejected", "wish_id": item.wish_id, "reason": CRASH_REASON} | |
| ) | |
| finally: | |
| self._current = None | |
| self._pending.pop(item.client_id, None) | |
| self._queue.task_done() | |
| await self._emit_queue() | |
| async def _process(self, item: _Item) -> None: | |
| if not self._browser_invoked: | |
| await self.grant_item(item) | |
| return | |
| # Browser-invoked mode: announce the turn, then wait for the wisher's | |
| # browser to call invoke() (a gradio API request carrying their ZeroGPU | |
| # quota). No invocation within the window -> the god moves on. | |
| token = uuid.uuid4().hex | |
| loop = asyncio.get_running_loop() | |
| started: asyncio.Future[bool] = loop.create_future() | |
| done: asyncio.Future[bool] = loop.create_future() | |
| self._awaiting = { | |
| "wish_id": item.wish_id, | |
| "token": token, | |
| "client_id": item.client_id, | |
| "item": item, | |
| "started": started, | |
| "done": done, | |
| "begun": False, | |
| } | |
| try: | |
| # SECURITY: the token is NOT broadcast. your_turn carries only the | |
| # wish_id over the public SSE; the owner's browser fetches the secret | |
| # token from GET /api/turn (cookie-authenticated, identity-bound). | |
| # A stranger tailing SSE sees the wish_id but cannot get the token. | |
| turn_event = { | |
| "type": "your_turn", | |
| "wish_id": item.wish_id, | |
| "window_s": self._invocation_window, | |
| } | |
| # Announce in thirds: re-emits cover SSE reconnects and slow tabs | |
| # (the browser also buffers early arrivals, but belt and braces). | |
| third = max(self._invocation_window / 3.0, 0.01) | |
| invoked = False | |
| for _ in range(3): | |
| await self._safe_emit(turn_event) | |
| try: | |
| await asyncio.wait_for(asyncio.shield(started), third) | |
| invoked = True | |
| break | |
| except asyncio.TimeoutError: | |
| continue | |
| if not invoked: | |
| await self._safe_emit( | |
| {"type": "wish_rejected", "wish_id": item.wish_id, "reason": WANDERED_REASON} | |
| ) | |
| return | |
| try: | |
| await asyncio.wait_for(asyncio.shield(done), self._grant_deadline) | |
| except asyncio.TimeoutError: | |
| # The invoked grant is hung past any sane wish length; the queue | |
| # must not stall behind it. (The stray request may still finish | |
| # server-side — logged, accepted as a rare edge.) | |
| log.error("grant for %s exceeded deadline; moving on", item.wish_id) | |
| await self._safe_emit( | |
| {"type": "wish_rejected", "wish_id": item.wish_id, "reason": INTERRUPTED_REASON} | |
| ) | |
| finally: | |
| self._awaiting = None | |
| def turn_token(self, wish_id: Any, client_id: Any) -> Optional[str]: | |
| """Return the secret invocation token for the wish currently before the | |
| god — but ONLY to its owner (identity-bound). The server exposes this | |
| over a cookie-authenticated route so the token never touches public SSE. | |
| Returns None when it isn't this caller's turn.""" | |
| awaiting = self._awaiting | |
| if ( | |
| awaiting is None | |
| or awaiting["wish_id"] != str(wish_id) | |
| or awaiting["client_id"] != str(client_id) | |
| ): | |
| return None | |
| return awaiting["token"] | |
| async def invoke(self, wish_id: Any, token: Any) -> dict: | |
| """Run the grant for the wish currently before the god. Called from the | |
| wisher's gradio API request so the GPU work runs in THEIR quota context. | |
| The token is a secret capability delivered only to the authenticated | |
| owner (see turn_token). Raises QueueError when it isn't this wish's turn.""" | |
| awaiting = self._awaiting | |
| if ( | |
| awaiting is None | |
| or awaiting["wish_id"] != str(wish_id) | |
| or awaiting["token"] != str(token) | |
| ): | |
| raise QueueError(NOT_YOUR_TURN_REASON) | |
| if awaiting["begun"]: | |
| raise QueueError(RITE_BEGUN_REASON) | |
| awaiting["begun"] = True | |
| if not awaiting["started"].done(): | |
| awaiting["started"].set_result(True) | |
| try: | |
| await self.grant_item(awaiting["item"]) | |
| return {"ok": True, "wish_id": str(wish_id)} | |
| except Exception: | |
| log.exception("invoked grant for %s crashed", wish_id) | |
| await self._safe_emit( | |
| {"type": "wish_rejected", "wish_id": str(wish_id), "reason": CRASH_REASON} | |
| ) | |
| return {"ok": False, "wish_id": str(wish_id), "reason": CRASH_REASON} | |
| finally: | |
| if not awaiting["done"].done(): | |
| awaiting["done"].set_result(True) | |
| async def grant_item(self, item: _Item) -> None: | |
| """Moderate -> plan/act -> persist -> emit. The whole rite for one wish; | |
| runs in the worker task (server mode) or inside the wisher's gradio | |
| request (browser-invoked mode).""" | |
| await self._safe_emit({"type": "wish_started", "wish_id": item.wish_id, "text": item.text}) | |
| await self._emit_queue() | |
| verdict = await self._moderator.check(item.text) | |
| if not verdict.allowed: | |
| await self._safe_emit( | |
| { | |
| "type": "wish_rejected", | |
| "wish_id": item.wish_id, | |
| "reason": verdict.poetic_reason or "the god declines this wish", | |
| } | |
| ) | |
| return | |
| world_summary = self._world.summary() | |
| calls = 0 | |
| landed = 0 # features actually applied to the world this wish | |
| async def act(call: dict) -> str: | |
| """Execute one tool call; emits tool_call + world_delta on success.""" | |
| nonlocal calls, landed | |
| index = calls | |
| calls += 1 | |
| if index >= self._max_calls: | |
| return HANDS_FULL_OBSERVATION | |
| # SECURITY: the MODEL composes inscribe_wish text — a clean wish can | |
| # still make it write a disallowed phrase that gets drawn PERMANENTLY | |
| # onto the shared world. Re-moderate that text before it can land; | |
| # default-deny so the world never carries an unvetted inscription. | |
| if isinstance(call, dict) and call.get("tool") == "inscribe_wish": | |
| ins = (call.get("args") or {}).get("text", "") | |
| if not self._moderator.check_content(ins).allowed: | |
| return "rejected: those words may not be written here; choose gentler ones" | |
| feature, observation = self._world.apply(item.wish_id, index, call) | |
| if feature is not None: | |
| landed += 1 | |
| await self._safe_emit( | |
| { | |
| "type": "tool_call", | |
| "wish_id": item.wish_id, | |
| "call_index": index, | |
| "tool": feature.tool, | |
| "args": dict(feature.args), | |
| } | |
| ) | |
| await self._safe_emit({"type": "world_delta", "feature": feature.to_dict()}) | |
| return observation | |
| trace = await self._planner.grant(item.text, world_summary, act, self._safe_emit) | |
| # If the wish built NOTHING and the god never actually planned — every | |
| # turn an error (GPU/model failure), not a clean "done" — reject honestly | |
| # and DON'T persist an empty trace that pollutes the shared Genesis Log. | |
| # A silent empty grant looks broken to the wisher (June 12: "can't see if | |
| # something happened"). A genuine done-with-no-build leaves a non-error | |
| # turn, so this only fires on real failure. | |
| turns = _trace_get(trace, "turns") or [] | |
| all_errored = bool(turns) and all( | |
| str((t or {}).get("observation") or "").startswith("error:") for t in turns | |
| ) | |
| if landed == 0 and (not turns or all_errored): | |
| await self._safe_emit( | |
| {"type": "wish_rejected", "wish_id": item.wish_id, "reason": BUSY_HEAVENS_REASON} | |
| ) | |
| return | |
| # SECURITY: the reading + epitaph are model-composed and persist to the | |
| # public dataset + the Genesis Log. Re-moderate; redact on denial so no | |
| # unvetted prose survives in the archive. (The reading already streamed | |
| # live, but the durable record — what judges and visitors re-read — is | |
| # clean.) Epitaph shows on the grant card, so a denied one is replaced. | |
| reading = _trace_get(trace, "reading") | |
| if reading and not self._moderator.check_content(reading).allowed: | |
| _trace_set(trace, "reading", "the god read this wish in silence.") | |
| epitaph = _trace_get(trace, "epitaph") | |
| if not epitaph or not self._moderator.check_content(epitaph).allowed: | |
| epitaph = _epitaph_from_reading(_trace_get(trace, "reading")) or FALLBACK_EPITAPH | |
| _trace_set(trace, "epitaph", epitaph) | |
| self._enrich_trace(trace, item) | |
| if self._persist is not None: | |
| try: | |
| result = self._persist(trace) | |
| if inspect.isawaitable(result): | |
| result = await result | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception: | |
| log.exception("persist failed for %s (grant stands)", item.wish_id) | |
| self._world.record_epitaph(epitaph) | |
| await self._safe_emit( | |
| { | |
| "type": "wish_granted", | |
| "wish_id": item.wish_id, | |
| "epitaph": str(epitaph), | |
| "epoch": self._world.epoch, | |
| } | |
| ) | |
| # --------------------------------------------------------------- helpers | |
| def _enrich_trace(self, trace: Any, item: _Item) -> None: | |
| """Fill trace fields the planner cannot know (wish_id, moderation, ...).""" | |
| fields = { | |
| "wish_id": item.wish_id, | |
| "text": item.text, | |
| "submitted_at": item.submitted_at, | |
| } | |
| try: | |
| if isinstance(trace, dict): | |
| for key, value in fields.items(): | |
| trace.setdefault(key, value) | |
| trace.setdefault("moderation", {"allowed": True, "category": None}) | |
| elif trace is not None: | |
| for key, value in fields.items(): | |
| if not getattr(trace, key, None): | |
| setattr(trace, key, value) | |
| if getattr(trace, "moderation", None) is None: | |
| setattr(trace, "moderation", {"allowed": True, "category": None}) | |
| except Exception: # frozen dataclass etc. — enrichment is best-effort | |
| log.debug("trace enrichment skipped for %s", item.wish_id, exc_info=True) | |
| async def _emit_queue(self) -> None: | |
| await self._safe_emit( | |
| {"type": "queue", "length": self._queue.qsize(), "current": self.current} | |
| ) | |
| async def _safe_emit(self, event: dict) -> None: | |
| """Broadcast one SSE event; emit failures must never break a grant.""" | |
| try: | |
| await self._emit(event) | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception: | |
| log.warning("emit failed for event type %s", event.get("type"), exc_info=True) | |