polyglot-alpha / scripts /verify_chain_consistency.py
licaomeng
deploy: main@8970ffb → HF Spaces (2026-05-27T05:19Z)
88d2f2a
"""Verify chain <-> SQLite DB consistency for a single event_id.
Standalone audit script — no FastAPI dependency. Given an ``event_id``, it
walks the five chain-touching phases of the PolyglotAlpha lifecycle and
asserts that what is stored in ``polyglot_alpha.db`` matches the on-chain
state reachable via the Arc-testnet RPC.
Phases checked:
2. Auction — ``TranslationAuction.getAuction`` winner + winning bid
vs ``auctions`` row + settlement tx receipt status.
4. Judges — ``JudgePanel`` attestation tx receipt status (W9-A
column ``events.judges_attestation_tx`` is optional;
reported as ``[pending W9-A]`` when missing).
5. Anchor — ``QuestionRegistry.questions(qid).titleHash`` vs
``questions.title_hash`` + tx receipt status.
7. Fee split — ``BuilderFeeRouter.getCumulativeFees`` deltas vs the
two ``builder_fee_events`` rows for the winner / treasury
legs (+ tx receipt status on each).
8. Reputation — ``ReputationRegistry.getStats(winner)`` vs the
``agent_reputation`` row for the winner (W9-B columns
are optional; reported as ``[pending W9-B]`` when
missing). For deltas we currently report only the
present-state snapshot — the script is idempotent and
can be re-run as a sanity check.
Mock events (any tx_hash that starts with ``0xsim_``) are reported as
``N/A (mock event)`` for that phase and don't fail the run.
Run with the project's virtualenv::
.venv/bin/python scripts/verify_chain_consistency.py 112
.venv/bin/python scripts/verify_chain_consistency.py 112 --verbose
Exit code is ``0`` if every non-skipped phase matches, ``1`` otherwise.
"""
from __future__ import annotations
import argparse
import json
import math
import os
import sqlite3
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Optional
from web3 import Web3
from web3.contract import Contract
# ---------------------------------------------------------------------------
# Project layout + env loading (no dotenv dependency)
# ---------------------------------------------------------------------------
_REPO_ROOT: Path = Path(__file__).resolve().parent.parent
_DB_PATH: Path = _REPO_ROOT / "polyglot_alpha.db"
_FOUNDRY_OUT: Path = _REPO_ROOT / "contracts" / "out"
# Sentinel prefix for synthetic ("0xsim_...") tx hashes produced by mock-mode
# lifecycles — kept in sync with ``polyglot_alpha.chain.sim_helpers``.
_SIM_TX_HASH_PREFIX: str = "0xsim_"
# Expected 90 / 10 router split (Path A) — see ``record_fill_with_split``.
_WINNER_SHARE: float = 0.90
_TREASURY_SHARE: float = 0.10
# Tolerance for floating-point USDC deltas. 1e-6 USDC == 1 base unit at
# 6 decimals, so anything tighter than this is meaningless.
_USDC_DELTA_TOL: float = 1e-4
def _load_env_file(path: Path) -> None:
"""Populate ``os.environ`` from a ``.env`` file (best-effort, idempotent).
Existing ``os.environ`` values take precedence so callers can still
override via the shell. Silently ignored if the file does not exist.
"""
if not path.exists():
return
for raw_line in path.read_text().splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
_load_env_file(_REPO_ROOT / ".env")
# ---------------------------------------------------------------------------
# Result types
# ---------------------------------------------------------------------------
@dataclass
class PhaseResult:
"""One phase's audit verdict."""
name: str
status: str = "PASS" # "PASS" | "FAIL" | "SKIP"
db_lines: list[str] = field(default_factory=list)
chain_lines: list[str] = field(default_factory=list)
notes: list[str] = field(default_factory=list)
reason: str = ""
@property
def emoji(self) -> str:
return {"PASS": "PASS", "FAIL": "FAIL", "SKIP": "SKIP"}.get(self.status, "?")
# ---------------------------------------------------------------------------
# Helpers — DB
# ---------------------------------------------------------------------------
def _db_connect() -> sqlite3.Connection:
# Open in read-only mode via URI so we can safely audit while the
# FastAPI backend may still hold a WAL write lock. ``immutable=0``
# (the default) keeps WAL visibility so our snapshot is up to date.
uri = f"file:{_DB_PATH}?mode=ro"
conn = sqlite3.connect(uri, uri=True)
conn.row_factory = sqlite3.Row
return conn
def _db_row(conn: sqlite3.Connection, sql: str, params: tuple) -> Optional[sqlite3.Row]:
cur = conn.execute(sql, params)
return cur.fetchone()
def _db_rows(
conn: sqlite3.Connection, sql: str, params: tuple
) -> list[sqlite3.Row]:
return list(conn.execute(sql, params))
def _table_has_column(conn: sqlite3.Connection, table: str, column: str) -> bool:
cur = conn.execute(f"PRAGMA table_info({table})")
return any(row[1] == column for row in cur.fetchall())
# ---------------------------------------------------------------------------
# Helpers — formatting
# ---------------------------------------------------------------------------
def _short(value: Optional[str], head: int = 6, tail: int = 4) -> str:
if not value:
return "<none>"
if len(value) <= head + tail + 3:
return value
return f"{value[:head]}...{value[-tail:]}"
def _is_sim_hash(value: Optional[str]) -> bool:
return bool(value) and value.lower().startswith(_SIM_TX_HASH_PREFIX)
def _coerce_bytes32(value: Optional[str]) -> bytes:
"""Match the orchestrator's coercion in ``chain.auction_client._event_id_bytes``
/ ``chain.question_registry._coerce_bytes32`` so the on-chain key matches.
"""
if not value:
return b"\x00" * 32
raw = value[2:] if value.startswith("0x") else value
try:
as_bytes = bytes.fromhex(raw)
except ValueError:
return Web3.keccak(text=value)
if len(as_bytes) == 32:
return as_bytes
if len(as_bytes) < 32:
return as_bytes.rjust(32, b"\x00")
return as_bytes[:32]
def _event_id_to_bytes32(event_id: int) -> bytes:
"""Replicate ``chain.auction_client._event_id_bytes`` for an int event_id.
The orchestrator calls ``event_id_from_event(str(event_id))`` which
keccak-hashes the decimal string representation.
"""
return Web3.keccak(text=str(event_id))
def _units_to_usdc(units: int, decimals: int = 6) -> float:
return units / (10 ** decimals)
# ---------------------------------------------------------------------------
# Helpers — chain
# ---------------------------------------------------------------------------
def _load_abi(contract_name: str) -> list[dict]:
path = _FOUNDRY_OUT / f"{contract_name}.sol" / f"{contract_name}.json"
with path.open("r", encoding="utf-8") as fh:
return json.load(fh)["abi"]
@dataclass
class ChainHandles:
w3: Web3
auction: Contract
question_registry: Contract
builder_fee_router: Contract
reputation: Contract
judge_panel: Optional[Contract]
def _build_chain_handles() -> ChainHandles:
rpc_url = os.environ.get("ARC_TESTNET_RPC", "https://rpc.testnet.arc.network")
w3 = Web3(Web3.HTTPProvider(rpc_url, request_kwargs={"timeout": 15}))
auction_addr = os.environ["TRANSLATION_AUCTION_ADDRESS"]
qr_addr = os.environ["QUESTION_REGISTRY_ADDRESS"]
bfr_addr = os.environ["BUILDER_FEE_ROUTER_ADDRESS"]
rep_addr = os.environ["REPUTATION_REGISTRY_ADDRESS"]
judge_panel_addr = os.environ.get("JUDGE_PANEL_ADDRESS")
auction = w3.eth.contract(
address=Web3.to_checksum_address(auction_addr),
abi=_load_abi("TranslationAuction"),
)
qr = w3.eth.contract(
address=Web3.to_checksum_address(qr_addr),
abi=_load_abi("QuestionRegistry"),
)
bfr = w3.eth.contract(
address=Web3.to_checksum_address(bfr_addr),
abi=_load_abi("BuilderFeeRouter"),
)
rep = w3.eth.contract(
address=Web3.to_checksum_address(rep_addr),
abi=_load_abi("ReputationRegistry"),
)
judge_panel: Optional[Contract] = None
if judge_panel_addr:
try:
judge_panel = w3.eth.contract(
address=Web3.to_checksum_address(judge_panel_addr),
abi=_load_abi("JudgePanel"),
)
except FileNotFoundError:
judge_panel = None
return ChainHandles(
w3=w3,
auction=auction,
question_registry=qr,
builder_fee_router=bfr,
reputation=rep,
judge_panel=judge_panel,
)
def _tx_status(w3: Web3, tx_hash: Optional[str]) -> Optional[int]:
"""Return the on-chain status (1=success, 0=revert) for ``tx_hash``.
Returns ``None`` if the receipt cannot be fetched (RPC error or pending).
"""
if not tx_hash or _is_sim_hash(tx_hash):
return None
try:
receipt = w3.eth.get_transaction_receipt(tx_hash)
except Exception:
return None
if receipt is None:
return None
return int(getattr(receipt, "status", receipt.get("status", 0)))
def _fmt_status(status: Optional[int]) -> str:
if status is None:
return "unknown"
return "success" if status == 1 else "revert"
# ---------------------------------------------------------------------------
# Phase verifiers
# ---------------------------------------------------------------------------
def verify_auction(
event_id: int,
conn: sqlite3.Connection,
chain: ChainHandles,
) -> PhaseResult:
res = PhaseResult(name="Phase 2 Auction", status="PASS")
row = _db_row(conn, "SELECT * FROM auctions WHERE event_id = ?", (event_id,))
if row is None:
res.status = "SKIP"
res.reason = "no auctions row for this event"
return res
db_winner: Optional[str] = row["winner_address"]
db_bid: Optional[float] = row["winning_bid"]
settle_tx: Optional[str] = row["settlement_tx_hash"]
res.db_lines.append(
f"winner={_short(db_winner)} bid={db_bid} settle_tx={_short(settle_tx)}"
)
if _is_sim_hash(settle_tx):
res.status = "SKIP"
res.reason = "N/A (mock event — sim tx hash)"
return res
if not settle_tx:
res.status = "FAIL"
res.reason = "DB row missing settlement_tx_hash"
return res
# Read TranslationAuction.getAuction(eventId) -> (eventHash, deadline,
# winner, winningBid, settled, opened, bidderCount).
try:
eid = _event_id_to_bytes32(event_id)
on_chain = chain.auction.functions.getAuction(eid).call()
except Exception as exc:
res.status = "FAIL"
res.reason = f"getAuction RPC call failed: {exc}"
return res
onchain_winner: str = on_chain[2]
onchain_winning_bid_units: int = int(on_chain[3])
onchain_settled: bool = bool(on_chain[4])
onchain_winning_bid_usdc = _units_to_usdc(onchain_winning_bid_units)
tx_status = _tx_status(chain.w3, settle_tx)
res.chain_lines.append(
f"winner={_short(onchain_winner)} bid={onchain_winning_bid_usdc} "
f"settled={onchain_settled} tx_status={_fmt_status(tx_status)}"
)
# The "lookalike" mock winners (e.g. 0xkkkk...) recorded in the DB are
# *off-chain* fixtures used by the orchestrator's bid-generation fallback
# and never actually existed on chain. Detect those so we don't report
# a misleading mismatch.
looks_like_placeholder = bool(
db_winner and (db_winner.lower().count(db_winner.lower()[2:3]) > 25)
)
if looks_like_placeholder:
res.notes.append(
"DB winner is a placeholder fixture (0xkkkk... pattern); "
"on-chain winner is the real settled bidder"
)
if tx_status != 1:
res.status = "FAIL"
res.reason = f"settle tx receipt status={_fmt_status(tx_status)}"
return res
if not onchain_settled:
res.status = "FAIL"
res.reason = "on-chain auction is not marked settled"
return res
# Winner address match (case-insensitive — DB stores mixed case,
# chain returns checksum). Skip the strict match when the DB row is
# a placeholder fixture.
if db_winner and not looks_like_placeholder:
if db_winner.lower() != onchain_winner.lower():
res.status = "FAIL"
res.reason = (
f"winner mismatch: db={db_winner} chain={onchain_winner}"
)
return res
# Winning-bid match (allow 1e-6 USDC tolerance).
if db_bid is not None and onchain_winning_bid_units > 0:
if abs(onchain_winning_bid_usdc - float(db_bid)) > _USDC_DELTA_TOL:
res.status = "FAIL"
res.reason = (
f"winning_bid mismatch: db={db_bid} "
f"chain={onchain_winning_bid_usdc}"
)
return res
return res
def verify_anchor(
event_id: int,
conn: sqlite3.Connection,
chain: ChainHandles,
) -> PhaseResult:
res = PhaseResult(name="Phase 5 Anchor")
row = _db_row(
conn,
"SELECT id, event_id, question_id_onchain, title_hash, tx_hash "
"FROM questions WHERE event_id = ? ORDER BY id ASC LIMIT 1",
(event_id,),
)
if row is None:
res.status = "SKIP"
res.reason = "no questions row for this event"
return res
commit_tx: Optional[str] = row["tx_hash"]
title_hash: Optional[str] = row["title_hash"]
qid_hex: Optional[str] = row["question_id_onchain"]
res.db_lines.append(
f"commit_tx={_short(commit_tx)} title_hash={_short(title_hash)} "
f"qid={_short(qid_hex)}"
)
if _is_sim_hash(commit_tx):
res.status = "SKIP"
res.reason = "N/A (mock event — sim tx hash)"
return res
if not commit_tx:
res.status = "FAIL"
res.reason = "DB questions row missing tx_hash"
return res
tx_status = _tx_status(chain.w3, commit_tx)
# Decode qid_hex (e.g. "0x000...2c") into an integer to feed
# questions(uint256). The orchestrator pads to 40 hex chars on the
# *left* so int(qid, 16) is the correct decoder.
onchain_title_hash: Optional[str] = None
qid_int: Optional[int] = None
if qid_hex:
try:
qid_int = int(qid_hex, 16)
q = chain.question_registry.functions.questions(qid_int).call()
onchain_title_hash = q[0].hex()
except Exception as exc:
res.notes.append(f"questions({qid_hex}) read failed: {exc}")
res.chain_lines.append(
f"qid={qid_int} on_chain_title_hash={_short(onchain_title_hash)} "
f"tx_status={_fmt_status(tx_status)}"
)
res.status = "PASS"
if tx_status != 1:
res.status = "FAIL"
res.reason = f"commit tx receipt status={_fmt_status(tx_status)}"
return res
# Compare the on-chain title hash with what was stored in DB. The
# orchestrator records ``candidate_hash`` (the LLM output digest) into
# ``questions.title_hash``; the contract receives the same digest as
# the ``titleHash`` field of registerQuestion.
if title_hash and onchain_title_hash:
db_normalised = title_hash.lower().lstrip("0x").rjust(64, "0")
chain_normalised = onchain_title_hash.lower().lstrip("0x").rjust(64, "0")
if db_normalised != chain_normalised:
res.status = "FAIL"
res.reason = (
f"title_hash mismatch: db={_short(title_hash)} "
f"chain={_short(onchain_title_hash)}"
)
return res
return res
def verify_judges(
event_id: int,
conn: sqlite3.Connection,
chain: ChainHandles,
) -> PhaseResult:
res = PhaseResult(name="Phase 4 Judges")
# W9-A introduces ``events.judges_attestation_tx``. If absent (the W9-A
# branch has not landed yet) we report the pending state and move on.
if not _table_has_column(conn, "events", "judges_attestation_tx"):
res.status = "SKIP"
res.reason = "[pending W9-A] events.judges_attestation_tx column not present"
# We still print the DB-side state we can read so the operator
# knows the quality_scores row exists.
qrow = _db_row(
conn,
"SELECT verdict, overall_score FROM quality_scores WHERE event_id = ?",
(event_id,),
)
if qrow is not None:
res.db_lines.append(
f"verdict={qrow['verdict']} overall_score={qrow['overall_score']}"
)
return res
row = _db_row(
conn,
"SELECT judges_attestation_tx FROM events WHERE id = ?",
(event_id,),
)
if row is None or not row["judges_attestation_tx"]:
res.status = "SKIP"
res.reason = "judges_attestation_tx is NULL"
return res
j_tx: str = row["judges_attestation_tx"]
res.db_lines.append(f"judges_attestation_tx={_short(j_tx)}")
if _is_sim_hash(j_tx):
res.status = "SKIP"
res.reason = "N/A (mock event — sim tx hash)"
return res
tx_status = _tx_status(chain.w3, j_tx)
res.chain_lines.append(f"tx_status={_fmt_status(tx_status)}")
res.status = "PASS" if tx_status == 1 else "FAIL"
if res.status == "FAIL":
res.reason = f"attestation tx status={_fmt_status(tx_status)}"
return res
def verify_fee_split(
event_id: int,
conn: sqlite3.Connection,
chain: ChainHandles,
) -> PhaseResult:
res = PhaseResult(name="Phase 7 Fee Split")
# Find this event's market(s) -> fee legs.
submissions = _db_rows(
conn,
"SELECT id, market_id FROM polymarket_submissions WHERE event_id = ?",
(event_id,),
)
if not submissions:
res.status = "SKIP"
res.reason = "no polymarket_submissions row for this event"
return res
market_ids = [s["market_id"] for s in submissions if s["market_id"]]
if not market_ids:
res.status = "SKIP"
res.reason = "polymarket_submissions has NULL market_id"
return res
placeholders = ",".join("?" for _ in market_ids)
fee_rows = _db_rows(
conn,
f"SELECT id, market_id, fill_amount, fee_amount, translator_address, "
f"arc_tx_hash, is_simulated FROM builder_fee_events "
f"WHERE market_id IN ({placeholders}) ORDER BY id ASC",
tuple(market_ids),
)
if not fee_rows:
res.status = "SKIP"
res.reason = "no builder_fee_events rows for this event's markets"
return res
# Group by translator_address.
legs_by_addr: dict[str, list[sqlite3.Row]] = {}
for r in fee_rows:
legs_by_addr.setdefault(r["translator_address"], []).append(r)
# Build a summary line.
leg_summary = ", ".join(
f"{_short(addr)}=${sum(float(r['fee_amount']) for r in legs):.4f}"
for addr, legs in legs_by_addr.items()
)
res.db_lines.append(f"{len(fee_rows)} legs [{leg_summary}]")
# Check tx receipt status for each leg.
all_sim = all(_is_sim_hash(r["arc_tx_hash"]) for r in fee_rows)
if all_sim:
res.status = "SKIP"
res.reason = "N/A (mock event — all sim tx hashes)"
return res
leg_statuses: list[tuple[int, str]] = []
for r in fee_rows:
leg_statuses.append((r["id"], _fmt_status(_tx_status(chain.w3, r["arc_tx_hash"]))))
failed = [lid for lid, st in leg_statuses if st != "success"]
res.chain_lines.append(
"leg tx receipts: "
+ ", ".join(f"id={lid}:{st}" for lid, st in leg_statuses)
)
# Read cumulative fees on-chain for each translator address that
# received a leg. We can only report the present-state value (no
# snapshot of "before" survives the lifecycle), so the check is
# "cumulative >= sum of legs we credited". If a delta is requested,
# the caller should snapshot before they fire Phase 7.
cum_lines: list[str] = []
fee_mismatch = False
for addr, legs in legs_by_addr.items():
expected = sum(float(r["fee_amount"]) for r in legs)
try:
raw = chain.builder_fee_router.functions.getCumulativeFees(
Web3.to_checksum_address(addr)
).call()
cum_usdc = _units_to_usdc(int(raw))
except Exception as exc:
cum_lines.append(f"{_short(addr)} cum=<rpc-fail: {exc}>")
fee_mismatch = True
continue
cum_lines.append(
f"{_short(addr)} cum=${cum_usdc:.6f} legs_sum=${expected:.6f}"
)
# The cumulative balance is *lifetime*: it should be >= the legs
# we are auditing. A strictly-less value is a clear inconsistency.
if cum_usdc + _USDC_DELTA_TOL < expected:
fee_mismatch = True
res.chain_lines.append("cumulative: " + " | ".join(cum_lines))
if failed:
res.status = "FAIL"
res.reason = f"{len(failed)} fee leg(s) without success receipt: {failed}"
return res
if fee_mismatch:
res.status = "FAIL"
res.reason = (
"cumulative fees on chain are less than the sum of DB legs; "
"tx hash recorded but on-chain state did not change"
)
return res
# Sanity-check the 90/10 split when there are exactly two legs.
if len(fee_rows) == 2:
sorted_legs = sorted(fee_rows, key=lambda r: -float(r["fee_amount"]))
winner_amount = float(sorted_legs[0]["fee_amount"])
treasury_amount = float(sorted_legs[1]["fee_amount"])
total = winner_amount + treasury_amount
if total > 0:
winner_ratio = winner_amount / total
if not math.isclose(winner_ratio, _WINNER_SHARE, abs_tol=0.01):
res.notes.append(
f"unexpected split ratio: winner={winner_ratio:.4f} "
f"expected ~{_WINNER_SHARE:.2f}"
)
res.status = "PASS"
return res
def verify_reputation(
event_id: int,
conn: sqlite3.Connection,
chain: ChainHandles,
) -> PhaseResult:
res = PhaseResult(name="Phase 8 Reputation")
# The winner address comes from the auctions row (DB side of truth).
arow = _db_row(
conn,
"SELECT winner_address FROM auctions WHERE event_id = ?",
(event_id,),
)
if arow is None or not arow["winner_address"]:
res.status = "SKIP"
res.reason = "no winner_address recorded in auctions"
return res
winner: str = arow["winner_address"]
# Skip the obvious placeholder fixtures (0xkkkk... etc).
looks_like_placeholder = bool(
len(winner) >= 32 and winner.lower().count(winner.lower()[2:3]) > 25
)
if looks_like_placeholder:
res.status = "SKIP"
res.reason = f"winner is a placeholder fixture: {winner}"
return res
# DB side — agent_reputation row.
rep_row = _db_row(
conn,
"SELECT * FROM agent_reputation WHERE LOWER(agent_address) = LOWER(?)",
(winner,),
)
# W9-B adds richer columns (``auction_count`` / ``quality_count`` /
# ``fee_total``). Probe for the actual column names so we degrade
# gracefully if W9-B has not landed.
w9b_cols = [
c for c in ("auction_count", "quality_count", "fee_total")
if _table_has_column(conn, "agent_reputation", c)
]
has_w9b = bool(w9b_cols)
if rep_row is None:
res.db_lines.append("no agent_reputation row for winner")
elif has_w9b:
parts = [f"{c}={rep_row[c]}" for c in w9b_cols]
res.db_lines.append(" ".join(parts))
else:
res.db_lines.append(
f"[pending W9-B] total_bids={rep_row['total_bids']} "
f"total_wins={rep_row['total_wins']} "
f"avg_quality={rep_row['avg_quality']} "
f"cumulative_fees={rep_row['cumulative_fees']}"
)
# Chain side — ReputationRegistry.getStats(winner).
try:
stats = chain.reputation.functions.getStats(
Web3.to_checksum_address(winner)
).call()
(
total_bids,
total_wins,
total_quality_passes,
cumulative_fees_units,
score_units,
) = (int(x) for x in stats)
cum_fees_usdc = _units_to_usdc(cumulative_fees_units)
score_float = score_units / 10 ** 18
res.chain_lines.append(
f"total_bids={total_bids} total_wins={total_wins} "
f"quality_passes={total_quality_passes} "
f"cum_fees=${cum_fees_usdc:.6f} score={score_float:.4f}"
)
except Exception as exc:
res.status = "FAIL"
res.reason = f"getStats RPC call failed: {exc}"
return res
# Match check — DB cumulative_fees vs chain cum_fees_usdc + total_wins vs DB.
res.status = "PASS"
if rep_row is None:
# DB has no row but chain does — that's an inconsistency if any
# non-zero stats are on chain.
if total_bids > 0 or total_wins > 0 or cumulative_fees_units > 0:
res.status = "FAIL"
res.reason = (
f"chain has stats (bids={total_bids}, wins={total_wins}, "
f"fees=${cum_fees_usdc:.6f}) but DB has no agent_reputation row"
)
return res
db_cum = float(rep_row["cumulative_fees"])
if abs(db_cum - cum_fees_usdc) > _USDC_DELTA_TOL:
res.notes.append(
f"cumulative_fees drift: db=${db_cum:.6f} chain=${cum_fees_usdc:.6f} "
f"(delta=${cum_fees_usdc - db_cum:+.6f})"
)
# We treat this as a soft mismatch — many DB updates are local-only
# bookkeeping and the chain ALWAYS lags by some amount of un-confirmed
# transactions. Surfaces in the report but does not fail the run.
return res
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def _resolve_event(
conn: sqlite3.Connection, event_id: int
) -> Optional[sqlite3.Row]:
return _db_row(
conn,
"SELECT id, status, mode, content_hash, title FROM events WHERE id = ?",
(event_id,),
)
def _print_phase(res: PhaseResult, verbose: bool) -> None:
print(f"\n{res.name}")
if res.status == "SKIP":
print(f" SKIP — {res.reason}")
if verbose and res.db_lines:
for line in res.db_lines:
print(f" DB: {line}")
return
for line in res.db_lines:
print(f" DB: {line}")
for line in res.chain_lines:
print(f" Chain: {line}")
for note in res.notes:
print(f" NOTE: {note}")
if res.status == "PASS":
print(" RESULT: PASS")
else:
print(f" RESULT: FAIL -- {res.reason}")
def main() -> int:
parser = argparse.ArgumentParser(
description="Verify chain<->DB consistency for one event_id"
)
parser.add_argument("event_id", type=int, help="event_id to verify")
parser.add_argument(
"--verbose",
"-v",
action="store_true",
help="show DB context lines even on SKIP",
)
args = parser.parse_args()
event_id: int = args.event_id
if not _DB_PATH.exists():
print(f"FATAL: DB not found at {_DB_PATH}", file=sys.stderr)
return 2
conn = _db_connect()
try:
evt = _resolve_event(conn, event_id)
if evt is None:
print(f"FATAL: no events row with id={event_id}", file=sys.stderr)
return 2
winner_row = _db_row(
conn, "SELECT winner_address FROM auctions WHERE event_id = ?", (event_id,)
)
winner = winner_row["winner_address"] if winner_row else None
print(f"verify_chain_consistency.py {event_id}\n")
print(f"Event {event_id} -- verifying chain <-> DB consistency")
print("=" * 64)
print(
f"Mode: {evt['mode']} | Status: {evt['status']} | "
f"Winner: {_short(winner)}"
)
try:
chain = _build_chain_handles()
except Exception as exc:
print(f"FATAL: failed to build chain handles: {exc}", file=sys.stderr)
return 2
phases = [
verify_auction(event_id, conn, chain),
verify_judges(event_id, conn, chain),
verify_anchor(event_id, conn, chain),
verify_fee_split(event_id, conn, chain),
verify_reputation(event_id, conn, chain),
]
for p in phases:
_print_phase(p, verbose=args.verbose)
# Final tally.
passed = sum(1 for p in phases if p.status == "PASS")
failed = sum(1 for p in phases if p.status == "FAIL")
skipped = sum(1 for p in phases if p.status == "SKIP")
checked = passed + failed
print()
if failed == 0:
print(
f"OVERALL: PASS {passed}/{checked} phases consistent "
f"({skipped} skipped)"
)
return 0
print(
f"OVERALL: FAIL {failed} of {checked} phases inconsistent "
f"({skipped} skipped)"
)
return 1
finally:
conn.close()
if __name__ == "__main__":
sys.exit(main())