Spaces:
Running on Zero
Running on Zero
| """HTTP/SSE layer tests (AGENT-SERVER). Engine and mind are faked — these tests | |
| exercise only the transport: endpoints, SSE hub, persistence, rate limiting, the | |
| fix gate, error paths. Wire names keep godseed's wish_* per the contract. | |
| Run: | |
| .venv/bin/python -m pytest tests/test_api.py -x -q | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import sys | |
| import time | |
| import zlib | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| import pytest | |
| from fastapi.testclient import TestClient | |
| sys.path.insert(0, str(Path(__file__).resolve().parents[1])) | |
| from app import ( # noqa: E402 | |
| FIX_COOLDOWN_REASON, | |
| FIX_RATE_REASON, | |
| TRAFFIC_SMOOTH_REASON, | |
| create_app, | |
| ) | |
| from server.persistence import PersistenceService, rebuild_features # noqa: E402 | |
| from server.ratelimit import ClientIdentity, RateLimiter # noqa: E402 | |
| from server.sse import SSEHub # noqa: E402 | |
| CITY_EPOCH_S = 1781222400.0 | |
| # Genesis fixture — a slice of ARCHITECTURE.md epoch-0 shapes (events are | |
| # byte-compatible with godseed Features: id, wish_id, tool, args, seed, t). | |
| GENESIS = [ | |
| {"id": "e_000000", "wish_id": "genesis", "tool": "lay_road", | |
| "args": {"cells": [[0, -12], [0, -11], [0, -10]], "klass": "avenue", | |
| "name": "Main St"}, "seed": 7, "t": CITY_EPOCH_S}, | |
| {"id": "e_000001", "wish_id": "genesis", "tool": "lay_road", | |
| "args": {"cells": [[8, 0], [9, 0]], "klass": "street", "name": "Old Bridge"}, | |
| "seed": 11, "t": CITY_EPOCH_S}, | |
| {"id": "e_000002", "wish_id": "genesis", "tool": "place_building", | |
| "args": {"kind": "town_hall", "name": "City Hall", "cx": -2, "cz": -3, | |
| "w": 2, "d": 2, "floors": 2, "hue": 35, "variant": 0}, | |
| "seed": 13, "t": CITY_EPOCH_S}, | |
| {"id": "e_000003", "wish_id": "genesis", "tool": "note", | |
| "args": {"text": "Nemocity is listening. Ask for a building.", | |
| "kind": "milestone"}, "seed": 17, "t": CITY_EPOCH_S}, | |
| ] | |
| PLAN = { | |
| "intent": "build", | |
| "blurb": "A little cafe rises near the park.", | |
| "buildings": [{"kind": "cafe", "name": "Cafe Luna", "near": "the park"}], | |
| "decline_reason": None, | |
| } | |
| # ------------------------------------------------------------------------------- fakes | |
| class TrafficSmooth(Exception): | |
| """Engine gate verdict; the server matches this by TYPE NAME, never by import.""" | |
| def fake_fast_check(text: str) -> Optional[str]: | |
| """Moderation fast path: readable reason or None (engine contract).""" | |
| if "forbidden" in text.lower(): | |
| return "City hall declines this petition; it reeks of the forbidden." | |
| if len(text) > 160: | |
| return "City hall reads only short petitions; trim yours to fewer words." | |
| return None | |
| class FakePlanner: | |
| """Emits the City Hall paperwork (plan), makes one place_building call, | |
| returns a WishTrace-shaped dict.""" | |
| async def grant(self, wish: str, world_summary: str, act, emit) -> dict[str, Any]: | |
| await emit({"type": "plan", "plan": PLAN}) | |
| call = { | |
| "tool": "place_building", | |
| "args": {"kind": "cafe", "name": "Cafe Luna", "cx": 3, "cz": -2, | |
| "w": 1, "d": 1, "floors": 1, "hue": 35, "variant": 2}, | |
| } | |
| await act(call) | |
| return { | |
| "text": wish, | |
| "submitted_at": time.time(), | |
| "moderation": {"allowed": True, "category": None}, | |
| "plan": PLAN, | |
| "epitaph": PLAN["blurb"], | |
| "model": "fake", "backend": "fake", "ms_total": 1, | |
| } | |
| class FakeWorld: | |
| def __init__(self, features: Optional[list[dict[str, Any]]] = None) -> None: | |
| self.features: list[dict[str, Any]] = list(features or []) | |
| self.version = len(self.features) | |
| self.epoch = 0 | |
| def apply_feature(self, feature: dict[str, Any]) -> None: | |
| self.features.append(feature) | |
| self.version += 1 | |
| class FakeQueueWorker: | |
| """Minimal stand-in for engine.queue_worker: pop -> grant -> persist -> emit, | |
| plus the City Engineer gate (submit_fix raises TrafficSmooth when smooth).""" | |
| def __init__(self, world: FakeWorld, hub: SSEHub, | |
| persistence: PersistenceService, planner: FakePlanner, | |
| traffic_smooth: bool = False) -> None: | |
| self.world, self.hub, self.persistence, self.planner = ( | |
| world, hub, persistence, planner) | |
| self.traffic_smooth = traffic_smooth | |
| self._pending: asyncio.Queue[dict[str, Any]] = asyncio.Queue() | |
| self._wish_counter = 0 | |
| self._feature_counter = len(world.features) | |
| self._current: Optional[dict[str, Any]] = None | |
| self._task: Optional[asyncio.Task[None]] = None | |
| async def start(self) -> None: | |
| self._task = asyncio.create_task(self._run()) | |
| async def stop(self) -> None: | |
| if self._task is not None: | |
| self._task.cancel() | |
| await asyncio.gather(self._task, return_exceptions=True) | |
| async def submit(self, text: str, client_id: str) -> tuple[str, int]: | |
| self._wish_counter += 1 | |
| wish_id = f"w_{self._wish_counter:06d}" | |
| await self._pending.put({"wish_id": wish_id, "text": text}) | |
| return wish_id, self._pending.qsize() | |
| async def submit_fix(self, client_id: str) -> tuple[str, int]: | |
| if self.traffic_smooth: | |
| raise TrafficSmooth("static assignment max demandRatio < 0.8") | |
| self._wish_counter += 1 | |
| return f"w_{self._wish_counter:06d}", 1 | |
| def snapshot(self) -> dict[str, Any]: | |
| current = None | |
| if self._current is not None: | |
| current = {"wish_id": self._current["wish_id"], | |
| "text_preview": self._current["text"][:40]} | |
| return {"length": self._pending.qsize() + (1 if current else 0), | |
| "current": current} | |
| async def _run(self) -> None: | |
| while True: | |
| wish = await self._pending.get() | |
| self._current = wish | |
| wish_id = wish["wish_id"] | |
| # Test-only grace: starlette's TestClient buffers a whole response, so the | |
| # SSE test must subscribe AFTER posting; hold the grant briefly so the | |
| # subscriber sees every event. The real engine worker has no such wait. | |
| deadline = asyncio.get_running_loop().time() + 0.5 | |
| while (self.hub.subscriber_count == 0 | |
| and asyncio.get_running_loop().time() < deadline): | |
| await asyncio.sleep(0.005) | |
| await self.hub.emit( | |
| {"type": "wish_started", "wish_id": wish_id, "text": wish["text"]}) | |
| features: list[dict[str, Any]] = [] | |
| call_index = 0 | |
| async def act(call: dict[str, Any]) -> str: | |
| nonlocal call_index | |
| await self.hub.emit({"type": "tool_call", "wish_id": wish_id, | |
| "call_index": call_index, | |
| "tool": call["tool"], "args": call["args"]}) | |
| feature = { | |
| "id": f"e_{self._feature_counter:06d}", | |
| "wish_id": wish_id, | |
| "tool": call["tool"], | |
| "args": call["args"], | |
| "seed": zlib.crc32(f"{wish_id}:{call_index}".encode()) & 0x7FFFFFFF, | |
| "t": time.time(), | |
| } | |
| self._feature_counter += 1 | |
| call_index += 1 | |
| self.world.apply_feature(feature) | |
| features.append(feature) | |
| await self.hub.emit({"type": "world_delta", "feature": feature}) | |
| return f"ok: {call['tool']}" | |
| async def emit(event: dict[str, Any]) -> None: | |
| await self.hub.emit({"wish_id": wish_id, **event}) | |
| trace = await self.planner.grant(wish["text"], "a small city", act, emit) | |
| trace["wish_id"] = wish_id | |
| trace["feature_ids"] = [f["id"] for f in features] | |
| await self.persistence.record(trace, features) | |
| self.world.epoch += 1 | |
| await self.hub.emit({"type": "wish_granted", "wish_id": wish_id, | |
| "epitaph": trace["epitaph"], | |
| "epoch": self.world.epoch}) | |
| self._current = None | |
| # ---------------------------------------------------------------------------- fixtures | |
| def build_api(tmp_path: Path, *, traffic_smooth: bool = False, | |
| fix_rate_limiter: Optional[RateLimiter] = None, | |
| fix_cooldown: Optional[RateLimiter] = None): | |
| persistence = PersistenceService(tmp_path / "traces") | |
| features = persistence.boot(GENESIS) | |
| world = FakeWorld(features) | |
| hub = SSEHub(heartbeat=0.05) # fast heartbeats pad bounded test streams quickly | |
| queue = FakeQueueWorker(world, hub, persistence, FakePlanner(), | |
| traffic_smooth=traffic_smooth) | |
| app = create_app( | |
| world=world, queue=queue, hub=hub, persistence=persistence, | |
| fast_check=fake_fast_check, | |
| rate_limiter=RateLimiter(limit=3, window=3600.0), | |
| ip_rate_limiter=RateLimiter(limit=100, window=3600.0), | |
| fix_rate_limiter=fix_rate_limiter, | |
| fix_cooldown=fix_cooldown, | |
| identity=ClientIdentity(secret="test-secret"), | |
| mount_web=False, mount_panel=False, | |
| ) | |
| return app, queue | |
| def api(tmp_path: Path): | |
| app, _queue = build_api(tmp_path) | |
| return app | |
| def _poll_state(client: TestClient, predicate, timeout: float = 5.0) -> dict[str, Any]: | |
| deadline = time.time() + timeout | |
| while time.time() < deadline: | |
| state = client.get("/api/state").json() | |
| if predicate(state): | |
| return state | |
| time.sleep(0.02) | |
| raise AssertionError("condition not reached before timeout") | |
| # ------------------------------------------------------------------------------- tests | |
| def test_post_wish_returns_id_and_position(api) -> None: | |
| with TestClient(api) as client: | |
| response = client.post("/api/wish", json={"text": "a ramen shop near the park"}) | |
| assert response.status_code == 200 | |
| body = response.json() | |
| assert body["wish_id"].startswith("w_") | |
| assert isinstance(body["position"], int) and body["position"] >= 1 | |
| def test_state_includes_server_now(api) -> None: | |
| with TestClient(api) as client: | |
| before = time.time() | |
| state = client.get("/api/state").json() | |
| after = time.time() | |
| assert before <= state["server_now"] <= after | |
| def test_sse_stream_emits_petition_lifecycle(api) -> None: | |
| """TestClient buffers whole responses, so the stream is bounded with ?limit=N | |
| (heartbeats pad it to N) and read after it closes; the fake worker holds the | |
| petition until the subscription lands, so every lifecycle event is captured.""" | |
| with TestClient(api) as client: | |
| posted = client.post("/api/wish", json={"text": "a cafe by the bridge"}) | |
| assert posted.status_code == 200 | |
| wish_id = posted.json()["wish_id"] | |
| response = client.get("/api/stream", params={"limit": 16}) | |
| assert response.status_code == 200 | |
| assert response.headers["x-accel-buffering"] == "no" | |
| assert response.headers["content-type"].startswith("text/event-stream") | |
| events = [ | |
| json.loads(line[len("data: "):]) | |
| for line in response.text.splitlines() | |
| if line.startswith("data: ") | |
| ] | |
| assert len(events) == 16 | |
| assert events[0]["type"] == "hello" | |
| assert events[0]["world_version"] == len(GENESIS) | |
| assert events[0]["epoch"] == 0 | |
| assert events[0]["server_now"] == pytest.approx(time.time(), abs=30) | |
| assert events[1]["type"] == "queue" | |
| for hb in (e for e in events if e["type"] == "heartbeat"): | |
| assert isinstance(hb["server_now"], float) | |
| seen = [e for e in events[2:] if e["type"] != "heartbeat"] | |
| types = [e["type"] for e in seen] | |
| assert types == ["wish_started", "plan", "tool_call", "world_delta", | |
| "wish_granted"] | |
| assert seen[0]["wish_id"] == wish_id | |
| plan = seen[1] | |
| assert plan["plan"] == PLAN # raw City Hall paperwork passes through intact | |
| delta = next(e for e in seen if e["type"] == "world_delta") | |
| assert delta["feature"]["tool"] == "place_building" | |
| assert delta["feature"]["wish_id"] == wish_id | |
| granted = seen[-1] | |
| assert granted["epitaph"] and granted["epoch"] == 1 | |
| def test_state_reflects_new_feature_and_ledger(api) -> None: | |
| with TestClient(api) as client: | |
| before = client.get("/api/state").json() | |
| baseline = len(before["world"]["features"]) | |
| assert baseline == len(GENESIS) | |
| assert before["queue"] == {"length": 0, "current": None} | |
| wish_id = client.post( | |
| "/api/wish", json={"text": "a row of townhouses on Elm St"} | |
| ).json()["wish_id"] | |
| state = _poll_state( | |
| client, lambda s: len(s["world"]["features"]) > baseline) | |
| newest = state["world"]["features"][-1] | |
| assert newest["wish_id"] == wish_id | |
| assert newest["seed"] == zlib.crc32(f"{wish_id}:0".encode()) & 0x7FFFFFFF | |
| assert state["world"]["version"] == baseline + 1 | |
| assert state["world"]["epoch"] == 1 | |
| assert state["wishes_recent"][0]["wish_id"] == wish_id | |
| index = client.get("/api/wishes").json() | |
| assert [w["wish_id"] for w in index] == [wish_id] | |
| assert index[0]["epitaph"] == PLAN["blurb"] | |
| trace = client.get(f"/api/wishes/{wish_id}").json() | |
| assert trace["wish_id"] == wish_id | |
| assert trace["plan"] == PLAN and trace["feature_ids"] == [newest["id"]] | |
| assert trace["features"][0] == newest # embedded for boot rebuild | |
| missing = client.get("/api/wishes/w_999999") | |
| assert missing.status_code == 404 | |
| def test_rate_limit_429_on_fourth_wish(api) -> None: | |
| with TestClient(api) as client: | |
| for i in range(3): | |
| ok = client.post("/api/wish", json={"text": f"small petition {i}"}) | |
| assert ok.status_code == 200, ok.text | |
| blocked = client.post("/api/wish", json={"text": "one petition too many"}) | |
| assert blocked.status_code == 429 | |
| assert "Retry-After" in blocked.headers | |
| detail = blocked.json()["detail"] | |
| assert isinstance(detail, str) and detail # readable, not a stack trace | |
| def test_moderation_fast_path_rejects_with_readable_reason(api) -> None: | |
| with TestClient(api) as client: | |
| response = client.post("/api/wish", json={"text": "a forbidden storefront"}) | |
| assert response.status_code == 400 | |
| assert response.json()["detail"] == ( | |
| "City hall declines this petition; it reeks of the forbidden.") | |
| # rejected petitions never reach the city | |
| assert len(client.get("/api/state").json()["world"]["features"]) == len(GENESIS) | |
| def test_blank_wish_is_unprocessable(api) -> None: | |
| with TestClient(api) as client: | |
| assert client.post("/api/wish", json={"text": " "}).status_code == 422 | |
| assert client.post("/api/wish", json={}).status_code == 422 | |
| # ---------------------------------------------------------------------- /api/fix tests | |
| def test_fix_409_when_traffic_smooth_and_cooldown_not_engaged(tmp_path: Path) -> None: | |
| app, queue = build_api(tmp_path, traffic_smooth=True) | |
| with TestClient(app) as client: | |
| smooth = client.post("/api/fix") | |
| assert smooth.status_code == 409 | |
| assert smooth.json()["detail"] == TRAFFIC_SMOOTH_REASON | |
| # a no-op click must not block the citywide engineer: jam forms, fix lands | |
| queue.traffic_smooth = False | |
| fixed = client.post("/api/fix") | |
| assert fixed.status_code == 200, fixed.text | |
| body = fixed.json() | |
| assert body["wish_id"].startswith("w_") and body["position"] >= 1 | |
| def test_fix_200_when_jammed_then_global_cooldown_429(tmp_path: Path) -> None: | |
| app, _queue = build_api(tmp_path) | |
| with TestClient(app) as client: | |
| first = client.post("/api/fix") | |
| assert first.status_code == 200, first.text | |
| second = client.post("/api/fix") | |
| assert second.status_code == 429 | |
| assert second.json()["detail"] == FIX_COOLDOWN_REASON | |
| assert "Retry-After" in second.headers | |
| def test_fix_client_rate_limit_two_per_hour(tmp_path: Path) -> None: | |
| # a permissive global cooldown isolates the per-client 2/hr window | |
| app, _queue = build_api( | |
| tmp_path, fix_cooldown=RateLimiter(limit=100, window=120.0)) | |
| with TestClient(app) as client: | |
| for _ in range(2): | |
| assert client.post("/api/fix").status_code == 200 | |
| blocked = client.post("/api/fix") | |
| assert blocked.status_code == 429 | |
| assert blocked.json()["detail"] == FIX_RATE_REASON | |
| assert "Retry-After" in blocked.headers | |
| # ------------------------------------------------------------------- unit: components | |
| def test_sse_hub_drops_oldest_never_blocks() -> None: | |
| async def scenario() -> list[int]: | |
| hub = SSEHub(heartbeat=0.1, max_queue=3) | |
| queue = hub.subscribe() | |
| for i in range(6): | |
| hub.publish({"type": "n", "i": i}) # never raises, never blocks | |
| hub.unsubscribe(queue) | |
| kept = [] | |
| while not queue.empty(): | |
| kept.append(queue.get_nowait()["i"]) | |
| return kept | |
| assert asyncio.run(scenario()) == [3, 4, 5] # oldest were dropped | |
| def test_rate_limiter_sliding_window() -> None: | |
| now = [0.0] | |
| limiter = RateLimiter(limit=3, window=3600.0, clock=lambda: now[0]) | |
| key = "client" | |
| for t in (0.0, 1000.0, 2000.0): | |
| now[0] = t | |
| assert limiter.hit(key)[0] | |
| now[0] = 2500.0 | |
| allowed, retry_after = limiter.hit(key) | |
| assert not allowed and retry_after == 3600.0 - 2500.0 # until the t=0 hit expires | |
| now[0] = 3600.5 # only the t=0 hit has slid out of the window | |
| assert limiter.hit(key)[0] | |
| assert limiter.remaining(key) == 0 # live: t=1000, t=2000, t=3600.5 | |
| allowed, _ = limiter.hit(key) | |
| assert not allowed # denied attempts were never recorded, window is exact | |
| def test_identity_cookie_roundtrip_and_tamper() -> None: | |
| identity = ClientIdentity(secret="s3cret") | |
| cookie = identity.mint() | |
| token = cookie.split(".", 1)[0] | |
| assert identity.verify(cookie) == token | |
| assert identity.verify(cookie[:-1] + ("0" if cookie[-1] != "0" else "1")) is None | |
| assert identity.verify("garbage") is None | |
| assert identity.verify(None) is None | |
| def test_persistence_rebuild_survives_corruption(tmp_path: Path) -> None: | |
| traces_dir = tmp_path / "traces" | |
| first = PersistenceService(traces_dir) | |
| first.boot(GENESIS) | |
| feature = {"id": "e_000004", "wish_id": "w_000001", "tool": "place_building", | |
| "args": {"kind": "house", "name": "Elm Cottage", "cx": -1, "cz": -6, | |
| "w": 1, "d": 1, "floors": 1, "hue": 20, "variant": 1}, | |
| "seed": 19, "t": CITY_EPOCH_S + 60.0} | |
| asyncio.run(first.record( | |
| {"wish_id": "w_000001", "text": "a little house", "submitted_at": 1.0, | |
| "epitaph": "a home with a clay roof"}, | |
| [feature], | |
| )) | |
| # simulate a crash mid-append | |
| with first.path.open("a", encoding="utf-8") as fh: | |
| fh.write('{"wish_id": "w_000002", "text": "truncat') | |
| second = PersistenceService(traces_dir) | |
| rebuilt = second.boot(GENESIS) | |
| assert rebuilt == GENESIS + [feature] | |
| assert [t["wish_id"] for t in second.traces()] == ["w_000001"] | |
| assert second.find("w_000001")["epitaph"] == "a home with a clay roof" | |
| assert second.find("w_000404") is None | |
| def test_rebuild_skips_traces_without_embedded_features() -> None: | |
| legacy = {"wish_id": "w_000009", "feature_ids": ["e_0009"]} # no features array | |
| assert rebuild_features(GENESIS, [legacy]) == GENESIS | |