polyglot-alpha / scripts /simulate_ema.py
licaomeng
deploy: main@8970ffb → HF Spaces (2026-05-27T05:19Z)
88d2f2a
"""W14-C/W14-CONTRACT-PREP: Simulate ReputationRegistry EMA score under demo conditions.
Mirrors the Solidity logic in ``contracts/src/ReputationRegistry.sol`` so we
can probe the formula without spinning up an EVM. All values are 1e18-scaled
to match on-chain fixed-point.
Two contract versions are simulated:
* ``v1`` — the **deployed** ReputationRegistry. Has the unit-scale bug
described in W9-B / W14-C: ``_fillSignal`` divides USDC-6-decimal units by
``FEE_SCALE=100`` then treats the result as a 1e18 fixed-point number, so
``fillSignal`` is permanently clamped at ``FILL_MIN=0.5`` for any realistic
fee. Initial score on first touch is 1.0 (``ONE``).
* ``v2`` — the **proposed** (not-yet-deployed) ReputationRegistry. Applies
two fixes:
- β: rescale fees with ``(cumFees * 1e12) / FEE_SCALE`` so 6-decimal USDC
→ 1e18 fixed-point before the ln() input.
- α: initial score on first touch is 0.5 (``HALF``) instead of 1.0 so
the first ``_recompute`` does not strictly decrease the score.
Run::
python scripts/simulate_ema.py # default: side-by-side v1 vs v2
python scripts/simulate_ema.py --version v1
python scripts/simulate_ema.py --version v2
"""
from __future__ import annotations
import argparse
import math
from dataclasses import dataclass
ONE = 10 ** 18
HALF = 5 * 10 ** 17 # 0.5 — v2 initial score
DECAY = 85 * 10 ** 16 # 0.85
SIGNAL_W = 15 * 10 ** 16 # 0.15
FILL_MIN = 5 * 10 ** 17 # 0.5
FILL_MAX = 2 * 10 ** 18 # 2.0
FEE_SCALE = 100
SAT_X = 6_389_056_098_930_650_407 # e^2 - 1 in 1e18 units
LN2 = 693_147_180_559_945_309
USDC_DECIMALS = 6
USDC_TO_1E18 = 10 ** 12 # 1e18 / 1e6 — v2 β fix
def mul_div(a: int, b: int, d: int) -> int:
return (a * b) // d
def clamp(v: int, lo: int, hi: int) -> int:
return lo if v < lo else hi if v > hi else v
def fill_signal(cumulative_fees_units: int, *, version: str = "v1") -> int:
"""Port of ``_fillSignal``.
``cumulative_fees_units`` is in 6-decimal USDC base units (the exact value
``BuilderFeeRouter.updateOnFee`` passes to the registry).
* ``v1`` reproduces the deployed bug: ``x = units / FEE_SCALE`` and then
treats ``x`` as 1e18 fixed-point.
* ``v2`` applies the β fix: ``x = (units * 1e12) / FEE_SCALE`` so the input
is correctly rescaled to 1e18 fixed-point first.
"""
if cumulative_fees_units == 0:
return FILL_MIN
if version == "v1":
x = cumulative_fees_units // FEE_SCALE
elif version == "v2":
x = mul_div(cumulative_fees_units, USDC_TO_1E18, FEE_SCALE)
else:
raise ValueError(f"unknown version: {version!r}")
if x >= SAT_X:
return FILL_MAX
if x > ONE:
num = x - ONE
den = SAT_X - ONE
t = mul_div(num, ONE, den)
interp = LN2 + mul_div(2 * ONE - LN2, t, ONE)
return clamp(interp, FILL_MIN, FILL_MAX)
x2 = mul_div(x, x, ONE)
x3 = mul_div(x2, x, ONE)
x4 = mul_div(x3, x, ONE)
pos = x + x3 // 3
neg = x2 // 2 + x4 // 4
ln = pos - neg if pos > neg else 0
return clamp(ln, FILL_MIN, FILL_MAX)
@dataclass
class Rep:
total_bids: int = 0
total_wins: int = 0
total_quality_passes: int = 0
cumulative_fees_units: int = 0
score: int = 0 # set lazily on first touch via _lazy_init
initialized: bool = False
def recompute(r: Rep, *, version: str = "v1") -> int:
win_rate = ONE if r.total_bids == 0 else mul_div(r.total_wins, ONE, r.total_bids)
quality_rate = ONE if r.total_wins == 0 else mul_div(r.total_quality_passes, ONE, r.total_wins)
fs = fill_signal(r.cumulative_fees_units, version=version)
wq = mul_div(win_rate, quality_rate, ONE)
signal = mul_div(wq, fs, ONE)
decayed = mul_div(r.score, DECAY, ONE)
weighted = mul_div(signal, SIGNAL_W, ONE)
return decayed + weighted
def _lazy_init(r: Rep, *, version: str) -> None:
if not r.initialized:
# α-fix: v1 inits to 1.0 (ONE); v2 inits to 0.5 (HALF).
r.score = ONE if version == "v1" else HALF
r.initialized = True
def update_on_auction(r: Rep, won: bool, *, version: str = "v1") -> None:
_lazy_init(r, version=version)
r.total_bids += 1
if won:
r.total_wins += 1
r.score = recompute(r, version=version)
def update_on_quality(r: Rep, passed: bool, *, version: str = "v1") -> None:
_lazy_init(r, version=version)
if passed:
r.total_quality_passes += 1
r.score = recompute(r, version=version)
def update_on_fee(r: Rep, fee_usdc: float, *, version: str = "v1") -> None:
_lazy_init(r, version=version)
units = int(round(fee_usdc * 10 ** USDC_DECIMALS))
r.cumulative_fees_units += units
r.score = recompute(r, version=version)
def score_f(score_1e18: int) -> float:
return score_1e18 / ONE
# -----------------------------------------------------------------------------
# Scenarios
# -----------------------------------------------------------------------------
def scenario(
label: str,
wins: int,
fee_each: float,
*,
quality_passes_each: bool = True,
version: str = "v1",
) -> dict:
r = Rep()
for _ in range(wins):
update_on_auction(r, won=True, version=version)
update_on_quality(r, passed=quality_passes_each, version=version)
update_on_fee(r, fee_each, version=version)
return {
"label": label,
"wins": wins,
"fee_each_usdc": fee_each,
"cum_fees_usdc": wins * fee_each,
"win_rate": r.total_wins / max(r.total_bids, 1),
"quality_rate": r.total_quality_passes / max(r.total_wins, 1) if r.total_wins else 1.0,
"fill_signal": score_f(fill_signal(r.cumulative_fees_units, version=version)),
"final_score": score_f(r.score),
}
def reproduce_w9b(*, version: str = "v1") -> dict:
"""Reproduce W9-B observed sequence: clean win, quality pass, $0.9 fee."""
r = Rep()
update_on_auction(r, won=True, version=version)
s1 = score_f(r.score)
update_on_quality(r, passed=True, version=version)
s2 = score_f(r.score)
update_on_fee(r, 0.9, version=version)
s3 = score_f(r.score)
return {"after_auction": s1, "after_quality": s2, "after_fee": s3}
# -----------------------------------------------------------------------------
# Output helpers
# -----------------------------------------------------------------------------
SCENARIOS = [
("fresh agent, 1 win, $0.9 fee", 1, 0.9),
("fresh agent, 10 wins, $0.9 each", 10, 0.9),
("fresh agent, 100 wins, $0.9 each", 100, 0.9),
("fresh agent, 1 win, $1000 fee", 1, 1000.0),
("fresh agent, 1 win, $10000 fee", 1, 10000.0),
("fresh agent, 50 wins, $5 each", 50, 5.0),
]
def _print_version(version: str) -> None:
print(f"\n=== version: {version} ===")
w9b = reproduce_w9b(version=version)
print(f" W9-B sequence (win + quality-pass + $0.9 fee):")
for k, v in w9b.items():
print(f" {k:>18}: {v:.6f}")
print(f" scenarios (auction-won + quality-pass + fee each event):")
print(f" {'scenario':<42} {'cumFees':>10} {'fillSig':>9} {'score':>8}")
for label, wins, fee in SCENARIOS:
s = scenario(label, wins, fee, version=version)
print(
f" {s['label']:<42} ${s['cum_fees_usdc']:>8.2f} "
f"{s['fill_signal']:>9.4f} {s['final_score']:>8.4f}"
)
def _print_side_by_side() -> None:
"""Compact v1 vs v2 comparison table — the W14-C-mandated artifact."""
print()
print("=" * 80)
print("v1 (deployed, buggy) vs v2 (proposed fix) — side-by-side")
print("=" * 80)
rows = []
rows.append(("W9-B: 1 win + quality + $0.9 fee",
reproduce_w9b(version="v1")["after_fee"],
reproduce_w9b(version="v2")["after_fee"]))
for label, wins, fee in SCENARIOS:
sv1 = scenario(label, wins, fee, version="v1")
sv2 = scenario(label, wins, fee, version="v2")
rows.append((label, sv1["final_score"], sv2["final_score"]))
print(f"{'Scenario':<42} | {'v1 score':>8} | {'v2 score':>8}")
print("-" * 42 + "-+-" + "-" * 9 + "-+-" + "-" * 9)
for label, v1, v2 in rows:
print(f"{label:<42} | {v1:>8.4f} | {v2:>8.4f}")
print()
print("Notes:")
print(" * v1 fillSignal collapses to FILL_MIN=0.5 for any realistic fee")
print(" because USDC 6-decimal units pass through `units/100` and are then")
print(" misinterpreted as a 1e18 fixed-point number (off by 1e12).")
print(" * v1 first touch seeds score=1.0 (max), but the per-event signal is")
print(" bounded around `winRate*qualityRate*0.5 = 0.5` mid-range, so the")
print(" first update strictly *subtracts* (1.0 -> 0.7529 even on a clean win).")
print(" * v2 β-fix rescales fees to 1e18 before the ln() input, so fillSignal")
print(" spans the [0.5, 2.0] band naturally.")
print(" * v2 α-fix seeds first touch at 0.5 (HALF) so the first event nets")
print(" UP for a clean winner instead of dropping from a maxed-out prior.")
# -----------------------------------------------------------------------------
# Diagnostic: why v1 fillSignal collapses
# -----------------------------------------------------------------------------
def _print_collapse_diagnostic() -> None:
print("\n[diagnostic] Why v1 fillSignal collapses to 0.5 (FILL_MIN) for demo fees:")
print(" contract does: x = cumFeesUnits / FEE_SCALE(=100)")
print(" then ln(1+x) where it ASSUMES x is 1e18-fixed-point")
print(" but cumFeesUnits is in USDC 6-decimal units (usdc_to_units)")
for fee in [0.9, 9.0, 90.0, 1000.0, 1_000_000.0]:
units = int(fee * 10 ** USDC_DECIMALS)
x_after_scale_v1 = units // FEE_SCALE
x_after_scale_v2 = mul_div(units, USDC_TO_1E18, FEE_SCALE)
x_as_float_v1 = x_after_scale_v1 / ONE
x_as_float_v2 = x_after_scale_v2 / ONE
print(
f" fee=${fee:>10.2f} units={units:>14} "
f"v1 x_post_scale={x_after_scale_v1:>14} (~{x_as_float_v1:.2e}) "
f"v2 x_post_scale={x_after_scale_v2:>14} (~{x_as_float_v2:.4f})"
)
def main() -> None:
parser = argparse.ArgumentParser(
description="Simulate ReputationRegistry EMA score (v1=deployed, v2=proposed fix)"
)
parser.add_argument(
"--version",
choices=("v1", "v2", "both"),
default="both",
help="Which contract version to simulate (default: both — prints side-by-side table).",
)
args = parser.parse_args()
print("=" * 80)
print("W14-CONTRACT-PREP: EMA reputation simulation")
print(" mirrors contracts/src/ReputationRegistry.sol (v1 + v2)")
print("=" * 80)
if args.version in ("v1", "both"):
_print_version("v1")
if args.version in ("v2", "both"):
_print_version("v2")
if args.version == "both":
_print_side_by_side()
_print_collapse_diagnostic()
if __name__ == "__main__":
main()