polyglot-alpha / tests /run_pass_path_audit.py
licaomeng
deploy: main@8970ffb → HF Spaces (2026-05-27T05:19Z)
88d2f2a
"""End-to-end PASS-path audit harness.
Drives 5 events through the orchestrator's full happy path
(PENDING -> AUCTION_OPEN -> AUCTION_SETTLED -> TRANSLATING ->
EVALUATING -> COMMITTED -> SUBMITTED) WITHOUT spending real money on
Anthropic LLM calls. Uses :mod:`tests._pass_path_mocks` to:
* Patch :func:`polyglot_alpha.judges.panel.evaluate` -> canned PASS verdict.
* Patch :class:`polyglot_alpha.llm.AnthropicLLM` -> no-network stand-in.
* Patch :func:`polyglot_alpha.llm.make_llm` / :func:`complete` / :func:`complete_json`.
* Refuse to construct a real ``AsyncAnthropic`` client.
On-chain TXs (open auction, settle, commit question, recordFill 90/10
split) still execute against the Arc testnet using the configured
operator wallet — gas is free testnet ETH.
Polymarket submission is forced to ``POLYMARKET_MODE=dry_run`` so no
real market is created.
The script runs in the SAME process as :func:`run_lifecycle`, writing
to the shared ``polyglot_alpha.db`` SQLite file the FastAPI backend
reads. The backend therefore observes the new rows via its normal
``GET /events/{id}`` etc. (SSE is per-process so the backend's hub
won't see this run's events — we capture our own pubsub events
locally and stash them into the audit JSON).
Outputs: ``outputs/audit_event_{event_id}.json`` per event +
``outputs/audit_summary.json``.
"""
from __future__ import annotations
import asyncio
import contextlib
import json
import logging
import os
import sys
import time
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
# Make sure the package root is importable when invoked directly.
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from tests import _pass_path_mocks as _mocks_mod # noqa: E402
from tests._pass_path_mocks import install_mocks, uninstall_mocks # noqa: E402
# Tone down the orchestrator's chatty INFO logging so the audit runner's
# own output stays readable. Errors / warnings still surface.
logging.basicConfig(level=logging.WARNING, format="%(levelname)s %(name)s: %(message)s")
# ---------------------------------------------------------------------------
# Audit constants
# ---------------------------------------------------------------------------
NUM_EVENTS: int = 5
OUTPUT_DIR: Path = ROOT / "outputs"
PER_EVENT_TIMEOUT_S: float = 180.0
#: Headline reused for every audit event. The orchestrator wraps it into a
#: P1-shape question internally (``"Will X by December 31, YYYY?"``). We use
#: ``user_payload``-style headlines so the RSS / Haiku ingestion path is
#: skipped entirely (mission constraint: bypass RSS+Haiku to avoid LLM cost).
BASE_TITLE: str = (
"Will the FOMC raise rates by 25bp at the June 2026 meeting?"
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _short(value: Any, length: int = 80) -> str:
if value is None:
return ""
s = str(value)
return s if len(s) <= length else s[: length - 1] + "..."
def _winner_address() -> str:
"""Pick the operator wallet address as the simulated translator winner.
Using a checksummed 0x... address makes the orchestrator's "real-looking
address" gate fire so the 90/10 split actually attempts the on-chain
``recordFill_with_split`` (vs the simulated-only fallback path).
"""
addr = os.environ.get("HACKATHON_WALLET_ADDRESS")
if addr and addr.startswith("0x") and len(addr) == 42:
return addr
# Deterministic stand-in if no operator wallet is configured.
return "0xdeadbeef00000000000000000000000000000001"
def _treasury_address() -> str:
return (
os.environ.get("PLATFORM_TREASURY_ADDRESS")
or os.environ.get("HACKATHON_WALLET_ADDRESS")
or "0x000000000000000000000000000000000000dead"
)
# ---------------------------------------------------------------------------
# DB capture
# ---------------------------------------------------------------------------
def _dump_event_rows(event_id: int) -> dict[str, Any]:
"""Snapshot every persisted row tied to ``event_id``.
Uses raw sqlite3 (read-only) so the dump never collides with the
backend's writes. JSON columns are parsed into nested dicts/lists.
"""
import sqlite3
db_path = ROOT / "polyglot_alpha.db"
con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
con.row_factory = sqlite3.Row
def _query(sql: str, *args: Any) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for row in con.execute(sql, args).fetchall():
d = dict(row)
# Best-effort JSON-decode the json-string columns.
for k, v in list(d.items()):
if (
isinstance(v, str)
and v
and v[0] in "[{"
and v[-1] in "]}"
):
try:
d[k] = json.loads(v)
except json.JSONDecodeError:
pass
out.append(d)
return out
snapshot: dict[str, Any] = {
"events": _query("SELECT * FROM events WHERE id=?", event_id),
"bids": _query(
"SELECT * FROM bids WHERE event_id=? ORDER BY id", event_id
),
"auctions": _query(
"SELECT * FROM auctions WHERE event_id=?", event_id
),
"translations": _query(
"SELECT * FROM translations WHERE event_id=?", event_id
),
"quality_scores": _query(
"SELECT * FROM quality_scores WHERE event_id=?", event_id
),
"questions": _query(
"SELECT * FROM questions WHERE event_id=?", event_id
),
"polymarket_submissions": _query(
"SELECT * FROM polymarket_submissions WHERE event_id=?", event_id
),
}
# Builder-fee events are keyed by market_id, not event_id. Look up the
# market_id from the polymarket_submissions row first.
market_ids = [
s.get("market_id")
for s in snapshot["polymarket_submissions"]
if s.get("market_id")
]
fee_rows: list[dict[str, Any]] = []
for mid in market_ids:
fee_rows.extend(
_query(
"SELECT * FROM builder_fee_events WHERE market_id=? ORDER BY id",
mid,
)
)
snapshot["builder_fee_events"] = fee_rows
# Agent reputation rows for the winner (if any).
rep_rows: list[dict[str, Any]] = []
for r in snapshot["bids"]:
addr = r.get("agent_address")
if addr:
rep_rows.extend(
_query(
"SELECT * FROM agent_reputation WHERE agent_address=?",
addr,
)
)
snapshot["agent_reputation"] = rep_rows
con.close()
return snapshot
# ---------------------------------------------------------------------------
# Pubsub capture
# ---------------------------------------------------------------------------
class _AuditSink:
"""Captures every publish() emitted by the orchestrator's pubsub hub.
Wires in by patching :meth:`polyglot_alpha.pubsub.PubSub.publish` to
fan out to both the original implementation AND our local buffer.
"""
def __init__(self) -> None:
self.events: list[dict[str, Any]] = []
self._installed = False
self._orig: Any = None
def install(self) -> None:
if self._installed:
return
from polyglot_alpha import pubsub as pubsub_mod
self._orig = pubsub_mod.PubSub.publish
async def _wrapped(
self_hub: Any, topic: str, payload: dict[str, Any]
) -> None:
self.events.append(
{
"timestamp": datetime.now(timezone.utc).isoformat(),
"topic": topic,
"payload": payload,
}
)
await self._orig(self_hub, topic, payload)
pubsub_mod.PubSub.publish = _wrapped # type: ignore[assignment]
self._installed = True
def uninstall(self) -> None:
if not self._installed:
return
from polyglot_alpha import pubsub as pubsub_mod
pubsub_mod.PubSub.publish = self._orig # type: ignore[assignment]
self._installed = False
def for_event(self, event_id: int) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for ev in self.events:
payload = ev.get("payload") or {}
if payload.get("event_id") == event_id:
out.append(ev)
return out
def reset(self) -> None:
self.events.clear()
# ---------------------------------------------------------------------------
# Per-event runner
# ---------------------------------------------------------------------------
async def _run_one(idx: int, sink: _AuditSink) -> dict[str, Any]:
"""Drive one PASS-path lifecycle and return the audit JSON for it."""
from polyglot_alpha.orchestrator import BidRecord, run_lifecycle
salt = uuid.uuid4().hex[:8]
title = f"{BASE_TITLE} [audit-{idx}-{salt}]"
event_dict: dict[str, Any] = {
"title": title,
"sources": [
{
"name": "audit-source",
"url": f"https://audit.example/event/{salt}",
"language": "en",
}
],
"language": "en",
"category": "macro",
"summary": (
"The June 17-18, 2026 FOMC meeting decides rate policy. "
"Audit synthetic source."
),
}
winner = _winner_address()
runner_up = "0x" + "a" * 40
second = "0x" + "b" * 40
mock_bids = [
BidRecord(
agent_address=winner,
bid_amount=0.45,
stake_amount=5.0,
candidate_hash=None,
tx_hash=None,
reputation=0.95,
),
BidRecord(
agent_address=runner_up,
bid_amount=0.55,
stake_amount=5.0,
candidate_hash=None,
tx_hash=None,
reputation=0.9,
),
BidRecord(
agent_address=second,
bid_amount=0.75,
stake_amount=5.0,
candidate_hash=None,
tx_hash=None,
reputation=0.85,
),
]
sink.reset()
phase_log: list[dict[str, Any]] = []
t0 = time.monotonic()
summary: dict[str, Any] = {}
try:
# Auction mode 'mock' deterministically produces a PASS-shaped
# final_question via _run_translator_pipeline's fallback, while
# the panel.evaluate patch returns a canned PASS verdict so the
# commit + Polymarket + builder-fee path runs.
summary = await asyncio.wait_for(
run_lifecycle(
event_dict,
auction_window_seconds=0.0,
mock_bids=mock_bids,
auction_mode="mock",
confirm_real_polymarket=False,
),
timeout=PER_EVENT_TIMEOUT_S,
)
except asyncio.TimeoutError:
summary = {
"status": "TIMEOUT",
"error": f"run_lifecycle exceeded {PER_EVENT_TIMEOUT_S:.0f}s",
}
wallclock = time.monotonic() - t0
event_id = summary.get("event_id")
if event_id is None:
# Couldn't locate the event row; emit a minimal failure dump.
return {
"audit_index": idx,
"title": title,
"wallclock_s": wallclock,
"summary": summary,
"error": "event_id missing from run_lifecycle result",
}
# Capture all DB rows belonging to this event.
rows = _dump_event_rows(int(event_id))
# Pull pubsub events for this event_id.
pub_events = sink.for_event(int(event_id))
# Compute per-phase wall-clock from pub events.
phases_seen: dict[str, str] = {}
for ev in pub_events:
phases_seen.setdefault(ev["topic"], ev["timestamp"])
# Subsystem boundary checks.
polymarket_row = rows["polymarket_submissions"][0] if rows[
"polymarket_submissions"
] else None
question_row = rows["questions"][0] if rows["questions"] else None
fee_rows = rows["builder_fee_events"]
quality_row = rows["quality_scores"][0] if rows["quality_scores"] else None
auction_row = rows["auctions"][0] if rows["auctions"] else None
event_row = rows["events"][0] if rows["events"] else None
# The 90/10 split: two fee rows, fee_amount totals to 1.0.
fee_amounts = sorted(float(f.get("fee_amount") or 0.0) for f in fee_rows)
fee_total = sum(fee_amounts) if fee_amounts else 0.0
has_split_90 = any(abs(a - 0.9) < 1e-6 for a in fee_amounts)
has_split_10 = any(abs(a - 0.1) < 1e-6 for a in fee_amounts)
subsystem_status = {
"rss_bypassed": True, # user_payload-style title; no RSS poll triggered
"db_event_written": event_row is not None,
"auction_opened": any(
ev["topic"] == "auction.opened" for ev in pub_events
),
"submit_bid_count": sum(
1 for ev in pub_events if ev["topic"] == "bid.submitted"
),
"auction_settled": auction_row is not None
and auction_row.get("settlement_tx_hash") is not None,
"translation_persisted": len(rows["translations"]) > 0,
"judges_pass_verdict": (
quality_row is not None and quality_row.get("verdict") == "PASS"
),
"commit_question_persisted": question_row is not None
and question_row.get("question_id_onchain") is not None,
"commit_tx_hash_nonnull": question_row is not None
and question_row.get("tx_hash") is not None,
"polymarket_submitted": polymarket_row is not None
and polymarket_row.get("market_id") is not None,
"builder_fee_split_present": len(fee_rows) >= 2,
"builder_fee_90_leg": has_split_90,
"builder_fee_10_leg": has_split_10,
"builder_fee_total_1usdc": abs(fee_total - 1.0) < 1e-6,
"reputation_updated": any(
(r.get("agent_address") == winner)
and (
(r.get("total_wins") or 0) >= 1
or float(r.get("cumulative_fees") or 0) > 0
)
for r in rows["agent_reputation"]
),
}
# Arc TX hashes captured.
tx_hashes = {
"open_auction": summary.get("open_tx_hash"),
"settle_auction": (
auction_row.get("settlement_tx_hash") if auction_row else None
),
"submit_bids": [b.get("tx_hash") for b in rows["bids"]],
"commit_question": (
question_row.get("tx_hash") if question_row else None
),
"builder_fee_arc": [f.get("arc_tx_hash") for f in fee_rows],
}
return {
"audit_index": idx,
"event_id": event_id,
"title": title,
"wallclock_s": wallclock,
"summary": summary,
"phase_timestamps": phases_seen,
"subsystem_status": subsystem_status,
"tx_hashes": tx_hashes,
"db_rows": rows,
"pub_events": pub_events,
"correlation_id": f"audit-{idx}-{salt}",
}
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
async def _main() -> None:
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
install_mocks()
sink = _AuditSink()
sink.install()
audits: list[dict[str, Any]] = []
overall_t0 = time.monotonic()
try:
for i in range(1, NUM_EVENTS + 1):
print(f"[audit] === Event {i}/{NUM_EVENTS} ===")
audit = await _run_one(i, sink)
audits.append(audit)
event_id = audit.get("event_id", "?")
status = (audit.get("summary") or {}).get("status", "?")
wc = audit.get("wallclock_s", 0.0)
print(
f"[audit] event {event_id} -> {status} in {wc:.1f}s",
flush=True,
)
# Persist per-event audit JSON immediately so a crash mid-run
# doesn't lose earlier data.
if isinstance(event_id, int):
out_path = OUTPUT_DIR / f"audit_event_{event_id}.json"
out_path.write_text(json.dumps(audit, indent=2, default=str))
print(f"[audit] wrote {out_path}")
finally:
sink.uninstall()
uninstall_mocks()
overall = {
"started_at": datetime.now(timezone.utc).isoformat(),
"wallclock_total_s": time.monotonic() - overall_t0,
"event_count": len(audits),
"panel_evaluate_calls": _mocks_mod.panel_evaluate_calls,
"mock_llm_calls": _mocks_mod.mock_llm_calls,
"mock_llm_first_log": list(_mocks_mod.mock_llm_log[:20]),
"audit_files": [
str(OUTPUT_DIR / f"audit_event_{a.get('event_id')}.json")
for a in audits
if a.get("event_id") is not None
],
"per_event": [
{
"audit_index": a["audit_index"],
"event_id": a.get("event_id"),
"status": (a.get("summary") or {}).get("status"),
"subsystem_status": a.get("subsystem_status"),
"wallclock_s": a.get("wallclock_s"),
}
for a in audits
],
}
summary_path = OUTPUT_DIR / "audit_summary.json"
summary_path.write_text(json.dumps(overall, indent=2, default=str))
print(f"[audit] wrote {summary_path}")
# Stdout summary table
print("\n[audit] ===== final =====")
print(f"[audit] panel.evaluate calls (mocked): {_mocks_mod.panel_evaluate_calls}")
print(f"[audit] mock_llm calls : {_mocks_mod.mock_llm_calls}")
submitted = sum(
1
for a in audits
if (a.get("summary") or {}).get("status") == "SUBMITTED"
)
print(f"[audit] SUBMITTED : {submitted}/{len(audits)}")
if __name__ == "__main__":
with contextlib.suppress(KeyboardInterrupt):
asyncio.run(_main())