Spaces:
Running on Zero
Running on Zero
| """NEMOCITY petition queue — one city hall, one petition at a time. | |
| A single global asyncio worker serializes every job: | |
| wish: pop -> moderate -> planner.grant(text, summary, act, emit) | |
| [act(building_req) runs the engine: synonyms -> sanitize -> | |
| placement.place -> world.apply -> world_delta per event] | |
| -> infill rule -> persist trace -> wish_granted | |
| fix: pop -> traffic snapshot (stats + candidates) -> planner.fix( | |
| stats, candidates, act_fix, emit) -> ONE apply_fix event -> | |
| persist -> wish_granted | |
| Admission control in `submit`/`submit_fix`: 1 pending job per client, queue | |
| cap 50. Hourly rate limits + fix cooldowns live in the server, not here. | |
| `submit_fix` raises TrafficSmooth (-> 409 upstream) when the static | |
| assignment says there is nothing to fix. | |
| The emit passed to the planner is wrapped: any event missing a wish_id gets | |
| this job's id injected — so the planner can emit {type:"plan", plan:{...}} | |
| right after its JSON parses without knowing the wish_id. | |
| Crash-safe by construction: an exception while granting one job emits a | |
| readable generic wish_rejected and the worker moves on. SSE emit failures | |
| are swallowed; persist failures are logged but do not undo a grant. | |
| Wire names keep godseed's wish_* verbatim; user-facing copy says petition. | |
| Stdlib only; this is the only async module in the engine. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import contextlib | |
| import inspect | |
| import logging | |
| import re | |
| import time | |
| import uuid | |
| from dataclasses import dataclass | |
| from typing import Any, Optional | |
| from . import constants as C | |
| from . import placement, tools, traffic | |
| from .city import CityState | |
| log = logging.getLogger("nemocity.engine.queue") | |
| QUEUE_MAX = 50 | |
| MAX_BUILDINGS_PER_WISH = 3 # act() invocations the planner gets per petition | |
| MAX_CALLS_PER_WISH = 12 # engine backstop on applied events | |
| TEXT_PREVIEW_LEN = 60 | |
| _SUBMIT_TEXT_CAP = 500 # keep raw text bounded; moderation still sees >160 and denies | |
| # Browser-invoked granting (ZeroGPU): GPU work must run inside the PETITIONER'S | |
| # gradio request (their HF auth headers carry the quota — a server-side | |
| # background spaces.GPU call fails by design). The worker emits your_turn and | |
| # waits for the browser to fetch its token (authenticated) then invoke(). | |
| INVOCATION_WINDOW_S = 25.0 | |
| GRANT_DEADLINE_S = 150.0 # one-shot 9B runs well under this; bounds a hung grant | |
| QUEUE_FULL_REASON = "city hall is swamped; come back in a few minutes" | |
| ALREADY_PENDING_REASON = "one petition per visitor at a time; yours is still on the desk" | |
| WANDERED_REASON = "the petitioner left the counter; city hall moved on" | |
| INTERRUPTED_REASON = "the review was interrupted; city hall moved on" | |
| NOT_YOUR_TURN_REASON = "this petition is not at the counter" | |
| RITE_BEGUN_REASON = "this petition is already being processed" | |
| CRASH_REASON = "city hall misfiled this petition; please submit it again" | |
| BUSY_HEAVENS_REASON = "the planning office is busy; offer your petition again in a moment" | |
| HANDS_FULL_OBSERVATION = "rejected: this petition's permit limit is reached" | |
| FALLBACK_EPITAPH = "the city granted a home" | |
| TRAFFIC_SMOOTH_REASON = "Traffic is flowing smoothly." | |
| FIX_JOB_TEXT = "City Engineer: traffic review" | |
| INFILL_NOTE = "New families are moving in." | |
| _WISH_ID_RE = re.compile(r"^w_(\d+)$") | |
| class QueueError(Exception): | |
| """Admission failure; `.reason` is readable and safe to show users.""" | |
| def __init__(self, reason: str) -> None: | |
| super().__init__(reason) | |
| self.reason = reason | |
| class QueueFull(QueueError): | |
| pass | |
| class AlreadyPending(QueueError): | |
| pass | |
| class TrafficSmooth(QueueError): | |
| """Nothing to fix — the server matches this by type name -> 409.""" | |
| class _Item: | |
| wish_id: str | |
| text: str | |
| client_id: str | |
| submitted_at: float | |
| kind: str = "wish" # "wish" | "fix" | |
| def _trace_get(trace: Any, key: str) -> Any: | |
| if isinstance(trace, dict): | |
| return trace.get(key) | |
| return getattr(trace, key, None) | |
| def _trace_set(trace: Any, key: str, value: Any) -> None: | |
| try: | |
| if isinstance(trace, dict): | |
| trace[key] = value | |
| else: | |
| setattr(trace, key, value) | |
| except Exception: # frozen dataclass etc. — best-effort | |
| pass | |
| def _epitaph_from_reading(reading: Any) -> Optional[str]: | |
| """Salvage the blurb from the model's last sentence so the ledger stays | |
| varied when the model omits a clean one.""" | |
| text = str(reading or "").strip() | |
| if not text: | |
| return None | |
| parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", text) if p.strip()] | |
| return parts[-1][:120] if parts else None | |
| def _num(value: Any) -> Optional[float]: | |
| if isinstance(value, bool): | |
| return None | |
| if isinstance(value, (int, float)): | |
| return float(value) | |
| try: | |
| return float(str(value).strip()) | |
| except (ValueError, TypeError): | |
| return None | |
| class QueueWorker: | |
| """Single serialized worker; construct once, `await start()` at boot. | |
| Injected collaborators (duck-typed, no engine->mind/server imports): | |
| world engine.world.World (apply/summary/record_epitaph/epoch) | |
| moderator engine.moderation.Moderator (async check) | |
| planner object with `async grant(text, summary, act, emit)` and | |
| (optionally) `async fix(stats, candidates, act_fix, emit)` | |
| emit async callable broadcasting one SSE event dict | |
| persist callable (sync or async) receiving the WishTrace; optional | |
| """ | |
| def __init__( | |
| self, | |
| world: Any, | |
| moderator: Any, | |
| planner: Any, | |
| emit: Any, | |
| persist: Any = None, | |
| *, | |
| max_queue: int = QUEUE_MAX, | |
| max_calls_per_wish: int = MAX_CALLS_PER_WISH, | |
| next_wish_index: Optional[int] = None, | |
| browser_invoked: bool = False, | |
| invocation_window: float = INVOCATION_WINDOW_S, | |
| grant_deadline: float = GRANT_DEADLINE_S, | |
| ) -> None: | |
| self._world = world | |
| self._moderator = moderator | |
| self._planner = planner | |
| self._emit = emit | |
| self._persist = persist | |
| self._max_queue = max_queue | |
| self._max_calls = max_calls_per_wish | |
| self._queue: asyncio.Queue[_Item] = asyncio.Queue() | |
| self._pending: dict[str, str] = {} # client_id -> wish_id | |
| self._current: Optional[_Item] = None | |
| self._task: Optional[asyncio.Task] = None | |
| self._browser_invoked = browser_invoked | |
| self._invocation_window = invocation_window | |
| self._grant_deadline = grant_deadline | |
| self._awaiting: Optional[dict[str, Any]] = None | |
| self._wish_counter = ( | |
| next_wish_index if next_wish_index is not None else self._derive_wish_counter(world) | |
| ) | |
| def _derive_wish_counter(world: Any) -> int: | |
| """Continue numbering after the highest persisted w_NNNNNN.""" | |
| best = 0 | |
| for feature in getattr(world, "features", ()) or (): | |
| match = _WISH_ID_RE.match(getattr(feature, "wish_id", "") or "") | |
| if match: | |
| best = max(best, int(match.group(1)) + 1) | |
| return best or 1 # first-ever petition is w_000001 | |
| def _city(self) -> CityState: | |
| return CityState.from_events(getattr(self._world, "features", ()) or ()) | |
| # --------------------------------------------------------------- admission | |
| def _admit(self, client_id: str) -> None: | |
| if client_id in self._pending: | |
| raise AlreadyPending(ALREADY_PENDING_REASON) | |
| if self._queue.qsize() >= self._max_queue: | |
| raise QueueFull(QUEUE_FULL_REASON) | |
| def _enqueue(self, text: str, client_id: str, kind: str) -> tuple[str, int]: | |
| wish_id = f"w_{self._wish_counter:06d}" | |
| self._wish_counter += 1 | |
| item = _Item( | |
| wish_id=wish_id, text=text, client_id=client_id, | |
| submitted_at=time.time(), kind=kind, | |
| ) | |
| self._pending[client_id] = wish_id | |
| self._queue.put_nowait(item) | |
| return wish_id, self._queue.qsize() + (1 if self._current else 0) | |
| async def submit(self, text: Any, client_id: Any) -> tuple[str, int]: | |
| """Enqueue a petition. Returns (wish_id, position). Position counts the | |
| petition itself plus everyone ahead of it (including the one being | |
| granted), so an idle empty queue yields position 1. | |
| Raises AlreadyPending / QueueFull with readable `.reason`. | |
| """ | |
| text = ("" if text is None else str(text)).strip()[:_SUBMIT_TEXT_CAP] | |
| client_id = str(client_id) | |
| self._admit(client_id) | |
| wish_id, position = self._enqueue(text, client_id, "wish") | |
| await self._emit_queue() | |
| return wish_id, position | |
| async def submit_fix(self, client_id: Any) -> tuple[str, int]: | |
| """Enqueue a City Engineer job. The static-assignment gate runs HERE: | |
| when the worst demand ratio is below the gate there is nothing to fix | |
| and TrafficSmooth is raised (the server turns it into a 409). Rate | |
| limits and cooldowns live in the server.""" | |
| client_id = str(client_id) | |
| self._admit(client_id) | |
| assignment = traffic.assign(self._city(), time.time()) | |
| if assignment.max_ratio < C.FIX_GATE_RATIO: | |
| raise TrafficSmooth(TRAFFIC_SMOOTH_REASON) | |
| wish_id, position = self._enqueue(FIX_JOB_TEXT, client_id, "fix") | |
| await self._emit_queue() | |
| return wish_id, position | |
| # --------------------------------------------------------------- lifecycle | |
| async def start(self) -> None: | |
| if self._task is None or self._task.done(): | |
| self._task = asyncio.get_running_loop().create_task( | |
| self._run(), name="nemocity-queue-worker" | |
| ) | |
| async def stop(self) -> None: | |
| if self._task is not None: | |
| self._task.cancel() | |
| with contextlib.suppress(asyncio.CancelledError): | |
| await self._task | |
| self._task = None | |
| def is_running(self) -> bool: | |
| return self._task is not None and not self._task.done() | |
| def queue_length(self) -> int: | |
| return self._queue.qsize() | |
| def current(self) -> Optional[dict]: | |
| if self._current is None: | |
| return None | |
| return { | |
| "wish_id": self._current.wish_id, | |
| "text_preview": self._current.text[:TEXT_PREVIEW_LEN], | |
| } | |
| def snapshot(self) -> dict: | |
| """The server's QueueLike view: waiting count + the job being granted.""" | |
| return {"length": self._queue.qsize(), "current": self.current} | |
| # --------------------------------------------------------------- worker | |
| async def _run(self) -> None: | |
| while True: | |
| item = await self._queue.get() | |
| self._current = item | |
| try: | |
| await self._process(item) | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception: | |
| # one petition must NEVER kill the worker | |
| log.exception("wish %s crashed; city hall moves on", item.wish_id) | |
| await self._safe_emit( | |
| {"type": "wish_rejected", "wish_id": item.wish_id, "reason": CRASH_REASON} | |
| ) | |
| finally: | |
| self._current = None | |
| self._pending.pop(item.client_id, None) | |
| self._queue.task_done() | |
| await self._emit_queue() | |
| async def _process(self, item: _Item) -> None: | |
| if not self._browser_invoked: | |
| await self.grant_item(item) | |
| return | |
| # Browser-invoked mode: announce the turn, then wait for the petitioner's | |
| # browser to call invoke() (a gradio API request carrying their ZeroGPU | |
| # quota). No invocation within the window -> city hall moves on. | |
| token = uuid.uuid4().hex | |
| loop = asyncio.get_running_loop() | |
| started: asyncio.Future[bool] = loop.create_future() | |
| done: asyncio.Future[bool] = loop.create_future() | |
| self._awaiting = { | |
| "wish_id": item.wish_id, | |
| "token": token, | |
| "client_id": item.client_id, | |
| "item": item, | |
| "started": started, | |
| "done": done, | |
| "begun": False, | |
| } | |
| try: | |
| # SECURITY: the token is NOT broadcast. your_turn carries only the | |
| # wish_id over the public SSE; the owner's browser fetches the secret | |
| # token from GET /api/turn (cookie-authenticated, identity-bound). | |
| turn_event = { | |
| "type": "your_turn", | |
| "wish_id": item.wish_id, | |
| "window_s": self._invocation_window, | |
| } | |
| # Announce in thirds: re-emits cover SSE reconnects and slow tabs. | |
| third = max(self._invocation_window / 3.0, 0.01) | |
| invoked = False | |
| for _ in range(3): | |
| await self._safe_emit(turn_event) | |
| try: | |
| await asyncio.wait_for(asyncio.shield(started), third) | |
| invoked = True | |
| break | |
| except asyncio.TimeoutError: | |
| continue | |
| if not invoked: | |
| await self._safe_emit( | |
| {"type": "wish_rejected", "wish_id": item.wish_id, "reason": WANDERED_REASON} | |
| ) | |
| return | |
| try: | |
| await asyncio.wait_for(asyncio.shield(done), self._grant_deadline) | |
| except asyncio.TimeoutError: | |
| # The invoked grant is hung past any sane petition length; the | |
| # queue must not stall behind it. | |
| log.error("grant for %s exceeded deadline; moving on", item.wish_id) | |
| await self._safe_emit( | |
| {"type": "wish_rejected", "wish_id": item.wish_id, "reason": INTERRUPTED_REASON} | |
| ) | |
| finally: | |
| self._awaiting = None | |
| def turn_token(self, wish_id: Any, client_id: Any) -> Optional[str]: | |
| """Return the secret invocation token for the job at the counter — but | |
| ONLY to its owner (identity-bound). Returns None when it isn't this | |
| caller's turn.""" | |
| awaiting = self._awaiting | |
| if ( | |
| awaiting is None | |
| or awaiting["wish_id"] != str(wish_id) | |
| or awaiting["client_id"] != str(client_id) | |
| ): | |
| return None | |
| return awaiting["token"] | |
| async def invoke(self, wish_id: Any, token: Any) -> dict: | |
| """Run the grant for the job at the counter. Called from the petitioner's | |
| gradio API request so the GPU work runs in THEIR quota context.""" | |
| awaiting = self._awaiting | |
| if ( | |
| awaiting is None | |
| or awaiting["wish_id"] != str(wish_id) | |
| or awaiting["token"] != str(token) | |
| ): | |
| raise QueueError(NOT_YOUR_TURN_REASON) | |
| if awaiting["begun"]: | |
| raise QueueError(RITE_BEGUN_REASON) | |
| awaiting["begun"] = True | |
| if not awaiting["started"].done(): | |
| awaiting["started"].set_result(True) | |
| try: | |
| await self.grant_item(awaiting["item"]) | |
| return {"ok": True, "wish_id": str(wish_id)} | |
| except Exception: | |
| log.exception("invoked grant for %s crashed", wish_id) | |
| await self._safe_emit( | |
| {"type": "wish_rejected", "wish_id": str(wish_id), "reason": CRASH_REASON} | |
| ) | |
| return {"ok": False, "wish_id": str(wish_id), "reason": CRASH_REASON} | |
| finally: | |
| if not awaiting["done"].done(): | |
| awaiting["done"].set_result(True) | |
| # ----------------------------------------------------------------- grants | |
| async def grant_item(self, item: _Item) -> None: | |
| """The whole rite for one job; runs in the worker task (server mode) or | |
| inside the petitioner's gradio request (browser-invoked mode).""" | |
| if item.kind == "fix": | |
| await self._grant_fix(item) | |
| else: | |
| await self._grant_wish(item) | |
| def _wrap_emit(self, item: _Item): | |
| """Planner-facing emit: inject this job's wish_id into any event that | |
| lacks one (the plan event, thought tokens).""" | |
| async def emit(event: dict) -> None: | |
| if isinstance(event, dict) and "wish_id" not in event: | |
| event = {**event, "wish_id": item.wish_id} | |
| await self._safe_emit(event) | |
| return emit | |
| async def _apply_events( | |
| self, item: _Item, city: CityState, events: list[dict], index: int | |
| ) -> tuple[int, int, str]: | |
| """world.apply each event; mirror into the live CityState; emit | |
| tool_call + world_delta. Returns (next_index, landed, first_obs).""" | |
| landed = 0 | |
| first_obs = "" | |
| for call in events: | |
| feature, observation = self._world.apply(item.wish_id, index, call) | |
| index += 1 | |
| if not first_obs: | |
| first_obs = observation | |
| if feature is None: | |
| log.warning("engine-composed event rejected: %s", observation) | |
| continue | |
| landed += 1 | |
| city.apply(feature) | |
| await self._safe_emit( | |
| { | |
| "type": "tool_call", | |
| "wish_id": item.wish_id, | |
| "call_index": index - 1, | |
| "tool": feature.tool, | |
| "args": dict(feature.args), | |
| } | |
| ) | |
| await self._safe_emit({"type": "world_delta", "feature": feature.to_dict()}) | |
| return index, landed, first_obs | |
| async def _grant_wish(self, item: _Item) -> None: | |
| await self._safe_emit({"type": "wish_started", "wish_id": item.wish_id, "text": item.text}) | |
| await self._emit_queue() | |
| verdict = await self._moderator.check(item.text) | |
| if not verdict.allowed: | |
| await self._safe_emit( | |
| { | |
| "type": "wish_rejected", | |
| "wish_id": item.wish_id, | |
| "reason": verdict.poetic_reason or "city hall declines this petition", | |
| } | |
| ) | |
| return | |
| world_summary = self._world.summary() | |
| city = self._city() | |
| state = {"index": 0, "acts": 0, "landed": 0, "jobs_anchor": ""} | |
| async def act(building_req: Any) -> str: | |
| """ONE building entry {kind,name,near,floors,hue} -> engine places | |
| it (plus an auto-routed connector) and the events land.""" | |
| if not isinstance(building_req, dict): | |
| return "rejected: malformed building request" | |
| if state["acts"] >= MAX_BUILDINGS_PER_WISH or state["index"] >= self._max_calls: | |
| return HANDS_FULL_OBSERVATION | |
| state["acts"] += 1 | |
| kind = tools.resolve_kind(building_req.get("kind")) | |
| name = tools.sanitize_name( | |
| building_req.get("name"), | |
| default=tools.default_name(kind, item.text, item.wish_id), | |
| ) | |
| events = placement.place( | |
| city, | |
| kind=kind, | |
| name=name, | |
| floors=_num(building_req.get("floors")), | |
| hue=_num(building_req.get("hue")), | |
| near=str(building_req.get("near") or ""), | |
| wish_id=item.wish_id, | |
| call_index=state["index"], | |
| ) | |
| state["index"], landed, first_obs = await self._apply_events( | |
| item, city, events, state["index"] | |
| ) | |
| state["landed"] += landed | |
| if landed and C.BUILDINGS[kind]["jobs"] > 0: | |
| state["jobs_anchor"] = name | |
| return first_obs or "rejected: nothing placed" | |
| trace = await self._planner.grant(item.text, world_summary, act, self._wrap_emit(item)) | |
| # If the petition built NOTHING and the planner never actually planned — | |
| # every turn an error (GPU/model failure) — reject honestly and DON'T | |
| # persist an empty trace that pollutes the shared ledger. | |
| turns = _trace_get(trace, "turns") or [] | |
| all_errored = bool(turns) and all( | |
| str((t or {}).get("observation") or "").startswith("error:") for t in turns | |
| ) | |
| if state["landed"] == 0 and (not turns or all_errored): | |
| await self._safe_emit( | |
| {"type": "wish_rejected", "wish_id": item.wish_id, "reason": BUSY_HEAVENS_REASON} | |
| ) | |
| return | |
| if state["landed"]: | |
| await self._apply_infill(item, city, state) | |
| # SECURITY: the reading + epitaph are model-composed and persist to the | |
| # public dataset + ledger. Re-moderate; redact on denial so no unvetted | |
| # prose survives in the archive. | |
| reading = _trace_get(trace, "reading") | |
| if reading and not self._moderator.check_content(reading).allowed: | |
| _trace_set(trace, "reading", "city hall read this petition in silence.") | |
| epitaph = _trace_get(trace, "epitaph") | |
| if not epitaph or not self._moderator.check_content(epitaph).allowed: | |
| epitaph = _epitaph_from_reading(_trace_get(trace, "reading")) or FALLBACK_EPITAPH | |
| _trace_set(trace, "epitaph", epitaph) | |
| self._enrich_trace(trace, item) | |
| await self._persist_trace(trace, item) | |
| self._world.record_epitaph(epitaph) | |
| await self._safe_emit( | |
| { | |
| "type": "wish_granted", | |
| "wish_id": item.wish_id, | |
| "epitaph": str(epitaph), | |
| "epoch": self._world.epoch, | |
| } | |
| ) | |
| async def _apply_infill(self, item: _Item, city: CityState, state: dict) -> None: | |
| """Demand-driven infill: jobs outpacing homes pulls ONE bonus home in | |
| near the new jobs (max 1 per petition).""" | |
| jobs, housing = city.jobs, city.housing_capacity | |
| if jobs <= housing * C.INFILL_RATIO or state["index"] + 2 > self._max_calls: | |
| return | |
| kind = "apartments" if jobs - housing >= C.INFILL_APARTMENTS_DEFICIT else "house" | |
| events = placement.place( | |
| city, | |
| kind=kind, | |
| name=tools.default_name(kind, item.text, item.wish_id + ":infill"), | |
| floors=None, | |
| hue=None, | |
| near=state["jobs_anchor"], | |
| wish_id=item.wish_id, | |
| call_index=state["index"], | |
| ) | |
| events.append({"tool": "note", "args": {"text": INFILL_NOTE, "kind": "infill"}}) | |
| state["index"], landed, _ = await self._apply_events( | |
| item, city, events, state["index"] | |
| ) | |
| state["landed"] += landed | |
| async def _grant_fix(self, item: _Item) -> None: | |
| await self._safe_emit({"type": "wish_started", "wish_id": item.wish_id, "text": item.text}) | |
| await self._emit_queue() | |
| city = self._city() | |
| now_s = time.time() | |
| stats, candidates = traffic.snapshot(city, now_s) | |
| if stats["max_ratio"] < C.FIX_GATE_RATIO or not candidates: | |
| await self._safe_emit( | |
| { | |
| "type": "wish_rejected", | |
| "wish_id": item.wish_id, | |
| "reason": TRAFFIC_SMOOTH_REASON, | |
| } | |
| ) | |
| return | |
| by_id = {c["id"]: c for c in candidates} | |
| best = candidates[0] # sorted best-predicted-first | |
| state = {"index": 0, "landed": 0, "applied": ""} | |
| def canned_diagnosis(cand: dict) -> str: | |
| return ( | |
| f"{stats['worst']} carries {stats['max_ratio']}x capacity at rush — " | |
| f"{cand['desc']} cuts the index " | |
| f"{stats['traffic_index']}->{cand['predicted_index']}." | |
| ) | |
| async def act_fix(choice: Any) -> str: | |
| """The planner's pick {choice, diagnosis, blurb} -> ONE apply_fix | |
| event. Invalid/missing choice falls back to the best candidate.""" | |
| if state["landed"]: | |
| return "rejected: the fix is already applied" | |
| choice = choice if isinstance(choice, dict) else {} | |
| cand = by_id.get(str(choice.get("choice") or "").strip().upper()) or best | |
| # model-composed diagnosis goes on the shared world: wordlist-gate it | |
| raw = str(choice.get("diagnosis") or "").strip() | |
| diagnosis = raw[:200] if raw and self._moderator.check_content(raw).allowed else "" | |
| if not diagnosis: | |
| diagnosis = canned_diagnosis(cand) | |
| call = { | |
| "tool": "apply_fix", | |
| "args": { | |
| "action": cand["action"], | |
| "cells": cand["cells"], | |
| "klass": cand["klass"], | |
| "name": cand["name"], | |
| "diagnosis": diagnosis, | |
| "metrics_before": { | |
| "traffic_index": stats["traffic_index"], | |
| "worst": stats["worst"] or "", | |
| }, | |
| "metrics_predicted": {"traffic_index": cand["predicted_index"]}, | |
| }, | |
| } | |
| state["index"], landed, obs = await self._apply_events( | |
| item, city, [call], state["index"] | |
| ) | |
| state["landed"] += landed | |
| if landed: | |
| state["applied"] = cand["id"] | |
| return obs | |
| public = [ | |
| {k: c[k] for k in ("id", "action", "desc", "predicted_index")} | |
| for c in candidates | |
| ] | |
| trace: Any = None | |
| planner_fix = getattr(self._planner, "fix", None) | |
| if planner_fix is not None: | |
| try: | |
| trace = await planner_fix(stats, public, act_fix, self._wrap_emit(item)) | |
| except Exception: | |
| log.exception("planner.fix crashed for %s; engine applies best", item.wish_id) | |
| if state["landed"] == 0: | |
| # parse failure / no planner: the engine applies the best predicted | |
| # candidate with the canned diagnosis — a fix ALWAYS lands. | |
| await act_fix({"choice": best["id"]}) | |
| epitaph = _trace_get(trace, "epitaph") | |
| if not epitaph or not self._moderator.check_content(epitaph).allowed: | |
| applied = by_id.get(state["applied"]) or best | |
| epitaph = f"The City Engineer opened {applied['name']}." | |
| if not isinstance(trace, dict): | |
| trace = {"reading": "", "turns": []} | |
| trace["epitaph"] = str(epitaph) | |
| trace.setdefault("kind", "fix") | |
| trace.setdefault("stats", stats) | |
| self._enrich_trace(trace, item) | |
| await self._persist_trace(trace, item) | |
| self._world.record_epitaph(epitaph) | |
| await self._safe_emit( | |
| { | |
| "type": "wish_granted", | |
| "wish_id": item.wish_id, | |
| "epitaph": str(epitaph), | |
| "epoch": self._world.epoch, | |
| } | |
| ) | |
| # --------------------------------------------------------------- helpers | |
| async def _persist_trace(self, trace: Any, item: _Item) -> None: | |
| if self._persist is None or trace is None: | |
| return | |
| try: | |
| result = self._persist(trace) | |
| if inspect.isawaitable(result): | |
| await result | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception: | |
| log.exception("persist failed for %s (grant stands)", item.wish_id) | |
| def _enrich_trace(self, trace: Any, item: _Item) -> None: | |
| """Fill trace fields the planner cannot know (wish_id, moderation, ...).""" | |
| fields = { | |
| "wish_id": item.wish_id, | |
| "text": item.text, | |
| "submitted_at": item.submitted_at, | |
| } | |
| try: | |
| if isinstance(trace, dict): | |
| for key, value in fields.items(): | |
| trace.setdefault(key, value) | |
| trace.setdefault("moderation", {"allowed": True, "category": None}) | |
| elif trace is not None: | |
| for key, value in fields.items(): | |
| if not getattr(trace, key, None): | |
| setattr(trace, key, value) | |
| if getattr(trace, "moderation", None) is None: | |
| setattr(trace, "moderation", {"allowed": True, "category": None}) | |
| except Exception: # frozen dataclass etc. — enrichment is best-effort | |
| log.debug("trace enrichment skipped for %s", item.wish_id, exc_info=True) | |
| async def _emit_queue(self) -> None: | |
| await self._safe_emit( | |
| {"type": "queue", "length": self._queue.qsize(), "current": self.current} | |
| ) | |
| async def _safe_emit(self, event: dict) -> None: | |
| """Broadcast one SSE event; emit failures must never break a grant.""" | |
| try: | |
| await self._emit(event) | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception: | |
| log.warning("emit failed for event type %s", event.get("type"), exc_info=True) | |