godseed / tests /test_api.py
AndresCarreon's picture
GODSEED v0
0594db3 verified
Raw
History Blame Contribute Delete
15.9 kB
"""HTTP/SSE layer tests (AGENT B). Engine and mind are faked — these tests exercise
only the transport: endpoints, SSE hub, persistence, rate limiting, error paths.
Run:
uv run --python 3.12 --with pytest --with fastapi --with httpx --with pydantic \
python -m pytest tests/test_api.py -x -q
"""
from __future__ import annotations
import asyncio
import json
import sys
import time
import zlib
from pathlib import Path
from typing import Any, Optional
import pytest
from fastapi.testclient import TestClient
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from app import create_app # noqa: E402
from server.persistence import PersistenceService, rebuild_features # noqa: E402
from server.ratelimit import ClientIdentity, RateLimiter # noqa: E402
from server.sse import SSEHub # noqa: E402
# Genesis fixture — mirrors ARCHITECTURE.md epoch-0 state.
GENESIS = [
{"id": "f_000000", "wish_id": "genesis", "tool": "set_sky",
"args": {"palette": "void", "star_density": 0.9, "moons": 1}, "seed": 7, "t": 0},
{"id": "f_000001", "wish_id": "genesis", "tool": "raise_terrain",
"args": {"lat": 12, "lon": 40, "radius_deg": 28, "height": 0.05, "roughness": 0.7},
"seed": 11, "t": 0},
{"id": "f_000002", "wish_id": "genesis", "tool": "place_structure",
"args": {"lat": 14, "lon": 38, "kind": "monolith", "scale": 1.4, "hue": 270},
"seed": 13, "t": 0},
{"id": "f_000003", "wish_id": "genesis", "tool": "inscribe_wish",
"args": {"text": "speak, and it will be made", "style": "orbit"}, "seed": 17, "t": 0},
]
# ------------------------------------------------------------------------------- fakes
def fake_fast_check(text: str) -> Optional[str]:
"""Moderation fast path: poetic reason or None (engine contract)."""
if "forbidden" in text.lower():
return "The god declines this wish; it reeks of the forbidden."
if len(text) > 140:
return "The god hears only short wishes; carve yours to fewer words."
return None
class FakePlanner:
"""Streams two thoughts, makes one spawn_flora tool call, returns a WishTrace."""
async def grant(self, wish: str, world_summary: str, act, emit) -> dict[str, Any]:
await emit({"type": "thought_token", "text": "the god considers… "})
call = {
"tool": "spawn_flora",
"args": {"lat": 10.0, "lon": 20.0, "radius_deg": 5,
"kind": "glowgrass", "density": 0.7, "hue": 160},
}
observation = await act(call)
await emit({"type": "thought_token", "text": "it is done."})
return {
"text": wish,
"submitted_at": time.time(),
"moderation": {"allowed": True, "category": None},
"reading": "a small green wanting",
"turns": [{"thought": "grass, then light", "call": call,
"observation": observation}],
"epitaph": "and the meadow shone",
"model": "fake", "backend": "fake", "ms_total": 1,
}
class FakeWorld:
def __init__(self, features: Optional[list[dict[str, Any]]] = None) -> None:
self.features: list[dict[str, Any]] = list(features or [])
self.version = len(self.features)
self.epoch = 0
def apply_feature(self, feature: dict[str, Any]) -> None:
self.features.append(feature)
self.version += 1
class FakeQueueWorker:
"""Minimal stand-in for engine.queue_worker: pop -> grant -> persist -> emit."""
def __init__(self, world: FakeWorld, hub: SSEHub,
persistence: PersistenceService, planner: FakePlanner) -> None:
self.world, self.hub, self.persistence, self.planner = (
world, hub, persistence, planner)
self._pending: asyncio.Queue[dict[str, Any]] = asyncio.Queue()
self._wish_counter = 0
self._feature_counter = len(world.features)
self._current: Optional[dict[str, Any]] = None
self._task: Optional[asyncio.Task[None]] = None
async def start(self) -> None:
self._task = asyncio.create_task(self._run())
async def stop(self) -> None:
if self._task is not None:
self._task.cancel()
await asyncio.gather(self._task, return_exceptions=True)
async def submit(self, text: str, client_id: str) -> tuple[str, int]:
self._wish_counter += 1
wish_id = f"w_{self._wish_counter:06d}"
await self._pending.put({"wish_id": wish_id, "text": text})
return wish_id, self._pending.qsize()
def snapshot(self) -> dict[str, Any]:
current = None
if self._current is not None:
current = {"wish_id": self._current["wish_id"],
"text_preview": self._current["text"][:40]}
return {"length": self._pending.qsize() + (1 if current else 0),
"current": current}
async def _run(self) -> None:
while True:
wish = await self._pending.get()
self._current = wish
wish_id = wish["wish_id"]
# Test-only grace: starlette's TestClient buffers a whole response, so the
# SSE test must subscribe AFTER posting; hold the liturgy briefly so the
# subscriber sees every event. The real engine worker has no such wait.
deadline = asyncio.get_running_loop().time() + 0.5
while (self.hub.subscriber_count == 0
and asyncio.get_running_loop().time() < deadline):
await asyncio.sleep(0.005)
await self.hub.emit(
{"type": "wish_started", "wish_id": wish_id, "text": wish["text"]})
features: list[dict[str, Any]] = []
call_index = 0
async def act(call: dict[str, Any]) -> str:
nonlocal call_index
feature = {
"id": f"f_{self._feature_counter:06d}",
"wish_id": wish_id,
"tool": call["tool"],
"args": call["args"],
"seed": zlib.crc32(f"{wish_id}:{call_index}".encode()) & 0x7FFFFFFF,
"t": time.time(),
}
self._feature_counter += 1
call_index += 1
self.world.apply_feature(feature)
features.append(feature)
await self.hub.emit({"type": "world_delta", "feature": feature})
return f"ok: {call['tool']}"
async def emit(event: dict[str, Any]) -> None:
await self.hub.emit({"wish_id": wish_id, **event})
trace = await self.planner.grant(wish["text"], "a barren world", act, emit)
trace["wish_id"] = wish_id
trace["feature_ids"] = [f["id"] for f in features]
await self.persistence.record(trace, features)
self.world.epoch += 1
await self.hub.emit({"type": "wish_granted", "wish_id": wish_id,
"epitaph": trace["epitaph"],
"epoch": self.world.epoch})
self._current = None
# ---------------------------------------------------------------------------- fixtures
@pytest.fixture()
def api(tmp_path: Path):
persistence = PersistenceService(tmp_path / "traces")
features = persistence.boot(GENESIS)
world = FakeWorld(features)
hub = SSEHub(heartbeat=0.05) # fast heartbeats pad bounded test streams quickly
queue = FakeQueueWorker(world, hub, persistence, FakePlanner())
return create_app(
world=world, queue=queue, hub=hub, persistence=persistence,
fast_check=fake_fast_check,
rate_limiter=RateLimiter(limit=3, window=3600.0),
ip_rate_limiter=RateLimiter(limit=100, window=3600.0),
identity=ClientIdentity(secret="test-secret"),
mount_web=False, mount_panel=False,
)
def _poll_state(client: TestClient, predicate, timeout: float = 5.0) -> dict[str, Any]:
deadline = time.time() + timeout
while time.time() < deadline:
state = client.get("/api/state").json()
if predicate(state):
return state
time.sleep(0.02)
raise AssertionError("condition not reached before timeout")
# ------------------------------------------------------------------------------- tests
def test_post_wish_returns_id_and_position(api) -> None:
with TestClient(api) as client:
response = client.post("/api/wish", json={"text": "a meadow of glowgrass"})
assert response.status_code == 200
body = response.json()
assert body["wish_id"].startswith("w_")
assert isinstance(body["position"], int) and body["position"] >= 1
def test_sse_stream_emits_wish_lifecycle(api) -> None:
"""TestClient buffers whole responses, so the stream is bounded with ?limit=N
(heartbeats pad it to N) and read after it closes; the fake worker holds the
wish until the subscription lands, so every lifecycle event is captured."""
with TestClient(api) as client:
posted = client.post("/api/wish", json={"text": "raise a violet mountain"})
assert posted.status_code == 200
wish_id = posted.json()["wish_id"]
response = client.get("/api/stream", params={"limit": 16})
assert response.status_code == 200
assert response.headers["x-accel-buffering"] == "no"
assert response.headers["content-type"].startswith("text/event-stream")
events = [
json.loads(line[len("data: "):])
for line in response.text.splitlines()
if line.startswith("data: ")
]
assert len(events) == 16
assert events[0]["type"] == "hello"
assert events[0]["world_version"] == len(GENESIS)
assert events[0]["epoch"] == 0
assert events[1]["type"] == "queue"
seen = [e for e in events[2:] if e["type"] != "heartbeat"]
types = [e["type"] for e in seen]
assert types == ["wish_started", "thought_token", "world_delta",
"thought_token", "wish_granted"]
started = seen[0]
assert started["wish_id"] == wish_id
delta = next(e for e in seen if e["type"] == "world_delta")
assert delta["feature"]["tool"] == "spawn_flora"
assert delta["feature"]["wish_id"] == wish_id
granted = seen[-1]
assert granted["epitaph"] and granted["epoch"] == 1
def test_state_reflects_new_feature_and_genesis_log(api) -> None:
with TestClient(api) as client:
before = client.get("/api/state").json()
baseline = len(before["world"]["features"])
assert baseline == len(GENESIS)
assert before["queue"] == {"length": 0, "current": None}
wish_id = client.post(
"/api/wish", json={"text": "let reeds line a silver pool"}
).json()["wish_id"]
state = _poll_state(
client, lambda s: len(s["world"]["features"]) > baseline)
newest = state["world"]["features"][-1]
assert newest["wish_id"] == wish_id
assert newest["seed"] == zlib.crc32(f"{wish_id}:0".encode()) & 0x7FFFFFFF
assert state["world"]["version"] == baseline + 1
assert state["world"]["epoch"] == 1
assert state["wishes_recent"][0]["wish_id"] == wish_id
index = client.get("/api/wishes").json()
assert [w["wish_id"] for w in index] == [wish_id]
assert index[0]["epitaph"] == "and the meadow shone"
trace = client.get(f"/api/wishes/{wish_id}").json()
assert trace["wish_id"] == wish_id
assert trace["turns"] and trace["feature_ids"] == [newest["id"]]
assert trace["features"][0] == newest # embedded for boot rebuild
missing = client.get("/api/wishes/w_999999")
assert missing.status_code == 404
def test_rate_limit_429_on_fourth_wish(api) -> None:
with TestClient(api) as client:
for i in range(3):
ok = client.post("/api/wish", json={"text": f"small wish {i}"})
assert ok.status_code == 200, ok.text
blocked = client.post("/api/wish", json={"text": "one wish too many"})
assert blocked.status_code == 429
assert "Retry-After" in blocked.headers
detail = blocked.json()["detail"]
assert isinstance(detail, str) and detail # poetic, not a stack trace
def test_moderation_fast_path_rejects_with_poetic_reason(api) -> None:
with TestClient(api) as client:
response = client.post("/api/wish", json={"text": "a forbidden sigil"})
assert response.status_code == 400
assert response.json()["detail"] == (
"The god declines this wish; it reeks of the forbidden.")
# rejected wishes never reach the world
assert len(client.get("/api/state").json()["world"]["features"]) == len(GENESIS)
def test_blank_wish_is_unprocessable(api) -> None:
with TestClient(api) as client:
assert client.post("/api/wish", json={"text": " "}).status_code == 422
assert client.post("/api/wish", json={}).status_code == 422
# ------------------------------------------------------------------- unit: components
def test_sse_hub_drops_oldest_never_blocks() -> None:
async def scenario() -> list[int]:
hub = SSEHub(heartbeat=0.1, max_queue=3)
queue = hub.subscribe()
for i in range(6):
hub.publish({"type": "n", "i": i}) # never raises, never blocks
hub.unsubscribe(queue)
kept = []
while not queue.empty():
kept.append(queue.get_nowait()["i"])
return kept
assert asyncio.run(scenario()) == [3, 4, 5] # oldest were dropped
def test_rate_limiter_sliding_window() -> None:
now = [0.0]
limiter = RateLimiter(limit=3, window=3600.0, clock=lambda: now[0])
key = "client"
for t in (0.0, 1000.0, 2000.0):
now[0] = t
assert limiter.hit(key)[0]
now[0] = 2500.0
allowed, retry_after = limiter.hit(key)
assert not allowed and retry_after == 3600.0 - 2500.0 # until the t=0 hit expires
now[0] = 3600.5 # only the t=0 hit has slid out of the window
assert limiter.hit(key)[0]
assert limiter.remaining(key) == 0 # live: t=1000, t=2000, t=3600.5
allowed, _ = limiter.hit(key)
assert not allowed # denied attempts were never recorded, window is exact
def test_identity_cookie_roundtrip_and_tamper() -> None:
identity = ClientIdentity(secret="s3cret")
cookie = identity.mint()
token = cookie.split(".", 1)[0]
assert identity.verify(cookie) == token
assert identity.verify(cookie[:-1] + ("0" if cookie[-1] != "0" else "1")) is None
assert identity.verify("garbage") is None
assert identity.verify(None) is None
def test_persistence_rebuild_survives_corruption(tmp_path: Path) -> None:
traces_dir = tmp_path / "traces"
first = PersistenceService(traces_dir)
first.boot(GENESIS)
feature = {"id": "f_000004", "wish_id": "w_000001", "tool": "set_weather",
"args": {"kind": "mist", "intensity": 0.4}, "seed": 19, "t": 1.0}
asyncio.run(first.record(
{"wish_id": "w_000001", "text": "let mist fall", "submitted_at": 1.0,
"epitaph": "a hush of grey"},
[feature],
))
# simulate a crash mid-append
with first.path.open("a", encoding="utf-8") as fh:
fh.write('{"wish_id": "w_000002", "text": "truncat')
second = PersistenceService(traces_dir)
rebuilt = second.boot(GENESIS)
assert rebuilt == GENESIS + [feature]
assert [t["wish_id"] for t in second.traces()] == ["w_000001"]
assert second.find("w_000001")["epitaph"] == "a hush of grey"
assert second.find("w_000404") is None
def test_rebuild_skips_traces_without_embedded_features() -> None:
legacy = {"wish_id": "w_000009", "feature_ids": ["f_0009"]} # no features array
assert rebuild_features(GENESIS, [legacy]) == GENESIS