Spaces:
Sleeping
Sleeping
| """/trigger demo endpoint β kicks off the orchestrator for a sample event.""" | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import math | |
| import os | |
| import re | |
| import uuid | |
| from datetime import datetime, timedelta, timezone | |
| from pathlib import Path | |
| from typing import Any | |
| from fastapi import APIRouter, BackgroundTasks, HTTPException, Request, Response, status | |
| from pydantic import BaseModel, Field, ValidationError, field_validator | |
| from sqlmodel import select | |
| from ...orchestrator import BidRecord, create_pending_event, run_lifecycle | |
| from ...persistence import session_scope | |
| from ...persistence.models import Event | |
| from ...pubsub import get_pubsub | |
| from ..rate_limit import limiter | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/trigger", tags=["trigger"]) | |
| _REPO_ROOT = Path(__file__).resolve().parents[3] | |
| _HARDCODED_SAMPLE_PATH = _REPO_ROOT / "outputs" / "sample_0.json" | |
| # Last-resort demo headline used when neither the RSS pipeline nor the | |
| # bundled sample_0.json are available. Lets the demo button always | |
| # produce a 200 even on a freshly cloned checkout. | |
| _DEMO_FALLBACK_TITLE: str = ( | |
| "Will the People's Bank of China announce a Reserve Requirement Ratio " | |
| "cut before December 31, 2026?" | |
| ) | |
| _DEMO_FALLBACK_SOURCES: list[dict[str, str]] = [ | |
| { | |
| "name": "pbc.gov.cn", | |
| "url": "http://www.pbc.gov.cn/", | |
| "language": "zh", | |
| } | |
| ] | |
| # Placeholder title written into the PENDING events row while the | |
| # background RSS fetch + Haiku scoring is still in flight. Once the | |
| # real headline lands, the row is updated and ``event.created`` is | |
| # republished so SSE listeners refresh the title in the UI. | |
| _RSS_PLACEHOLDER_TITLE: str = "Fetching latest non-English newsβ¦" | |
| # --------------------------------------------------------------------------- | |
| # Input size limits (DoS hardening) | |
| # --------------------------------------------------------------------------- | |
| MAX_TITLE_LENGTH: int = 500 | |
| MAX_BIDS_PER_REQUEST: int = 20 | |
| MAX_SOURCES_PER_REQUEST: int = 10 | |
| MIN_AUCTION_WINDOW_S: float = 0.0 | |
| MAX_AUCTION_WINDOW_S: float = 300.0 | |
| # Bid amount sanity bounds β positive, sane upper bound. | |
| MIN_BID_AMOUNT: float = 0.0001 | |
| MAX_BID_AMOUNT: float = 10000.0 | |
| # Stake bounds (defensive β same upper bound as bid). | |
| MIN_STAKE_AMOUNT: float = 0.0 | |
| MAX_STAKE_AMOUNT: float = 10000.0 | |
| # Reputation is a normalized score in ``[0.0, 1.0]``. | |
| MIN_REPUTATION: float = 0.0 | |
| MAX_REPUTATION: float = 1.0 | |
| # Agent address pattern. Hex form (``0x[a-fA-F0-9]+``) is the canonical | |
| # Ethereum-ish shape; we also tolerate purely lowercase test/demo strings | |
| # (``0xagent``, ``0xllama_agent``) since the orchestrator treats the | |
| # address as an opaque identifier. | |
| _AGENT_ADDRESS_RE: re.Pattern[str] = re.compile(r"^0x[a-zA-Z0-9_]+$") | |
| class TriggerSource(BaseModel): | |
| name: str = Field(..., min_length=1, max_length=200) | |
| url: str = Field(..., min_length=1, max_length=2000) | |
| language: str = Field(default="en", max_length=16) | |
| class TriggerBid(BaseModel): | |
| """Pydantic-validated bid record. Mirrors :class:`BidRecord`.""" | |
| agent_address: str = Field(..., min_length=1, max_length=128) | |
| bid_amount: float = Field( | |
| ..., ge=MIN_BID_AMOUNT, le=MAX_BID_AMOUNT | |
| ) | |
| stake_amount: float = Field( | |
| default=5.0, ge=MIN_STAKE_AMOUNT, le=MAX_STAKE_AMOUNT | |
| ) | |
| candidate_hash: str | None = Field(default=None, max_length=128) | |
| tx_hash: str | None = Field(default=None, max_length=128) | |
| reputation: float = Field( | |
| default=1.0, ge=MIN_REPUTATION, le=MAX_REPUTATION | |
| ) | |
| def _validate_agent_address(cls, value: str) -> str: | |
| value = value.strip() | |
| if not value: | |
| raise ValueError("agent_address must be non-empty") | |
| if not _AGENT_ADDRESS_RE.match(value): | |
| raise ValueError( | |
| "agent_address must match ^0x[a-zA-Z0-9_]+$" | |
| ) | |
| return value | |
| def _reject_non_finite(cls, value: float) -> float: | |
| # Pydantic accepts ``float('nan')`` / ``float('inf')`` even with | |
| # ``ge``/``le`` bounds on some versions; reject them explicitly. | |
| if not math.isfinite(value): | |
| raise ValueError("value must be a finite number") | |
| return value | |
| _VALID_EVENT_SOURCES = ("user_payload", "hardcoded", "rss") | |
| # Lifecycle ``mode`` values accepted by ``POST /trigger/event``. ``live`` runs | |
| # the real LLM + Arc tx + RSS + judge panel; ``mock`` short-circuits each | |
| # subsystem with deterministic fixtures (other W5 agents own those impls). | |
| _VALID_EVENT_MODES: tuple[str, ...] = ("live", "mock") | |
| _DEFAULT_EVENT_MODE_ENV: str = "DEFAULT_EVENT_MODE" | |
| _FALLBACK_DEFAULT_EVENT_MODE: str = "live" | |
| # W23: when ``DISABLE_LIVE=true`` the deployment is locked to mock-only mode | |
| # (Hugging Face Spaces free tier, no API keys). Incoming ``mode=live`` requests | |
| # are silently rewritten to ``"mock"`` and a header ``X-Live-Disabled: true`` | |
| # is set on the response so the client can surface the override in the UI. | |
| _DISABLE_LIVE_ENV: str = "DISABLE_LIVE" | |
| def _is_live_disabled() -> bool: | |
| """Return ``True`` when the deployment is locked to mock-only mode.""" | |
| raw = os.environ.get(_DISABLE_LIVE_ENV, "false") | |
| return (raw or "false").strip().lower() in ("1", "true", "yes", "on") | |
| def _resolve_default_event_mode() -> str: | |
| """Return the env-configured default mode, falling back to ``"live"``.""" | |
| raw = os.environ.get(_DEFAULT_EVENT_MODE_ENV, _FALLBACK_DEFAULT_EVENT_MODE) | |
| candidate = (raw or _FALLBACK_DEFAULT_EVENT_MODE).strip().lower() | |
| if candidate not in _VALID_EVENT_MODES: | |
| logger.warning( | |
| "trigger: %s=%r is invalid; falling back to %r", | |
| _DEFAULT_EVENT_MODE_ENV, | |
| raw, | |
| _FALLBACK_DEFAULT_EVENT_MODE, | |
| ) | |
| return _FALLBACK_DEFAULT_EVENT_MODE | |
| return candidate | |
| class TriggerRequest(BaseModel): | |
| # ``title`` is required for ``user_payload`` mode but becomes optional | |
| # for ``rss``/``hardcoded`` where the backend fills it in. We keep the | |
| # field non-empty when present to preserve the historical validation. | |
| title: str | None = Field(default=None, max_length=MAX_TITLE_LENGTH) | |
| sources: list[TriggerSource] = Field( | |
| default_factory=list, max_length=MAX_SOURCES_PER_REQUEST | |
| ) | |
| language: str = Field(default="en", max_length=16) | |
| category: str = Field(default="geopolitics", max_length=64) | |
| event_source: str = Field( | |
| default="user_payload", | |
| description=( | |
| "Where the event comes from. ``user_payload`` (default) trusts " | |
| "the request body. ``hardcoded`` reads outputs/sample_0.json. " | |
| "``rss`` fetches the freshest cross-referenced cluster from " | |
| "the configured RSS sources and falls back to ``hardcoded`` " | |
| "when no cluster confirms within ``rss_window_minutes``." | |
| ), | |
| ) | |
| rss_window_minutes: int = Field( | |
| default=60, ge=1, le=24 * 60, | |
| description="How far back to look for RSS items when event_source='rss'.", | |
| ) | |
| auction_mode: str | None = Field( | |
| default=None, | |
| description=( | |
| "Override AUCTION_MODE for this lifecycle. ``real`` drives 4 " | |
| "agent bids; ``mock`` uses the legacy deterministic bid path." | |
| ), | |
| ) | |
| confirm_real_polymarket: bool = Field( | |
| default=False, | |
| description=( | |
| "Operator opt-in for live Polymarket submission. Without it, " | |
| "real-mode degrades to a ``blocked`` SubmissionResult." | |
| ), | |
| ) | |
| auction_window_seconds: float | None = Field( | |
| default=0.0, | |
| ge=MIN_AUCTION_WINDOW_S, | |
| le=MAX_AUCTION_WINDOW_S, | |
| description=( | |
| "Auction window in seconds. Defaults to 0 in the demo endpoint " | |
| "so the lifecycle completes synchronously." | |
| ), | |
| ) | |
| mock_bids: list[TriggerBid] | None = Field( | |
| default=None, | |
| max_length=MAX_BIDS_PER_REQUEST, | |
| description=( | |
| "Optional override list of {agent_address, bid_amount, ...}. " | |
| "When omitted, the orchestrator drives a real on-chain auction " | |
| "across the 4 reference agents (auction_mode='real')." | |
| ), | |
| ) | |
| # ``mode`` is the W5 lifecycle mode (``"live"`` | ``"mock"``). We accept | |
| # it as a free-form string here and validate in the handler so the | |
| # 400-on-invalid contract isn't masked by Pydantic's 422 default. | |
| # When omitted, the handler falls back to the ``DEFAULT_EVENT_MODE`` | |
| # env var (and ultimately to ``"live"``) to preserve back-compat. | |
| mode: str | None = Field( | |
| default=None, | |
| max_length=16, | |
| description=( | |
| "Lifecycle execution mode for this event. ``live`` (default) " | |
| "runs the real LLM + Arc tx + RSS + judge panel. ``mock`` " | |
| "short-circuits each subsystem with deterministic fixtures. " | |
| "Invalid values return HTTP 400." | |
| ), | |
| ) | |
| def _validate_event_source(cls, value: str) -> str: | |
| v = (value or "user_payload").strip().lower() | |
| if v not in _VALID_EVENT_SOURCES: | |
| raise ValueError( | |
| f"event_source must be one of {_VALID_EVENT_SOURCES}" | |
| ) | |
| return v | |
| def _validate_auction_mode(cls, value: str | None) -> str | None: | |
| if value is None: | |
| return None | |
| v = value.strip().lower() | |
| if v not in ("real", "mock"): | |
| raise ValueError("auction_mode must be 'real' or 'mock'") | |
| return v | |
| run_in_background: bool = Field( | |
| default=False, | |
| description=( | |
| "If true the lifecycle runs as a FastAPI BackgroundTask and " | |
| "the endpoint returns immediately with {scheduled: true}." | |
| ), | |
| ) | |
| def _reject_duplicate_addresses( | |
| cls, value: list[TriggerBid] | None | |
| ) -> list[TriggerBid] | None: | |
| if not value: | |
| return value | |
| seen: set[str] = set() | |
| for bid in value: | |
| if bid.agent_address in seen: | |
| raise ValueError( | |
| f"duplicate agent_address in mock_bids: {bid.agent_address}" | |
| ) | |
| seen.add(bid.agent_address) | |
| return value | |
| def _load_hardcoded_sample() -> dict[str, Any] | None: | |
| """Read ``outputs/sample_0.json`` and project it into the trigger shape. | |
| Returns ``None`` if the file is missing or malformed so the caller can | |
| fall back to the in-process demo defaults. | |
| """ | |
| if not _HARDCODED_SAMPLE_PATH.exists(): | |
| return None | |
| try: | |
| with _HARDCODED_SAMPLE_PATH.open("r", encoding="utf-8") as fh: | |
| data = json.load(fh) | |
| except (OSError, json.JSONDecodeError) as exc: | |
| logger.warning( | |
| "trigger: failed to read hardcoded sample (%s): %s", | |
| _HARDCODED_SAMPLE_PATH, | |
| exc, | |
| ) | |
| return None | |
| title = str(data.get("title") or "").strip() | |
| if not title: | |
| return None | |
| resolution = data.get("resolution_source") or "" | |
| sources: list[dict[str, str]] = [] | |
| if resolution: | |
| sources.append( | |
| { | |
| "name": "resolution_source", | |
| "url": str(resolution), | |
| "language": str(data.get("source_language") or "zh"), | |
| } | |
| ) | |
| return { | |
| "title": title, | |
| "sources": sources or list(_DEMO_FALLBACK_SOURCES), | |
| "language": str(data.get("source_language") or "zh"), | |
| "category": str(data.get("category") or "geopolitics"), | |
| "summary": data.get("source_news") or data.get("description"), | |
| } | |
| def _fallback_demo_event() -> dict[str, Any]: | |
| """Return the last-resort hardcoded demo event (never fails).""" | |
| return { | |
| "title": _DEMO_FALLBACK_TITLE, | |
| "sources": list(_DEMO_FALLBACK_SOURCES), | |
| "language": "zh", | |
| "category": "policy/china", | |
| } | |
| async def _fetch_rss_demo_event(window_minutes: int) -> dict[str, Any] | None: | |
| """Best-effort RSS fetch β Haiku event scoring β raw cluster passthrough. | |
| Post-pivot contract (2026-05-26): the marketplace MUST NOT write any | |
| Polymarket question text. Each agent (seeder or external) frames its | |
| own question during the auction. This function therefore returns the | |
| RAW news cluster (title, summary, sources) plus a ``scoring`` dict | |
| with quality / category / entities metadata β nothing more. | |
| 1. Poll the registered RSS sources once. | |
| 2. Filter to items within ``window_minutes``. | |
| 3. Ask Claude Haiku 4.5 to score the cluster (quality, category, | |
| entities, credibility, timeliness) β NOT to write a question. | |
| 4. If ``event_quality_score`` is below the auction threshold, return | |
| ``None`` so the caller degrades to the hardcoded fallback. | |
| 5. Else return ``{title, sources, language, category, summary, | |
| scoring}`` where ``scoring`` is the :class:`EventScoring` dict. | |
| Returns ``None`` on RSS failure or sub-threshold score so the caller | |
| degrades to the bundled hardcoded sample. | |
| Mock-mode short-circuit (W5-A3): when the lifecycle contextvar | |
| ``event_mode == "mock"`` (set by the trigger handler / orchestrator | |
| after :func:`set_event_mode`), we skip the live RSS poll entirely | |
| and pick a canned multi-language cluster from | |
| :mod:`polyglot_alpha.ingestion.fixtures`. The fixture matches the | |
| same return shape as the real path so the caller is fully agnostic. | |
| """ | |
| # ---- W5-A3 fixture short-circuit ---- | |
| try: | |
| from polyglot_alpha.logging_ctx import get_event_mode | |
| except ImportError: # pragma: no cover - defensive | |
| get_event_mode = None # type: ignore[assignment] | |
| if get_event_mode is not None and get_event_mode() == "mock": | |
| try: | |
| from polyglot_alpha.ingestion.fixtures import pick_mock_cluster | |
| cluster = pick_mock_cluster() | |
| logger.info( | |
| "trigger: mock-mode short-circuit β fixture title=%r", | |
| (cluster.get("title") or "")[:80], | |
| ) | |
| return { | |
| "title": cluster["title"], | |
| "sources": list(cluster.get("sources") or []), | |
| "language": cluster.get("language", "zh"), | |
| "category": cluster.get("category", "geopolitics"), | |
| "summary": cluster.get("summary"), | |
| "scoring": cluster.get("scoring"), | |
| } | |
| except (FileNotFoundError, RuntimeError, ImportError) as exc: | |
| logger.warning( | |
| "trigger: mock fixture load failed (%s); falling through to RSS", | |
| exc, | |
| ) | |
| try: | |
| from polyglot_alpha.ingestion import ( | |
| cross_reference, | |
| news_summarizer, | |
| rss_aggregator, | |
| ) | |
| except ImportError as exc: | |
| logger.info("trigger: ingestion modules unavailable (%s)", exc) | |
| return None | |
| # ---- 1. Poll RSS ---- | |
| rss_health: dict[str, Any] | None = None | |
| try: | |
| sources = rss_aggregator.load_sources() | |
| raw_events, report = await rss_aggregator.poll_sources_once_with_report( | |
| sources | |
| ) | |
| rss_health = report.to_dict() | |
| except rss_aggregator.RSSAggregatorError as exc: | |
| # Fail-loud path: no fixture fallback. Re-raise so the trigger | |
| # handler can mark the event FAILED with reason=rss_all_unreachable. | |
| logger.error( | |
| "trigger: rss_all_unreachable β %s", | |
| exc, | |
| ) | |
| raise | |
| except Exception as exc: # broad β outbound HTTP / parse / FS | |
| logger.warning("trigger: RSS poll failed (%s)", exc) | |
| return None | |
| if not raw_events: | |
| logger.info( | |
| "trigger: RSS poll returned no events (rss_health=%s)", | |
| rss_health, | |
| ) | |
| return None | |
| # ---- 2. Recency filter ---- | |
| try: | |
| recent = cross_reference.filter_recent( | |
| raw_events, window=timedelta(minutes=window_minutes) | |
| ) | |
| except Exception as exc: # pragma: no cover - defensive | |
| logger.warning("trigger: recency filter crashed (%s)", exc) | |
| recent = [] | |
| # If nothing landed in the requested window, widen to "anything we got" | |
| # so the demo button still surfaces a real headline rather than the | |
| # canned fallback. The 5-min DB dedup downstream prevents duplicates. | |
| pool = recent if recent else raw_events | |
| # ---- 3. Format for Haiku and score the cluster ---- | |
| articles: list[dict[str, Any]] = [] | |
| for ev in pool[:25]: # cap so prompt stays small | |
| articles.append( | |
| { | |
| "title": ev.title, | |
| "summary": ev.summary, | |
| "source": ev.source, | |
| "published": ev.published_at.isoformat() | |
| if ev.published_at | |
| else "", | |
| "url": ev.url, | |
| "language": ev.language, | |
| } | |
| ) | |
| if not articles: | |
| return None | |
| scoring = await news_summarizer.score_event_for_auction(articles) | |
| # ---- 4. Threshold check β reject low-quality clusters ---- | |
| if scoring.event_quality_score < news_summarizer.MIN_AUCTION_QUALITY: | |
| logger.info( | |
| "trigger: cluster rejected β score=%.2f reason=%r", | |
| scoring.event_quality_score, | |
| scoring.rejection_reason, | |
| ) | |
| return None | |
| # ---- 5. Pass through the RAW top article β no question rewriting ---- | |
| top = articles[0] | |
| language = top.get("language") or "zh" | |
| return { | |
| "title": top["title"], | |
| "sources": [ | |
| { | |
| "name": top["source"], | |
| "url": top["url"] or "", | |
| "language": language, | |
| } | |
| ], | |
| "language": language, | |
| "category": scoring.primary_category or "geopolitics", | |
| "summary": scoring.raw_summary or top["summary"], | |
| "scoring": scoring.as_dict(), | |
| # W13-C: per-source health snapshot for the UI Phase 1 panel and | |
| # for downstream logging / diagnostics. Always present when the | |
| # live RSS path executes (None for mock / hardcoded / payload). | |
| "rss_health": rss_health, | |
| } | |
| def _coerce_bids(raw: list[TriggerBid] | None) -> list[BidRecord] | None: | |
| """Convert validated :class:`TriggerBid` Pydantic models into orchestrator | |
| :class:`BidRecord` dataclasses. All sanity checks happened at Pydantic | |
| validation time; this is a straight 1:1 projection. | |
| """ | |
| if raw is None: | |
| return None | |
| out: list[BidRecord] = [] | |
| for bid in raw: | |
| out.append( | |
| BidRecord( | |
| agent_address=bid.agent_address, | |
| bid_amount=bid.bid_amount, | |
| stake_amount=bid.stake_amount, | |
| candidate_hash=bid.candidate_hash, | |
| tx_hash=bid.tx_hash, | |
| reputation=bid.reputation, | |
| ) | |
| ) | |
| return out | |
| async def trigger_event( | |
| request: Request, | |
| payload: TriggerRequest, | |
| background_tasks: BackgroundTasks, | |
| response: Response, | |
| ) -> dict[str, Any]: | |
| # --- Resolve the W5 lifecycle mode (live | mock) --------------------- | |
| # Validate explicitly here so we can return HTTP 400 (not 422) on | |
| # ``mode='bogus'`` per the W5-A1 contract. If the client omits ``mode`` | |
| # entirely we fall back to ``DEFAULT_EVENT_MODE`` and then to ``"live"``. | |
| if payload.mode is None: | |
| lifecycle_mode = _resolve_default_event_mode() | |
| else: | |
| candidate = (payload.mode or "").strip().lower() | |
| if candidate not in _VALID_EVENT_MODES: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=( | |
| f"mode must be one of {list(_VALID_EVENT_MODES)}; " | |
| f"got {payload.mode!r}" | |
| ), | |
| ) | |
| lifecycle_mode = candidate | |
| # --- W23: DISABLE_LIVE deployment override --------------------------- | |
| # When the deployment env locks live mode off (e.g. Hugging Face Spaces | |
| # demo with no API keys), silently rewrite any ``live`` request to | |
| # ``mock`` rather than 400-ing the client. The UI may still attempt to | |
| # send ``live`` because the toggle is hidden / stale; the header tells | |
| # it that we overrode the choice server-side. | |
| if _is_live_disabled() and lifecycle_mode == "live": | |
| logger.info( | |
| "DISABLE_LIVE active: rewriting mode=live -> mock for event " | |
| "title=%r", | |
| (payload.title or "")[:80], | |
| ) | |
| lifecycle_mode = "mock" | |
| response.headers["X-Live-Disabled"] = "true" | |
| # --- W5-A3 mock short-circuit ---------------------------------------- | |
| # In ``mode='mock'`` the lifecycle never touches the live RSS feed, | |
| # Haiku scorer, or any external network. We pick one of the 5 bundled | |
| # multi-language news clusters and run it through the full lifecycle. | |
| # This bypasses the ``event_source`` branching below entirely because | |
| # we already have the resolved event body (title, sources, language, | |
| # category, summary, scoring) in hand from the fixture. | |
| if lifecycle_mode == "mock": | |
| from ...ingestion.fixtures import pick_mock_cluster # local import to | |
| # keep cold-start light on the live path | |
| try: | |
| cluster = pick_mock_cluster() | |
| except (FileNotFoundError, RuntimeError) as exc: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"mock fixture load failed: {exc}", | |
| ) from exc | |
| try: | |
| bids = _coerce_bids(payload.mock_bids) | |
| except ValidationError as exc: | |
| raise HTTPException( | |
| status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, | |
| detail=exc.errors(), | |
| ) from exc | |
| # Build event_dict in the same shape ``_fetch_rss_demo_event`` | |
| # would have produced β :func:`run_lifecycle` is the source of | |
| # truth on the contract so we just pass the fixture through. | |
| mock_event_dict: dict[str, Any] = { | |
| "title": cluster["title"], | |
| "sources": list(cluster.get("sources") or []), | |
| "language": cluster.get("language", "zh"), | |
| "category": cluster.get("category", "geopolitics"), | |
| } | |
| if cluster.get("summary") is not None: | |
| mock_event_dict["summary"] = cluster["summary"] | |
| if isinstance(cluster.get("scoring"), dict): | |
| mock_event_dict["scoring"] = cluster["scoring"] | |
| # Salt the title with a per-click token so repeated mock triggers | |
| # always produce a NEW lifecycle (content_hash dedup would | |
| # otherwise collapse every click in the same minute back onto the | |
| # first event_id, breaking the demo button's "fresh run on each | |
| # click" expectation). Suffix is hidden inside square brackets so | |
| # the UI still shows a clean headline. | |
| unique_token = uuid.uuid4().hex[:8] | |
| mock_event_dict["title"] = f"{mock_event_dict['title']} [mock:{unique_token}]" | |
| prep = await create_pending_event(mock_event_dict, mode=lifecycle_mode) | |
| precreated_event_id_mock: int = int(prep["event_id"]) | |
| async def _mock_runner() -> None: | |
| await run_lifecycle( | |
| mock_event_dict, | |
| auction_window_seconds=payload.auction_window_seconds, | |
| mock_bids=bids, | |
| # Force ``auction_mode='mock'`` in mock mode so the | |
| # auction sub-system also uses its deterministic seeder | |
| # path (no external agent dispatch / on-chain calls). | |
| auction_mode=payload.auction_mode or "mock", | |
| confirm_real_polymarket=payload.confirm_real_polymarket, | |
| precreated_event_id=precreated_event_id_mock, | |
| mode=lifecycle_mode, | |
| ) | |
| background_tasks.add_task(_mock_runner) | |
| return { | |
| "event_id": precreated_event_id_mock, | |
| "status": "PENDING", | |
| "scheduled": True, | |
| "title": mock_event_dict["title"], | |
| "mode": lifecycle_mode, | |
| } | |
| # --- Resolve the event body depending on event_source ----------------- | |
| # ``user_payload`` (default) β trust the body. Title is mandatory. | |
| # ``hardcoded`` β read outputs/sample_0.json. | |
| # ``rss`` β fetch latest RSS cluster; on any | |
| # failure degrade to ``hardcoded`` and | |
| # finally to a baked-in fallback. | |
| source_mode = (payload.event_source or "user_payload").lower() | |
| resolved_title: str | None = payload.title | |
| resolved_sources: list[dict[str, Any]] = [ | |
| s.model_dump() for s in payload.sources | |
| ] | |
| resolved_language: str = payload.language | |
| resolved_category: str = payload.category | |
| resolved_summary: Any = None | |
| resolved_scoring: dict[str, Any] | None = None | |
| if source_mode == "rss": | |
| # Demo-grade fast path for RSS: we do NOT block the HTTP response | |
| # on the 5-15 s RSS fetch + Haiku scoring. Instead we pre-create a | |
| # PENDING event row with a placeholder title and schedule a single | |
| # BackgroundTask that: | |
| # 1. Runs ``_fetch_rss_demo_event(...)`` (RSS poll + Haiku score) | |
| # 2. UPDATEs the events row with the real title / sources / lang. | |
| # 3. Re-publishes ``event.created`` so SSE listeners refresh. | |
| # 4. Calls ``run_lifecycle(precreated_event_id=event_id)``. | |
| # This lets the UI navigate to ``/events/{id}`` in <200 ms and | |
| # render the news-fetch step as a regular SSE timeline phase. | |
| try: | |
| bids = _coerce_bids(payload.mock_bids) | |
| except ValidationError as exc: | |
| raise HTTPException( | |
| status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, | |
| detail=exc.errors(), | |
| ) from exc | |
| # Salt the placeholder title with a per-click unique token so every | |
| # button click creates a NEW event row (no dedup on placeholders). | |
| # The earlier 5-min-epoch-bucket salt caused repeated clicks within | |
| # 5 minutes to all map to the same content_hash and dedup back to | |
| # the first event_id β that broke the user's expectation of "every | |
| # click runs a fresh demo". Real-content dedup still happens later | |
| # in the background task when run_lifecycle computes the content_hash | |
| # over the resolved RSS headline + sources. | |
| unique_token = uuid.uuid4().hex | |
| placeholder_title = f"{_RSS_PLACEHOLDER_TITLE} [{unique_token}]" | |
| placeholder_dict: dict[str, Any] = { | |
| "title": placeholder_title, | |
| "sources": resolved_sources or list(_DEMO_FALLBACK_SOURCES), | |
| "language": payload.language or "zh", | |
| "category": payload.category or "geopolitics", | |
| } | |
| prep = await create_pending_event(placeholder_dict, mode=lifecycle_mode) | |
| precreated_event_id_rss: int = int(prep["event_id"]) | |
| async def _rss_then_lifecycle() -> None: | |
| # Import lazily so the failure path can introspect the exception | |
| # type without forcing import at trigger handler entry. | |
| from polyglot_alpha.ingestion import rss_aggregator as _rss | |
| try: | |
| rss_event = await _fetch_rss_demo_event(payload.rss_window_minutes) | |
| except _rss.RSSAggregatorError as exc: | |
| # Fail-loud: mark the placeholder event as FAILED with a | |
| # descriptive reason + the full rss_health payload so the | |
| # UI can render "live mode unreachable" instead of falling | |
| # through to a misleading hardcoded headline. | |
| logger.error( | |
| "trigger: rss_all_unreachable for event_id=%s β %s", | |
| precreated_event_id_rss, | |
| exc, | |
| ) | |
| try: | |
| with session_scope() as session: | |
| row = session.get(Event, precreated_event_id_rss) | |
| if row is not None: | |
| row.status = "FAILED" | |
| row.title = ( | |
| "RSS unreachable β live mode degraded" | |
| ) | |
| # Persist the health snapshot via the | |
| # reserved meta marker (see | |
| # ``_extract_rss_health_meta`` in events.py). | |
| existing_sources = list(row.sources or []) | |
| existing_sources = [ | |
| s for s in existing_sources | |
| if not ( | |
| isinstance(s, dict) | |
| and s.get("name") == "_meta:rss_health" | |
| ) | |
| ] | |
| existing_sources.append( | |
| { | |
| "name": "_meta:rss_health", | |
| "url": "", | |
| "language": "_meta", | |
| "health": exc.report.to_dict(), | |
| "reason": "rss_all_unreachable", | |
| } | |
| ) | |
| row.sources = existing_sources | |
| session.add(row) | |
| except Exception as inner_exc: # pragma: no cover - defensive | |
| logger.warning( | |
| "trigger: failed to mark event FAILED after rss " | |
| "unreachable (%s)", inner_exc, | |
| ) | |
| # Publish an event.updated so SSE listeners refresh. | |
| try: | |
| hub = get_pubsub() | |
| await hub.publish( | |
| "event.updated", | |
| { | |
| "event_id": precreated_event_id_rss, | |
| "title": "RSS unreachable β live mode degraded", | |
| "status": "FAILED", | |
| "rss_health": exc.report.to_dict(), | |
| "reason": "rss_all_unreachable", | |
| }, | |
| ) | |
| except Exception: # pragma: no cover - defensive | |
| pass | |
| return | |
| if rss_event is None: | |
| rss_event = _load_hardcoded_sample() or _fallback_demo_event() | |
| logger.info( | |
| "trigger: event_source=rss degraded to hardcoded fallback" | |
| ) | |
| real_title = rss_event.get("title") or _DEMO_FALLBACK_TITLE | |
| real_sources = list( | |
| rss_event.get("sources") or _DEMO_FALLBACK_SOURCES | |
| ) | |
| real_language = str(rss_event.get("language") or "zh") | |
| real_category = str(rss_event.get("category") or "geopolitics") | |
| real_summary = rss_event.get("summary") | |
| real_scoring = rss_event.get("scoring") | |
| # W13-C: stash the per-source health snapshot on the event so | |
| # the UI Phase 1 panel can show "live: 7/8 sources reachable". | |
| rss_health_meta = rss_event.get("rss_health") | |
| if isinstance(rss_health_meta, dict): | |
| real_sources = [ | |
| s for s in real_sources | |
| if not ( | |
| isinstance(s, dict) | |
| and s.get("name") == "_meta:rss_health" | |
| ) | |
| ] | |
| real_sources.append( | |
| { | |
| "name": "_meta:rss_health", | |
| "url": "", | |
| "language": "_meta", | |
| "health": rss_health_meta, | |
| } | |
| ) | |
| # UPDATE the pre-created event row in-place so subsequent | |
| # ``GET /events/{id}`` calls return the real headline. | |
| try: | |
| with session_scope() as session: | |
| row = session.get(Event, precreated_event_id_rss) | |
| if row is not None: | |
| row.title = real_title | |
| row.sources = real_sources | |
| row.language = real_language | |
| session.add(row) | |
| except Exception as exc: # pragma: no cover - defensive | |
| logger.warning( | |
| "trigger: failed to update RSS placeholder row id=%s: %s", | |
| precreated_event_id_rss, | |
| exc, | |
| ) | |
| # Re-publish ``event.created`` with the real title so SSE | |
| # subscribers can refresh the header. We include both the new | |
| # title and the original placeholder so UI can detect updates. | |
| try: | |
| hub = get_pubsub() | |
| await hub.publish( | |
| "event.updated", | |
| { | |
| "event_id": precreated_event_id_rss, | |
| "title": real_title, | |
| "scoring": real_scoring if isinstance(real_scoring, dict) else None, | |
| }, | |
| ) | |
| except Exception as exc: # pragma: no cover - defensive | |
| logger.warning( | |
| "trigger: pubsub republish failed (%s)", exc | |
| ) | |
| # Now run the lifecycle with the real event_dict. We pass | |
| # ``precreated_event_id`` so the orchestrator adopts the same | |
| # row rather than inserting a duplicate. | |
| final_event_dict: dict[str, Any] = { | |
| "title": real_title, | |
| "sources": real_sources, | |
| "language": real_language, | |
| "category": real_category, | |
| } | |
| if real_summary is not None: | |
| final_event_dict["summary"] = real_summary | |
| if isinstance(real_scoring, dict): | |
| final_event_dict["scoring"] = real_scoring | |
| await run_lifecycle( | |
| final_event_dict, | |
| auction_window_seconds=payload.auction_window_seconds, | |
| mock_bids=bids, | |
| auction_mode=payload.auction_mode, | |
| confirm_real_polymarket=payload.confirm_real_polymarket, | |
| precreated_event_id=precreated_event_id_rss, | |
| mode=lifecycle_mode, | |
| ) | |
| background_tasks.add_task(_rss_then_lifecycle) | |
| return { | |
| "event_id": precreated_event_id_rss, | |
| "status": "PENDING", | |
| "scheduled": True, | |
| "title": placeholder_title, | |
| "mode": lifecycle_mode, | |
| } | |
| elif source_mode == "hardcoded": | |
| hard = _load_hardcoded_sample() or _fallback_demo_event() | |
| resolved_title = hard.get("title") or _DEMO_FALLBACK_TITLE | |
| resolved_sources = list(hard.get("sources") or _DEMO_FALLBACK_SOURCES) | |
| resolved_language = str(hard.get("language") or "zh") | |
| resolved_category = str(hard.get("category") or "policy/china") | |
| resolved_summary = hard.get("summary") | |
| else: | |
| # ``user_payload`` -> title is required. | |
| if not resolved_title or not resolved_title.strip(): | |
| raise HTTPException( | |
| status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, | |
| detail=[ | |
| { | |
| "loc": ["body", "title"], | |
| "msg": "title is required when event_source='user_payload'", | |
| "type": "value_error.missing", | |
| } | |
| ], | |
| ) | |
| event_dict: dict[str, Any] = { | |
| "title": resolved_title, | |
| "sources": resolved_sources, | |
| "language": resolved_language, | |
| "category": resolved_category, | |
| } | |
| if resolved_summary is not None: | |
| event_dict["summary"] = resolved_summary | |
| if resolved_scoring is not None: | |
| event_dict["scoring"] = resolved_scoring | |
| try: | |
| bids = _coerce_bids(payload.mock_bids) | |
| except ValidationError as exc: | |
| raise HTTPException( | |
| status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, | |
| detail=exc.errors(), | |
| ) from exc | |
| # 5-minute sliding-window dedup. The orchestrator's content_hash dedup | |
| # is permanent (per `compute_content_hash` over title+sources+language) | |
| # which makes the RSS demo unusable since the same Haiku-picked | |
| # headline can recur for ~30 min. Pre-check the events table for a row | |
| # with the same title created in the last 5 minutes β if hit, short- | |
| # circuit to that event_id (acts like dedup but doesn't 409). Older | |
| # duplicates are salted with a 5-min epoch bucket so they progress as | |
| # a fresh lifecycle. | |
| if resolved_title: | |
| recent_cutoff = datetime.now(tz=timezone.utc) - timedelta(minutes=5) | |
| with session_scope() as session: | |
| same_title_recent = session.exec( | |
| select(Event) | |
| .where(Event.title == resolved_title) | |
| .where(Event.triggered_at >= recent_cutoff.replace(tzinfo=None)) | |
| .order_by(Event.triggered_at.desc()) | |
| ).first() | |
| if same_title_recent is not None and same_title_recent.id is not None: | |
| logger.info( | |
| "trigger: 5-min dedup hit β reusing event_id=%s for title=%r", | |
| same_title_recent.id, | |
| resolved_title[:80], | |
| ) | |
| return { | |
| "event_id": same_title_recent.id, | |
| "status": same_title_recent.status, | |
| "scheduled": False, | |
| "deduped": True, | |
| "mode": same_title_recent.mode or "live", | |
| } | |
| # NOTE: the previous ``source_mode == "rss"`` 5-min-bucket salt that | |
| # lived here was removed when the RSS branch was inverted into a | |
| # placeholder + BackgroundTask flow (the fetch now runs *after* the | |
| # response). RSS triggers never reach this block. | |
| # Demo-grade fast path: pre-create the PENDING event row so the caller | |
| # gets ``event_id`` back in ~10 ms and can navigate to ``/events/{id}`` | |
| # while the 60-90 s lifecycle (auction β translation β 11-judge panel β | |
| # Arc anchor β Polymarket) runs as a FastAPI BackgroundTask. The UI | |
| # detail page subscribes to SSE and animates the Timeline as each | |
| # lifecycle phase publishes its event. | |
| prep = await create_pending_event(event_dict, mode=lifecycle_mode) | |
| if prep.get("deduped"): | |
| # Permanent content_hash dedup still fires for back-to-back | |
| # identical payloads >5 min apart that survived our salt. Surface | |
| # as 200 with the original event_id so the demo button still | |
| # navigates the user somewhere useful. | |
| logger.info( | |
| "trigger: content_hash dedup β reusing event_id=%s", | |
| prep.get("event_id"), | |
| ) | |
| return { | |
| "event_id": prep.get("event_id"), | |
| "status": prep.get("status"), | |
| "scheduled": False, | |
| "deduped": True, | |
| "mode": prep.get("mode") or "live", | |
| } | |
| precreated_event_id: int = int(prep["event_id"]) | |
| if payload.run_in_background: | |
| async def _runner() -> None: | |
| await run_lifecycle( | |
| event_dict, | |
| auction_window_seconds=payload.auction_window_seconds, | |
| mock_bids=bids, | |
| auction_mode=payload.auction_mode, | |
| confirm_real_polymarket=payload.confirm_real_polymarket, | |
| precreated_event_id=precreated_event_id, | |
| mode=lifecycle_mode, | |
| ) | |
| background_tasks.add_task(_runner) | |
| return { | |
| "scheduled": True, | |
| "event_id": precreated_event_id, | |
| "status": "PENDING", | |
| "title": resolved_title, | |
| "mode": lifecycle_mode, | |
| } | |
| # Default demo path: schedule lifecycle in the background so the | |
| # endpoint returns event_id immediately. The UI navigates to the | |
| # detail page on event_id and watches SSE for phase transitions. | |
| async def _bg_runner() -> None: | |
| await run_lifecycle( | |
| event_dict, | |
| auction_window_seconds=payload.auction_window_seconds, | |
| mock_bids=bids, | |
| auction_mode=payload.auction_mode, | |
| confirm_real_polymarket=payload.confirm_real_polymarket, | |
| precreated_event_id=precreated_event_id, | |
| mode=lifecycle_mode, | |
| ) | |
| background_tasks.add_task(_bg_runner) | |
| return { | |
| "event_id": precreated_event_id, | |
| "status": "PENDING", | |
| "scheduled": True, | |
| "mode": lifecycle_mode, | |
| } | |