File size: 13,323 Bytes
88d2f2a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
"""FastAPI application entrypoint for PolyglotAlpha v2."""

from __future__ import annotations

import asyncio
import logging
import os
import time
from contextlib import asynccontextmanager
from datetime import datetime, timedelta, timezone
from typing import AsyncIterator

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from slowapi import _rate_limit_exceeded_handler
from sqlmodel import func, select

from ..llm import shutdown_anthropic
from ..logging_ctx import install_event_id_filter
from ..persistence import engine as _persistence_engine, init_db, session_scope
from ..persistence.models import Event, EventStatus, FewShotExemplar
from ..pubsub import get_pubsub
from .rate_limit import limiter
from .routes import (
    agents,
    builder_fees,
    events,
    leaderboard,
    operators,
    polymarket,
    sse,
    trigger,
)

logger = logging.getLogger(__name__)


# Non-terminal lifecycle statuses. If an event row is still in any of these
# states on backend startup and is older than the recovery cutoff, the
# previous backend process almost certainly crashed mid-lifecycle and the
# in-memory orchestrator task is gone β€” sweep these rows to FAILED so the
# UI doesn't display perpetual "running" badges.
_NON_TERMINAL_STATUSES: tuple[str, ...] = (
    EventStatus.PENDING.value,
    EventStatus.AUCTION_OPEN.value,
    EventStatus.AUCTION_SETTLED.value,
    EventStatus.TRANSLATING.value,
    EventStatus.EVALUATING.value,
)


def _sweep_stuck_events() -> int:
    """Mark crashed-in-flight events as FAILED on startup.

    A previous backend process that crashed (OOM, restart, panic) cannot
    finish the lifecycle task it owned. Any row still in a non-terminal
    status that is older than ``2 * AUCTION_WINDOW_SECONDS +
    PANEL_TIMEOUT_SECONDS`` is past every legitimate phase budget and must
    be flipped to ``FAILED`` so /events views don't show a stuck row.

    Returns the number of rows updated. Best-effort: any exception is
    logged and swallowed so a sweep failure cannot block app startup.
    """

    try:
        auction_s = float(os.environ.get("AUCTION_WINDOW_SECONDS", "60"))
        panel_s = float(os.environ.get("PANEL_TIMEOUT_SECONDS", "120"))
        # 2 * auction (open + settle drift) + panel + a small slack so we
        # never race a still-healthy lifecycle that's near its tail.
        cutoff_seconds = 2.0 * auction_s + panel_s
        cutoff = datetime.now(timezone.utc) - timedelta(seconds=cutoff_seconds)

        swept = 0
        with session_scope() as session:
            stmt = select(Event).where(
                Event.status.in_(_NON_TERMINAL_STATUSES),  # type: ignore[attr-defined]
                Event.triggered_at < cutoff,
            )
            for row in session.exec(stmt).all():
                row.status = EventStatus.FAILED.value
                session.add(row)
                swept += 1
        if swept:
            logger.warning(
                "startup_recovery: swept %d stuck event(s) to FAILED "
                "(cutoff=%.0fs, reason=startup_recovery)",
                swept,
                cutoff_seconds,
            )
        else:
            logger.info("startup_recovery: no stuck events found")
        return swept
    except Exception:  # noqa: BLE001 β€” best-effort, must not block startup
        logger.exception("startup_recovery sweep failed; continuing startup")
        return 0


def _init_ingestion_tables() -> None:
    """Create the watcher-only ingestion tables on the persistence engine.

    ``polyglot_alpha.ingestion.models.RawEntry`` lives on a private SQLModel
    registry so it is **not** part of ``SQLModel.metadata`` and therefore
    not created by ``init_db()``. Without this hook the first RSS poll
    crashes with ``no such table: raw_entries``. Idempotent β€” running
    ``create_all`` against an already-present table is a no-op.
    """

    try:
        from ..ingestion.models import _INGESTION_METADATA

        _INGESTION_METADATA.create_all(_persistence_engine)
        logger.info("startup_recovery: ingestion metadata create_all completed")
    except Exception:  # noqa: BLE001 β€” best-effort, must not block startup
        logger.exception(
            "startup_recovery: ingestion metadata create_all failed; continuing"
        )


# When ``few_shot_exemplars`` is empty (fresh DB), auto-ingest the bundled
# ``EXTENDED_EXEMPLARS`` so the LLM judges have ICL examples available out of
# the box. Operators can disable this via ``SKIP_AUTO_INGEST_FEW_SHOTS=true``
# (useful for tests or for clusters that pre-seed via the one-shot script).
_AUTO_INGEST_FEW_SHOTS_ENV: str = "SKIP_AUTO_INGEST_FEW_SHOTS"


def _maybe_auto_ingest_few_shots() -> None:
    """Seed ``few_shot_exemplars`` from ``EXTENDED_EXEMPLARS`` if empty.

    Idempotent: only runs when the table count is zero. Any error is
    logged and swallowed so seeding cannot block startup.
    """

    if os.environ.get(_AUTO_INGEST_FEW_SHOTS_ENV, "").lower() in {"1", "true", "yes"}:
        logger.info(
            "startup_recovery: %s=true; skipping few-shot auto-ingest",
            _AUTO_INGEST_FEW_SHOTS_ENV,
        )
        return
    try:
        with session_scope() as session:
            existing = session.exec(
                select(func.count()).select_from(FewShotExemplar)
            ).one()
            # `existing` may be a tuple-like row depending on dialect.
            count = existing[0] if isinstance(existing, (tuple, list)) else int(existing)
        if count > 0:
            logger.info(
                "startup_recovery: few_shot_exemplars has %d row(s); skipping seed",
                count,
            )
            return
    except Exception:  # noqa: BLE001 β€” best-effort
        logger.exception(
            "startup_recovery: failed to count few_shot_exemplars; skipping seed"
        )
        return

    try:
        from ..corpus.few_shots_extended import EXTENDED_EXEMPLARS
        from .._fewshots_seed import seed_few_shots_from_extended

        inserted = seed_few_shots_from_extended(EXTENDED_EXEMPLARS)
        logger.info(
            "startup_recovery: seeded few_shot_exemplars with %d row(s)", inserted
        )
    except Exception:  # noqa: BLE001 β€” best-effort
        logger.exception(
            "startup_recovery: few-shot auto-ingest failed; continuing startup"
        )


def _truthy(value: str | None) -> bool:
    """Permissive truthy parser for env knobs (1/true/yes/on)."""

    if value is None:
        return False
    return value.strip().lower() in {"1", "true", "yes", "on", "y", "t"}


async def _prewarm_d8_embedding_model() -> None:
    """Pre-load the SBert encoder used by D8 so the first event doesn't
    pay the cold-start tax (W3 measured ~60s on first FAISS+SBert hit).

    Runs once on startup as a non-blocking ``asyncio.create_task``: a
    failure here MUST NOT crash startup β€” D8 will just report
    INSUFFICIENT_DATA (W13-D) on the first event instead of silently
    passing. Disabled when ``D8_PREWARM`` is falsy (defaults to true) so
    test harnesses can skip the download.
    """

    if not _truthy(os.environ.get("D8_PREWARM", "true")):
        logger.info("d8.model_load: skipped (D8_PREWARM disabled)")
        return
    try:
        # Lazy import β€” keeps the SBert / FAISS deps out of test envs that
        # never start the FastAPI app.
        from polyglot_alpha.judges.style_alignment import d8_duplicate_detection

        t0 = time.perf_counter()
        model = await asyncio.to_thread(
            d8_duplicate_detection._load_embedding_model
        )
        elapsed = time.perf_counter() - t0
        if model is None:
            err = d8_duplicate_detection.get_last_model_load_error() or "unknown"
            logger.error(
                "d8.model_load: FAILED model=%s reason=%s "
                "(D8 will report INSUFFICIENT_DATA per W13-D)",
                d8_duplicate_detection.DEFAULT_MODEL_ID,
                err,
            )
            return
        logger.info(
            "d8.model_load: success model=%s elapsed=%.2fs",
            d8_duplicate_detection.DEFAULT_MODEL_ID,
            elapsed,
        )
    except Exception:  # noqa: BLE001 - must not block startup
        logger.exception(
            "d8.model_load: pre-warm crashed; D8 will lazy-load on first use"
        )


# Safe default origins for local development. Production deployments must
# override via the ``CORS_ORIGINS`` env var (comma-separated list).
DEFAULT_CORS_ORIGINS: tuple[str, ...] = (
    "http://localhost:3000",
    "http://localhost:3001",
    "http://127.0.0.1:3000",
    "http://127.0.0.1:3001",
)

ALLOWED_METHODS: tuple[str, ...] = ("GET", "POST", "OPTIONS")


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    """Lifespan hook: create tables + warm pub/sub singleton.

    On shutdown we explicitly ``aclose()`` the shared ``AsyncAnthropic``
    client so its underlying ``httpx.AsyncClient`` is closed *while* the
    event loop is still alive. Without this the SDK client gets
    finalized post-loop-close and we see ``RuntimeError: Event loop is
    closed`` tracebacks in the backend log on every shutdown.
    """

    logger.info("polyglot_alpha: starting up; initializing DB")
    # Install the [event_id=N] correlation-id filter on the root logger so
    # every subsystem's log line carries the active lifecycle id (see
    # polyglot_alpha.logging_ctx). No-op if already installed.
    install_event_id_filter()
    init_db()
    # Create watcher-only ingestion tables (raw_entries) on the same DB
    # so the RSS aggregator can write dedup rows without crashing on first
    # poll. ``init_db()`` does not cover these because they live on a
    # private SQLModel registry β€” see ingestion/models.py for the
    # rationale.
    _init_ingestion_tables()
    # Recover any events left in non-terminal states by a previously
    # crashed/restarted backend process before warming pub/sub.
    _sweep_stuck_events()
    # Auto-seed FewShotExemplar from the bundled EXTENDED_EXEMPLARS when
    # the table is empty (fresh checkouts). Opt-out via
    # SKIP_AUTO_INGEST_FEW_SHOTS=true.
    _maybe_auto_ingest_few_shots()
    get_pubsub()
    # W13-D: pre-warm the SBert encoder used by D8 so the first event
    # doesn't pay the cold-load tax. Fire-and-forget via
    # ``asyncio.create_task`` so it never blocks lifespan startup; the
    # task logs its own outcome under ``d8.model_load:``.
    prewarm_task = asyncio.create_task(
        _prewarm_d8_embedding_model(), name="d8_prewarm"
    )
    try:
        yield
    finally:
        # Don't await the pre-warm task during shutdown β€” it's
        # fire-and-forget. Cancel only if still running so we don't
        # leak the worker thread holding the partial model load.
        if not prewarm_task.done():
            prewarm_task.cancel()
        logger.info("polyglot_alpha: shutting down")
        await shutdown_anthropic()


def _build_cors_origins() -> list[str]:
    """Parse ``CORS_ORIGINS`` env var into a list of safe origins.

    Wildcard ``*`` is incompatible with ``allow_credentials=True``
    (FastAPI/Starlette will silently drop credentials). If a caller
    sets ``CORS_ORIGINS="*"`` we fall back to the safe defaults and log
    a warning so the misconfiguration is visible.
    """

    raw = os.environ.get("CORS_ORIGINS")
    if not raw:
        return list(DEFAULT_CORS_ORIGINS)
    parts = [o.strip() for o in raw.split(",") if o.strip()]
    if any(p == "*" for p in parts):
        logger.warning(
            "CORS_ORIGINS contains '*' which is incompatible with "
            "allow_credentials=True; falling back to safe defaults"
        )
        return list(DEFAULT_CORS_ORIGINS)
    return parts


def create_app() -> FastAPI:
    app = FastAPI(
        title="PolyglotAlpha v2 API",
        version="0.2.0",
        lifespan=lifespan,
    )

    # ----- Rate limiting (slowapi) -----
    # Register the limiter on the app state, install the middleware, and
    # wire the 429 handler so RateLimitExceeded responses are returned
    # automatically.
    app.state.limiter = limiter
    app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
    app.add_middleware(SlowAPIMiddleware)

    # ----- CORS (hardened) -----
    app.add_middleware(
        CORSMiddleware,
        allow_origins=_build_cors_origins(),
        allow_credentials=True,
        allow_methods=list(ALLOWED_METHODS),
        allow_headers=["*"],
    )

    app.include_router(events.router)
    app.include_router(agents.router)
    app.include_router(leaderboard.router)
    app.include_router(builder_fees.router)
    app.include_router(sse.router)
    app.include_router(trigger.router)
    app.include_router(polymarket.router)
    app.include_router(operators.router)
    app.include_router(operators.bid_router)

    @app.get("/health", tags=["meta"])
    def health() -> dict[str, str]:
        return {"status": "ok"}

    @app.get("/", tags=["meta"])
    def root() -> dict[str, str]:
        return {"name": "polyglot-alpha", "version": app.version}

    return app


app = create_app()