Spaces:
Running
Running
| """Tests for the FastAPI app + every route.""" | |
| from __future__ import annotations | |
| import json | |
| from typing import Any | |
| import httpx | |
| import pytest | |
| from fastapi.testclient import TestClient | |
| def _build_app(): | |
| from polyglot_alpha.api.main import create_app | |
| return create_app() | |
| def _force_judges_pass(monkeypatch: pytest.MonkeyPatch) -> None: | |
| """Force the judge panel to PASS so demo lifecycles complete to SUBMITTED.""" | |
| from polyglot_alpha import orchestrator | |
| async def passing_judges(_q: dict[str, Any]) -> orchestrator.JudgePanelResult: | |
| return orchestrator.JudgePanelResult( | |
| translation_scores={"bleu": 0.9, "comet": 0.85, "mqm": {"score": 0}}, | |
| style_alignment_passes={f"d{i}": True for i in range(1, 9)}, | |
| overall_score=0.92, | |
| verdict="PASS", | |
| ) | |
| monkeypatch.setattr(orchestrator, "_evaluate_with_judges", passing_judges) | |
| def test_health_and_root(isolated_db: str) -> None: | |
| app = _build_app() | |
| with TestClient(app) as client: | |
| r = client.get("/health") | |
| assert r.status_code == 200 | |
| assert r.json() == {"status": "ok"} | |
| r = client.get("/") | |
| assert r.status_code == 200 | |
| assert r.json()["name"] == "polyglot-alpha" | |
| def test_trigger_runs_full_lifecycle_and_returns_summary(isolated_db: str) -> None: | |
| app = _build_app() | |
| with TestClient(app) as client: | |
| payload: dict[str, Any] = { | |
| "title": "Trigger test event", | |
| "sources": [{"name": "test", "url": "https://example.com/x"}], | |
| "auction_window_seconds": 0.0, | |
| "mock_bids": [ | |
| # Thesis: lowest qualified bid wins. ``0xwinner`` bids | |
| # the lower amount, so it is the auction winner. | |
| {"agent_address": "0xwinner", "bid_amount": 2.0}, | |
| {"agent_address": "0xrunner", "bid_amount": 5.0}, | |
| ], | |
| } | |
| r = client.post("/trigger/event", json=payload) | |
| assert r.status_code == 200, r.text | |
| body = r.json() | |
| assert body["status"] == "SUBMITTED" | |
| assert body["winner_address"] == "0xwinner" | |
| assert body["is_simulated"] is True | |
| def test_events_routes_list_get_and_bids(isolated_db: str) -> None: | |
| app = _build_app() | |
| with TestClient(app) as client: | |
| client.post( | |
| "/trigger/event", | |
| json={ | |
| "title": "Events route test", | |
| "sources": [], | |
| "auction_window_seconds": 0.0, | |
| "mock_bids": [ | |
| {"agent_address": "0xA", "bid_amount": 1.0}, | |
| {"agent_address": "0xB", "bid_amount": 3.0}, | |
| ], | |
| }, | |
| ) | |
| r = client.get("/events") | |
| assert r.status_code == 200 | |
| events = r.json() | |
| assert isinstance(events, list) | |
| assert len(events) == 1 | |
| event_id = events[0]["id"] | |
| r = client.get(f"/events/{event_id}") | |
| assert r.status_code == 200 | |
| assert str(r.json()["id"]) == str(event_id) | |
| r = client.get(f"/events/{event_id}/bids") | |
| assert r.status_code == 200 | |
| bids = r.json()["items"] | |
| assert len(bids) == 2 | |
| assert {b["agent_address"] for b in bids} == {"0xA", "0xB"} | |
| r = client.get("/events/99999") | |
| assert r.status_code == 404 | |
| def test_agents_routes(isolated_db: str) -> None: | |
| app = _build_app() | |
| with TestClient(app) as client: | |
| client.post( | |
| "/trigger/event", | |
| json={ | |
| "title": "Agents route test", | |
| "sources": [], | |
| "auction_window_seconds": 0.0, | |
| "mock_bids": [ | |
| # Lowest qualified bid wins. | |
| {"agent_address": "0xWINNER", "bid_amount": 1.0}, | |
| {"agent_address": "0xLOSER", "bid_amount": 4.0}, | |
| ], | |
| }, | |
| ) | |
| r = client.get("/agents/0xWINNER") | |
| assert r.status_code == 200 | |
| body = r.json() | |
| assert body["agent_address"] == "0xWINNER" | |
| assert body["total_wins"] == 1 | |
| r = client.get("/agents/0xWINNER/history") | |
| assert r.status_code == 200 | |
| history = r.json() | |
| assert history["wins"], "winner should have at least one win" | |
| assert history["bids"], "winner should have at least one bid" | |
| r = client.get("/agents/0xUNKNOWN") | |
| assert r.status_code == 404 | |
| def test_leaderboard_route(isolated_db: str) -> None: | |
| app = _build_app() | |
| with TestClient(app) as client: | |
| client.post( | |
| "/trigger/event", | |
| json={ | |
| "title": "Leaderboard run A", | |
| "sources": [], | |
| "auction_window_seconds": 0.0, | |
| "mock_bids": [ | |
| # Lowest qualified bid wins -> 0xTOP (bid=1.0). | |
| {"agent_address": "0xTOP", "bid_amount": 1.0}, | |
| {"agent_address": "0xMID", "bid_amount": 9.0}, | |
| ], | |
| }, | |
| ) | |
| client.post( | |
| "/trigger/event", | |
| json={ | |
| "title": "Leaderboard run B", | |
| "sources": [], | |
| "auction_window_seconds": 0.0, | |
| "mock_bids": [ | |
| {"agent_address": "0xTOP", "bid_amount": 7.0}, | |
| ], | |
| }, | |
| ) | |
| r = client.get("/leaderboard?sort_by=cumulative_fees") | |
| assert r.status_code == 200 | |
| items = r.json() | |
| assert isinstance(items, list) | |
| assert items[0]["agent_address"] == "0xTOP" | |
| assert items[0]["address"] == "0xTOP" | |
| assert items[0]["rank"] == 1 | |
| r = client.get("/leaderboard?sort_by=total_wins&limit=5") | |
| assert r.status_code == 200 | |
| def test_trigger_validation_errors(isolated_db: str) -> None: | |
| app = _build_app() | |
| with TestClient(app) as client: | |
| # Missing title -> 422 | |
| r = client.post("/trigger/event", json={}) | |
| assert r.status_code == 422 | |
| def test_trigger_event_source_rss_no_title_returns_200( | |
| isolated_db: str, monkeypatch: pytest.MonkeyPatch | |
| ) -> None: | |
| """``event_source='rss'`` with no title must succeed even when the live | |
| RSS pipeline is unavailable — it falls back to the hardcoded sample. | |
| """ | |
| # Force the RSS fetch helper to return None so we exercise the | |
| # ``hardcoded`` -> in-process fallback chain. | |
| from polyglot_alpha.api.routes import trigger as trigger_route | |
| async def fake_rss_fetch(_window: int) -> None: | |
| return None | |
| monkeypatch.setattr(trigger_route, "_fetch_rss_demo_event", fake_rss_fetch) | |
| app = _build_app() | |
| with TestClient(app) as client: | |
| r = client.post( | |
| "/trigger/event", | |
| json={ | |
| "event_source": "rss", | |
| "auction_window_seconds": 0.0, | |
| "mock_bids": [ | |
| {"agent_address": "0xrss", "bid_amount": 1.0} | |
| ], | |
| }, | |
| ) | |
| assert r.status_code == 200, r.text | |
| body = r.json() | |
| assert body["status"] == "SUBMITTED" | |
| # The synthesized title should be a non-empty string (either the | |
| # hardcoded sample or the baked-in fallback headline). | |
| assert isinstance(body.get("event_id"), int) | |
| def test_trigger_event_source_hardcoded_no_title_returns_200( | |
| isolated_db: str, | |
| ) -> None: | |
| """``event_source='hardcoded'`` loads ``outputs/sample_0.json``.""" | |
| app = _build_app() | |
| with TestClient(app) as client: | |
| r = client.post( | |
| "/trigger/event", | |
| json={ | |
| "event_source": "hardcoded", | |
| "auction_window_seconds": 0.0, | |
| "mock_bids": [ | |
| {"agent_address": "0xhardcoded", "bid_amount": 1.0} | |
| ], | |
| }, | |
| ) | |
| assert r.status_code == 200, r.text | |
| assert r.json()["status"] == "SUBMITTED" | |
| async def test_sse_iterator_emits_lifecycle_events(isolated_db: str) -> None: | |
| """Drive the SSE iterator directly + run a lifecycle; assert events arrive. | |
| Note: We test the SSE iterator function rather than streaming over | |
| httpx's ASGITransport because ASGITransport buffers streamed bodies | |
| until response completion, which makes long-lived SSE responses | |
| untestable through the transport. The route function uses this iterator | |
| verbatim, so the contract is preserved. | |
| """ | |
| import asyncio | |
| from polyglot_alpha.api.routes.sse import _event_iter | |
| from polyglot_alpha.orchestrator import BidRecord, run_lifecycle | |
| class _FakeRequest: | |
| async def is_disconnected(self) -> bool: | |
| return False | |
| seen: list[dict[str, Any]] = [] | |
| async def reader() -> None: | |
| async for msg in _event_iter(_FakeRequest()): # type: ignore[arg-type] | |
| seen.append(msg) | |
| if msg.get("event") == "polymarket.submitted": | |
| return | |
| reader_task = asyncio.create_task(reader()) | |
| await asyncio.sleep(0.1) # let the subscriber register | |
| await run_lifecycle( | |
| { | |
| "title": "SSE stream test", | |
| "sources": [], | |
| "language": "en", | |
| "category": "geopolitics", | |
| }, | |
| auction_window_seconds=0.0, | |
| mock_bids=[BidRecord(agent_address="0xstream", bid_amount=1.0)], | |
| ) | |
| await asyncio.wait_for(reader_task, timeout=5.0) | |
| types = {m.get("event") for m in seen} | |
| assert "hello" in types | |
| assert "polymarket.submitted" in types | |
| # The other key transitions should show up too. | |
| assert "event.created" in types | |
| assert "auction.settled" in types | |
| def test_sse_route_registered(isolated_db: str) -> None: | |
| """Smoke test: the SSE route is wired into the FastAPI app.""" | |
| app = _build_app() | |
| routes = {getattr(r, "path", None) for r in app.routes} | |
| assert "/sse/events" in routes | |