godseed / engine /queue_worker.py
AndresCarreon's picture
FORGIVING TOWNS: map any building kind (never reject), district reroute, restore god's voice (content-only moderation), town few-shot pushes build_district+roads, honest fail on empty
baf9d60 verified
Raw
History Blame Contribute Delete
20.6 kB
"""GODSEED wish queue — one god, one wish at a time.
A single global asyncio worker serializes every wish:
pop -> moderate -> planner.grant(wish, world_summary, act, emit)
-> persist trace (injected callback) -> emit wish_granted
Admission control in `submit`: 1 pending wish per client, queue cap 50
("the god is overwhelmed; return at dusk"). The hourly rate limit lives in
the server's ratelimit.py, not here.
Crash-safe by construction: an exception while granting one wish emits a
poetic generic wish_rejected and the worker moves on. SSE emit failures are
swallowed (a broken pipe must never reject a wish), persist failures are
logged but do not undo a grant (the world already changed, durably enough
until the next snapshot).
Stdlib only; this is the only async module in the engine.
"""
from __future__ import annotations
import asyncio
import contextlib
import inspect
import logging
import re
import time
import uuid
from dataclasses import dataclass
from typing import Any, Optional
log = logging.getLogger("godseed.engine.queue")
QUEUE_MAX = 50
MAX_CALLS_PER_WISH = 12 # planner caps at 7 turns; this is the engine backstop
TEXT_PREVIEW_LEN = 60
_SUBMIT_TEXT_CAP = 500 # keep raw text bounded; moderation still sees >140 and denies
# Browser-invoked granting (ZeroGPU): GPU work must run inside the WISHER'S
# gradio request (their HF auth headers carry the quota — a server-side
# background spaces.GPU call fails by design). The worker emits your_turn and
# waits for the browser to fetch its token (authenticated) then invoke().
# Windows kept tight so an un-invoked wish can't freeze the shared world for
# long: a real browser fetches + invokes within ~2s; 25s covers slow connects.
INVOCATION_WINDOW_S = 25.0
GRANT_DEADLINE_S = 150.0 # a 9B wish runs ~50-90s; this bounds a hung grant
QUEUE_FULL_REASON = "the god is overwhelmed; return at dusk"
ALREADY_PENDING_REASON = "one wish per soul at a time; yours already stands before the god"
WANDERED_REASON = "the wisher wandered from the temple; the god moved on"
INTERRUPTED_REASON = "the rite was interrupted; the god moved on"
NOT_YOUR_TURN_REASON = "this wish is not before the god"
RITE_BEGUN_REASON = "the rite has already begun"
CRASH_REASON = "the god reached for this wish and it slipped beyond the veil; wish again"
BUSY_HEAVENS_REASON = "the heavens are busy and the god could not reach your wish; offer it again in a moment"
HANDS_FULL_OBSERVATION = "rejected: the god's hands are full; say done"
FALLBACK_EPITAPH = "it is done"
_WISH_ID_RE = re.compile(r"^w_(\d+)$")
class QueueError(Exception):
"""Admission failure; `.reason` is poetic and safe to show users."""
def __init__(self, reason: str) -> None:
super().__init__(reason)
self.reason = reason
class QueueFull(QueueError):
pass
class AlreadyPending(QueueError):
pass
@dataclass
class _Item:
wish_id: str
text: str
client_id: str
submitted_at: float
def _trace_get(trace: Any, key: str) -> Any:
if isinstance(trace, dict):
return trace.get(key)
return getattr(trace, key, None)
def _trace_set(trace: Any, key: str, value: Any) -> None:
try:
if isinstance(trace, dict):
trace[key] = value
else:
setattr(trace, key, value)
except Exception: # frozen dataclass etc. — best-effort
pass
def _epitaph_from_reading(reading: Any) -> Optional[str]:
"""When the model never reaches a clean done-turn, the log fills with a
generic epitaph. Salvage the reading's last sentence instead — it's the
god's own closing line about THIS wish, so the Genesis Log stays varied."""
text = str(reading or "").strip()
if not text:
return None
parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", text) if p.strip()]
return parts[-1][:120] if parts else None
class QueueWorker:
"""Single serialized worker; construct once, `await start()` at boot.
Injected collaborators (duck-typed, no engine->mind/server imports):
world engine.world.World (apply/summary/record_epitaph/epoch)
moderator engine.moderation.Moderator (async check)
planner object with `async grant(wish, world_summary, act, emit)`
emit async callable broadcasting one SSE event dict
persist callable (sync or async) receiving the WishTrace; optional
"""
def __init__(
self,
world: Any,
moderator: Any,
planner: Any,
emit: Any,
persist: Any = None,
*,
max_queue: int = QUEUE_MAX,
max_calls_per_wish: int = MAX_CALLS_PER_WISH,
next_wish_index: Optional[int] = None,
browser_invoked: bool = False,
invocation_window: float = INVOCATION_WINDOW_S,
grant_deadline: float = GRANT_DEADLINE_S,
) -> None:
self._world = world
self._moderator = moderator
self._planner = planner
self._emit = emit
self._persist = persist
self._max_queue = max_queue
self._max_calls = max_calls_per_wish
self._queue: asyncio.Queue[_Item] = asyncio.Queue()
self._pending: dict[str, str] = {} # client_id -> wish_id
self._current: Optional[_Item] = None
self._task: Optional[asyncio.Task] = None
self._browser_invoked = browser_invoked
self._invocation_window = invocation_window
self._grant_deadline = grant_deadline
self._awaiting: Optional[dict[str, Any]] = None
self._wish_counter = (
next_wish_index if next_wish_index is not None else self._derive_wish_counter(world)
)
@staticmethod
def _derive_wish_counter(world: Any) -> int:
"""Continue numbering after the highest persisted w_NNNNNN."""
best = 0
for feature in getattr(world, "features", ()) or ():
match = _WISH_ID_RE.match(getattr(feature, "wish_id", "") or "")
if match:
best = max(best, int(match.group(1)) + 1)
return best or 1 # first-ever wish is w_000001
# --------------------------------------------------------------- admission
async def submit(self, text: Any, client_id: Any) -> tuple[str, int]:
"""Enqueue a wish. Returns (wish_id, position). Position counts the
wish itself plus everyone ahead of it (including the one being granted),
so an idle empty queue yields position 1.
Raises AlreadyPending / QueueFull with poetic `.reason`.
"""
text = ("" if text is None else str(text)).strip()[:_SUBMIT_TEXT_CAP]
client_id = str(client_id)
if client_id in self._pending:
raise AlreadyPending(ALREADY_PENDING_REASON)
if self._queue.qsize() >= self._max_queue:
raise QueueFull(QUEUE_FULL_REASON)
wish_id = f"w_{self._wish_counter:06d}"
self._wish_counter += 1
item = _Item(wish_id=wish_id, text=text, client_id=client_id, submitted_at=time.time())
self._pending[client_id] = wish_id
self._queue.put_nowait(item)
position = self._queue.qsize() + (1 if self._current else 0)
await self._emit_queue()
return wish_id, position
# --------------------------------------------------------------- lifecycle
async def start(self) -> None:
if self._task is None or self._task.done():
self._task = asyncio.get_running_loop().create_task(
self._run(), name="godseed-queue-worker"
)
async def stop(self) -> None:
if self._task is not None:
self._task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._task
self._task = None
@property
def is_running(self) -> bool:
return self._task is not None and not self._task.done()
@property
def queue_length(self) -> int:
return self._queue.qsize()
@property
def current(self) -> Optional[dict]:
if self._current is None:
return None
return {
"wish_id": self._current.wish_id,
"text_preview": self._current.text[:TEXT_PREVIEW_LEN],
}
def snapshot(self) -> dict:
"""The server's QueueLike view: waiting count + the wish being granted."""
return {"length": self._queue.qsize(), "current": self.current}
# --------------------------------------------------------------- worker
async def _run(self) -> None:
while True:
item = await self._queue.get()
self._current = item
try:
await self._process(item)
except asyncio.CancelledError:
raise
except Exception:
# one wish must NEVER kill the worker
log.exception("wish %s crashed; the god moves on", item.wish_id)
await self._safe_emit(
{"type": "wish_rejected", "wish_id": item.wish_id, "reason": CRASH_REASON}
)
finally:
self._current = None
self._pending.pop(item.client_id, None)
self._queue.task_done()
await self._emit_queue()
async def _process(self, item: _Item) -> None:
if not self._browser_invoked:
await self.grant_item(item)
return
# Browser-invoked mode: announce the turn, then wait for the wisher's
# browser to call invoke() (a gradio API request carrying their ZeroGPU
# quota). No invocation within the window -> the god moves on.
token = uuid.uuid4().hex
loop = asyncio.get_running_loop()
started: asyncio.Future[bool] = loop.create_future()
done: asyncio.Future[bool] = loop.create_future()
self._awaiting = {
"wish_id": item.wish_id,
"token": token,
"client_id": item.client_id,
"item": item,
"started": started,
"done": done,
"begun": False,
}
try:
# SECURITY: the token is NOT broadcast. your_turn carries only the
# wish_id over the public SSE; the owner's browser fetches the secret
# token from GET /api/turn (cookie-authenticated, identity-bound).
# A stranger tailing SSE sees the wish_id but cannot get the token.
turn_event = {
"type": "your_turn",
"wish_id": item.wish_id,
"window_s": self._invocation_window,
}
# Announce in thirds: re-emits cover SSE reconnects and slow tabs
# (the browser also buffers early arrivals, but belt and braces).
third = max(self._invocation_window / 3.0, 0.01)
invoked = False
for _ in range(3):
await self._safe_emit(turn_event)
try:
await asyncio.wait_for(asyncio.shield(started), third)
invoked = True
break
except asyncio.TimeoutError:
continue
if not invoked:
await self._safe_emit(
{"type": "wish_rejected", "wish_id": item.wish_id, "reason": WANDERED_REASON}
)
return
try:
await asyncio.wait_for(asyncio.shield(done), self._grant_deadline)
except asyncio.TimeoutError:
# The invoked grant is hung past any sane wish length; the queue
# must not stall behind it. (The stray request may still finish
# server-side — logged, accepted as a rare edge.)
log.error("grant for %s exceeded deadline; moving on", item.wish_id)
await self._safe_emit(
{"type": "wish_rejected", "wish_id": item.wish_id, "reason": INTERRUPTED_REASON}
)
finally:
self._awaiting = None
def turn_token(self, wish_id: Any, client_id: Any) -> Optional[str]:
"""Return the secret invocation token for the wish currently before the
god — but ONLY to its owner (identity-bound). The server exposes this
over a cookie-authenticated route so the token never touches public SSE.
Returns None when it isn't this caller's turn."""
awaiting = self._awaiting
if (
awaiting is None
or awaiting["wish_id"] != str(wish_id)
or awaiting["client_id"] != str(client_id)
):
return None
return awaiting["token"]
async def invoke(self, wish_id: Any, token: Any) -> dict:
"""Run the grant for the wish currently before the god. Called from the
wisher's gradio API request so the GPU work runs in THEIR quota context.
The token is a secret capability delivered only to the authenticated
owner (see turn_token). Raises QueueError when it isn't this wish's turn."""
awaiting = self._awaiting
if (
awaiting is None
or awaiting["wish_id"] != str(wish_id)
or awaiting["token"] != str(token)
):
raise QueueError(NOT_YOUR_TURN_REASON)
if awaiting["begun"]:
raise QueueError(RITE_BEGUN_REASON)
awaiting["begun"] = True
if not awaiting["started"].done():
awaiting["started"].set_result(True)
try:
await self.grant_item(awaiting["item"])
return {"ok": True, "wish_id": str(wish_id)}
except Exception:
log.exception("invoked grant for %s crashed", wish_id)
await self._safe_emit(
{"type": "wish_rejected", "wish_id": str(wish_id), "reason": CRASH_REASON}
)
return {"ok": False, "wish_id": str(wish_id), "reason": CRASH_REASON}
finally:
if not awaiting["done"].done():
awaiting["done"].set_result(True)
async def grant_item(self, item: _Item) -> None:
"""Moderate -> plan/act -> persist -> emit. The whole rite for one wish;
runs in the worker task (server mode) or inside the wisher's gradio
request (browser-invoked mode)."""
await self._safe_emit({"type": "wish_started", "wish_id": item.wish_id, "text": item.text})
await self._emit_queue()
verdict = await self._moderator.check(item.text)
if not verdict.allowed:
await self._safe_emit(
{
"type": "wish_rejected",
"wish_id": item.wish_id,
"reason": verdict.poetic_reason or "the god declines this wish",
}
)
return
world_summary = self._world.summary()
calls = 0
landed = 0 # features actually applied to the world this wish
async def act(call: dict) -> str:
"""Execute one tool call; emits tool_call + world_delta on success."""
nonlocal calls, landed
index = calls
calls += 1
if index >= self._max_calls:
return HANDS_FULL_OBSERVATION
# SECURITY: the MODEL composes inscribe_wish text — a clean wish can
# still make it write a disallowed phrase that gets drawn PERMANENTLY
# onto the shared world. Re-moderate that text before it can land;
# default-deny so the world never carries an unvetted inscription.
if isinstance(call, dict) and call.get("tool") == "inscribe_wish":
ins = (call.get("args") or {}).get("text", "")
if not self._moderator.check_content(ins).allowed:
return "rejected: those words may not be written here; choose gentler ones"
feature, observation = self._world.apply(item.wish_id, index, call)
if feature is not None:
landed += 1
await self._safe_emit(
{
"type": "tool_call",
"wish_id": item.wish_id,
"call_index": index,
"tool": feature.tool,
"args": dict(feature.args),
}
)
await self._safe_emit({"type": "world_delta", "feature": feature.to_dict()})
return observation
trace = await self._planner.grant(item.text, world_summary, act, self._safe_emit)
# If the wish built NOTHING and the god never actually planned — every
# turn an error (GPU/model failure), not a clean "done" — reject honestly
# and DON'T persist an empty trace that pollutes the shared Genesis Log.
# A silent empty grant looks broken to the wisher (June 12: "can't see if
# something happened"). A genuine done-with-no-build leaves a non-error
# turn, so this only fires on real failure.
turns = _trace_get(trace, "turns") or []
all_errored = bool(turns) and all(
str((t or {}).get("observation") or "").startswith("error:") for t in turns
)
if landed == 0 and (not turns or all_errored):
await self._safe_emit(
{"type": "wish_rejected", "wish_id": item.wish_id, "reason": BUSY_HEAVENS_REASON}
)
return
# SECURITY: the reading + epitaph are model-composed and persist to the
# public dataset + the Genesis Log. Re-moderate; redact on denial so no
# unvetted prose survives in the archive. (The reading already streamed
# live, but the durable record — what judges and visitors re-read — is
# clean.) Epitaph shows on the grant card, so a denied one is replaced.
reading = _trace_get(trace, "reading")
if reading and not self._moderator.check_content(reading).allowed:
_trace_set(trace, "reading", "the god read this wish in silence.")
epitaph = _trace_get(trace, "epitaph")
if not epitaph or not self._moderator.check_content(epitaph).allowed:
epitaph = _epitaph_from_reading(_trace_get(trace, "reading")) or FALLBACK_EPITAPH
_trace_set(trace, "epitaph", epitaph)
self._enrich_trace(trace, item)
if self._persist is not None:
try:
result = self._persist(trace)
if inspect.isawaitable(result):
result = await result
except asyncio.CancelledError:
raise
except Exception:
log.exception("persist failed for %s (grant stands)", item.wish_id)
self._world.record_epitaph(epitaph)
await self._safe_emit(
{
"type": "wish_granted",
"wish_id": item.wish_id,
"epitaph": str(epitaph),
"epoch": self._world.epoch,
}
)
# --------------------------------------------------------------- helpers
def _enrich_trace(self, trace: Any, item: _Item) -> None:
"""Fill trace fields the planner cannot know (wish_id, moderation, ...)."""
fields = {
"wish_id": item.wish_id,
"text": item.text,
"submitted_at": item.submitted_at,
}
try:
if isinstance(trace, dict):
for key, value in fields.items():
trace.setdefault(key, value)
trace.setdefault("moderation", {"allowed": True, "category": None})
elif trace is not None:
for key, value in fields.items():
if not getattr(trace, key, None):
setattr(trace, key, value)
if getattr(trace, "moderation", None) is None:
setattr(trace, "moderation", {"allowed": True, "category": None})
except Exception: # frozen dataclass etc. — enrichment is best-effort
log.debug("trace enrichment skipped for %s", item.wish_id, exc_info=True)
async def _emit_queue(self) -> None:
await self._safe_emit(
{"type": "queue", "length": self._queue.qsize(), "current": self.current}
)
async def _safe_emit(self, event: dict) -> None:
"""Broadcast one SSE event; emit failures must never break a grant."""
try:
await self._emit(event)
except asyncio.CancelledError:
raise
except Exception:
log.warning("emit failed for event type %s", event.get("type"), exc_info=True)