"""NEMOCITY petition queue — one city hall, one petition at a time. A single global asyncio worker serializes every job: wish: pop -> moderate -> planner.grant(text, summary, act, emit) [act(building_req) runs the engine: synonyms -> sanitize -> placement.place -> world.apply -> world_delta per event] -> infill rule -> persist trace -> wish_granted fix: pop -> traffic snapshot (stats + candidates) -> planner.fix( stats, candidates, act_fix, emit) -> ONE apply_fix event -> persist -> wish_granted Admission control in `submit`/`submit_fix`: 1 pending job per client, queue cap 50. Hourly rate limits + fix cooldowns live in the server, not here. `submit_fix` raises TrafficSmooth (-> 409 upstream) when the static assignment says there is nothing to fix. The emit passed to the planner is wrapped: any event missing a wish_id gets this job's id injected — so the planner can emit {type:"plan", plan:{...}} right after its JSON parses without knowing the wish_id. Crash-safe by construction: an exception while granting one job emits a readable generic wish_rejected and the worker moves on. SSE emit failures are swallowed; persist failures are logged but do not undo a grant. Wire names keep godseed's wish_* verbatim; user-facing copy says petition. Stdlib only; this is the only async module in the engine. """ from __future__ import annotations import asyncio import contextlib import inspect import logging import re import time import uuid from dataclasses import dataclass from typing import Any, Optional from . import constants as C from . import placement, tools, traffic from .city import CityState log = logging.getLogger("nemocity.engine.queue") QUEUE_MAX = 50 MAX_BUILDINGS_PER_WISH = 3 # act() invocations the planner gets per petition MAX_CALLS_PER_WISH = 12 # engine backstop on applied events TEXT_PREVIEW_LEN = 60 _SUBMIT_TEXT_CAP = 500 # keep raw text bounded; moderation still sees >160 and denies # Browser-invoked granting (ZeroGPU): GPU work must run inside the PETITIONER'S # gradio request (their HF auth headers carry the quota — a server-side # background spaces.GPU call fails by design). The worker emits your_turn and # waits for the browser to fetch its token (authenticated) then invoke(). INVOCATION_WINDOW_S = 25.0 GRANT_DEADLINE_S = 150.0 # one-shot 9B runs well under this; bounds a hung grant QUEUE_FULL_REASON = "city hall is swamped; come back in a few minutes" ALREADY_PENDING_REASON = "one petition per visitor at a time; yours is still on the desk" WANDERED_REASON = "the petitioner left the counter; city hall moved on" INTERRUPTED_REASON = "the review was interrupted; city hall moved on" NOT_YOUR_TURN_REASON = "this petition is not at the counter" RITE_BEGUN_REASON = "this petition is already being processed" CRASH_REASON = "city hall misfiled this petition; please submit it again" BUSY_HEAVENS_REASON = "the planning office is busy; offer your petition again in a moment" HANDS_FULL_OBSERVATION = "rejected: this petition's permit limit is reached" FALLBACK_EPITAPH = "the city granted a home" TRAFFIC_SMOOTH_REASON = "Traffic is flowing smoothly." FIX_JOB_TEXT = "City Engineer: traffic review" INFILL_NOTE = "New families are moving in." _WISH_ID_RE = re.compile(r"^w_(\d+)$") class QueueError(Exception): """Admission failure; `.reason` is readable and safe to show users.""" def __init__(self, reason: str) -> None: super().__init__(reason) self.reason = reason class QueueFull(QueueError): pass class AlreadyPending(QueueError): pass class TrafficSmooth(QueueError): """Nothing to fix — the server matches this by type name -> 409.""" @dataclass class _Item: wish_id: str text: str client_id: str submitted_at: float kind: str = "wish" # "wish" | "fix" def _trace_get(trace: Any, key: str) -> Any: if isinstance(trace, dict): return trace.get(key) return getattr(trace, key, None) def _trace_set(trace: Any, key: str, value: Any) -> None: try: if isinstance(trace, dict): trace[key] = value else: setattr(trace, key, value) except Exception: # frozen dataclass etc. — best-effort pass def _epitaph_from_reading(reading: Any) -> Optional[str]: """Salvage the blurb from the model's last sentence so the ledger stays varied when the model omits a clean one.""" text = str(reading or "").strip() if not text: return None parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", text) if p.strip()] return parts[-1][:120] if parts else None def _num(value: Any) -> Optional[float]: if isinstance(value, bool): return None if isinstance(value, (int, float)): return float(value) try: return float(str(value).strip()) except (ValueError, TypeError): return None class QueueWorker: """Single serialized worker; construct once, `await start()` at boot. Injected collaborators (duck-typed, no engine->mind/server imports): world engine.world.World (apply/summary/record_epitaph/epoch) moderator engine.moderation.Moderator (async check) planner object with `async grant(text, summary, act, emit)` and (optionally) `async fix(stats, candidates, act_fix, emit)` emit async callable broadcasting one SSE event dict persist callable (sync or async) receiving the WishTrace; optional """ def __init__( self, world: Any, moderator: Any, planner: Any, emit: Any, persist: Any = None, *, max_queue: int = QUEUE_MAX, max_calls_per_wish: int = MAX_CALLS_PER_WISH, next_wish_index: Optional[int] = None, browser_invoked: bool = False, invocation_window: float = INVOCATION_WINDOW_S, grant_deadline: float = GRANT_DEADLINE_S, ) -> None: self._world = world self._moderator = moderator self._planner = planner self._emit = emit self._persist = persist self._max_queue = max_queue self._max_calls = max_calls_per_wish self._queue: asyncio.Queue[_Item] = asyncio.Queue() self._pending: dict[str, str] = {} # client_id -> wish_id self._current: Optional[_Item] = None self._task: Optional[asyncio.Task] = None self._browser_invoked = browser_invoked self._invocation_window = invocation_window self._grant_deadline = grant_deadline self._awaiting: Optional[dict[str, Any]] = None self._wish_counter = ( next_wish_index if next_wish_index is not None else self._derive_wish_counter(world) ) @staticmethod def _derive_wish_counter(world: Any) -> int: """Continue numbering after the highest persisted w_NNNNNN.""" best = 0 for feature in getattr(world, "features", ()) or (): match = _WISH_ID_RE.match(getattr(feature, "wish_id", "") or "") if match: best = max(best, int(match.group(1)) + 1) return best or 1 # first-ever petition is w_000001 def _city(self) -> CityState: return CityState.from_events(getattr(self._world, "features", ()) or ()) # --------------------------------------------------------------- admission def _admit(self, client_id: str) -> None: if client_id in self._pending: raise AlreadyPending(ALREADY_PENDING_REASON) if self._queue.qsize() >= self._max_queue: raise QueueFull(QUEUE_FULL_REASON) def _enqueue(self, text: str, client_id: str, kind: str) -> tuple[str, int]: wish_id = f"w_{self._wish_counter:06d}" self._wish_counter += 1 item = _Item( wish_id=wish_id, text=text, client_id=client_id, submitted_at=time.time(), kind=kind, ) self._pending[client_id] = wish_id self._queue.put_nowait(item) return wish_id, self._queue.qsize() + (1 if self._current else 0) async def submit(self, text: Any, client_id: Any) -> tuple[str, int]: """Enqueue a petition. Returns (wish_id, position). Position counts the petition itself plus everyone ahead of it (including the one being granted), so an idle empty queue yields position 1. Raises AlreadyPending / QueueFull with readable `.reason`. """ text = ("" if text is None else str(text)).strip()[:_SUBMIT_TEXT_CAP] client_id = str(client_id) self._admit(client_id) wish_id, position = self._enqueue(text, client_id, "wish") await self._emit_queue() return wish_id, position async def submit_fix(self, client_id: Any) -> tuple[str, int]: """Enqueue a City Engineer job. The static-assignment gate runs HERE: when the worst demand ratio is below the gate there is nothing to fix and TrafficSmooth is raised (the server turns it into a 409). Rate limits and cooldowns live in the server.""" client_id = str(client_id) self._admit(client_id) assignment = traffic.assign(self._city(), time.time()) if assignment.max_ratio < C.FIX_GATE_RATIO: raise TrafficSmooth(TRAFFIC_SMOOTH_REASON) wish_id, position = self._enqueue(FIX_JOB_TEXT, client_id, "fix") await self._emit_queue() return wish_id, position # --------------------------------------------------------------- lifecycle async def start(self) -> None: if self._task is None or self._task.done(): self._task = asyncio.get_running_loop().create_task( self._run(), name="nemocity-queue-worker" ) async def stop(self) -> None: if self._task is not None: self._task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._task self._task = None @property def is_running(self) -> bool: return self._task is not None and not self._task.done() @property def queue_length(self) -> int: return self._queue.qsize() @property def current(self) -> Optional[dict]: if self._current is None: return None return { "wish_id": self._current.wish_id, "text_preview": self._current.text[:TEXT_PREVIEW_LEN], } def snapshot(self) -> dict: """The server's QueueLike view: waiting count + the job being granted.""" return {"length": self._queue.qsize(), "current": self.current} # --------------------------------------------------------------- worker async def _run(self) -> None: while True: item = await self._queue.get() self._current = item try: await self._process(item) except asyncio.CancelledError: raise except Exception: # one petition must NEVER kill the worker log.exception("wish %s crashed; city hall moves on", item.wish_id) await self._safe_emit( {"type": "wish_rejected", "wish_id": item.wish_id, "reason": CRASH_REASON} ) finally: self._current = None self._pending.pop(item.client_id, None) self._queue.task_done() await self._emit_queue() async def _process(self, item: _Item) -> None: if not self._browser_invoked: await self.grant_item(item) return # Browser-invoked mode: announce the turn, then wait for the petitioner's # browser to call invoke() (a gradio API request carrying their ZeroGPU # quota). No invocation within the window -> city hall moves on. token = uuid.uuid4().hex loop = asyncio.get_running_loop() started: asyncio.Future[bool] = loop.create_future() done: asyncio.Future[bool] = loop.create_future() self._awaiting = { "wish_id": item.wish_id, "token": token, "client_id": item.client_id, "item": item, "started": started, "done": done, "begun": False, } try: # SECURITY: the token is NOT broadcast. your_turn carries only the # wish_id over the public SSE; the owner's browser fetches the secret # token from GET /api/turn (cookie-authenticated, identity-bound). turn_event = { "type": "your_turn", "wish_id": item.wish_id, "window_s": self._invocation_window, } # Announce in thirds: re-emits cover SSE reconnects and slow tabs. third = max(self._invocation_window / 3.0, 0.01) invoked = False for _ in range(3): await self._safe_emit(turn_event) try: await asyncio.wait_for(asyncio.shield(started), third) invoked = True break except asyncio.TimeoutError: continue if not invoked: await self._safe_emit( {"type": "wish_rejected", "wish_id": item.wish_id, "reason": WANDERED_REASON} ) return try: await asyncio.wait_for(asyncio.shield(done), self._grant_deadline) except asyncio.TimeoutError: # The invoked grant is hung past any sane petition length; the # queue must not stall behind it. log.error("grant for %s exceeded deadline; moving on", item.wish_id) await self._safe_emit( {"type": "wish_rejected", "wish_id": item.wish_id, "reason": INTERRUPTED_REASON} ) finally: self._awaiting = None def turn_token(self, wish_id: Any, client_id: Any) -> Optional[str]: """Return the secret invocation token for the job at the counter — but ONLY to its owner (identity-bound). Returns None when it isn't this caller's turn.""" awaiting = self._awaiting if ( awaiting is None or awaiting["wish_id"] != str(wish_id) or awaiting["client_id"] != str(client_id) ): return None return awaiting["token"] async def invoke(self, wish_id: Any, token: Any) -> dict: """Run the grant for the job at the counter. Called from the petitioner's gradio API request so the GPU work runs in THEIR quota context.""" awaiting = self._awaiting if ( awaiting is None or awaiting["wish_id"] != str(wish_id) or awaiting["token"] != str(token) ): raise QueueError(NOT_YOUR_TURN_REASON) if awaiting["begun"]: raise QueueError(RITE_BEGUN_REASON) awaiting["begun"] = True if not awaiting["started"].done(): awaiting["started"].set_result(True) try: await self.grant_item(awaiting["item"]) return {"ok": True, "wish_id": str(wish_id)} except Exception: log.exception("invoked grant for %s crashed", wish_id) await self._safe_emit( {"type": "wish_rejected", "wish_id": str(wish_id), "reason": CRASH_REASON} ) return {"ok": False, "wish_id": str(wish_id), "reason": CRASH_REASON} finally: if not awaiting["done"].done(): awaiting["done"].set_result(True) # ----------------------------------------------------------------- grants async def grant_item(self, item: _Item) -> None: """The whole rite for one job; runs in the worker task (server mode) or inside the petitioner's gradio request (browser-invoked mode).""" if item.kind == "fix": await self._grant_fix(item) else: await self._grant_wish(item) def _wrap_emit(self, item: _Item): """Planner-facing emit: inject this job's wish_id into any event that lacks one (the plan event, thought tokens).""" async def emit(event: dict) -> None: if isinstance(event, dict) and "wish_id" not in event: event = {**event, "wish_id": item.wish_id} await self._safe_emit(event) return emit async def _apply_events( self, item: _Item, city: CityState, events: list[dict], index: int ) -> tuple[int, int, str]: """world.apply each event; mirror into the live CityState; emit tool_call + world_delta. Returns (next_index, landed, first_obs).""" landed = 0 first_obs = "" for call in events: feature, observation = self._world.apply(item.wish_id, index, call) index += 1 if not first_obs: first_obs = observation if feature is None: log.warning("engine-composed event rejected: %s", observation) continue landed += 1 city.apply(feature) await self._safe_emit( { "type": "tool_call", "wish_id": item.wish_id, "call_index": index - 1, "tool": feature.tool, "args": dict(feature.args), } ) await self._safe_emit({"type": "world_delta", "feature": feature.to_dict()}) return index, landed, first_obs async def _grant_wish(self, item: _Item) -> None: await self._safe_emit({"type": "wish_started", "wish_id": item.wish_id, "text": item.text}) await self._emit_queue() verdict = await self._moderator.check(item.text) if not verdict.allowed: await self._safe_emit( { "type": "wish_rejected", "wish_id": item.wish_id, "reason": verdict.poetic_reason or "city hall declines this petition", } ) return world_summary = self._world.summary() city = self._city() state = {"index": 0, "acts": 0, "landed": 0, "jobs_anchor": ""} async def act(building_req: Any) -> str: """ONE building entry {kind,name,near,floors,hue} -> engine places it (plus an auto-routed connector) and the events land.""" if not isinstance(building_req, dict): return "rejected: malformed building request" if state["acts"] >= MAX_BUILDINGS_PER_WISH or state["index"] >= self._max_calls: return HANDS_FULL_OBSERVATION state["acts"] += 1 kind = tools.resolve_kind(building_req.get("kind")) name = tools.sanitize_name( building_req.get("name"), default=tools.default_name(kind, item.text, item.wish_id), ) events = placement.place( city, kind=kind, name=name, floors=_num(building_req.get("floors")), hue=_num(building_req.get("hue")), near=str(building_req.get("near") or ""), wish_id=item.wish_id, call_index=state["index"], ) state["index"], landed, first_obs = await self._apply_events( item, city, events, state["index"] ) state["landed"] += landed if landed and C.BUILDINGS[kind]["jobs"] > 0: state["jobs_anchor"] = name return first_obs or "rejected: nothing placed" trace = await self._planner.grant(item.text, world_summary, act, self._wrap_emit(item)) # If the petition built NOTHING and the planner never actually planned — # every turn an error (GPU/model failure) — reject honestly and DON'T # persist an empty trace that pollutes the shared ledger. turns = _trace_get(trace, "turns") or [] all_errored = bool(turns) and all( str((t or {}).get("observation") or "").startswith("error:") for t in turns ) if state["landed"] == 0 and (not turns or all_errored): await self._safe_emit( {"type": "wish_rejected", "wish_id": item.wish_id, "reason": BUSY_HEAVENS_REASON} ) return if state["landed"]: await self._apply_infill(item, city, state) # SECURITY: the reading + epitaph are model-composed and persist to the # public dataset + ledger. Re-moderate; redact on denial so no unvetted # prose survives in the archive. reading = _trace_get(trace, "reading") if reading and not self._moderator.check_content(reading).allowed: _trace_set(trace, "reading", "city hall read this petition in silence.") epitaph = _trace_get(trace, "epitaph") if not epitaph or not self._moderator.check_content(epitaph).allowed: epitaph = _epitaph_from_reading(_trace_get(trace, "reading")) or FALLBACK_EPITAPH _trace_set(trace, "epitaph", epitaph) self._enrich_trace(trace, item) await self._persist_trace(trace, item) self._world.record_epitaph(epitaph) await self._safe_emit( { "type": "wish_granted", "wish_id": item.wish_id, "epitaph": str(epitaph), "epoch": self._world.epoch, } ) async def _apply_infill(self, item: _Item, city: CityState, state: dict) -> None: """Demand-driven infill: jobs outpacing homes pulls ONE bonus home in near the new jobs (max 1 per petition).""" jobs, housing = city.jobs, city.housing_capacity if jobs <= housing * C.INFILL_RATIO or state["index"] + 2 > self._max_calls: return kind = "apartments" if jobs - housing >= C.INFILL_APARTMENTS_DEFICIT else "house" events = placement.place( city, kind=kind, name=tools.default_name(kind, item.text, item.wish_id + ":infill"), floors=None, hue=None, near=state["jobs_anchor"], wish_id=item.wish_id, call_index=state["index"], ) events.append({"tool": "note", "args": {"text": INFILL_NOTE, "kind": "infill"}}) state["index"], landed, _ = await self._apply_events( item, city, events, state["index"] ) state["landed"] += landed async def _grant_fix(self, item: _Item) -> None: await self._safe_emit({"type": "wish_started", "wish_id": item.wish_id, "text": item.text}) await self._emit_queue() city = self._city() now_s = time.time() stats, candidates = traffic.snapshot(city, now_s) if stats["max_ratio"] < C.FIX_GATE_RATIO or not candidates: await self._safe_emit( { "type": "wish_rejected", "wish_id": item.wish_id, "reason": TRAFFIC_SMOOTH_REASON, } ) return by_id = {c["id"]: c for c in candidates} best = candidates[0] # sorted best-predicted-first state = {"index": 0, "landed": 0, "applied": ""} def canned_diagnosis(cand: dict) -> str: return ( f"{stats['worst']} carries {stats['max_ratio']}x capacity at rush — " f"{cand['desc']} cuts the index " f"{stats['traffic_index']}->{cand['predicted_index']}." ) async def act_fix(choice: Any) -> str: """The planner's pick {choice, diagnosis, blurb} -> ONE apply_fix event. Invalid/missing choice falls back to the best candidate.""" if state["landed"]: return "rejected: the fix is already applied" choice = choice if isinstance(choice, dict) else {} cand = by_id.get(str(choice.get("choice") or "").strip().upper()) or best # model-composed diagnosis goes on the shared world: wordlist-gate it raw = str(choice.get("diagnosis") or "").strip() diagnosis = raw[:200] if raw and self._moderator.check_content(raw).allowed else "" if not diagnosis: diagnosis = canned_diagnosis(cand) call = { "tool": "apply_fix", "args": { "action": cand["action"], "cells": cand["cells"], "klass": cand["klass"], "name": cand["name"], "diagnosis": diagnosis, "metrics_before": { "traffic_index": stats["traffic_index"], "worst": stats["worst"] or "", }, "metrics_predicted": {"traffic_index": cand["predicted_index"]}, }, } state["index"], landed, obs = await self._apply_events( item, city, [call], state["index"] ) state["landed"] += landed if landed: state["applied"] = cand["id"] return obs public = [ {k: c[k] for k in ("id", "action", "desc", "predicted_index")} for c in candidates ] trace: Any = None planner_fix = getattr(self._planner, "fix", None) if planner_fix is not None: try: trace = await planner_fix(stats, public, act_fix, self._wrap_emit(item)) except Exception: log.exception("planner.fix crashed for %s; engine applies best", item.wish_id) if state["landed"] == 0: # parse failure / no planner: the engine applies the best predicted # candidate with the canned diagnosis — a fix ALWAYS lands. await act_fix({"choice": best["id"]}) epitaph = _trace_get(trace, "epitaph") if not epitaph or not self._moderator.check_content(epitaph).allowed: applied = by_id.get(state["applied"]) or best epitaph = f"The City Engineer opened {applied['name']}." if not isinstance(trace, dict): trace = {"reading": "", "turns": []} trace["epitaph"] = str(epitaph) trace.setdefault("kind", "fix") trace.setdefault("stats", stats) self._enrich_trace(trace, item) await self._persist_trace(trace, item) self._world.record_epitaph(epitaph) await self._safe_emit( { "type": "wish_granted", "wish_id": item.wish_id, "epitaph": str(epitaph), "epoch": self._world.epoch, } ) # --------------------------------------------------------------- helpers async def _persist_trace(self, trace: Any, item: _Item) -> None: if self._persist is None or trace is None: return try: result = self._persist(trace) if inspect.isawaitable(result): await result except asyncio.CancelledError: raise except Exception: log.exception("persist failed for %s (grant stands)", item.wish_id) def _enrich_trace(self, trace: Any, item: _Item) -> None: """Fill trace fields the planner cannot know (wish_id, moderation, ...).""" fields = { "wish_id": item.wish_id, "text": item.text, "submitted_at": item.submitted_at, } try: if isinstance(trace, dict): for key, value in fields.items(): trace.setdefault(key, value) trace.setdefault("moderation", {"allowed": True, "category": None}) elif trace is not None: for key, value in fields.items(): if not getattr(trace, key, None): setattr(trace, key, value) if getattr(trace, "moderation", None) is None: setattr(trace, "moderation", {"allowed": True, "category": None}) except Exception: # frozen dataclass etc. — enrichment is best-effort log.debug("trace enrichment skipped for %s", item.wish_id, exc_info=True) async def _emit_queue(self) -> None: await self._safe_emit( {"type": "queue", "length": self._queue.qsize(), "current": self.current} ) async def _safe_emit(self, event: dict) -> None: """Broadcast one SSE event; emit failures must never break a grant.""" try: await self._emit(event) except asyncio.CancelledError: raise except Exception: log.warning("emit failed for event type %s", event.get("type"), exc_info=True)