"""Pipeline dispatch — glue from orchestrator to the 5-layer translation pipeline + 3-seeder evaluate/bid stage. This module is what ``orchestrator.py`` imports as ``from .agents import dispatch``. When the import (or the underlying chain on-chain stack) fails, the orchestrator falls back to a static template emitter that always outputs ``"Will X by December 31, 2026?"``. To keep the orchestrator's real pipeline alive we therefore: * Make sure the module imports cleanly even when the LLM keys are missing (``make_llm`` already returns a :class:`MockLLM` in that case). * Provide both the orchestrator-facing surface (``run_for_winner`` + :class:`PipelineResult`) **and** the standalone surface the rest of the product uses (``collect_bids_inline`` + a ``polymarket.types.Question`` shaped ``run_pipeline``). * Tolerate per-agent LLM failures during ``collect_bids_inline`` — one bad agent must not kill the whole auction. The public surface is enumerated in :data:`__all__` at the bottom. """ from __future__ import annotations import asyncio import hashlib import json import logging import uuid from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Callable, Optional from eth_account import Account from .. import analysts, quality_eval, synthesizer, translators from ..chain.auction_client import AuctionClient from ..llm import LLMCallable, make_llm from ..schemas import Question as SchemaQuestion, event_dict_to_model from . import AGENT_REGISTRY from .wallets import load_or_derive_wallet logger = logging.getLogger(__name__) _REPO_ROOT = Path(__file__).resolve().parents[2] _WALLETS_PATH = _REPO_ROOT / "outputs" / "agent_wallets.json" _DEFAULT_AUCTION_WINDOW_SECONDS: float = 30.0 _DEFAULT_PIPELINE_TIMEOUT_SECONDS: float = 120.0 # --------------------------------------------------------------------------- # PipelineResult shape (mirrors orchestrator.PipelineResult by-field). # --------------------------------------------------------------------------- @dataclass class PipelineResult: """What :func:`run_for_winner` returns to the orchestrator.""" final_question: dict[str, Any] pipeline_trace_ipfs: Optional[str] candidate_hash: str # --------------------------------------------------------------------------- # Agent-name resolution # --------------------------------------------------------------------------- def _load_wallet_map() -> dict[str, str]: """Return ``{lowercased_address: agent_name}`` from outputs/agent_wallets.json.""" if not _WALLETS_PATH.exists(): return {} try: data = json.loads(_WALLETS_PATH.read_text()) except (OSError, json.JSONDecodeError): return {} out: dict[str, str] = {} for name, info in data.items(): addr = info.get("address") if isinstance(info, dict) else None if addr: out[str(addr).lower()] = name return out def resolve_agent_name(agent_address: str) -> str: """Map a winner address back to a seeder slot. Falls back to ``"gemini-v2"`` (Seeder Alpha) so the pipeline always has a valid model binding even if the on-chain winner is an unregistered demo address. The "-v2" suffix is the W16-B rotation (2026-05-27): pre-rotation slots had on-chain reputation below the auction's 0.7 ``submitBid`` gate. """ default = "gemini-v2" if not agent_address: return default wallets = _load_wallet_map() return wallets.get(agent_address.lower(), default) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _coerce_event(event_dict: dict[str, Any]) -> dict[str, Any]: """Pad missing fields so :func:`event_dict_to_model` does not blow up.""" title = ( event_dict.get("title_zh") or event_dict.get("title") or "PolyglotAlpha demo event" ) body = ( event_dict.get("body_zh") or event_dict.get("body") or event_dict.get("summary") or title ) cutoff_ts = int(event_dict.get("cutoff_ts") or 0) if not cutoff_ts: cutoff_ts = int(datetime.now(timezone.utc).timestamp()) + 30 * 24 * 3600 return { **event_dict, "event_id": str( event_dict.get("event_id") or event_dict.get("eventId") or "" ), "title_zh": title, "body_zh": body, "url": event_dict.get("url") or "", "cutoff_ts": cutoff_ts, } def _throwaway_pk() -> str: """Generate a fresh, never-funded private key for eval-only agents. :class:`BaseTranslatorAgent` requires a non-empty ``wallet_pk`` so it can derive an address. For the in-process auction we just want to size a bid; we never broadcast a transaction with this key. """ return Account.create().key.hex() def _candidate_hash_for_agent( agent_name: str, evaluation: Any, event_dict: dict[str, Any] ) -> str: """Deterministic per-agent candidate hash for the bid payload.""" body = { "agent": agent_name, "bid_amount_usdc": getattr(evaluation, "bid_amount_usdc", None), "confidence": getattr(evaluation, "confidence", None), "title": event_dict.get("title") or event_dict.get("title_zh"), } return hashlib.sha256( json.dumps(body, sort_keys=True, separators=(",", ":")).encode() ).hexdigest() def _polymarket_question_from_schema( event_dict: dict[str, Any], question: SchemaQuestion, ) -> Any: """Lift a ``schemas.Question`` into a ``polymarket.types.Question``. Imported lazily so a missing ``polymarket`` package does not break the orchestrator's import of this module. """ from ..polymarket.types import Question as PolymarketQuestion # local import qid = str( event_dict.get("event_id") or event_dict.get("question_id") or question.event_id or uuid.uuid4().hex ) return PolymarketQuestion( question_id=qid, text=question.question_en, category=event_dict.get("category", "geopolitics"), resolution_source=event_dict.get("resolution_source") or "operator", end_date_iso=question.end_date_iso, ) def _build_final_question_dict( event_dict: dict[str, Any], question: SchemaQuestion, ) -> dict[str, Any]: """Project the agent's :class:`Question` into the orchestrator's wire shape.""" title = (question.question_en or "").strip() cutoff_dt = datetime.now(timezone.utc).replace(month=12, day=31, microsecond=0) cutoff_human = cutoff_dt.strftime("%B %d, %Y") # Only prepend the "Will ... by ?" template if the candidate does # NOT already start with "Will" (case-insensitive). Otherwise we end up # with "Will Will the People's Bank ..." doubled prefixes when the LLM # already produced a P1-shape title. if not title.lower().startswith("will "): title = f"Will {title.rstrip('?')} by {cutoff_human}?" return { "title": title, "description": event_dict.get("summary") or event_dict.get("title") or title, "resolution_criteria": question.resolution_criteria, "resolution_source": event_dict.get("resolution_source") or "operator", "cutoff_ts": question.end_date_iso or cutoff_dt.isoformat(), "end_date_iso": question.end_date_iso, "category": event_dict.get("category", "geopolitics"), "source_news": event_dict.get("title") or event_dict.get("title_zh") or "", "source_language": event_dict.get("language", "zh"), "target_language": "en", "outcomes": ["Yes", "No"], "confidence": question.confidence, "quality_score": question.quality_score, } # --------------------------------------------------------------------------- # Pipeline driver # --------------------------------------------------------------------------- async def _run_pipeline_schema( event_dict: dict[str, Any], agent_name: str, llm_factory: Optional[Callable[[], LLMCallable]] = None, ) -> SchemaQuestion: """Run analysts -> translators -> synthesizer -> quality_eval. Returns the internal :class:`schemas.Question` (with confidence and quality_score populated). All four stages execute as in :meth:`BaseTranslatorAgent.run_pipeline` but with an explicit ``llm_factory`` injection point for tests. """ cls = AGENT_REGISTRY.get(agent_name) or AGENT_REGISTRY["gemini-v2"] model_id = cls.MODEL_ID factory = llm_factory or (lambda: make_llm(model_id)) llm = factory() coerced = _coerce_event(event_dict) event = event_dict_to_model(coerced) # Layer 1: source analysts (parallel). reports = await analysts.run_analysts(event, llm) # Layer 2: translator debate (parallel candidates). candidates = await translators.propose_candidates(event, reports, llm) # Layer 3: synthesizer (pick / merge). question = synthesizer.synthesize(event, candidates) # Layer 4: quality eval (internal sanity check, NOT the 11-judge panel). score = quality_eval.score_question(question) question.confidence = score.score question.quality_score = score.score return question async def run_pipeline( event: dict[str, Any], *, winner_agent_name: str, llm_factory: Optional[Callable[[], LLMCallable]] = None, ) -> Any: """Execute the 5-layer translation pipeline using the winning agent. Layer 1: Source Analysts (``analysts.run_analysts``) Layer 2: Translator Debate (``translators.propose_candidates``) Layer 3: Synthesizer (``synthesizer.synthesize``) Layer 4: Quality Evaluation (``quality_eval.score_question`` — internal sanity check, NOT the 11-judge panel) Layer 5: Final ``polymarket.types.Question`` construction (this module) Returns a ``polymarket.types.Question`` Pydantic model ready for submission. The full layer trace is attached as the model's ``layer_trace`` extra attribute so callers (UI / API) can inspect each stage. """ question = await _run_pipeline_schema( event, winner_agent_name, llm_factory=llm_factory ) pm_question = _polymarket_question_from_schema(event, question) # Stash a layer trace for UI rendering. ``polymarket.types.Question`` is a # Pydantic v2 model with default config; setting an attribute is enough # for our consumers, but we also expose it via ``model_extra`` if the # model allows extras. layer_trace = { "analyst_reports": [ r.model_dump() if hasattr(r, "model_dump") else dict(r) for r in getattr(question, "_analyst_reports", []) or [] ], "synthesized": question.model_dump(), "quality_score": question.quality_score, "confidence": question.confidence, "winner_agent": winner_agent_name, } try: object.__setattr__(pm_question, "layer_trace", layer_trace) except (AttributeError, ValueError): # pragma: no cover - Pydantic strict pass return pm_question # --------------------------------------------------------------------------- # Bid collection (4 agents in parallel) # --------------------------------------------------------------------------- def _resolve_agent_signing_key(agent_name: str) -> Optional[str]: """Return the agent's deterministic private key for on-chain ``placeBid``. Tries (in order): ``_WALLET_PRIVATE_KEY`` env, then deterministic derivation from ``HACKATHON_WALLET_PRIVATE_KEY``. Returns ``None`` when neither is available (e.g. unit tests without operator PK) — callers fall back to a throwaway eval-only keypair and skip the on-chain ``placeBid`` for that bid. """ try: return load_or_derive_wallet(agent_name).private_key except (RuntimeError, ValueError): return None async def _place_bid_on_chain( agent_name: str, agent_pk: str, auction_event_id: Any, bid_amount: float, candidate_hash_hex: str, ) -> str: """Sign + send ``TranslationAuction.placeBid`` for one agent. Returns the 0x-prefixed tx hash. Raises on RPC / signing failure so the caller can record the bid as failed instead of pretending the on-chain write succeeded. """ if auction_event_id is None or auction_event_id == "": raise ValueError( f"dispatch._place_bid_on_chain: missing auction event_id for {agent_name}" ) # W11 bug-C guard: log the canonical bytes32 we are about to write # ``submitBid`` against so it can be diffed against the orchestrator's # ``getBid`` lookup site (which now also logs at DEBUG). Same helper # on both sides => same bytes32 => same storage slot. from ..onchain import event_id_to_bytes32 # local import — avoid cycles logger.debug( "BID ENCODING: agent=%s event_id=%s -> bytes32=0x%s (submitBid)", agent_name, auction_event_id, event_id_to_bytes32(auction_event_id).hex(), ) client = AuctionClient() return await client.submit_bid( event_id=auction_event_id, bid_amount_usdc=bid_amount, candidate_hash=candidate_hash_hex, agent_pk=agent_pk, ) async def _safe_agent_bid( agent_name: str, agent_cls: type, event_dict: dict[str, Any], *, auction_event_id: Any = None, ) -> Optional[dict[str, Any]]: """Drive one agent's pre-bid evaluation + on-chain ``submitBid``. W9-E: this function now returns ``None`` when the on-chain ``submitBid`` does NOT land successfully (insufficient gas, reverted on the reputation gate, window closed, etc.). That way the orchestrator's bid pool only contains seeders whose bids actually made it into ``TranslationAuction`` storage — settle picks a winner from REAL on-chain bidders, not from a DB-only synthesis. On any LLM / construction failure we **propagate** the exception so the orchestrator records the bid as failed and moves on — no synthetic fallback bid is ever returned. After a successful evaluation, the agent's wallet calls ``ensure_registered()`` (idempotent — a no-op when already registered) and then signs+sends a real ``submitBid`` transaction on ``TranslationAuction``. We block on ``wait_for_transaction_receipt`` to confirm the tx mined with ``status=1`` (i.e. did not revert on ``"window closed"`` / ``"reputation gate"`` / ``"not registered"``) before reporting the bid as landed. """ coerced = _coerce_event(event_dict) # Prefer the agent's real signing key (so ``submitBid`` is signed by the # same address used everywhere else). Fall back to a throwaway PK only # for evaluation purposes when the operator PK is unavailable. real_pk = _resolve_agent_signing_key(agent_name) agent_pk = real_pk or _throwaway_pk() agent = agent_cls(wallet_pk=agent_pk) evaluation = await agent.evaluate_event(coerced) bid_amount = float(evaluation.bid_amount_usdc) candidate_hash_hex = _candidate_hash_for_agent( agent_name, evaluation, coerced ) result: dict[str, Any] = { "agent_address": agent.address, "agent_name": agent_name, "bid_amount": bid_amount, "candidate_hash": candidate_hash_hex, "reputation": float(getattr(evaluation, "estimated_quality", 1.0)), "confidence": float(evaluation.confidence), "expected_cost_usdc": float(evaluation.expected_cost_usdc), "llm_model": agent_cls.MODEL_ID, } # The orchestrator's real-auction path ALWAYS passes # ``auction_event_id`` explicitly (the integer DB event id used to # derive bytes32 in ``auction_client``). Callers that pass ``None`` # (e.g. unit tests, CLI dry-run, eval-only metrics) are signalling # "don't touch chain" — we return the bid metadata without a # ``tx_hash`` so the caller can still inspect bid spread, candidate # hash, reputation, etc. The orchestrator filters out tx-hash-less # bids before passing them into the auction settle pool, so this # cannot accidentally enter on-chain auction state. if auction_event_id is None: logger.info( "dispatch.collect_bids_inline: agent=%s eval-only (no " "auction_event_id supplied); returning bid without tx_hash", agent_name, ) return result if real_pk is None or auction_event_id in (None, ""): logger.warning( "dispatch.collect_bids_inline: agent=%s skipped on-chain " "submitBid (no signing key); dropping bid", agent_name, ) return None chain_event_id = auction_event_id # ------------------------------------------------------------------ # Ensure the seeder is registered on-chain (defensive — most live # deployments register the seeders once via scripts/register_agents.py # but the invariant is "submitBid requires registered"). Errors are # logged but not fatal; the subsequent submitBid will revert if the # agent is still unregistered, and we will surface that below. # ------------------------------------------------------------------ try: reg_tx = await agent.ensure_registered() if reg_tx: logger.info( "dispatch.collect_bids_inline: agent=%s registered on-chain " "tx=%s", agent_name, reg_tx, ) except Exception as exc: # pragma: no cover - depends on chain state logger.warning( "dispatch.collect_bids_inline: agent=%s ensure_registered " "failed: %s; will still attempt submitBid", agent_name, exc, ) # ------------------------------------------------------------------ # Submit + confirm bid on-chain # ------------------------------------------------------------------ try: tx_hash = await _place_bid_on_chain( agent_name=agent_name, agent_pk=real_pk, auction_event_id=chain_event_id, bid_amount=bid_amount, candidate_hash_hex=candidate_hash_hex, ) except Exception as exc: logger.warning( "dispatch.collect_bids_inline: agent=%s submitBid send failed: %s; " "dropping bid (will not enter on-chain auction state)", agent_name, exc, ) return None # Confirm the tx actually mined with status=1. A status=0 receipt # means submitBid reverted (window closed / reputation gate / not # registered) — the bid never entered ``TranslationAuction`` storage, # so we drop it from the orchestrator's bid pool. try: confirmed = await _confirm_bid_tx(agent.onchain.w3, tx_hash) except Exception as exc: # pragma: no cover - RPC error logger.warning( "dispatch.collect_bids_inline: agent=%s submitBid receipt " "lookup failed (%s); dropping bid", agent_name, exc, ) return None if not confirmed: logger.warning( "dispatch.collect_bids_inline: agent=%s submitBid reverted on " "chain (tx=%s); dropping bid", agent_name, tx_hash, ) return None result["tx_hash"] = tx_hash logger.info( "dispatch.collect_bids_inline: agent=%s submitBid landed " "(bid=%.4f USDC, tx=%s)", agent_name, bid_amount, tx_hash, ) return result async def _confirm_bid_tx(w3, tx_hash: str, *, timeout_s: float = 30.0) -> bool: """Poll ``eth_getTransactionReceipt`` until mined; return ``True`` iff ``status == 1`` (submitBid did not revert).""" loop = asyncio.get_running_loop() def _wait() -> Any: return w3.eth.wait_for_transaction_receipt(tx_hash, timeout=timeout_s) receipt = await loop.run_in_executor(None, _wait) return int(getattr(receipt, "status", 0)) == 1 async def collect_bids_inline( event: dict[str, Any], *, window_seconds: float = _DEFAULT_AUCTION_WINDOW_SECONDS, auction_event_id: Any = None, ) -> list[dict[str, Any]]: """Run all 3 reference seeders in parallel; each seeder's bid goes on-chain. Each agent drives its real ``evaluate_event`` (the LLM-backed pricing call) and, when its wallet is resolvable + ``auction_event_id`` is supplied, signs and sends ``TranslationAuction.placeBid`` so the resulting ``bids.tx_hash`` reflects an actual chain write rather than NULL. Each returned dict has at least:: { "agent_address": str, "agent_name": str, "bid_amount": float, # USDC "candidate_hash": str, "reputation": float, # 0-1 "confidence": float, # 0-1 "expected_cost_usdc": float, "llm_model": str, "tx_hash": str | absent, # 0x... when placeBid succeeded } Agents whose ``evaluate_event`` raises are **dropped** — no synthetic placeholder bid is returned. The orchestrator therefore proceeds with however many real bids were produced (0-3) instead of pretending all three seeders voted. """ items = list(AGENT_REGISTRY.items()) # [(name, cls), ...] — 3 entries. tasks = [ asyncio.create_task( _safe_agent_bid(name, cls, event, auction_event_id=auction_event_id) ) for name, cls in items ] bids: list[dict[str, Any]] = [] try: done, pending = await asyncio.wait( tasks, timeout=max(window_seconds, 0.0), return_when=asyncio.ALL_COMPLETED, ) for task in done: try: result = task.result() except Exception as exc: # task itself raised — log + skip logger.warning( "dispatch.collect_bids_inline: agent task crashed: %s", exc, ) continue # W9-E: _safe_agent_bid returns None when the on-chain # submitBid did NOT land (insufficient gas / reverted / window # closed). Drop those so the orchestrator's bid pool reflects # the real on-chain auction state. if result is None: continue bids.append(result) for task in pending: task.cancel() except asyncio.CancelledError: for task in tasks: task.cancel() raise return bids # --------------------------------------------------------------------------- # Orchestrator-facing entry point # --------------------------------------------------------------------------- async def run_for_winner( event_dict: dict[str, Any], winner_address: str, ) -> PipelineResult: """Orchestrator-facing entry point. Resolves the agent class from ``winner_address`` and drives the full 5-layer pipeline. **No fallback translation.** If the LLM call fails (quota, timeout, parse error, network), the exception is propagated to the orchestrator so the lifecycle records the failure and skips the on-chain commit step. The previous implementation emitted a synthetic ``"Will ? by 2026-12-31"`` placeholder on failure which was then committed on-chain and judged by the 11-judge panel as if it were a real translation — that path is removed. """ agent_name = resolve_agent_name(winner_address) logger.info( "dispatch.run_for_winner: address=%s -> agent=%s", winner_address, agent_name, ) question = await asyncio.wait_for( _run_pipeline_schema(event_dict, agent_name), timeout=_DEFAULT_PIPELINE_TIMEOUT_SECONDS, ) final_question = _build_final_question_dict(event_dict, question) candidate_hash = hashlib.sha256( json.dumps(final_question, sort_keys=True).encode() ).hexdigest() return PipelineResult( final_question=final_question, pipeline_trace_ipfs=f"ipfs://pipeline/{agent_name}/{candidate_hash[:12]}", candidate_hash=candidate_hash, ) __all__ = [ "PipelineResult", "collect_bids_inline", "resolve_agent_name", "run_for_winner", "run_pipeline", ]