nemocity / engine /queue_worker.py
AndresCarreon's picture
NEMOCITY v0 — mock backend, gradio 6.16.0 (pre-SSR)
d72231c verified
Raw
History Blame Contribute Delete
29.1 kB
"""NEMOCITY petition queue — one city hall, one petition at a time.
A single global asyncio worker serializes every job:
wish: pop -> moderate -> planner.grant(text, summary, act, emit)
[act(building_req) runs the engine: synonyms -> sanitize ->
placement.place -> world.apply -> world_delta per event]
-> infill rule -> persist trace -> wish_granted
fix: pop -> traffic snapshot (stats + candidates) -> planner.fix(
stats, candidates, act_fix, emit) -> ONE apply_fix event ->
persist -> wish_granted
Admission control in `submit`/`submit_fix`: 1 pending job per client, queue
cap 50. Hourly rate limits + fix cooldowns live in the server, not here.
`submit_fix` raises TrafficSmooth (-> 409 upstream) when the static
assignment says there is nothing to fix.
The emit passed to the planner is wrapped: any event missing a wish_id gets
this job's id injected — so the planner can emit {type:"plan", plan:{...}}
right after its JSON parses without knowing the wish_id.
Crash-safe by construction: an exception while granting one job emits a
readable generic wish_rejected and the worker moves on. SSE emit failures
are swallowed; persist failures are logged but do not undo a grant.
Wire names keep godseed's wish_* verbatim; user-facing copy says petition.
Stdlib only; this is the only async module in the engine.
"""
from __future__ import annotations
import asyncio
import contextlib
import inspect
import logging
import re
import time
import uuid
from dataclasses import dataclass
from typing import Any, Optional
from . import constants as C
from . import placement, tools, traffic
from .city import CityState
log = logging.getLogger("nemocity.engine.queue")
QUEUE_MAX = 50
MAX_BUILDINGS_PER_WISH = 3 # act() invocations the planner gets per petition
MAX_CALLS_PER_WISH = 12 # engine backstop on applied events
TEXT_PREVIEW_LEN = 60
_SUBMIT_TEXT_CAP = 500 # keep raw text bounded; moderation still sees >160 and denies
# Browser-invoked granting (ZeroGPU): GPU work must run inside the PETITIONER'S
# gradio request (their HF auth headers carry the quota — a server-side
# background spaces.GPU call fails by design). The worker emits your_turn and
# waits for the browser to fetch its token (authenticated) then invoke().
INVOCATION_WINDOW_S = 25.0
GRANT_DEADLINE_S = 150.0 # one-shot 9B runs well under this; bounds a hung grant
QUEUE_FULL_REASON = "city hall is swamped; come back in a few minutes"
ALREADY_PENDING_REASON = "one petition per visitor at a time; yours is still on the desk"
WANDERED_REASON = "the petitioner left the counter; city hall moved on"
INTERRUPTED_REASON = "the review was interrupted; city hall moved on"
NOT_YOUR_TURN_REASON = "this petition is not at the counter"
RITE_BEGUN_REASON = "this petition is already being processed"
CRASH_REASON = "city hall misfiled this petition; please submit it again"
BUSY_HEAVENS_REASON = "the planning office is busy; offer your petition again in a moment"
HANDS_FULL_OBSERVATION = "rejected: this petition's permit limit is reached"
FALLBACK_EPITAPH = "the city granted a home"
TRAFFIC_SMOOTH_REASON = "Traffic is flowing smoothly."
FIX_JOB_TEXT = "City Engineer: traffic review"
INFILL_NOTE = "New families are moving in."
_WISH_ID_RE = re.compile(r"^w_(\d+)$")
class QueueError(Exception):
"""Admission failure; `.reason` is readable and safe to show users."""
def __init__(self, reason: str) -> None:
super().__init__(reason)
self.reason = reason
class QueueFull(QueueError):
pass
class AlreadyPending(QueueError):
pass
class TrafficSmooth(QueueError):
"""Nothing to fix — the server matches this by type name -> 409."""
@dataclass
class _Item:
wish_id: str
text: str
client_id: str
submitted_at: float
kind: str = "wish" # "wish" | "fix"
def _trace_get(trace: Any, key: str) -> Any:
if isinstance(trace, dict):
return trace.get(key)
return getattr(trace, key, None)
def _trace_set(trace: Any, key: str, value: Any) -> None:
try:
if isinstance(trace, dict):
trace[key] = value
else:
setattr(trace, key, value)
except Exception: # frozen dataclass etc. — best-effort
pass
def _epitaph_from_reading(reading: Any) -> Optional[str]:
"""Salvage the blurb from the model's last sentence so the ledger stays
varied when the model omits a clean one."""
text = str(reading or "").strip()
if not text:
return None
parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", text) if p.strip()]
return parts[-1][:120] if parts else None
def _num(value: Any) -> Optional[float]:
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
return float(value)
try:
return float(str(value).strip())
except (ValueError, TypeError):
return None
class QueueWorker:
"""Single serialized worker; construct once, `await start()` at boot.
Injected collaborators (duck-typed, no engine->mind/server imports):
world engine.world.World (apply/summary/record_epitaph/epoch)
moderator engine.moderation.Moderator (async check)
planner object with `async grant(text, summary, act, emit)` and
(optionally) `async fix(stats, candidates, act_fix, emit)`
emit async callable broadcasting one SSE event dict
persist callable (sync or async) receiving the WishTrace; optional
"""
def __init__(
self,
world: Any,
moderator: Any,
planner: Any,
emit: Any,
persist: Any = None,
*,
max_queue: int = QUEUE_MAX,
max_calls_per_wish: int = MAX_CALLS_PER_WISH,
next_wish_index: Optional[int] = None,
browser_invoked: bool = False,
invocation_window: float = INVOCATION_WINDOW_S,
grant_deadline: float = GRANT_DEADLINE_S,
) -> None:
self._world = world
self._moderator = moderator
self._planner = planner
self._emit = emit
self._persist = persist
self._max_queue = max_queue
self._max_calls = max_calls_per_wish
self._queue: asyncio.Queue[_Item] = asyncio.Queue()
self._pending: dict[str, str] = {} # client_id -> wish_id
self._current: Optional[_Item] = None
self._task: Optional[asyncio.Task] = None
self._browser_invoked = browser_invoked
self._invocation_window = invocation_window
self._grant_deadline = grant_deadline
self._awaiting: Optional[dict[str, Any]] = None
self._wish_counter = (
next_wish_index if next_wish_index is not None else self._derive_wish_counter(world)
)
@staticmethod
def _derive_wish_counter(world: Any) -> int:
"""Continue numbering after the highest persisted w_NNNNNN."""
best = 0
for feature in getattr(world, "features", ()) or ():
match = _WISH_ID_RE.match(getattr(feature, "wish_id", "") or "")
if match:
best = max(best, int(match.group(1)) + 1)
return best or 1 # first-ever petition is w_000001
def _city(self) -> CityState:
return CityState.from_events(getattr(self._world, "features", ()) or ())
# --------------------------------------------------------------- admission
def _admit(self, client_id: str) -> None:
if client_id in self._pending:
raise AlreadyPending(ALREADY_PENDING_REASON)
if self._queue.qsize() >= self._max_queue:
raise QueueFull(QUEUE_FULL_REASON)
def _enqueue(self, text: str, client_id: str, kind: str) -> tuple[str, int]:
wish_id = f"w_{self._wish_counter:06d}"
self._wish_counter += 1
item = _Item(
wish_id=wish_id, text=text, client_id=client_id,
submitted_at=time.time(), kind=kind,
)
self._pending[client_id] = wish_id
self._queue.put_nowait(item)
return wish_id, self._queue.qsize() + (1 if self._current else 0)
async def submit(self, text: Any, client_id: Any) -> tuple[str, int]:
"""Enqueue a petition. Returns (wish_id, position). Position counts the
petition itself plus everyone ahead of it (including the one being
granted), so an idle empty queue yields position 1.
Raises AlreadyPending / QueueFull with readable `.reason`.
"""
text = ("" if text is None else str(text)).strip()[:_SUBMIT_TEXT_CAP]
client_id = str(client_id)
self._admit(client_id)
wish_id, position = self._enqueue(text, client_id, "wish")
await self._emit_queue()
return wish_id, position
async def submit_fix(self, client_id: Any) -> tuple[str, int]:
"""Enqueue a City Engineer job. The static-assignment gate runs HERE:
when the worst demand ratio is below the gate there is nothing to fix
and TrafficSmooth is raised (the server turns it into a 409). Rate
limits and cooldowns live in the server."""
client_id = str(client_id)
self._admit(client_id)
assignment = traffic.assign(self._city(), time.time())
if assignment.max_ratio < C.FIX_GATE_RATIO:
raise TrafficSmooth(TRAFFIC_SMOOTH_REASON)
wish_id, position = self._enqueue(FIX_JOB_TEXT, client_id, "fix")
await self._emit_queue()
return wish_id, position
# --------------------------------------------------------------- lifecycle
async def start(self) -> None:
if self._task is None or self._task.done():
self._task = asyncio.get_running_loop().create_task(
self._run(), name="nemocity-queue-worker"
)
async def stop(self) -> None:
if self._task is not None:
self._task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._task
self._task = None
@property
def is_running(self) -> bool:
return self._task is not None and not self._task.done()
@property
def queue_length(self) -> int:
return self._queue.qsize()
@property
def current(self) -> Optional[dict]:
if self._current is None:
return None
return {
"wish_id": self._current.wish_id,
"text_preview": self._current.text[:TEXT_PREVIEW_LEN],
}
def snapshot(self) -> dict:
"""The server's QueueLike view: waiting count + the job being granted."""
return {"length": self._queue.qsize(), "current": self.current}
# --------------------------------------------------------------- worker
async def _run(self) -> None:
while True:
item = await self._queue.get()
self._current = item
try:
await self._process(item)
except asyncio.CancelledError:
raise
except Exception:
# one petition must NEVER kill the worker
log.exception("wish %s crashed; city hall moves on", item.wish_id)
await self._safe_emit(
{"type": "wish_rejected", "wish_id": item.wish_id, "reason": CRASH_REASON}
)
finally:
self._current = None
self._pending.pop(item.client_id, None)
self._queue.task_done()
await self._emit_queue()
async def _process(self, item: _Item) -> None:
if not self._browser_invoked:
await self.grant_item(item)
return
# Browser-invoked mode: announce the turn, then wait for the petitioner's
# browser to call invoke() (a gradio API request carrying their ZeroGPU
# quota). No invocation within the window -> city hall moves on.
token = uuid.uuid4().hex
loop = asyncio.get_running_loop()
started: asyncio.Future[bool] = loop.create_future()
done: asyncio.Future[bool] = loop.create_future()
self._awaiting = {
"wish_id": item.wish_id,
"token": token,
"client_id": item.client_id,
"item": item,
"started": started,
"done": done,
"begun": False,
}
try:
# SECURITY: the token is NOT broadcast. your_turn carries only the
# wish_id over the public SSE; the owner's browser fetches the secret
# token from GET /api/turn (cookie-authenticated, identity-bound).
turn_event = {
"type": "your_turn",
"wish_id": item.wish_id,
"window_s": self._invocation_window,
}
# Announce in thirds: re-emits cover SSE reconnects and slow tabs.
third = max(self._invocation_window / 3.0, 0.01)
invoked = False
for _ in range(3):
await self._safe_emit(turn_event)
try:
await asyncio.wait_for(asyncio.shield(started), third)
invoked = True
break
except asyncio.TimeoutError:
continue
if not invoked:
await self._safe_emit(
{"type": "wish_rejected", "wish_id": item.wish_id, "reason": WANDERED_REASON}
)
return
try:
await asyncio.wait_for(asyncio.shield(done), self._grant_deadline)
except asyncio.TimeoutError:
# The invoked grant is hung past any sane petition length; the
# queue must not stall behind it.
log.error("grant for %s exceeded deadline; moving on", item.wish_id)
await self._safe_emit(
{"type": "wish_rejected", "wish_id": item.wish_id, "reason": INTERRUPTED_REASON}
)
finally:
self._awaiting = None
def turn_token(self, wish_id: Any, client_id: Any) -> Optional[str]:
"""Return the secret invocation token for the job at the counter — but
ONLY to its owner (identity-bound). Returns None when it isn't this
caller's turn."""
awaiting = self._awaiting
if (
awaiting is None
or awaiting["wish_id"] != str(wish_id)
or awaiting["client_id"] != str(client_id)
):
return None
return awaiting["token"]
async def invoke(self, wish_id: Any, token: Any) -> dict:
"""Run the grant for the job at the counter. Called from the petitioner's
gradio API request so the GPU work runs in THEIR quota context."""
awaiting = self._awaiting
if (
awaiting is None
or awaiting["wish_id"] != str(wish_id)
or awaiting["token"] != str(token)
):
raise QueueError(NOT_YOUR_TURN_REASON)
if awaiting["begun"]:
raise QueueError(RITE_BEGUN_REASON)
awaiting["begun"] = True
if not awaiting["started"].done():
awaiting["started"].set_result(True)
try:
await self.grant_item(awaiting["item"])
return {"ok": True, "wish_id": str(wish_id)}
except Exception:
log.exception("invoked grant for %s crashed", wish_id)
await self._safe_emit(
{"type": "wish_rejected", "wish_id": str(wish_id), "reason": CRASH_REASON}
)
return {"ok": False, "wish_id": str(wish_id), "reason": CRASH_REASON}
finally:
if not awaiting["done"].done():
awaiting["done"].set_result(True)
# ----------------------------------------------------------------- grants
async def grant_item(self, item: _Item) -> None:
"""The whole rite for one job; runs in the worker task (server mode) or
inside the petitioner's gradio request (browser-invoked mode)."""
if item.kind == "fix":
await self._grant_fix(item)
else:
await self._grant_wish(item)
def _wrap_emit(self, item: _Item):
"""Planner-facing emit: inject this job's wish_id into any event that
lacks one (the plan event, thought tokens)."""
async def emit(event: dict) -> None:
if isinstance(event, dict) and "wish_id" not in event:
event = {**event, "wish_id": item.wish_id}
await self._safe_emit(event)
return emit
async def _apply_events(
self, item: _Item, city: CityState, events: list[dict], index: int
) -> tuple[int, int, str]:
"""world.apply each event; mirror into the live CityState; emit
tool_call + world_delta. Returns (next_index, landed, first_obs)."""
landed = 0
first_obs = ""
for call in events:
feature, observation = self._world.apply(item.wish_id, index, call)
index += 1
if not first_obs:
first_obs = observation
if feature is None:
log.warning("engine-composed event rejected: %s", observation)
continue
landed += 1
city.apply(feature)
await self._safe_emit(
{
"type": "tool_call",
"wish_id": item.wish_id,
"call_index": index - 1,
"tool": feature.tool,
"args": dict(feature.args),
}
)
await self._safe_emit({"type": "world_delta", "feature": feature.to_dict()})
return index, landed, first_obs
async def _grant_wish(self, item: _Item) -> None:
await self._safe_emit({"type": "wish_started", "wish_id": item.wish_id, "text": item.text})
await self._emit_queue()
verdict = await self._moderator.check(item.text)
if not verdict.allowed:
await self._safe_emit(
{
"type": "wish_rejected",
"wish_id": item.wish_id,
"reason": verdict.poetic_reason or "city hall declines this petition",
}
)
return
world_summary = self._world.summary()
city = self._city()
state = {"index": 0, "acts": 0, "landed": 0, "jobs_anchor": ""}
async def act(building_req: Any) -> str:
"""ONE building entry {kind,name,near,floors,hue} -> engine places
it (plus an auto-routed connector) and the events land."""
if not isinstance(building_req, dict):
return "rejected: malformed building request"
if state["acts"] >= MAX_BUILDINGS_PER_WISH or state["index"] >= self._max_calls:
return HANDS_FULL_OBSERVATION
state["acts"] += 1
kind = tools.resolve_kind(building_req.get("kind"))
name = tools.sanitize_name(
building_req.get("name"),
default=tools.default_name(kind, item.text, item.wish_id),
)
events = placement.place(
city,
kind=kind,
name=name,
floors=_num(building_req.get("floors")),
hue=_num(building_req.get("hue")),
near=str(building_req.get("near") or ""),
wish_id=item.wish_id,
call_index=state["index"],
)
state["index"], landed, first_obs = await self._apply_events(
item, city, events, state["index"]
)
state["landed"] += landed
if landed and C.BUILDINGS[kind]["jobs"] > 0:
state["jobs_anchor"] = name
return first_obs or "rejected: nothing placed"
trace = await self._planner.grant(item.text, world_summary, act, self._wrap_emit(item))
# If the petition built NOTHING and the planner never actually planned —
# every turn an error (GPU/model failure) — reject honestly and DON'T
# persist an empty trace that pollutes the shared ledger.
turns = _trace_get(trace, "turns") or []
all_errored = bool(turns) and all(
str((t or {}).get("observation") or "").startswith("error:") for t in turns
)
if state["landed"] == 0 and (not turns or all_errored):
await self._safe_emit(
{"type": "wish_rejected", "wish_id": item.wish_id, "reason": BUSY_HEAVENS_REASON}
)
return
if state["landed"]:
await self._apply_infill(item, city, state)
# SECURITY: the reading + epitaph are model-composed and persist to the
# public dataset + ledger. Re-moderate; redact on denial so no unvetted
# prose survives in the archive.
reading = _trace_get(trace, "reading")
if reading and not self._moderator.check_content(reading).allowed:
_trace_set(trace, "reading", "city hall read this petition in silence.")
epitaph = _trace_get(trace, "epitaph")
if not epitaph or not self._moderator.check_content(epitaph).allowed:
epitaph = _epitaph_from_reading(_trace_get(trace, "reading")) or FALLBACK_EPITAPH
_trace_set(trace, "epitaph", epitaph)
self._enrich_trace(trace, item)
await self._persist_trace(trace, item)
self._world.record_epitaph(epitaph)
await self._safe_emit(
{
"type": "wish_granted",
"wish_id": item.wish_id,
"epitaph": str(epitaph),
"epoch": self._world.epoch,
}
)
async def _apply_infill(self, item: _Item, city: CityState, state: dict) -> None:
"""Demand-driven infill: jobs outpacing homes pulls ONE bonus home in
near the new jobs (max 1 per petition)."""
jobs, housing = city.jobs, city.housing_capacity
if jobs <= housing * C.INFILL_RATIO or state["index"] + 2 > self._max_calls:
return
kind = "apartments" if jobs - housing >= C.INFILL_APARTMENTS_DEFICIT else "house"
events = placement.place(
city,
kind=kind,
name=tools.default_name(kind, item.text, item.wish_id + ":infill"),
floors=None,
hue=None,
near=state["jobs_anchor"],
wish_id=item.wish_id,
call_index=state["index"],
)
events.append({"tool": "note", "args": {"text": INFILL_NOTE, "kind": "infill"}})
state["index"], landed, _ = await self._apply_events(
item, city, events, state["index"]
)
state["landed"] += landed
async def _grant_fix(self, item: _Item) -> None:
await self._safe_emit({"type": "wish_started", "wish_id": item.wish_id, "text": item.text})
await self._emit_queue()
city = self._city()
now_s = time.time()
stats, candidates = traffic.snapshot(city, now_s)
if stats["max_ratio"] < C.FIX_GATE_RATIO or not candidates:
await self._safe_emit(
{
"type": "wish_rejected",
"wish_id": item.wish_id,
"reason": TRAFFIC_SMOOTH_REASON,
}
)
return
by_id = {c["id"]: c for c in candidates}
best = candidates[0] # sorted best-predicted-first
state = {"index": 0, "landed": 0, "applied": ""}
def canned_diagnosis(cand: dict) -> str:
return (
f"{stats['worst']} carries {stats['max_ratio']}x capacity at rush — "
f"{cand['desc']} cuts the index "
f"{stats['traffic_index']}->{cand['predicted_index']}."
)
async def act_fix(choice: Any) -> str:
"""The planner's pick {choice, diagnosis, blurb} -> ONE apply_fix
event. Invalid/missing choice falls back to the best candidate."""
if state["landed"]:
return "rejected: the fix is already applied"
choice = choice if isinstance(choice, dict) else {}
cand = by_id.get(str(choice.get("choice") or "").strip().upper()) or best
# model-composed diagnosis goes on the shared world: wordlist-gate it
raw = str(choice.get("diagnosis") or "").strip()
diagnosis = raw[:200] if raw and self._moderator.check_content(raw).allowed else ""
if not diagnosis:
diagnosis = canned_diagnosis(cand)
call = {
"tool": "apply_fix",
"args": {
"action": cand["action"],
"cells": cand["cells"],
"klass": cand["klass"],
"name": cand["name"],
"diagnosis": diagnosis,
"metrics_before": {
"traffic_index": stats["traffic_index"],
"worst": stats["worst"] or "",
},
"metrics_predicted": {"traffic_index": cand["predicted_index"]},
},
}
state["index"], landed, obs = await self._apply_events(
item, city, [call], state["index"]
)
state["landed"] += landed
if landed:
state["applied"] = cand["id"]
return obs
public = [
{k: c[k] for k in ("id", "action", "desc", "predicted_index")}
for c in candidates
]
trace: Any = None
planner_fix = getattr(self._planner, "fix", None)
if planner_fix is not None:
try:
trace = await planner_fix(stats, public, act_fix, self._wrap_emit(item))
except Exception:
log.exception("planner.fix crashed for %s; engine applies best", item.wish_id)
if state["landed"] == 0:
# parse failure / no planner: the engine applies the best predicted
# candidate with the canned diagnosis — a fix ALWAYS lands.
await act_fix({"choice": best["id"]})
epitaph = _trace_get(trace, "epitaph")
if not epitaph or not self._moderator.check_content(epitaph).allowed:
applied = by_id.get(state["applied"]) or best
epitaph = f"The City Engineer opened {applied['name']}."
if not isinstance(trace, dict):
trace = {"reading": "", "turns": []}
trace["epitaph"] = str(epitaph)
trace.setdefault("kind", "fix")
trace.setdefault("stats", stats)
self._enrich_trace(trace, item)
await self._persist_trace(trace, item)
self._world.record_epitaph(epitaph)
await self._safe_emit(
{
"type": "wish_granted",
"wish_id": item.wish_id,
"epitaph": str(epitaph),
"epoch": self._world.epoch,
}
)
# --------------------------------------------------------------- helpers
async def _persist_trace(self, trace: Any, item: _Item) -> None:
if self._persist is None or trace is None:
return
try:
result = self._persist(trace)
if inspect.isawaitable(result):
await result
except asyncio.CancelledError:
raise
except Exception:
log.exception("persist failed for %s (grant stands)", item.wish_id)
def _enrich_trace(self, trace: Any, item: _Item) -> None:
"""Fill trace fields the planner cannot know (wish_id, moderation, ...)."""
fields = {
"wish_id": item.wish_id,
"text": item.text,
"submitted_at": item.submitted_at,
}
try:
if isinstance(trace, dict):
for key, value in fields.items():
trace.setdefault(key, value)
trace.setdefault("moderation", {"allowed": True, "category": None})
elif trace is not None:
for key, value in fields.items():
if not getattr(trace, key, None):
setattr(trace, key, value)
if getattr(trace, "moderation", None) is None:
setattr(trace, "moderation", {"allowed": True, "category": None})
except Exception: # frozen dataclass etc. — enrichment is best-effort
log.debug("trace enrichment skipped for %s", item.wish_id, exc_info=True)
async def _emit_queue(self) -> None:
await self._safe_emit(
{"type": "queue", "length": self._queue.qsize(), "current": self.current}
)
async def _safe_emit(self, event: dict) -> None:
"""Broadcast one SSE event; emit failures must never break a grant."""
try:
await self._emit(event)
except asyncio.CancelledError:
raise
except Exception:
log.warning("emit failed for event type %s", event.get("type"), exc_info=True)