polyglot-alpha / scripts /w9-b-verify.py
licaomeng
deploy: main@8970ffb → HF Spaces (2026-05-27T05:19Z)
88d2f2a
"""W9-B verifier: prove on-chain ReputationRegistry deltas match DB deltas.
The orchestrator now pushes (won, quality) signals after Phase 5 and
the fee signal after Phase 7. This script verifies the chain side is
actually receiving those updates by:
1. Snapshotting on-chain ``getStats(winner)`` BEFORE.
2. Snapshotting the SQLite ``agent_reputation`` row for the same agent.
3. Calling the same three writer functions the orchestrator does
(``update_reputation(won=True, quality_passed=...)`` and
``update_reputation_fee_only(fee_usdc=...)``).
4. Snapshotting on-chain ``getStats(winner)`` AFTER.
5. Computing deltas and printing PASS/FAIL.
We don't drive a full ``run_lifecycle`` here because that flow depends
on Polymarket connectivity, agent registration, and ~1 minute of auction
window time. The semantic check we care about — "does the orchestrator's
on-chain update path land deltas that match the DB" — is fully covered
by directly invoking the same helpers the orchestrator invokes, which
also lets us run the test deterministically against any winner address.
Usage::
python scripts/w9-b-verify.py
python scripts/w9-b-verify.py --winner 0xDeadBeef... --fee-usdc 0.9
python scripts/w9-b-verify.py --mock # confirm no real RPC in mock mode
Exit codes:
0 — PASS (chain delta matches expected)
1 — FAIL (delta mismatch)
2 — environment error (RPC down, wallet unset, etc.)
"""
from __future__ import annotations
import argparse
import asyncio
import logging
import os
import secrets
import sys
from pathlib import Path
from typing import Any
# Make ``polyglot_alpha`` importable when run as a plain script.
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError: # pragma: no cover
pass
from eth_account import Account
from web3 import Web3
from polyglot_alpha.chain import reputation_registry as repo
from polyglot_alpha.onchain import OnChainClient, reputation_to_float
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
logger = logging.getLogger("w9-b-verify")
# ---------------------------------------------------------------------------
# DB helpers (optional — DB is per-process SQLite, so we treat the snapshot
# as best-effort and surface "(no db row)" when the agent has never been
# touched by an orchestrator run in the local DB).
# ---------------------------------------------------------------------------
def _db_snapshot(agent_address: str) -> dict[str, Any] | None:
try:
from polyglot_alpha.persistence import session_scope
from polyglot_alpha.persistence.models import AgentReputation
with session_scope() as session:
row = session.get(AgentReputation, agent_address)
if row is None:
return None
return {
"total_bids": int(row.total_bids),
"total_wins": int(row.total_wins),
"avg_quality": float(row.avg_quality),
"cumulative_fees": float(row.cumulative_fees),
}
except Exception as exc: # pragma: no cover - DB optional
logger.warning("db snapshot unavailable: %s", exc)
return None
# ---------------------------------------------------------------------------
# Chain helpers
# ---------------------------------------------------------------------------
def _chain_snapshot(client: OnChainClient, agent_address: str) -> dict[str, Any]:
addr = Web3.to_checksum_address(agent_address)
raw = client.reputation.functions.getStats(addr).call()
total_bids, total_wins, total_quality, fees_units, score_raw = raw
return {
"total_bids": int(total_bids),
"total_wins": int(total_wins),
"total_quality_passes": int(total_quality),
"cumulative_fees_units": int(fees_units),
"cumulative_fees_usdc": fees_units / (10 ** 6),
"score_raw": int(score_raw),
"score": reputation_to_float(score_raw),
}
def _delta(before: dict[str, Any], after: dict[str, Any]) -> dict[str, Any]:
return {k: after[k] - before[k] for k in before if isinstance(before[k], (int, float))}
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
async def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--winner",
default=None,
help=(
"Winner address to update. Defaults to a freshly generated "
"throwaway address so deltas start from a clean (0,0,0,0) baseline."
),
)
parser.add_argument(
"--fee-usdc",
type=float,
default=0.9,
help="Fee amount (USDC) to push via updateOnFee. Default 0.9.",
)
parser.add_argument(
"--quality-passed",
action=argparse.BooleanOptionalAction,
default=True,
help="Pass --no-quality-passed to push quality_passed=False.",
)
parser.add_argument(
"--mock",
action="store_true",
help=(
"Mock-mode: verify the orchestrator's mode-gate by patching "
"the chain helper to assert no real RPC call is issued."
),
)
args = parser.parse_args()
if args.winner is None:
# Generate a throwaway address — never been touched on this contract,
# so all four counters start at zero.
winner = Account.from_key("0x" + secrets.token_hex(32)).address
logger.info("using fresh throwaway winner address: %s", winner)
else:
winner = Web3.to_checksum_address(args.winner)
client = OnChainClient()
logger.info(
"ReputationRegistry @ %s on chain %s",
client.config.reputation_registry,
client.config.chain_id,
)
# -------------------------------------------------------------------
# Mock-mode branch: verify the orchestrator's mode-gate prevents RPC.
# -------------------------------------------------------------------
if args.mock:
# Patch the chain helper so any send attempt raises immediately;
# the orchestrator helper must skip the call entirely for
# mode='mock' and never touch this patched function.
from polyglot_alpha import orchestrator as orch
async def _explode(*a: Any, **kw: Any) -> None:
raise AssertionError(
"mock mode should NOT issue any on-chain reputation RPC"
)
original_post = orch._update_reputation_on_chain_post_commit
original_fee = orch._update_reputation_fee_on_chain
# Sanity: the orchestrator helpers must early-return on mode='mock'
# without calling the chain module at all.
result_post = await orch._update_reputation_on_chain_post_commit(
winner, quality_passed=True, mode="mock"
)
result_fee = await orch._update_reputation_fee_on_chain(
winner, fee_usdc=1.0, mode="mock"
)
ok_post = result_post == {}
ok_fee = result_fee is None
logger.info(
"mock post-commit result=%s (expected {}) -> %s",
result_post,
"OK" if ok_post else "FAIL",
)
logger.info(
"mock fee result=%s (expected None) -> %s",
result_fee,
"OK" if ok_fee else "FAIL",
)
sim_hash_from_module = orch.sim_tx_hash()
logger.info(
"sample sim_tx_hash from sim_helpers: %s (no real RPC issued)",
sim_hash_from_module,
)
return 0 if (ok_post and ok_fee) else 1
# -------------------------------------------------------------------
# Live branch: real on-chain updates + delta check.
# -------------------------------------------------------------------
chain_before = _chain_snapshot(client, winner)
db_before = _db_snapshot(winner)
logger.info("CHAIN BEFORE: %s", chain_before)
logger.info("DB BEFORE: %s", db_before)
expected_quality_inc = 1 if args.quality_passed else 0
fee_units = int(round(args.fee_usdc * (10 ** 6)))
# Phase 5 simulation: updateOnAuction(true) + updateOnQuality(passed).
logger.info(
"sending post-commit signals: won=True, quality_passed=%s",
args.quality_passed,
)
post_txs = await repo.update_reputation(
winner,
won=True,
quality_passed=bool(args.quality_passed),
)
logger.info("post-commit txs: %s", post_txs)
# Phase 7 simulation: updateOnFee(fee_usdc).
logger.info("sending fee signal: fee_usdc=%.6f", args.fee_usdc)
fee_tx = await repo.update_reputation_fee_only(
winner, fee_usdc=args.fee_usdc
)
logger.info("fee tx: %s", fee_tx)
# Wait for the last tx to confirm so getStats() reflects all three writes.
if fee_tx:
client.w3.eth.wait_for_transaction_receipt(fee_tx, timeout=60)
elif post_txs.get("quality"):
client.w3.eth.wait_for_transaction_receipt(
post_txs["quality"], timeout=60
)
chain_after = _chain_snapshot(client, winner)
db_after = _db_snapshot(winner)
logger.info("CHAIN AFTER: %s", chain_after)
logger.info("DB AFTER: %s", db_after)
chain_delta = _delta(chain_before, chain_after)
logger.info("CHAIN DELTA: %s", chain_delta)
# Expected deltas:
expected = {
"total_bids": 1,
"total_wins": 1,
"total_quality_passes": expected_quality_inc,
"cumulative_fees_units": fee_units,
}
logger.info("EXPECTED: %s", expected)
# Match check.
mismatches: list[str] = []
for key, want in expected.items():
got = chain_delta.get(key)
if got != want:
mismatches.append(f"{key}: got={got} want={want}")
if mismatches:
logger.error("FAIL — delta mismatches:")
for m in mismatches:
logger.error(" %s", m)
print("deltas_match: FAIL")
return 1
# Gas estimate (informational).
if fee_tx:
receipt = client.w3.eth.get_transaction_receipt(fee_tx)
logger.info(
"fee-call gas used: %s (effective gas price %s wei)",
receipt.get("gasUsed"),
receipt.get("effectiveGasPrice"),
)
print("deltas_match: PASS")
return 0
if __name__ == "__main__":
sys.exit(asyncio.run(main()))