Spaces:
Running
Running
| """FastAPI application entrypoint for PolyglotAlpha v2.""" | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| import os | |
| import time | |
| from contextlib import asynccontextmanager | |
| from datetime import datetime, timedelta, timezone | |
| from typing import AsyncIterator | |
| from fastapi import FastAPI | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from slowapi.errors import RateLimitExceeded | |
| from slowapi.middleware import SlowAPIMiddleware | |
| from slowapi import _rate_limit_exceeded_handler | |
| from sqlmodel import func, select | |
| from ..llm import shutdown_anthropic | |
| from ..logging_ctx import install_event_id_filter | |
| from ..persistence import engine as _persistence_engine, init_db, session_scope | |
| from ..persistence.models import Event, EventStatus, FewShotExemplar | |
| from ..pubsub import get_pubsub | |
| from .rate_limit import limiter | |
| from .routes import ( | |
| agents, | |
| builder_fees, | |
| events, | |
| leaderboard, | |
| operators, | |
| polymarket, | |
| sse, | |
| trigger, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Non-terminal lifecycle statuses. If an event row is still in any of these | |
| # states on backend startup and is older than the recovery cutoff, the | |
| # previous backend process almost certainly crashed mid-lifecycle and the | |
| # in-memory orchestrator task is gone — sweep these rows to FAILED so the | |
| # UI doesn't display perpetual "running" badges. | |
| _NON_TERMINAL_STATUSES: tuple[str, ...] = ( | |
| EventStatus.PENDING.value, | |
| EventStatus.AUCTION_OPEN.value, | |
| EventStatus.AUCTION_SETTLED.value, | |
| EventStatus.TRANSLATING.value, | |
| EventStatus.EVALUATING.value, | |
| ) | |
| def _sweep_stuck_events() -> int: | |
| """Mark crashed-in-flight events as FAILED on startup. | |
| A previous backend process that crashed (OOM, restart, panic) cannot | |
| finish the lifecycle task it owned. Any row still in a non-terminal | |
| status that is older than ``2 * AUCTION_WINDOW_SECONDS + | |
| PANEL_TIMEOUT_SECONDS`` is past every legitimate phase budget and must | |
| be flipped to ``FAILED`` so /events views don't show a stuck row. | |
| Returns the number of rows updated. Best-effort: any exception is | |
| logged and swallowed so a sweep failure cannot block app startup. | |
| """ | |
| try: | |
| auction_s = float(os.environ.get("AUCTION_WINDOW_SECONDS", "60")) | |
| panel_s = float(os.environ.get("PANEL_TIMEOUT_SECONDS", "120")) | |
| # 2 * auction (open + settle drift) + panel + a small slack so we | |
| # never race a still-healthy lifecycle that's near its tail. | |
| cutoff_seconds = 2.0 * auction_s + panel_s | |
| cutoff = datetime.now(timezone.utc) - timedelta(seconds=cutoff_seconds) | |
| swept = 0 | |
| with session_scope() as session: | |
| stmt = select(Event).where( | |
| Event.status.in_(_NON_TERMINAL_STATUSES), # type: ignore[attr-defined] | |
| Event.triggered_at < cutoff, | |
| ) | |
| for row in session.exec(stmt).all(): | |
| row.status = EventStatus.FAILED.value | |
| session.add(row) | |
| swept += 1 | |
| if swept: | |
| logger.warning( | |
| "startup_recovery: swept %d stuck event(s) to FAILED " | |
| "(cutoff=%.0fs, reason=startup_recovery)", | |
| swept, | |
| cutoff_seconds, | |
| ) | |
| else: | |
| logger.info("startup_recovery: no stuck events found") | |
| return swept | |
| except Exception: # noqa: BLE001 — best-effort, must not block startup | |
| logger.exception("startup_recovery sweep failed; continuing startup") | |
| return 0 | |
| def _init_ingestion_tables() -> None: | |
| """Create the watcher-only ingestion tables on the persistence engine. | |
| ``polyglot_alpha.ingestion.models.RawEntry`` lives on a private SQLModel | |
| registry so it is **not** part of ``SQLModel.metadata`` and therefore | |
| not created by ``init_db()``. Without this hook the first RSS poll | |
| crashes with ``no such table: raw_entries``. Idempotent — running | |
| ``create_all`` against an already-present table is a no-op. | |
| """ | |
| try: | |
| from ..ingestion.models import _INGESTION_METADATA | |
| _INGESTION_METADATA.create_all(_persistence_engine) | |
| logger.info("startup_recovery: ingestion metadata create_all completed") | |
| except Exception: # noqa: BLE001 — best-effort, must not block startup | |
| logger.exception( | |
| "startup_recovery: ingestion metadata create_all failed; continuing" | |
| ) | |
| # When ``few_shot_exemplars`` is empty (fresh DB), auto-ingest the bundled | |
| # ``EXTENDED_EXEMPLARS`` so the LLM judges have ICL examples available out of | |
| # the box. Operators can disable this via ``SKIP_AUTO_INGEST_FEW_SHOTS=true`` | |
| # (useful for tests or for clusters that pre-seed via the one-shot script). | |
| _AUTO_INGEST_FEW_SHOTS_ENV: str = "SKIP_AUTO_INGEST_FEW_SHOTS" | |
| def _maybe_auto_ingest_few_shots() -> None: | |
| """Seed ``few_shot_exemplars`` from ``EXTENDED_EXEMPLARS`` if empty. | |
| Idempotent: only runs when the table count is zero. Any error is | |
| logged and swallowed so seeding cannot block startup. | |
| """ | |
| if os.environ.get(_AUTO_INGEST_FEW_SHOTS_ENV, "").lower() in {"1", "true", "yes"}: | |
| logger.info( | |
| "startup_recovery: %s=true; skipping few-shot auto-ingest", | |
| _AUTO_INGEST_FEW_SHOTS_ENV, | |
| ) | |
| return | |
| try: | |
| with session_scope() as session: | |
| existing = session.exec( | |
| select(func.count()).select_from(FewShotExemplar) | |
| ).one() | |
| # `existing` may be a tuple-like row depending on dialect. | |
| count = existing[0] if isinstance(existing, (tuple, list)) else int(existing) | |
| if count > 0: | |
| logger.info( | |
| "startup_recovery: few_shot_exemplars has %d row(s); skipping seed", | |
| count, | |
| ) | |
| return | |
| except Exception: # noqa: BLE001 — best-effort | |
| logger.exception( | |
| "startup_recovery: failed to count few_shot_exemplars; skipping seed" | |
| ) | |
| return | |
| try: | |
| from ..corpus.few_shots_extended import EXTENDED_EXEMPLARS | |
| from .._fewshots_seed import seed_few_shots_from_extended | |
| inserted = seed_few_shots_from_extended(EXTENDED_EXEMPLARS) | |
| logger.info( | |
| "startup_recovery: seeded few_shot_exemplars with %d row(s)", inserted | |
| ) | |
| except Exception: # noqa: BLE001 — best-effort | |
| logger.exception( | |
| "startup_recovery: few-shot auto-ingest failed; continuing startup" | |
| ) | |
| def _truthy(value: str | None) -> bool: | |
| """Permissive truthy parser for env knobs (1/true/yes/on).""" | |
| if value is None: | |
| return False | |
| return value.strip().lower() in {"1", "true", "yes", "on", "y", "t"} | |
| async def _prewarm_d8_embedding_model() -> None: | |
| """Pre-load the SBert encoder used by D8 so the first event doesn't | |
| pay the cold-start tax (W3 measured ~60s on first FAISS+SBert hit). | |
| Runs once on startup as a non-blocking ``asyncio.create_task``: a | |
| failure here MUST NOT crash startup — D8 will just report | |
| INSUFFICIENT_DATA (W13-D) on the first event instead of silently | |
| passing. Disabled when ``D8_PREWARM`` is falsy (defaults to true) so | |
| test harnesses can skip the download. | |
| """ | |
| if not _truthy(os.environ.get("D8_PREWARM", "true")): | |
| logger.info("d8.model_load: skipped (D8_PREWARM disabled)") | |
| return | |
| try: | |
| # Lazy import — keeps the SBert / FAISS deps out of test envs that | |
| # never start the FastAPI app. | |
| from polyglot_alpha.judges.style_alignment import d8_duplicate_detection | |
| t0 = time.perf_counter() | |
| model = await asyncio.to_thread( | |
| d8_duplicate_detection._load_embedding_model | |
| ) | |
| elapsed = time.perf_counter() - t0 | |
| if model is None: | |
| err = d8_duplicate_detection.get_last_model_load_error() or "unknown" | |
| logger.error( | |
| "d8.model_load: FAILED model=%s reason=%s " | |
| "(D8 will report INSUFFICIENT_DATA per W13-D)", | |
| d8_duplicate_detection.DEFAULT_MODEL_ID, | |
| err, | |
| ) | |
| return | |
| logger.info( | |
| "d8.model_load: success model=%s elapsed=%.2fs", | |
| d8_duplicate_detection.DEFAULT_MODEL_ID, | |
| elapsed, | |
| ) | |
| except Exception: # noqa: BLE001 - must not block startup | |
| logger.exception( | |
| "d8.model_load: pre-warm crashed; D8 will lazy-load on first use" | |
| ) | |
| # Safe default origins for local development. Production deployments must | |
| # override via the ``CORS_ORIGINS`` env var (comma-separated list). | |
| DEFAULT_CORS_ORIGINS: tuple[str, ...] = ( | |
| "http://localhost:3000", | |
| "http://localhost:3001", | |
| "http://127.0.0.1:3000", | |
| "http://127.0.0.1:3001", | |
| ) | |
| ALLOWED_METHODS: tuple[str, ...] = ("GET", "POST", "OPTIONS") | |
| async def lifespan(app: FastAPI) -> AsyncIterator[None]: | |
| """Lifespan hook: create tables + warm pub/sub singleton. | |
| On shutdown we explicitly ``aclose()`` the shared ``AsyncAnthropic`` | |
| client so its underlying ``httpx.AsyncClient`` is closed *while* the | |
| event loop is still alive. Without this the SDK client gets | |
| finalized post-loop-close and we see ``RuntimeError: Event loop is | |
| closed`` tracebacks in the backend log on every shutdown. | |
| """ | |
| logger.info("polyglot_alpha: starting up; initializing DB") | |
| # Install the [event_id=N] correlation-id filter on the root logger so | |
| # every subsystem's log line carries the active lifecycle id (see | |
| # polyglot_alpha.logging_ctx). No-op if already installed. | |
| install_event_id_filter() | |
| init_db() | |
| # Create watcher-only ingestion tables (raw_entries) on the same DB | |
| # so the RSS aggregator can write dedup rows without crashing on first | |
| # poll. ``init_db()`` does not cover these because they live on a | |
| # private SQLModel registry — see ingestion/models.py for the | |
| # rationale. | |
| _init_ingestion_tables() | |
| # Recover any events left in non-terminal states by a previously | |
| # crashed/restarted backend process before warming pub/sub. | |
| _sweep_stuck_events() | |
| # Auto-seed FewShotExemplar from the bundled EXTENDED_EXEMPLARS when | |
| # the table is empty (fresh checkouts). Opt-out via | |
| # SKIP_AUTO_INGEST_FEW_SHOTS=true. | |
| _maybe_auto_ingest_few_shots() | |
| get_pubsub() | |
| # W13-D: pre-warm the SBert encoder used by D8 so the first event | |
| # doesn't pay the cold-load tax. Fire-and-forget via | |
| # ``asyncio.create_task`` so it never blocks lifespan startup; the | |
| # task logs its own outcome under ``d8.model_load:``. | |
| prewarm_task = asyncio.create_task( | |
| _prewarm_d8_embedding_model(), name="d8_prewarm" | |
| ) | |
| try: | |
| yield | |
| finally: | |
| # Don't await the pre-warm task during shutdown — it's | |
| # fire-and-forget. Cancel only if still running so we don't | |
| # leak the worker thread holding the partial model load. | |
| if not prewarm_task.done(): | |
| prewarm_task.cancel() | |
| logger.info("polyglot_alpha: shutting down") | |
| await shutdown_anthropic() | |
| def _build_cors_origins() -> list[str]: | |
| """Parse ``CORS_ORIGINS`` env var into a list of safe origins. | |
| Wildcard ``*`` is incompatible with ``allow_credentials=True`` | |
| (FastAPI/Starlette will silently drop credentials). If a caller | |
| sets ``CORS_ORIGINS="*"`` we fall back to the safe defaults and log | |
| a warning so the misconfiguration is visible. | |
| """ | |
| raw = os.environ.get("CORS_ORIGINS") | |
| if not raw: | |
| return list(DEFAULT_CORS_ORIGINS) | |
| parts = [o.strip() for o in raw.split(",") if o.strip()] | |
| if any(p == "*" for p in parts): | |
| logger.warning( | |
| "CORS_ORIGINS contains '*' which is incompatible with " | |
| "allow_credentials=True; falling back to safe defaults" | |
| ) | |
| return list(DEFAULT_CORS_ORIGINS) | |
| return parts | |
| def create_app() -> FastAPI: | |
| app = FastAPI( | |
| title="PolyglotAlpha v2 API", | |
| version="0.2.0", | |
| lifespan=lifespan, | |
| ) | |
| # ----- Rate limiting (slowapi) ----- | |
| # Register the limiter on the app state, install the middleware, and | |
| # wire the 429 handler so RateLimitExceeded responses are returned | |
| # automatically. | |
| app.state.limiter = limiter | |
| app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) | |
| app.add_middleware(SlowAPIMiddleware) | |
| # ----- CORS (hardened) ----- | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=_build_cors_origins(), | |
| allow_credentials=True, | |
| allow_methods=list(ALLOWED_METHODS), | |
| allow_headers=["*"], | |
| ) | |
| app.include_router(events.router) | |
| app.include_router(agents.router) | |
| app.include_router(leaderboard.router) | |
| app.include_router(builder_fees.router) | |
| app.include_router(sse.router) | |
| app.include_router(trigger.router) | |
| app.include_router(polymarket.router) | |
| app.include_router(operators.router) | |
| app.include_router(operators.bid_router) | |
| def health() -> dict[str, str]: | |
| return {"status": "ok"} | |
| def root() -> dict[str, str]: | |
| return {"name": "polyglot-alpha", "version": app.version} | |
| return app | |
| app = create_app() | |