godseed / tests /test_queue_browser.py
AndresCarreon's picture
FIX zerogpu NVML crash: defer cuda placement into the GPU context (no startup CUDA init); reject failed wishes honestly instead of empty-granting
5789bc7 verified
Raw
History Blame Contribute Delete
9.87 kB
"""Browser-invoked queue mode — the ZeroGPU handshake (your_turn -> invoke).
Covers the paths the live Space depends on but end-to-end tests can't exercise
deterministically: wrong/stale tokens, the wandered-wisher timeout, double
invocation, and worker continuity after a skipped wish.
"""
from __future__ import annotations
import asyncio
import pytest
from engine.genesis import genesis_features
from engine.moderation import Moderator
from engine.queue_worker import (
NOT_YOUR_TURN_REASON,
RITE_BEGUN_REASON,
WANDERED_REASON,
QueueError,
QueueWorker,
)
from engine.world import World
def make_world() -> World:
return World.load(genesis_features())
class SlowPlanner:
"""Grants after a short sleep so double-invoke can race the rite."""
def __init__(self, delay: float = 0.0):
self.delay = delay
async def grant(self, wish, world_summary, act, emit):
if self.delay:
await asyncio.sleep(self.delay)
await act({"tool": "raise_terrain",
"args": {"lat": 10, "lon": 10, "radius_deg": 5, "height": 0.05, "roughness": 0.4}})
return {"reading": "so be it", "turns": [], "epitaph": f"granted: {wish[:20]}"}
async def _wait_for(events, type_, wish_id=None, timeout=5.0):
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout
while loop.time() < deadline:
for event in events:
if event.get("type") == type_ and (wish_id is None or event.get("wish_id") == wish_id):
return event
await asyncio.sleep(0.01)
raise AssertionError(f"timeout waiting for {type_} ({wish_id})")
def _worker(events, planner=None, **kwargs) -> QueueWorker:
async def emit(event):
events.append(event)
return QueueWorker(
make_world(), Moderator(), planner or SlowPlanner(), emit,
browser_invoked=True, **kwargs,
)
def test_handshake_tokens_and_grant():
async def run():
events: list[dict] = []
worker = _worker(events)
await worker.start()
try:
wish_id, _ = await worker.submit("gentle green hills", "c1")
turn = await _wait_for(events, "your_turn", wish_id)
# SECURITY: the token must NOT be broadcast over SSE
assert "token" not in turn and turn["window_s"] > 0
# the token is identity-bound: only the owner (c1) can fetch it
assert worker.turn_token(wish_id, "stranger") is None
assert worker.turn_token("w_999999", "c1") is None
token = worker.turn_token(wish_id, "c1")
assert token
with pytest.raises(QueueError) as exc:
await worker.invoke(wish_id, "not-the-token")
assert exc.value.reason == NOT_YOUR_TURN_REASON
with pytest.raises(QueueError):
await worker.invoke("w_999999", token)
result = await worker.invoke(wish_id, token)
assert result == {"ok": True, "wish_id": wish_id}
granted = await _wait_for(events, "wish_granted", wish_id)
assert granted["epitaph"].startswith("granted:")
# the turn is spent: a replayed invocation is refused (which of the
# two refusals depends on whether the worker task has been
# scheduled to clear the slot yet — both are correct)
with pytest.raises(QueueError) as late:
await worker.invoke(wish_id, token)
assert late.value.reason in (NOT_YOUR_TURN_REASON, RITE_BEGUN_REASON)
# and only one grant ever landed
assert sum(1 for e in events if e.get("type") == "wish_granted") == 1
finally:
await worker.stop()
asyncio.run(run())
def test_wandered_wisher_is_skipped_and_queue_continues():
async def run():
events: list[dict] = []
worker = _worker(events, invocation_window=0.1)
await worker.start()
try:
ghost_id, _ = await worker.submit("a wish nobody returns for", "ghost")
await _wait_for(events, "your_turn", ghost_id)
rejected = await _wait_for(events, "wish_rejected", ghost_id)
assert rejected["reason"] == WANDERED_REASON
# the god moved on: the next wish gets its turn and can be granted
next_id, _ = await worker.submit("a patient wish", "present")
await _wait_for(events, "your_turn", next_id)
await worker.invoke(next_id, worker.turn_token(next_id, "present"))
await _wait_for(events, "wish_granted", next_id)
finally:
await worker.stop()
asyncio.run(run())
def test_double_invoke_during_rite_is_refused():
async def run():
events: list[dict] = []
worker = _worker(events, planner=SlowPlanner(delay=0.3))
await worker.start()
try:
wish_id, _ = await worker.submit("slow sacred work", "c1")
await _wait_for(events, "your_turn", wish_id)
token = worker.turn_token(wish_id, "c1")
first = asyncio.create_task(worker.invoke(wish_id, token))
await asyncio.sleep(0.05) # the rite has begun
with pytest.raises(QueueError) as exc:
await worker.invoke(wish_id, token)
assert exc.value.reason == RITE_BEGUN_REASON
assert (await first) == {"ok": True, "wish_id": wish_id}
await _wait_for(events, "wish_granted", wish_id)
finally:
await worker.stop()
asyncio.run(run())
def test_crashing_grant_reports_and_frees_the_queue():
class Bomb:
async def grant(self, wish, world_summary, act, emit):
raise RuntimeError("boom")
async def run():
events: list[dict] = []
worker = _worker(events, planner=Bomb())
await worker.start()
try:
wish_id, _ = await worker.submit("explode quietly", "c1")
await _wait_for(events, "your_turn", wish_id)
result = await worker.invoke(wish_id, worker.turn_token(wish_id, "c1"))
assert result["ok"] is False
assert "boom" not in result["reason"] # poetic, never a stack trace
rejected = await _wait_for(events, "wish_rejected", wish_id)
assert "boom" not in rejected["reason"]
# slot freed once the worker resumes; the same client may wish again
await asyncio.sleep(0.05)
again_id, _ = await worker.submit("try once more", "c1")
await _wait_for(events, "your_turn", again_id)
finally:
await worker.stop()
asyncio.run(run())
def test_inscribe_text_is_re_moderated_before_landing():
"""A clean wish must not let the model write a slur permanently onto the
shared world via inscribe_wish (verify-fleet critical)."""
class Inscriber:
async def grant(self, wish, world_summary, act, emit):
ok = await act({"tool": "inscribe_wish",
"args": {"text": "a gentle blessing for travelers", "style": "orbit"}})
bad = await act({"tool": "inscribe_wish",
"args": {"text": "kill all of them", "style": "stone"}})
return {"reading": "r", "turns": [], "epitaph": "done", "ok": ok, "bad": bad}
async def run():
events: list[dict] = []
world = make_world()
captured = {}
async def emit(e):
events.append(e)
planner = Inscriber()
async def persist(trace):
captured.update(trace if isinstance(trace, dict) else {})
worker = QueueWorker(world, Moderator(), planner, emit, persist=persist,
browser_invoked=False)
await worker.start()
try:
wid, _ = await worker.submit("bless the travelers", "c1")
await _wait_for(events, "wish_granted", wid)
finally:
await worker.stop()
inscriptions = [e["feature"]["args"]["text"]
for e in events
if e.get("type") == "world_delta"
and e["feature"]["tool"] == "inscribe_wish"]
assert "a gentle blessing for travelers" in inscriptions
assert all("kill" not in t for t in inscriptions)
asyncio.run(run())
def test_all_errored_wish_is_rejected_not_granted_empty():
"""A wish where every turn errored (GPU/model down) must reject honestly,
not silently 'grant' an empty wish (June 12 live incident)."""
from engine.queue_worker import BUSY_HEAVENS_REASON
class DeadModel:
async def grant(self, wish, world_summary, act, emit):
# planner returns its fallback reading + all-error turns, no act() calls
return {"reading": "the god is silent.", "epitaph": "",
"turns": [{"thought": "", "call": None, "observation": "error: backend failure; turn skipped"},
{"thought": "", "call": None, "observation": "error: backend failure; turn skipped"}]}
async def run():
events, persisted = [], []
async def emit(e):
events.append(e)
def persist(t):
persisted.append(t)
worker = QueueWorker(make_world(), Moderator(), DeadModel(), emit, persist=persist,
browser_invoked=False)
await worker.start()
try:
wid, _ = await worker.submit("found a town here", "c1")
rejected = await _wait_for(events, "wish_rejected", wid)
assert rejected["reason"] == BUSY_HEAVENS_REASON
finally:
await worker.stop()
# nothing granted, nothing persisted to pollute the log
assert not any(e.get("type") == "wish_granted" for e in events)
assert persisted == []
asyncio.run(run())