Spaces:
Sleeping
Sleeping
FIX zerogpu NVML crash: defer cuda placement into the GPU context (no startup CUDA init); reject failed wishes honestly instead of empty-granting
5789bc7 verified | """Browser-invoked queue mode — the ZeroGPU handshake (your_turn -> invoke). | |
| Covers the paths the live Space depends on but end-to-end tests can't exercise | |
| deterministically: wrong/stale tokens, the wandered-wisher timeout, double | |
| invocation, and worker continuity after a skipped wish. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import pytest | |
| from engine.genesis import genesis_features | |
| from engine.moderation import Moderator | |
| from engine.queue_worker import ( | |
| NOT_YOUR_TURN_REASON, | |
| RITE_BEGUN_REASON, | |
| WANDERED_REASON, | |
| QueueError, | |
| QueueWorker, | |
| ) | |
| from engine.world import World | |
| def make_world() -> World: | |
| return World.load(genesis_features()) | |
| class SlowPlanner: | |
| """Grants after a short sleep so double-invoke can race the rite.""" | |
| def __init__(self, delay: float = 0.0): | |
| self.delay = delay | |
| async def grant(self, wish, world_summary, act, emit): | |
| if self.delay: | |
| await asyncio.sleep(self.delay) | |
| await act({"tool": "raise_terrain", | |
| "args": {"lat": 10, "lon": 10, "radius_deg": 5, "height": 0.05, "roughness": 0.4}}) | |
| return {"reading": "so be it", "turns": [], "epitaph": f"granted: {wish[:20]}"} | |
| async def _wait_for(events, type_, wish_id=None, timeout=5.0): | |
| loop = asyncio.get_running_loop() | |
| deadline = loop.time() + timeout | |
| while loop.time() < deadline: | |
| for event in events: | |
| if event.get("type") == type_ and (wish_id is None or event.get("wish_id") == wish_id): | |
| return event | |
| await asyncio.sleep(0.01) | |
| raise AssertionError(f"timeout waiting for {type_} ({wish_id})") | |
| def _worker(events, planner=None, **kwargs) -> QueueWorker: | |
| async def emit(event): | |
| events.append(event) | |
| return QueueWorker( | |
| make_world(), Moderator(), planner or SlowPlanner(), emit, | |
| browser_invoked=True, **kwargs, | |
| ) | |
| def test_handshake_tokens_and_grant(): | |
| async def run(): | |
| events: list[dict] = [] | |
| worker = _worker(events) | |
| await worker.start() | |
| try: | |
| wish_id, _ = await worker.submit("gentle green hills", "c1") | |
| turn = await _wait_for(events, "your_turn", wish_id) | |
| # SECURITY: the token must NOT be broadcast over SSE | |
| assert "token" not in turn and turn["window_s"] > 0 | |
| # the token is identity-bound: only the owner (c1) can fetch it | |
| assert worker.turn_token(wish_id, "stranger") is None | |
| assert worker.turn_token("w_999999", "c1") is None | |
| token = worker.turn_token(wish_id, "c1") | |
| assert token | |
| with pytest.raises(QueueError) as exc: | |
| await worker.invoke(wish_id, "not-the-token") | |
| assert exc.value.reason == NOT_YOUR_TURN_REASON | |
| with pytest.raises(QueueError): | |
| await worker.invoke("w_999999", token) | |
| result = await worker.invoke(wish_id, token) | |
| assert result == {"ok": True, "wish_id": wish_id} | |
| granted = await _wait_for(events, "wish_granted", wish_id) | |
| assert granted["epitaph"].startswith("granted:") | |
| # the turn is spent: a replayed invocation is refused (which of the | |
| # two refusals depends on whether the worker task has been | |
| # scheduled to clear the slot yet — both are correct) | |
| with pytest.raises(QueueError) as late: | |
| await worker.invoke(wish_id, token) | |
| assert late.value.reason in (NOT_YOUR_TURN_REASON, RITE_BEGUN_REASON) | |
| # and only one grant ever landed | |
| assert sum(1 for e in events if e.get("type") == "wish_granted") == 1 | |
| finally: | |
| await worker.stop() | |
| asyncio.run(run()) | |
| def test_wandered_wisher_is_skipped_and_queue_continues(): | |
| async def run(): | |
| events: list[dict] = [] | |
| worker = _worker(events, invocation_window=0.1) | |
| await worker.start() | |
| try: | |
| ghost_id, _ = await worker.submit("a wish nobody returns for", "ghost") | |
| await _wait_for(events, "your_turn", ghost_id) | |
| rejected = await _wait_for(events, "wish_rejected", ghost_id) | |
| assert rejected["reason"] == WANDERED_REASON | |
| # the god moved on: the next wish gets its turn and can be granted | |
| next_id, _ = await worker.submit("a patient wish", "present") | |
| await _wait_for(events, "your_turn", next_id) | |
| await worker.invoke(next_id, worker.turn_token(next_id, "present")) | |
| await _wait_for(events, "wish_granted", next_id) | |
| finally: | |
| await worker.stop() | |
| asyncio.run(run()) | |
| def test_double_invoke_during_rite_is_refused(): | |
| async def run(): | |
| events: list[dict] = [] | |
| worker = _worker(events, planner=SlowPlanner(delay=0.3)) | |
| await worker.start() | |
| try: | |
| wish_id, _ = await worker.submit("slow sacred work", "c1") | |
| await _wait_for(events, "your_turn", wish_id) | |
| token = worker.turn_token(wish_id, "c1") | |
| first = asyncio.create_task(worker.invoke(wish_id, token)) | |
| await asyncio.sleep(0.05) # the rite has begun | |
| with pytest.raises(QueueError) as exc: | |
| await worker.invoke(wish_id, token) | |
| assert exc.value.reason == RITE_BEGUN_REASON | |
| assert (await first) == {"ok": True, "wish_id": wish_id} | |
| await _wait_for(events, "wish_granted", wish_id) | |
| finally: | |
| await worker.stop() | |
| asyncio.run(run()) | |
| def test_crashing_grant_reports_and_frees_the_queue(): | |
| class Bomb: | |
| async def grant(self, wish, world_summary, act, emit): | |
| raise RuntimeError("boom") | |
| async def run(): | |
| events: list[dict] = [] | |
| worker = _worker(events, planner=Bomb()) | |
| await worker.start() | |
| try: | |
| wish_id, _ = await worker.submit("explode quietly", "c1") | |
| await _wait_for(events, "your_turn", wish_id) | |
| result = await worker.invoke(wish_id, worker.turn_token(wish_id, "c1")) | |
| assert result["ok"] is False | |
| assert "boom" not in result["reason"] # poetic, never a stack trace | |
| rejected = await _wait_for(events, "wish_rejected", wish_id) | |
| assert "boom" not in rejected["reason"] | |
| # slot freed once the worker resumes; the same client may wish again | |
| await asyncio.sleep(0.05) | |
| again_id, _ = await worker.submit("try once more", "c1") | |
| await _wait_for(events, "your_turn", again_id) | |
| finally: | |
| await worker.stop() | |
| asyncio.run(run()) | |
| def test_inscribe_text_is_re_moderated_before_landing(): | |
| """A clean wish must not let the model write a slur permanently onto the | |
| shared world via inscribe_wish (verify-fleet critical).""" | |
| class Inscriber: | |
| async def grant(self, wish, world_summary, act, emit): | |
| ok = await act({"tool": "inscribe_wish", | |
| "args": {"text": "a gentle blessing for travelers", "style": "orbit"}}) | |
| bad = await act({"tool": "inscribe_wish", | |
| "args": {"text": "kill all of them", "style": "stone"}}) | |
| return {"reading": "r", "turns": [], "epitaph": "done", "ok": ok, "bad": bad} | |
| async def run(): | |
| events: list[dict] = [] | |
| world = make_world() | |
| captured = {} | |
| async def emit(e): | |
| events.append(e) | |
| planner = Inscriber() | |
| async def persist(trace): | |
| captured.update(trace if isinstance(trace, dict) else {}) | |
| worker = QueueWorker(world, Moderator(), planner, emit, persist=persist, | |
| browser_invoked=False) | |
| await worker.start() | |
| try: | |
| wid, _ = await worker.submit("bless the travelers", "c1") | |
| await _wait_for(events, "wish_granted", wid) | |
| finally: | |
| await worker.stop() | |
| inscriptions = [e["feature"]["args"]["text"] | |
| for e in events | |
| if e.get("type") == "world_delta" | |
| and e["feature"]["tool"] == "inscribe_wish"] | |
| assert "a gentle blessing for travelers" in inscriptions | |
| assert all("kill" not in t for t in inscriptions) | |
| asyncio.run(run()) | |
| def test_all_errored_wish_is_rejected_not_granted_empty(): | |
| """A wish where every turn errored (GPU/model down) must reject honestly, | |
| not silently 'grant' an empty wish (June 12 live incident).""" | |
| from engine.queue_worker import BUSY_HEAVENS_REASON | |
| class DeadModel: | |
| async def grant(self, wish, world_summary, act, emit): | |
| # planner returns its fallback reading + all-error turns, no act() calls | |
| return {"reading": "the god is silent.", "epitaph": "", | |
| "turns": [{"thought": "", "call": None, "observation": "error: backend failure; turn skipped"}, | |
| {"thought": "", "call": None, "observation": "error: backend failure; turn skipped"}]} | |
| async def run(): | |
| events, persisted = [], [] | |
| async def emit(e): | |
| events.append(e) | |
| def persist(t): | |
| persisted.append(t) | |
| worker = QueueWorker(make_world(), Moderator(), DeadModel(), emit, persist=persist, | |
| browser_invoked=False) | |
| await worker.start() | |
| try: | |
| wid, _ = await worker.submit("found a town here", "c1") | |
| rejected = await _wait_for(events, "wish_rejected", wid) | |
| assert rejected["reason"] == BUSY_HEAVENS_REASON | |
| finally: | |
| await worker.stop() | |
| # nothing granted, nothing persisted to pollute the log | |
| assert not any(e.get("type") == "wish_granted" for e in events) | |
| assert persisted == [] | |
| asyncio.run(run()) | |