nemocity / tests /test_api.py
AndresCarreon's picture
NEMOCITY v0 — mock backend, gradio 6.16.0 (pre-SSR)
d72231c verified
Raw
History Blame Contribute Delete
19.8 kB
"""HTTP/SSE layer tests (AGENT-SERVER). Engine and mind are faked — these tests
exercise only the transport: endpoints, SSE hub, persistence, rate limiting, the
fix gate, error paths. Wire names keep godseed's wish_* per the contract.
Run:
.venv/bin/python -m pytest tests/test_api.py -x -q
"""
from __future__ import annotations
import asyncio
import json
import sys
import time
import zlib
from pathlib import Path
from typing import Any, Optional
import pytest
from fastapi.testclient import TestClient
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from app import ( # noqa: E402
FIX_COOLDOWN_REASON,
FIX_RATE_REASON,
TRAFFIC_SMOOTH_REASON,
create_app,
)
from server.persistence import PersistenceService, rebuild_features # noqa: E402
from server.ratelimit import ClientIdentity, RateLimiter # noqa: E402
from server.sse import SSEHub # noqa: E402
CITY_EPOCH_S = 1781222400.0
# Genesis fixture — a slice of ARCHITECTURE.md epoch-0 shapes (events are
# byte-compatible with godseed Features: id, wish_id, tool, args, seed, t).
GENESIS = [
{"id": "e_000000", "wish_id": "genesis", "tool": "lay_road",
"args": {"cells": [[0, -12], [0, -11], [0, -10]], "klass": "avenue",
"name": "Main St"}, "seed": 7, "t": CITY_EPOCH_S},
{"id": "e_000001", "wish_id": "genesis", "tool": "lay_road",
"args": {"cells": [[8, 0], [9, 0]], "klass": "street", "name": "Old Bridge"},
"seed": 11, "t": CITY_EPOCH_S},
{"id": "e_000002", "wish_id": "genesis", "tool": "place_building",
"args": {"kind": "town_hall", "name": "City Hall", "cx": -2, "cz": -3,
"w": 2, "d": 2, "floors": 2, "hue": 35, "variant": 0},
"seed": 13, "t": CITY_EPOCH_S},
{"id": "e_000003", "wish_id": "genesis", "tool": "note",
"args": {"text": "Nemocity is listening. Ask for a building.",
"kind": "milestone"}, "seed": 17, "t": CITY_EPOCH_S},
]
PLAN = {
"intent": "build",
"blurb": "A little cafe rises near the park.",
"buildings": [{"kind": "cafe", "name": "Cafe Luna", "near": "the park"}],
"decline_reason": None,
}
# ------------------------------------------------------------------------------- fakes
class TrafficSmooth(Exception):
"""Engine gate verdict; the server matches this by TYPE NAME, never by import."""
def fake_fast_check(text: str) -> Optional[str]:
"""Moderation fast path: readable reason or None (engine contract)."""
if "forbidden" in text.lower():
return "City hall declines this petition; it reeks of the forbidden."
if len(text) > 160:
return "City hall reads only short petitions; trim yours to fewer words."
return None
class FakePlanner:
"""Emits the City Hall paperwork (plan), makes one place_building call,
returns a WishTrace-shaped dict."""
async def grant(self, wish: str, world_summary: str, act, emit) -> dict[str, Any]:
await emit({"type": "plan", "plan": PLAN})
call = {
"tool": "place_building",
"args": {"kind": "cafe", "name": "Cafe Luna", "cx": 3, "cz": -2,
"w": 1, "d": 1, "floors": 1, "hue": 35, "variant": 2},
}
await act(call)
return {
"text": wish,
"submitted_at": time.time(),
"moderation": {"allowed": True, "category": None},
"plan": PLAN,
"epitaph": PLAN["blurb"],
"model": "fake", "backend": "fake", "ms_total": 1,
}
class FakeWorld:
def __init__(self, features: Optional[list[dict[str, Any]]] = None) -> None:
self.features: list[dict[str, Any]] = list(features or [])
self.version = len(self.features)
self.epoch = 0
def apply_feature(self, feature: dict[str, Any]) -> None:
self.features.append(feature)
self.version += 1
class FakeQueueWorker:
"""Minimal stand-in for engine.queue_worker: pop -> grant -> persist -> emit,
plus the City Engineer gate (submit_fix raises TrafficSmooth when smooth)."""
def __init__(self, world: FakeWorld, hub: SSEHub,
persistence: PersistenceService, planner: FakePlanner,
traffic_smooth: bool = False) -> None:
self.world, self.hub, self.persistence, self.planner = (
world, hub, persistence, planner)
self.traffic_smooth = traffic_smooth
self._pending: asyncio.Queue[dict[str, Any]] = asyncio.Queue()
self._wish_counter = 0
self._feature_counter = len(world.features)
self._current: Optional[dict[str, Any]] = None
self._task: Optional[asyncio.Task[None]] = None
async def start(self) -> None:
self._task = asyncio.create_task(self._run())
async def stop(self) -> None:
if self._task is not None:
self._task.cancel()
await asyncio.gather(self._task, return_exceptions=True)
async def submit(self, text: str, client_id: str) -> tuple[str, int]:
self._wish_counter += 1
wish_id = f"w_{self._wish_counter:06d}"
await self._pending.put({"wish_id": wish_id, "text": text})
return wish_id, self._pending.qsize()
async def submit_fix(self, client_id: str) -> tuple[str, int]:
if self.traffic_smooth:
raise TrafficSmooth("static assignment max demandRatio < 0.8")
self._wish_counter += 1
return f"w_{self._wish_counter:06d}", 1
def snapshot(self) -> dict[str, Any]:
current = None
if self._current is not None:
current = {"wish_id": self._current["wish_id"],
"text_preview": self._current["text"][:40]}
return {"length": self._pending.qsize() + (1 if current else 0),
"current": current}
async def _run(self) -> None:
while True:
wish = await self._pending.get()
self._current = wish
wish_id = wish["wish_id"]
# Test-only grace: starlette's TestClient buffers a whole response, so the
# SSE test must subscribe AFTER posting; hold the grant briefly so the
# subscriber sees every event. The real engine worker has no such wait.
deadline = asyncio.get_running_loop().time() + 0.5
while (self.hub.subscriber_count == 0
and asyncio.get_running_loop().time() < deadline):
await asyncio.sleep(0.005)
await self.hub.emit(
{"type": "wish_started", "wish_id": wish_id, "text": wish["text"]})
features: list[dict[str, Any]] = []
call_index = 0
async def act(call: dict[str, Any]) -> str:
nonlocal call_index
await self.hub.emit({"type": "tool_call", "wish_id": wish_id,
"call_index": call_index,
"tool": call["tool"], "args": call["args"]})
feature = {
"id": f"e_{self._feature_counter:06d}",
"wish_id": wish_id,
"tool": call["tool"],
"args": call["args"],
"seed": zlib.crc32(f"{wish_id}:{call_index}".encode()) & 0x7FFFFFFF,
"t": time.time(),
}
self._feature_counter += 1
call_index += 1
self.world.apply_feature(feature)
features.append(feature)
await self.hub.emit({"type": "world_delta", "feature": feature})
return f"ok: {call['tool']}"
async def emit(event: dict[str, Any]) -> None:
await self.hub.emit({"wish_id": wish_id, **event})
trace = await self.planner.grant(wish["text"], "a small city", act, emit)
trace["wish_id"] = wish_id
trace["feature_ids"] = [f["id"] for f in features]
await self.persistence.record(trace, features)
self.world.epoch += 1
await self.hub.emit({"type": "wish_granted", "wish_id": wish_id,
"epitaph": trace["epitaph"],
"epoch": self.world.epoch})
self._current = None
# ---------------------------------------------------------------------------- fixtures
def build_api(tmp_path: Path, *, traffic_smooth: bool = False,
fix_rate_limiter: Optional[RateLimiter] = None,
fix_cooldown: Optional[RateLimiter] = None):
persistence = PersistenceService(tmp_path / "traces")
features = persistence.boot(GENESIS)
world = FakeWorld(features)
hub = SSEHub(heartbeat=0.05) # fast heartbeats pad bounded test streams quickly
queue = FakeQueueWorker(world, hub, persistence, FakePlanner(),
traffic_smooth=traffic_smooth)
app = create_app(
world=world, queue=queue, hub=hub, persistence=persistence,
fast_check=fake_fast_check,
rate_limiter=RateLimiter(limit=3, window=3600.0),
ip_rate_limiter=RateLimiter(limit=100, window=3600.0),
fix_rate_limiter=fix_rate_limiter,
fix_cooldown=fix_cooldown,
identity=ClientIdentity(secret="test-secret"),
mount_web=False, mount_panel=False,
)
return app, queue
@pytest.fixture()
def api(tmp_path: Path):
app, _queue = build_api(tmp_path)
return app
def _poll_state(client: TestClient, predicate, timeout: float = 5.0) -> dict[str, Any]:
deadline = time.time() + timeout
while time.time() < deadline:
state = client.get("/api/state").json()
if predicate(state):
return state
time.sleep(0.02)
raise AssertionError("condition not reached before timeout")
# ------------------------------------------------------------------------------- tests
def test_post_wish_returns_id_and_position(api) -> None:
with TestClient(api) as client:
response = client.post("/api/wish", json={"text": "a ramen shop near the park"})
assert response.status_code == 200
body = response.json()
assert body["wish_id"].startswith("w_")
assert isinstance(body["position"], int) and body["position"] >= 1
def test_state_includes_server_now(api) -> None:
with TestClient(api) as client:
before = time.time()
state = client.get("/api/state").json()
after = time.time()
assert before <= state["server_now"] <= after
def test_sse_stream_emits_petition_lifecycle(api) -> None:
"""TestClient buffers whole responses, so the stream is bounded with ?limit=N
(heartbeats pad it to N) and read after it closes; the fake worker holds the
petition until the subscription lands, so every lifecycle event is captured."""
with TestClient(api) as client:
posted = client.post("/api/wish", json={"text": "a cafe by the bridge"})
assert posted.status_code == 200
wish_id = posted.json()["wish_id"]
response = client.get("/api/stream", params={"limit": 16})
assert response.status_code == 200
assert response.headers["x-accel-buffering"] == "no"
assert response.headers["content-type"].startswith("text/event-stream")
events = [
json.loads(line[len("data: "):])
for line in response.text.splitlines()
if line.startswith("data: ")
]
assert len(events) == 16
assert events[0]["type"] == "hello"
assert events[0]["world_version"] == len(GENESIS)
assert events[0]["epoch"] == 0
assert events[0]["server_now"] == pytest.approx(time.time(), abs=30)
assert events[1]["type"] == "queue"
for hb in (e for e in events if e["type"] == "heartbeat"):
assert isinstance(hb["server_now"], float)
seen = [e for e in events[2:] if e["type"] != "heartbeat"]
types = [e["type"] for e in seen]
assert types == ["wish_started", "plan", "tool_call", "world_delta",
"wish_granted"]
assert seen[0]["wish_id"] == wish_id
plan = seen[1]
assert plan["plan"] == PLAN # raw City Hall paperwork passes through intact
delta = next(e for e in seen if e["type"] == "world_delta")
assert delta["feature"]["tool"] == "place_building"
assert delta["feature"]["wish_id"] == wish_id
granted = seen[-1]
assert granted["epitaph"] and granted["epoch"] == 1
def test_state_reflects_new_feature_and_ledger(api) -> None:
with TestClient(api) as client:
before = client.get("/api/state").json()
baseline = len(before["world"]["features"])
assert baseline == len(GENESIS)
assert before["queue"] == {"length": 0, "current": None}
wish_id = client.post(
"/api/wish", json={"text": "a row of townhouses on Elm St"}
).json()["wish_id"]
state = _poll_state(
client, lambda s: len(s["world"]["features"]) > baseline)
newest = state["world"]["features"][-1]
assert newest["wish_id"] == wish_id
assert newest["seed"] == zlib.crc32(f"{wish_id}:0".encode()) & 0x7FFFFFFF
assert state["world"]["version"] == baseline + 1
assert state["world"]["epoch"] == 1
assert state["wishes_recent"][0]["wish_id"] == wish_id
index = client.get("/api/wishes").json()
assert [w["wish_id"] for w in index] == [wish_id]
assert index[0]["epitaph"] == PLAN["blurb"]
trace = client.get(f"/api/wishes/{wish_id}").json()
assert trace["wish_id"] == wish_id
assert trace["plan"] == PLAN and trace["feature_ids"] == [newest["id"]]
assert trace["features"][0] == newest # embedded for boot rebuild
missing = client.get("/api/wishes/w_999999")
assert missing.status_code == 404
def test_rate_limit_429_on_fourth_wish(api) -> None:
with TestClient(api) as client:
for i in range(3):
ok = client.post("/api/wish", json={"text": f"small petition {i}"})
assert ok.status_code == 200, ok.text
blocked = client.post("/api/wish", json={"text": "one petition too many"})
assert blocked.status_code == 429
assert "Retry-After" in blocked.headers
detail = blocked.json()["detail"]
assert isinstance(detail, str) and detail # readable, not a stack trace
def test_moderation_fast_path_rejects_with_readable_reason(api) -> None:
with TestClient(api) as client:
response = client.post("/api/wish", json={"text": "a forbidden storefront"})
assert response.status_code == 400
assert response.json()["detail"] == (
"City hall declines this petition; it reeks of the forbidden.")
# rejected petitions never reach the city
assert len(client.get("/api/state").json()["world"]["features"]) == len(GENESIS)
def test_blank_wish_is_unprocessable(api) -> None:
with TestClient(api) as client:
assert client.post("/api/wish", json={"text": " "}).status_code == 422
assert client.post("/api/wish", json={}).status_code == 422
# ---------------------------------------------------------------------- /api/fix tests
def test_fix_409_when_traffic_smooth_and_cooldown_not_engaged(tmp_path: Path) -> None:
app, queue = build_api(tmp_path, traffic_smooth=True)
with TestClient(app) as client:
smooth = client.post("/api/fix")
assert smooth.status_code == 409
assert smooth.json()["detail"] == TRAFFIC_SMOOTH_REASON
# a no-op click must not block the citywide engineer: jam forms, fix lands
queue.traffic_smooth = False
fixed = client.post("/api/fix")
assert fixed.status_code == 200, fixed.text
body = fixed.json()
assert body["wish_id"].startswith("w_") and body["position"] >= 1
def test_fix_200_when_jammed_then_global_cooldown_429(tmp_path: Path) -> None:
app, _queue = build_api(tmp_path)
with TestClient(app) as client:
first = client.post("/api/fix")
assert first.status_code == 200, first.text
second = client.post("/api/fix")
assert second.status_code == 429
assert second.json()["detail"] == FIX_COOLDOWN_REASON
assert "Retry-After" in second.headers
def test_fix_client_rate_limit_two_per_hour(tmp_path: Path) -> None:
# a permissive global cooldown isolates the per-client 2/hr window
app, _queue = build_api(
tmp_path, fix_cooldown=RateLimiter(limit=100, window=120.0))
with TestClient(app) as client:
for _ in range(2):
assert client.post("/api/fix").status_code == 200
blocked = client.post("/api/fix")
assert blocked.status_code == 429
assert blocked.json()["detail"] == FIX_RATE_REASON
assert "Retry-After" in blocked.headers
# ------------------------------------------------------------------- unit: components
def test_sse_hub_drops_oldest_never_blocks() -> None:
async def scenario() -> list[int]:
hub = SSEHub(heartbeat=0.1, max_queue=3)
queue = hub.subscribe()
for i in range(6):
hub.publish({"type": "n", "i": i}) # never raises, never blocks
hub.unsubscribe(queue)
kept = []
while not queue.empty():
kept.append(queue.get_nowait()["i"])
return kept
assert asyncio.run(scenario()) == [3, 4, 5] # oldest were dropped
def test_rate_limiter_sliding_window() -> None:
now = [0.0]
limiter = RateLimiter(limit=3, window=3600.0, clock=lambda: now[0])
key = "client"
for t in (0.0, 1000.0, 2000.0):
now[0] = t
assert limiter.hit(key)[0]
now[0] = 2500.0
allowed, retry_after = limiter.hit(key)
assert not allowed and retry_after == 3600.0 - 2500.0 # until the t=0 hit expires
now[0] = 3600.5 # only the t=0 hit has slid out of the window
assert limiter.hit(key)[0]
assert limiter.remaining(key) == 0 # live: t=1000, t=2000, t=3600.5
allowed, _ = limiter.hit(key)
assert not allowed # denied attempts were never recorded, window is exact
def test_identity_cookie_roundtrip_and_tamper() -> None:
identity = ClientIdentity(secret="s3cret")
cookie = identity.mint()
token = cookie.split(".", 1)[0]
assert identity.verify(cookie) == token
assert identity.verify(cookie[:-1] + ("0" if cookie[-1] != "0" else "1")) is None
assert identity.verify("garbage") is None
assert identity.verify(None) is None
def test_persistence_rebuild_survives_corruption(tmp_path: Path) -> None:
traces_dir = tmp_path / "traces"
first = PersistenceService(traces_dir)
first.boot(GENESIS)
feature = {"id": "e_000004", "wish_id": "w_000001", "tool": "place_building",
"args": {"kind": "house", "name": "Elm Cottage", "cx": -1, "cz": -6,
"w": 1, "d": 1, "floors": 1, "hue": 20, "variant": 1},
"seed": 19, "t": CITY_EPOCH_S + 60.0}
asyncio.run(first.record(
{"wish_id": "w_000001", "text": "a little house", "submitted_at": 1.0,
"epitaph": "a home with a clay roof"},
[feature],
))
# simulate a crash mid-append
with first.path.open("a", encoding="utf-8") as fh:
fh.write('{"wish_id": "w_000002", "text": "truncat')
second = PersistenceService(traces_dir)
rebuilt = second.boot(GENESIS)
assert rebuilt == GENESIS + [feature]
assert [t["wish_id"] for t in second.traces()] == ["w_000001"]
assert second.find("w_000001")["epitaph"] == "a home with a clay roof"
assert second.find("w_000404") is None
def test_rebuild_skips_traces_without_embedded_features() -> None:
legacy = {"wish_id": "w_000009", "feature_ids": ["e_0009"]} # no features array
assert rebuild_features(GENESIS, [legacy]) == GENESIS