"""Browser-invoked queue mode — the ZeroGPU handshake (your_turn -> invoke). Covers the paths the live Space depends on but end-to-end tests can't exercise deterministically: wrong/stale tokens, the wandered-wisher timeout, double invocation, and worker continuity after a skipped wish. """ from __future__ import annotations import asyncio import pytest from engine.genesis import genesis_features from engine.moderation import Moderator from engine.queue_worker import ( NOT_YOUR_TURN_REASON, RITE_BEGUN_REASON, WANDERED_REASON, QueueError, QueueWorker, ) from engine.world import World def make_world() -> World: return World.load(genesis_features()) class SlowPlanner: """Grants after a short sleep so double-invoke can race the rite.""" def __init__(self, delay: float = 0.0): self.delay = delay async def grant(self, wish, world_summary, act, emit): if self.delay: await asyncio.sleep(self.delay) await act({"tool": "raise_terrain", "args": {"lat": 10, "lon": 10, "radius_deg": 5, "height": 0.05, "roughness": 0.4}}) return {"reading": "so be it", "turns": [], "epitaph": f"granted: {wish[:20]}"} async def _wait_for(events, type_, wish_id=None, timeout=5.0): loop = asyncio.get_running_loop() deadline = loop.time() + timeout while loop.time() < deadline: for event in events: if event.get("type") == type_ and (wish_id is None or event.get("wish_id") == wish_id): return event await asyncio.sleep(0.01) raise AssertionError(f"timeout waiting for {type_} ({wish_id})") def _worker(events, planner=None, **kwargs) -> QueueWorker: async def emit(event): events.append(event) return QueueWorker( make_world(), Moderator(), planner or SlowPlanner(), emit, browser_invoked=True, **kwargs, ) def test_handshake_tokens_and_grant(): async def run(): events: list[dict] = [] worker = _worker(events) await worker.start() try: wish_id, _ = await worker.submit("gentle green hills", "c1") turn = await _wait_for(events, "your_turn", wish_id) # SECURITY: the token must NOT be broadcast over SSE assert "token" not in turn and turn["window_s"] > 0 # the token is identity-bound: only the owner (c1) can fetch it assert worker.turn_token(wish_id, "stranger") is None assert worker.turn_token("w_999999", "c1") is None token = worker.turn_token(wish_id, "c1") assert token with pytest.raises(QueueError) as exc: await worker.invoke(wish_id, "not-the-token") assert exc.value.reason == NOT_YOUR_TURN_REASON with pytest.raises(QueueError): await worker.invoke("w_999999", token) result = await worker.invoke(wish_id, token) assert result == {"ok": True, "wish_id": wish_id} granted = await _wait_for(events, "wish_granted", wish_id) assert granted["epitaph"].startswith("granted:") # the turn is spent: a replayed invocation is refused (which of the # two refusals depends on whether the worker task has been # scheduled to clear the slot yet — both are correct) with pytest.raises(QueueError) as late: await worker.invoke(wish_id, token) assert late.value.reason in (NOT_YOUR_TURN_REASON, RITE_BEGUN_REASON) # and only one grant ever landed assert sum(1 for e in events if e.get("type") == "wish_granted") == 1 finally: await worker.stop() asyncio.run(run()) def test_wandered_wisher_is_skipped_and_queue_continues(): async def run(): events: list[dict] = [] worker = _worker(events, invocation_window=0.1) await worker.start() try: ghost_id, _ = await worker.submit("a wish nobody returns for", "ghost") await _wait_for(events, "your_turn", ghost_id) rejected = await _wait_for(events, "wish_rejected", ghost_id) assert rejected["reason"] == WANDERED_REASON # the god moved on: the next wish gets its turn and can be granted next_id, _ = await worker.submit("a patient wish", "present") await _wait_for(events, "your_turn", next_id) await worker.invoke(next_id, worker.turn_token(next_id, "present")) await _wait_for(events, "wish_granted", next_id) finally: await worker.stop() asyncio.run(run()) def test_double_invoke_during_rite_is_refused(): async def run(): events: list[dict] = [] worker = _worker(events, planner=SlowPlanner(delay=0.3)) await worker.start() try: wish_id, _ = await worker.submit("slow sacred work", "c1") await _wait_for(events, "your_turn", wish_id) token = worker.turn_token(wish_id, "c1") first = asyncio.create_task(worker.invoke(wish_id, token)) await asyncio.sleep(0.05) # the rite has begun with pytest.raises(QueueError) as exc: await worker.invoke(wish_id, token) assert exc.value.reason == RITE_BEGUN_REASON assert (await first) == {"ok": True, "wish_id": wish_id} await _wait_for(events, "wish_granted", wish_id) finally: await worker.stop() asyncio.run(run()) def test_crashing_grant_reports_and_frees_the_queue(): class Bomb: async def grant(self, wish, world_summary, act, emit): raise RuntimeError("boom") async def run(): events: list[dict] = [] worker = _worker(events, planner=Bomb()) await worker.start() try: wish_id, _ = await worker.submit("explode quietly", "c1") await _wait_for(events, "your_turn", wish_id) result = await worker.invoke(wish_id, worker.turn_token(wish_id, "c1")) assert result["ok"] is False assert "boom" not in result["reason"] # poetic, never a stack trace rejected = await _wait_for(events, "wish_rejected", wish_id) assert "boom" not in rejected["reason"] # slot freed once the worker resumes; the same client may wish again await asyncio.sleep(0.05) again_id, _ = await worker.submit("try once more", "c1") await _wait_for(events, "your_turn", again_id) finally: await worker.stop() asyncio.run(run()) def test_inscribe_text_is_re_moderated_before_landing(): """A clean wish must not let the model write a slur permanently onto the shared world via inscribe_wish (verify-fleet critical).""" class Inscriber: async def grant(self, wish, world_summary, act, emit): ok = await act({"tool": "inscribe_wish", "args": {"text": "a gentle blessing for travelers", "style": "orbit"}}) bad = await act({"tool": "inscribe_wish", "args": {"text": "kill all of them", "style": "stone"}}) return {"reading": "r", "turns": [], "epitaph": "done", "ok": ok, "bad": bad} async def run(): events: list[dict] = [] world = make_world() captured = {} async def emit(e): events.append(e) planner = Inscriber() async def persist(trace): captured.update(trace if isinstance(trace, dict) else {}) worker = QueueWorker(world, Moderator(), planner, emit, persist=persist, browser_invoked=False) await worker.start() try: wid, _ = await worker.submit("bless the travelers", "c1") await _wait_for(events, "wish_granted", wid) finally: await worker.stop() inscriptions = [e["feature"]["args"]["text"] for e in events if e.get("type") == "world_delta" and e["feature"]["tool"] == "inscribe_wish"] assert "a gentle blessing for travelers" in inscriptions assert all("kill" not in t for t in inscriptions) asyncio.run(run()) def test_all_errored_wish_is_rejected_not_granted_empty(): """A wish where every turn errored (GPU/model down) must reject honestly, not silently 'grant' an empty wish (June 12 live incident).""" from engine.queue_worker import BUSY_HEAVENS_REASON class DeadModel: async def grant(self, wish, world_summary, act, emit): # planner returns its fallback reading + all-error turns, no act() calls return {"reading": "the god is silent.", "epitaph": "", "turns": [{"thought": "", "call": None, "observation": "error: backend failure; turn skipped"}, {"thought": "", "call": None, "observation": "error: backend failure; turn skipped"}]} async def run(): events, persisted = [], [] async def emit(e): events.append(e) def persist(t): persisted.append(t) worker = QueueWorker(make_world(), Moderator(), DeadModel(), emit, persist=persist, browser_invoked=False) await worker.start() try: wid, _ = await worker.submit("found a town here", "c1") rejected = await _wait_for(events, "wish_rejected", wid) assert rejected["reason"] == BUSY_HEAVENS_REASON finally: await worker.stop() # nothing granted, nothing persisted to pollute the log assert not any(e.get("type") == "wish_granted" for e in events) assert persisted == [] asyncio.run(run())