Spaces:
Sleeping
Sleeping
| """HTTP/SSE layer tests (AGENT B). Engine and mind are faked — these tests exercise | |
| only the transport: endpoints, SSE hub, persistence, rate limiting, error paths. | |
| Run: | |
| uv run --python 3.12 --with pytest --with fastapi --with httpx --with pydantic \ | |
| python -m pytest tests/test_api.py -x -q | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import sys | |
| import time | |
| import zlib | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| import pytest | |
| from fastapi.testclient import TestClient | |
| sys.path.insert(0, str(Path(__file__).resolve().parents[1])) | |
| from app import create_app # noqa: E402 | |
| from server.persistence import PersistenceService, rebuild_features # noqa: E402 | |
| from server.ratelimit import ClientIdentity, RateLimiter # noqa: E402 | |
| from server.sse import SSEHub # noqa: E402 | |
| # Genesis fixture — mirrors ARCHITECTURE.md epoch-0 state. | |
| GENESIS = [ | |
| {"id": "f_000000", "wish_id": "genesis", "tool": "set_sky", | |
| "args": {"palette": "void", "star_density": 0.9, "moons": 1}, "seed": 7, "t": 0}, | |
| {"id": "f_000001", "wish_id": "genesis", "tool": "raise_terrain", | |
| "args": {"lat": 12, "lon": 40, "radius_deg": 28, "height": 0.05, "roughness": 0.7}, | |
| "seed": 11, "t": 0}, | |
| {"id": "f_000002", "wish_id": "genesis", "tool": "place_structure", | |
| "args": {"lat": 14, "lon": 38, "kind": "monolith", "scale": 1.4, "hue": 270}, | |
| "seed": 13, "t": 0}, | |
| {"id": "f_000003", "wish_id": "genesis", "tool": "inscribe_wish", | |
| "args": {"text": "speak, and it will be made", "style": "orbit"}, "seed": 17, "t": 0}, | |
| ] | |
| # ------------------------------------------------------------------------------- fakes | |
| def fake_fast_check(text: str) -> Optional[str]: | |
| """Moderation fast path: poetic reason or None (engine contract).""" | |
| if "forbidden" in text.lower(): | |
| return "The god declines this wish; it reeks of the forbidden." | |
| if len(text) > 140: | |
| return "The god hears only short wishes; carve yours to fewer words." | |
| return None | |
| class FakePlanner: | |
| """Streams two thoughts, makes one spawn_flora tool call, returns a WishTrace.""" | |
| async def grant(self, wish: str, world_summary: str, act, emit) -> dict[str, Any]: | |
| await emit({"type": "thought_token", "text": "the god considers… "}) | |
| call = { | |
| "tool": "spawn_flora", | |
| "args": {"lat": 10.0, "lon": 20.0, "radius_deg": 5, | |
| "kind": "glowgrass", "density": 0.7, "hue": 160}, | |
| } | |
| observation = await act(call) | |
| await emit({"type": "thought_token", "text": "it is done."}) | |
| return { | |
| "text": wish, | |
| "submitted_at": time.time(), | |
| "moderation": {"allowed": True, "category": None}, | |
| "reading": "a small green wanting", | |
| "turns": [{"thought": "grass, then light", "call": call, | |
| "observation": observation}], | |
| "epitaph": "and the meadow shone", | |
| "model": "fake", "backend": "fake", "ms_total": 1, | |
| } | |
| class FakeWorld: | |
| def __init__(self, features: Optional[list[dict[str, Any]]] = None) -> None: | |
| self.features: list[dict[str, Any]] = list(features or []) | |
| self.version = len(self.features) | |
| self.epoch = 0 | |
| def apply_feature(self, feature: dict[str, Any]) -> None: | |
| self.features.append(feature) | |
| self.version += 1 | |
| class FakeQueueWorker: | |
| """Minimal stand-in for engine.queue_worker: pop -> grant -> persist -> emit.""" | |
| def __init__(self, world: FakeWorld, hub: SSEHub, | |
| persistence: PersistenceService, planner: FakePlanner) -> None: | |
| self.world, self.hub, self.persistence, self.planner = ( | |
| world, hub, persistence, planner) | |
| self._pending: asyncio.Queue[dict[str, Any]] = asyncio.Queue() | |
| self._wish_counter = 0 | |
| self._feature_counter = len(world.features) | |
| self._current: Optional[dict[str, Any]] = None | |
| self._task: Optional[asyncio.Task[None]] = None | |
| async def start(self) -> None: | |
| self._task = asyncio.create_task(self._run()) | |
| async def stop(self) -> None: | |
| if self._task is not None: | |
| self._task.cancel() | |
| await asyncio.gather(self._task, return_exceptions=True) | |
| async def submit(self, text: str, client_id: str) -> tuple[str, int]: | |
| self._wish_counter += 1 | |
| wish_id = f"w_{self._wish_counter:06d}" | |
| await self._pending.put({"wish_id": wish_id, "text": text}) | |
| return wish_id, self._pending.qsize() | |
| def snapshot(self) -> dict[str, Any]: | |
| current = None | |
| if self._current is not None: | |
| current = {"wish_id": self._current["wish_id"], | |
| "text_preview": self._current["text"][:40]} | |
| return {"length": self._pending.qsize() + (1 if current else 0), | |
| "current": current} | |
| async def _run(self) -> None: | |
| while True: | |
| wish = await self._pending.get() | |
| self._current = wish | |
| wish_id = wish["wish_id"] | |
| # Test-only grace: starlette's TestClient buffers a whole response, so the | |
| # SSE test must subscribe AFTER posting; hold the liturgy briefly so the | |
| # subscriber sees every event. The real engine worker has no such wait. | |
| deadline = asyncio.get_running_loop().time() + 0.5 | |
| while (self.hub.subscriber_count == 0 | |
| and asyncio.get_running_loop().time() < deadline): | |
| await asyncio.sleep(0.005) | |
| await self.hub.emit( | |
| {"type": "wish_started", "wish_id": wish_id, "text": wish["text"]}) | |
| features: list[dict[str, Any]] = [] | |
| call_index = 0 | |
| async def act(call: dict[str, Any]) -> str: | |
| nonlocal call_index | |
| feature = { | |
| "id": f"f_{self._feature_counter:06d}", | |
| "wish_id": wish_id, | |
| "tool": call["tool"], | |
| "args": call["args"], | |
| "seed": zlib.crc32(f"{wish_id}:{call_index}".encode()) & 0x7FFFFFFF, | |
| "t": time.time(), | |
| } | |
| self._feature_counter += 1 | |
| call_index += 1 | |
| self.world.apply_feature(feature) | |
| features.append(feature) | |
| await self.hub.emit({"type": "world_delta", "feature": feature}) | |
| return f"ok: {call['tool']}" | |
| async def emit(event: dict[str, Any]) -> None: | |
| await self.hub.emit({"wish_id": wish_id, **event}) | |
| trace = await self.planner.grant(wish["text"], "a barren world", act, emit) | |
| trace["wish_id"] = wish_id | |
| trace["feature_ids"] = [f["id"] for f in features] | |
| await self.persistence.record(trace, features) | |
| self.world.epoch += 1 | |
| await self.hub.emit({"type": "wish_granted", "wish_id": wish_id, | |
| "epitaph": trace["epitaph"], | |
| "epoch": self.world.epoch}) | |
| self._current = None | |
| # ---------------------------------------------------------------------------- fixtures | |
| def api(tmp_path: Path): | |
| persistence = PersistenceService(tmp_path / "traces") | |
| features = persistence.boot(GENESIS) | |
| world = FakeWorld(features) | |
| hub = SSEHub(heartbeat=0.05) # fast heartbeats pad bounded test streams quickly | |
| queue = FakeQueueWorker(world, hub, persistence, FakePlanner()) | |
| return create_app( | |
| world=world, queue=queue, hub=hub, persistence=persistence, | |
| fast_check=fake_fast_check, | |
| rate_limiter=RateLimiter(limit=3, window=3600.0), | |
| ip_rate_limiter=RateLimiter(limit=100, window=3600.0), | |
| identity=ClientIdentity(secret="test-secret"), | |
| mount_web=False, mount_panel=False, | |
| ) | |
| def _poll_state(client: TestClient, predicate, timeout: float = 5.0) -> dict[str, Any]: | |
| deadline = time.time() + timeout | |
| while time.time() < deadline: | |
| state = client.get("/api/state").json() | |
| if predicate(state): | |
| return state | |
| time.sleep(0.02) | |
| raise AssertionError("condition not reached before timeout") | |
| # ------------------------------------------------------------------------------- tests | |
| def test_post_wish_returns_id_and_position(api) -> None: | |
| with TestClient(api) as client: | |
| response = client.post("/api/wish", json={"text": "a meadow of glowgrass"}) | |
| assert response.status_code == 200 | |
| body = response.json() | |
| assert body["wish_id"].startswith("w_") | |
| assert isinstance(body["position"], int) and body["position"] >= 1 | |
| def test_sse_stream_emits_wish_lifecycle(api) -> None: | |
| """TestClient buffers whole responses, so the stream is bounded with ?limit=N | |
| (heartbeats pad it to N) and read after it closes; the fake worker holds the | |
| wish until the subscription lands, so every lifecycle event is captured.""" | |
| with TestClient(api) as client: | |
| posted = client.post("/api/wish", json={"text": "raise a violet mountain"}) | |
| assert posted.status_code == 200 | |
| wish_id = posted.json()["wish_id"] | |
| response = client.get("/api/stream", params={"limit": 16}) | |
| assert response.status_code == 200 | |
| assert response.headers["x-accel-buffering"] == "no" | |
| assert response.headers["content-type"].startswith("text/event-stream") | |
| events = [ | |
| json.loads(line[len("data: "):]) | |
| for line in response.text.splitlines() | |
| if line.startswith("data: ") | |
| ] | |
| assert len(events) == 16 | |
| assert events[0]["type"] == "hello" | |
| assert events[0]["world_version"] == len(GENESIS) | |
| assert events[0]["epoch"] == 0 | |
| assert events[1]["type"] == "queue" | |
| seen = [e for e in events[2:] if e["type"] != "heartbeat"] | |
| types = [e["type"] for e in seen] | |
| assert types == ["wish_started", "thought_token", "world_delta", | |
| "thought_token", "wish_granted"] | |
| started = seen[0] | |
| assert started["wish_id"] == wish_id | |
| delta = next(e for e in seen if e["type"] == "world_delta") | |
| assert delta["feature"]["tool"] == "spawn_flora" | |
| assert delta["feature"]["wish_id"] == wish_id | |
| granted = seen[-1] | |
| assert granted["epitaph"] and granted["epoch"] == 1 | |
| def test_state_reflects_new_feature_and_genesis_log(api) -> None: | |
| with TestClient(api) as client: | |
| before = client.get("/api/state").json() | |
| baseline = len(before["world"]["features"]) | |
| assert baseline == len(GENESIS) | |
| assert before["queue"] == {"length": 0, "current": None} | |
| wish_id = client.post( | |
| "/api/wish", json={"text": "let reeds line a silver pool"} | |
| ).json()["wish_id"] | |
| state = _poll_state( | |
| client, lambda s: len(s["world"]["features"]) > baseline) | |
| newest = state["world"]["features"][-1] | |
| assert newest["wish_id"] == wish_id | |
| assert newest["seed"] == zlib.crc32(f"{wish_id}:0".encode()) & 0x7FFFFFFF | |
| assert state["world"]["version"] == baseline + 1 | |
| assert state["world"]["epoch"] == 1 | |
| assert state["wishes_recent"][0]["wish_id"] == wish_id | |
| index = client.get("/api/wishes").json() | |
| assert [w["wish_id"] for w in index] == [wish_id] | |
| assert index[0]["epitaph"] == "and the meadow shone" | |
| trace = client.get(f"/api/wishes/{wish_id}").json() | |
| assert trace["wish_id"] == wish_id | |
| assert trace["turns"] and trace["feature_ids"] == [newest["id"]] | |
| assert trace["features"][0] == newest # embedded for boot rebuild | |
| missing = client.get("/api/wishes/w_999999") | |
| assert missing.status_code == 404 | |
| def test_rate_limit_429_on_fourth_wish(api) -> None: | |
| with TestClient(api) as client: | |
| for i in range(3): | |
| ok = client.post("/api/wish", json={"text": f"small wish {i}"}) | |
| assert ok.status_code == 200, ok.text | |
| blocked = client.post("/api/wish", json={"text": "one wish too many"}) | |
| assert blocked.status_code == 429 | |
| assert "Retry-After" in blocked.headers | |
| detail = blocked.json()["detail"] | |
| assert isinstance(detail, str) and detail # poetic, not a stack trace | |
| def test_moderation_fast_path_rejects_with_poetic_reason(api) -> None: | |
| with TestClient(api) as client: | |
| response = client.post("/api/wish", json={"text": "a forbidden sigil"}) | |
| assert response.status_code == 400 | |
| assert response.json()["detail"] == ( | |
| "The god declines this wish; it reeks of the forbidden.") | |
| # rejected wishes never reach the world | |
| assert len(client.get("/api/state").json()["world"]["features"]) == len(GENESIS) | |
| def test_blank_wish_is_unprocessable(api) -> None: | |
| with TestClient(api) as client: | |
| assert client.post("/api/wish", json={"text": " "}).status_code == 422 | |
| assert client.post("/api/wish", json={}).status_code == 422 | |
| # ------------------------------------------------------------------- unit: components | |
| def test_sse_hub_drops_oldest_never_blocks() -> None: | |
| async def scenario() -> list[int]: | |
| hub = SSEHub(heartbeat=0.1, max_queue=3) | |
| queue = hub.subscribe() | |
| for i in range(6): | |
| hub.publish({"type": "n", "i": i}) # never raises, never blocks | |
| hub.unsubscribe(queue) | |
| kept = [] | |
| while not queue.empty(): | |
| kept.append(queue.get_nowait()["i"]) | |
| return kept | |
| assert asyncio.run(scenario()) == [3, 4, 5] # oldest were dropped | |
| def test_rate_limiter_sliding_window() -> None: | |
| now = [0.0] | |
| limiter = RateLimiter(limit=3, window=3600.0, clock=lambda: now[0]) | |
| key = "client" | |
| for t in (0.0, 1000.0, 2000.0): | |
| now[0] = t | |
| assert limiter.hit(key)[0] | |
| now[0] = 2500.0 | |
| allowed, retry_after = limiter.hit(key) | |
| assert not allowed and retry_after == 3600.0 - 2500.0 # until the t=0 hit expires | |
| now[0] = 3600.5 # only the t=0 hit has slid out of the window | |
| assert limiter.hit(key)[0] | |
| assert limiter.remaining(key) == 0 # live: t=1000, t=2000, t=3600.5 | |
| allowed, _ = limiter.hit(key) | |
| assert not allowed # denied attempts were never recorded, window is exact | |
| def test_identity_cookie_roundtrip_and_tamper() -> None: | |
| identity = ClientIdentity(secret="s3cret") | |
| cookie = identity.mint() | |
| token = cookie.split(".", 1)[0] | |
| assert identity.verify(cookie) == token | |
| assert identity.verify(cookie[:-1] + ("0" if cookie[-1] != "0" else "1")) is None | |
| assert identity.verify("garbage") is None | |
| assert identity.verify(None) is None | |
| def test_persistence_rebuild_survives_corruption(tmp_path: Path) -> None: | |
| traces_dir = tmp_path / "traces" | |
| first = PersistenceService(traces_dir) | |
| first.boot(GENESIS) | |
| feature = {"id": "f_000004", "wish_id": "w_000001", "tool": "set_weather", | |
| "args": {"kind": "mist", "intensity": 0.4}, "seed": 19, "t": 1.0} | |
| asyncio.run(first.record( | |
| {"wish_id": "w_000001", "text": "let mist fall", "submitted_at": 1.0, | |
| "epitaph": "a hush of grey"}, | |
| [feature], | |
| )) | |
| # simulate a crash mid-append | |
| with first.path.open("a", encoding="utf-8") as fh: | |
| fh.write('{"wish_id": "w_000002", "text": "truncat') | |
| second = PersistenceService(traces_dir) | |
| rebuilt = second.boot(GENESIS) | |
| assert rebuilt == GENESIS + [feature] | |
| assert [t["wish_id"] for t in second.traces()] == ["w_000001"] | |
| assert second.find("w_000001")["epitaph"] == "a hush of grey" | |
| assert second.find("w_000404") is None | |
| def test_rebuild_skips_traces_without_embedded_features() -> None: | |
| legacy = {"wish_id": "w_000009", "feature_ids": ["f_0009"]} # no features array | |
| assert rebuild_features(GENESIS, [legacy]) == GENESIS | |