"""HTTP/SSE layer tests (AGENT-SERVER). Engine and mind are faked — these tests exercise only the transport: endpoints, SSE hub, persistence, rate limiting, the fix gate, error paths. Wire names keep godseed's wish_* per the contract. Run: .venv/bin/python -m pytest tests/test_api.py -x -q """ from __future__ import annotations import asyncio import json import sys import time import zlib from pathlib import Path from typing import Any, Optional import pytest from fastapi.testclient import TestClient sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from app import ( # noqa: E402 FIX_COOLDOWN_REASON, FIX_RATE_REASON, TRAFFIC_SMOOTH_REASON, create_app, ) from server.persistence import PersistenceService, rebuild_features # noqa: E402 from server.ratelimit import ClientIdentity, RateLimiter # noqa: E402 from server.sse import SSEHub # noqa: E402 CITY_EPOCH_S = 1781222400.0 # Genesis fixture — a slice of ARCHITECTURE.md epoch-0 shapes (events are # byte-compatible with godseed Features: id, wish_id, tool, args, seed, t). GENESIS = [ {"id": "e_000000", "wish_id": "genesis", "tool": "lay_road", "args": {"cells": [[0, -12], [0, -11], [0, -10]], "klass": "avenue", "name": "Main St"}, "seed": 7, "t": CITY_EPOCH_S}, {"id": "e_000001", "wish_id": "genesis", "tool": "lay_road", "args": {"cells": [[8, 0], [9, 0]], "klass": "street", "name": "Old Bridge"}, "seed": 11, "t": CITY_EPOCH_S}, {"id": "e_000002", "wish_id": "genesis", "tool": "place_building", "args": {"kind": "town_hall", "name": "City Hall", "cx": -2, "cz": -3, "w": 2, "d": 2, "floors": 2, "hue": 35, "variant": 0}, "seed": 13, "t": CITY_EPOCH_S}, {"id": "e_000003", "wish_id": "genesis", "tool": "note", "args": {"text": "Nemocity is listening. Ask for a building.", "kind": "milestone"}, "seed": 17, "t": CITY_EPOCH_S}, ] PLAN = { "intent": "build", "blurb": "A little cafe rises near the park.", "buildings": [{"kind": "cafe", "name": "Cafe Luna", "near": "the park"}], "decline_reason": None, } # ------------------------------------------------------------------------------- fakes class TrafficSmooth(Exception): """Engine gate verdict; the server matches this by TYPE NAME, never by import.""" def fake_fast_check(text: str) -> Optional[str]: """Moderation fast path: readable reason or None (engine contract).""" if "forbidden" in text.lower(): return "City hall declines this petition; it reeks of the forbidden." if len(text) > 160: return "City hall reads only short petitions; trim yours to fewer words." return None class FakePlanner: """Emits the City Hall paperwork (plan), makes one place_building call, returns a WishTrace-shaped dict.""" async def grant(self, wish: str, world_summary: str, act, emit) -> dict[str, Any]: await emit({"type": "plan", "plan": PLAN}) call = { "tool": "place_building", "args": {"kind": "cafe", "name": "Cafe Luna", "cx": 3, "cz": -2, "w": 1, "d": 1, "floors": 1, "hue": 35, "variant": 2}, } await act(call) return { "text": wish, "submitted_at": time.time(), "moderation": {"allowed": True, "category": None}, "plan": PLAN, "epitaph": PLAN["blurb"], "model": "fake", "backend": "fake", "ms_total": 1, } class FakeWorld: def __init__(self, features: Optional[list[dict[str, Any]]] = None) -> None: self.features: list[dict[str, Any]] = list(features or []) self.version = len(self.features) self.epoch = 0 def apply_feature(self, feature: dict[str, Any]) -> None: self.features.append(feature) self.version += 1 class FakeQueueWorker: """Minimal stand-in for engine.queue_worker: pop -> grant -> persist -> emit, plus the City Engineer gate (submit_fix raises TrafficSmooth when smooth).""" def __init__(self, world: FakeWorld, hub: SSEHub, persistence: PersistenceService, planner: FakePlanner, traffic_smooth: bool = False) -> None: self.world, self.hub, self.persistence, self.planner = ( world, hub, persistence, planner) self.traffic_smooth = traffic_smooth self._pending: asyncio.Queue[dict[str, Any]] = asyncio.Queue() self._wish_counter = 0 self._feature_counter = len(world.features) self._current: Optional[dict[str, Any]] = None self._task: Optional[asyncio.Task[None]] = None async def start(self) -> None: self._task = asyncio.create_task(self._run()) async def stop(self) -> None: if self._task is not None: self._task.cancel() await asyncio.gather(self._task, return_exceptions=True) async def submit(self, text: str, client_id: str) -> tuple[str, int]: self._wish_counter += 1 wish_id = f"w_{self._wish_counter:06d}" await self._pending.put({"wish_id": wish_id, "text": text}) return wish_id, self._pending.qsize() async def submit_fix(self, client_id: str) -> tuple[str, int]: if self.traffic_smooth: raise TrafficSmooth("static assignment max demandRatio < 0.8") self._wish_counter += 1 return f"w_{self._wish_counter:06d}", 1 def snapshot(self) -> dict[str, Any]: current = None if self._current is not None: current = {"wish_id": self._current["wish_id"], "text_preview": self._current["text"][:40]} return {"length": self._pending.qsize() + (1 if current else 0), "current": current} async def _run(self) -> None: while True: wish = await self._pending.get() self._current = wish wish_id = wish["wish_id"] # Test-only grace: starlette's TestClient buffers a whole response, so the # SSE test must subscribe AFTER posting; hold the grant briefly so the # subscriber sees every event. The real engine worker has no such wait. deadline = asyncio.get_running_loop().time() + 0.5 while (self.hub.subscriber_count == 0 and asyncio.get_running_loop().time() < deadline): await asyncio.sleep(0.005) await self.hub.emit( {"type": "wish_started", "wish_id": wish_id, "text": wish["text"]}) features: list[dict[str, Any]] = [] call_index = 0 async def act(call: dict[str, Any]) -> str: nonlocal call_index await self.hub.emit({"type": "tool_call", "wish_id": wish_id, "call_index": call_index, "tool": call["tool"], "args": call["args"]}) feature = { "id": f"e_{self._feature_counter:06d}", "wish_id": wish_id, "tool": call["tool"], "args": call["args"], "seed": zlib.crc32(f"{wish_id}:{call_index}".encode()) & 0x7FFFFFFF, "t": time.time(), } self._feature_counter += 1 call_index += 1 self.world.apply_feature(feature) features.append(feature) await self.hub.emit({"type": "world_delta", "feature": feature}) return f"ok: {call['tool']}" async def emit(event: dict[str, Any]) -> None: await self.hub.emit({"wish_id": wish_id, **event}) trace = await self.planner.grant(wish["text"], "a small city", act, emit) trace["wish_id"] = wish_id trace["feature_ids"] = [f["id"] for f in features] await self.persistence.record(trace, features) self.world.epoch += 1 await self.hub.emit({"type": "wish_granted", "wish_id": wish_id, "epitaph": trace["epitaph"], "epoch": self.world.epoch}) self._current = None # ---------------------------------------------------------------------------- fixtures def build_api(tmp_path: Path, *, traffic_smooth: bool = False, fix_rate_limiter: Optional[RateLimiter] = None, fix_cooldown: Optional[RateLimiter] = None): persistence = PersistenceService(tmp_path / "traces") features = persistence.boot(GENESIS) world = FakeWorld(features) hub = SSEHub(heartbeat=0.05) # fast heartbeats pad bounded test streams quickly queue = FakeQueueWorker(world, hub, persistence, FakePlanner(), traffic_smooth=traffic_smooth) app = create_app( world=world, queue=queue, hub=hub, persistence=persistence, fast_check=fake_fast_check, rate_limiter=RateLimiter(limit=3, window=3600.0), ip_rate_limiter=RateLimiter(limit=100, window=3600.0), fix_rate_limiter=fix_rate_limiter, fix_cooldown=fix_cooldown, identity=ClientIdentity(secret="test-secret"), mount_web=False, mount_panel=False, ) return app, queue @pytest.fixture() def api(tmp_path: Path): app, _queue = build_api(tmp_path) return app def _poll_state(client: TestClient, predicate, timeout: float = 5.0) -> dict[str, Any]: deadline = time.time() + timeout while time.time() < deadline: state = client.get("/api/state").json() if predicate(state): return state time.sleep(0.02) raise AssertionError("condition not reached before timeout") # ------------------------------------------------------------------------------- tests def test_post_wish_returns_id_and_position(api) -> None: with TestClient(api) as client: response = client.post("/api/wish", json={"text": "a ramen shop near the park"}) assert response.status_code == 200 body = response.json() assert body["wish_id"].startswith("w_") assert isinstance(body["position"], int) and body["position"] >= 1 def test_state_includes_server_now(api) -> None: with TestClient(api) as client: before = time.time() state = client.get("/api/state").json() after = time.time() assert before <= state["server_now"] <= after def test_sse_stream_emits_petition_lifecycle(api) -> None: """TestClient buffers whole responses, so the stream is bounded with ?limit=N (heartbeats pad it to N) and read after it closes; the fake worker holds the petition until the subscription lands, so every lifecycle event is captured.""" with TestClient(api) as client: posted = client.post("/api/wish", json={"text": "a cafe by the bridge"}) assert posted.status_code == 200 wish_id = posted.json()["wish_id"] response = client.get("/api/stream", params={"limit": 16}) assert response.status_code == 200 assert response.headers["x-accel-buffering"] == "no" assert response.headers["content-type"].startswith("text/event-stream") events = [ json.loads(line[len("data: "):]) for line in response.text.splitlines() if line.startswith("data: ") ] assert len(events) == 16 assert events[0]["type"] == "hello" assert events[0]["world_version"] == len(GENESIS) assert events[0]["epoch"] == 0 assert events[0]["server_now"] == pytest.approx(time.time(), abs=30) assert events[1]["type"] == "queue" for hb in (e for e in events if e["type"] == "heartbeat"): assert isinstance(hb["server_now"], float) seen = [e for e in events[2:] if e["type"] != "heartbeat"] types = [e["type"] for e in seen] assert types == ["wish_started", "plan", "tool_call", "world_delta", "wish_granted"] assert seen[0]["wish_id"] == wish_id plan = seen[1] assert plan["plan"] == PLAN # raw City Hall paperwork passes through intact delta = next(e for e in seen if e["type"] == "world_delta") assert delta["feature"]["tool"] == "place_building" assert delta["feature"]["wish_id"] == wish_id granted = seen[-1] assert granted["epitaph"] and granted["epoch"] == 1 def test_state_reflects_new_feature_and_ledger(api) -> None: with TestClient(api) as client: before = client.get("/api/state").json() baseline = len(before["world"]["features"]) assert baseline == len(GENESIS) assert before["queue"] == {"length": 0, "current": None} wish_id = client.post( "/api/wish", json={"text": "a row of townhouses on Elm St"} ).json()["wish_id"] state = _poll_state( client, lambda s: len(s["world"]["features"]) > baseline) newest = state["world"]["features"][-1] assert newest["wish_id"] == wish_id assert newest["seed"] == zlib.crc32(f"{wish_id}:0".encode()) & 0x7FFFFFFF assert state["world"]["version"] == baseline + 1 assert state["world"]["epoch"] == 1 assert state["wishes_recent"][0]["wish_id"] == wish_id index = client.get("/api/wishes").json() assert [w["wish_id"] for w in index] == [wish_id] assert index[0]["epitaph"] == PLAN["blurb"] trace = client.get(f"/api/wishes/{wish_id}").json() assert trace["wish_id"] == wish_id assert trace["plan"] == PLAN and trace["feature_ids"] == [newest["id"]] assert trace["features"][0] == newest # embedded for boot rebuild missing = client.get("/api/wishes/w_999999") assert missing.status_code == 404 def test_rate_limit_429_on_fourth_wish(api) -> None: with TestClient(api) as client: for i in range(3): ok = client.post("/api/wish", json={"text": f"small petition {i}"}) assert ok.status_code == 200, ok.text blocked = client.post("/api/wish", json={"text": "one petition too many"}) assert blocked.status_code == 429 assert "Retry-After" in blocked.headers detail = blocked.json()["detail"] assert isinstance(detail, str) and detail # readable, not a stack trace def test_moderation_fast_path_rejects_with_readable_reason(api) -> None: with TestClient(api) as client: response = client.post("/api/wish", json={"text": "a forbidden storefront"}) assert response.status_code == 400 assert response.json()["detail"] == ( "City hall declines this petition; it reeks of the forbidden.") # rejected petitions never reach the city assert len(client.get("/api/state").json()["world"]["features"]) == len(GENESIS) def test_blank_wish_is_unprocessable(api) -> None: with TestClient(api) as client: assert client.post("/api/wish", json={"text": " "}).status_code == 422 assert client.post("/api/wish", json={}).status_code == 422 # ---------------------------------------------------------------------- /api/fix tests def test_fix_409_when_traffic_smooth_and_cooldown_not_engaged(tmp_path: Path) -> None: app, queue = build_api(tmp_path, traffic_smooth=True) with TestClient(app) as client: smooth = client.post("/api/fix") assert smooth.status_code == 409 assert smooth.json()["detail"] == TRAFFIC_SMOOTH_REASON # a no-op click must not block the citywide engineer: jam forms, fix lands queue.traffic_smooth = False fixed = client.post("/api/fix") assert fixed.status_code == 200, fixed.text body = fixed.json() assert body["wish_id"].startswith("w_") and body["position"] >= 1 def test_fix_200_when_jammed_then_global_cooldown_429(tmp_path: Path) -> None: app, _queue = build_api(tmp_path) with TestClient(app) as client: first = client.post("/api/fix") assert first.status_code == 200, first.text second = client.post("/api/fix") assert second.status_code == 429 assert second.json()["detail"] == FIX_COOLDOWN_REASON assert "Retry-After" in second.headers def test_fix_client_rate_limit_two_per_hour(tmp_path: Path) -> None: # a permissive global cooldown isolates the per-client 2/hr window app, _queue = build_api( tmp_path, fix_cooldown=RateLimiter(limit=100, window=120.0)) with TestClient(app) as client: for _ in range(2): assert client.post("/api/fix").status_code == 200 blocked = client.post("/api/fix") assert blocked.status_code == 429 assert blocked.json()["detail"] == FIX_RATE_REASON assert "Retry-After" in blocked.headers # ------------------------------------------------------------------- unit: components def test_sse_hub_drops_oldest_never_blocks() -> None: async def scenario() -> list[int]: hub = SSEHub(heartbeat=0.1, max_queue=3) queue = hub.subscribe() for i in range(6): hub.publish({"type": "n", "i": i}) # never raises, never blocks hub.unsubscribe(queue) kept = [] while not queue.empty(): kept.append(queue.get_nowait()["i"]) return kept assert asyncio.run(scenario()) == [3, 4, 5] # oldest were dropped def test_rate_limiter_sliding_window() -> None: now = [0.0] limiter = RateLimiter(limit=3, window=3600.0, clock=lambda: now[0]) key = "client" for t in (0.0, 1000.0, 2000.0): now[0] = t assert limiter.hit(key)[0] now[0] = 2500.0 allowed, retry_after = limiter.hit(key) assert not allowed and retry_after == 3600.0 - 2500.0 # until the t=0 hit expires now[0] = 3600.5 # only the t=0 hit has slid out of the window assert limiter.hit(key)[0] assert limiter.remaining(key) == 0 # live: t=1000, t=2000, t=3600.5 allowed, _ = limiter.hit(key) assert not allowed # denied attempts were never recorded, window is exact def test_identity_cookie_roundtrip_and_tamper() -> None: identity = ClientIdentity(secret="s3cret") cookie = identity.mint() token = cookie.split(".", 1)[0] assert identity.verify(cookie) == token assert identity.verify(cookie[:-1] + ("0" if cookie[-1] != "0" else "1")) is None assert identity.verify("garbage") is None assert identity.verify(None) is None def test_persistence_rebuild_survives_corruption(tmp_path: Path) -> None: traces_dir = tmp_path / "traces" first = PersistenceService(traces_dir) first.boot(GENESIS) feature = {"id": "e_000004", "wish_id": "w_000001", "tool": "place_building", "args": {"kind": "house", "name": "Elm Cottage", "cx": -1, "cz": -6, "w": 1, "d": 1, "floors": 1, "hue": 20, "variant": 1}, "seed": 19, "t": CITY_EPOCH_S + 60.0} asyncio.run(first.record( {"wish_id": "w_000001", "text": "a little house", "submitted_at": 1.0, "epitaph": "a home with a clay roof"}, [feature], )) # simulate a crash mid-append with first.path.open("a", encoding="utf-8") as fh: fh.write('{"wish_id": "w_000002", "text": "truncat') second = PersistenceService(traces_dir) rebuilt = second.boot(GENESIS) assert rebuilt == GENESIS + [feature] assert [t["wish_id"] for t in second.traces()] == ["w_000001"] assert second.find("w_000001")["epitaph"] == "a home with a clay roof" assert second.find("w_000404") is None def test_rebuild_skips_traces_without_embedded_features() -> None: legacy = {"wish_id": "w_000009", "feature_ids": ["e_0009"]} # no features array assert rebuild_features(GENESIS, [legacy]) == GENESIS