Spaces:
Running
Running
| """W14-C/W14-CONTRACT-PREP: Simulate ReputationRegistry EMA score under demo conditions. | |
| Mirrors the Solidity logic in ``contracts/src/ReputationRegistry.sol`` so we | |
| can probe the formula without spinning up an EVM. All values are 1e18-scaled | |
| to match on-chain fixed-point. | |
| Two contract versions are simulated: | |
| * ``v1`` — the **deployed** ReputationRegistry. Has the unit-scale bug | |
| described in W9-B / W14-C: ``_fillSignal`` divides USDC-6-decimal units by | |
| ``FEE_SCALE=100`` then treats the result as a 1e18 fixed-point number, so | |
| ``fillSignal`` is permanently clamped at ``FILL_MIN=0.5`` for any realistic | |
| fee. Initial score on first touch is 1.0 (``ONE``). | |
| * ``v2`` — the **proposed** (not-yet-deployed) ReputationRegistry. Applies | |
| two fixes: | |
| - β: rescale fees with ``(cumFees * 1e12) / FEE_SCALE`` so 6-decimal USDC | |
| → 1e18 fixed-point before the ln() input. | |
| - α: initial score on first touch is 0.5 (``HALF``) instead of 1.0 so | |
| the first ``_recompute`` does not strictly decrease the score. | |
| Run:: | |
| python scripts/simulate_ema.py # default: side-by-side v1 vs v2 | |
| python scripts/simulate_ema.py --version v1 | |
| python scripts/simulate_ema.py --version v2 | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import math | |
| from dataclasses import dataclass | |
| ONE = 10 ** 18 | |
| HALF = 5 * 10 ** 17 # 0.5 — v2 initial score | |
| DECAY = 85 * 10 ** 16 # 0.85 | |
| SIGNAL_W = 15 * 10 ** 16 # 0.15 | |
| FILL_MIN = 5 * 10 ** 17 # 0.5 | |
| FILL_MAX = 2 * 10 ** 18 # 2.0 | |
| FEE_SCALE = 100 | |
| SAT_X = 6_389_056_098_930_650_407 # e^2 - 1 in 1e18 units | |
| LN2 = 693_147_180_559_945_309 | |
| USDC_DECIMALS = 6 | |
| USDC_TO_1E18 = 10 ** 12 # 1e18 / 1e6 — v2 β fix | |
| def mul_div(a: int, b: int, d: int) -> int: | |
| return (a * b) // d | |
| def clamp(v: int, lo: int, hi: int) -> int: | |
| return lo if v < lo else hi if v > hi else v | |
| def fill_signal(cumulative_fees_units: int, *, version: str = "v1") -> int: | |
| """Port of ``_fillSignal``. | |
| ``cumulative_fees_units`` is in 6-decimal USDC base units (the exact value | |
| ``BuilderFeeRouter.updateOnFee`` passes to the registry). | |
| * ``v1`` reproduces the deployed bug: ``x = units / FEE_SCALE`` and then | |
| treats ``x`` as 1e18 fixed-point. | |
| * ``v2`` applies the β fix: ``x = (units * 1e12) / FEE_SCALE`` so the input | |
| is correctly rescaled to 1e18 fixed-point first. | |
| """ | |
| if cumulative_fees_units == 0: | |
| return FILL_MIN | |
| if version == "v1": | |
| x = cumulative_fees_units // FEE_SCALE | |
| elif version == "v2": | |
| x = mul_div(cumulative_fees_units, USDC_TO_1E18, FEE_SCALE) | |
| else: | |
| raise ValueError(f"unknown version: {version!r}") | |
| if x >= SAT_X: | |
| return FILL_MAX | |
| if x > ONE: | |
| num = x - ONE | |
| den = SAT_X - ONE | |
| t = mul_div(num, ONE, den) | |
| interp = LN2 + mul_div(2 * ONE - LN2, t, ONE) | |
| return clamp(interp, FILL_MIN, FILL_MAX) | |
| x2 = mul_div(x, x, ONE) | |
| x3 = mul_div(x2, x, ONE) | |
| x4 = mul_div(x3, x, ONE) | |
| pos = x + x3 // 3 | |
| neg = x2 // 2 + x4 // 4 | |
| ln = pos - neg if pos > neg else 0 | |
| return clamp(ln, FILL_MIN, FILL_MAX) | |
| class Rep: | |
| total_bids: int = 0 | |
| total_wins: int = 0 | |
| total_quality_passes: int = 0 | |
| cumulative_fees_units: int = 0 | |
| score: int = 0 # set lazily on first touch via _lazy_init | |
| initialized: bool = False | |
| def recompute(r: Rep, *, version: str = "v1") -> int: | |
| win_rate = ONE if r.total_bids == 0 else mul_div(r.total_wins, ONE, r.total_bids) | |
| quality_rate = ONE if r.total_wins == 0 else mul_div(r.total_quality_passes, ONE, r.total_wins) | |
| fs = fill_signal(r.cumulative_fees_units, version=version) | |
| wq = mul_div(win_rate, quality_rate, ONE) | |
| signal = mul_div(wq, fs, ONE) | |
| decayed = mul_div(r.score, DECAY, ONE) | |
| weighted = mul_div(signal, SIGNAL_W, ONE) | |
| return decayed + weighted | |
| def _lazy_init(r: Rep, *, version: str) -> None: | |
| if not r.initialized: | |
| # α-fix: v1 inits to 1.0 (ONE); v2 inits to 0.5 (HALF). | |
| r.score = ONE if version == "v1" else HALF | |
| r.initialized = True | |
| def update_on_auction(r: Rep, won: bool, *, version: str = "v1") -> None: | |
| _lazy_init(r, version=version) | |
| r.total_bids += 1 | |
| if won: | |
| r.total_wins += 1 | |
| r.score = recompute(r, version=version) | |
| def update_on_quality(r: Rep, passed: bool, *, version: str = "v1") -> None: | |
| _lazy_init(r, version=version) | |
| if passed: | |
| r.total_quality_passes += 1 | |
| r.score = recompute(r, version=version) | |
| def update_on_fee(r: Rep, fee_usdc: float, *, version: str = "v1") -> None: | |
| _lazy_init(r, version=version) | |
| units = int(round(fee_usdc * 10 ** USDC_DECIMALS)) | |
| r.cumulative_fees_units += units | |
| r.score = recompute(r, version=version) | |
| def score_f(score_1e18: int) -> float: | |
| return score_1e18 / ONE | |
| # ----------------------------------------------------------------------------- | |
| # Scenarios | |
| # ----------------------------------------------------------------------------- | |
| def scenario( | |
| label: str, | |
| wins: int, | |
| fee_each: float, | |
| *, | |
| quality_passes_each: bool = True, | |
| version: str = "v1", | |
| ) -> dict: | |
| r = Rep() | |
| for _ in range(wins): | |
| update_on_auction(r, won=True, version=version) | |
| update_on_quality(r, passed=quality_passes_each, version=version) | |
| update_on_fee(r, fee_each, version=version) | |
| return { | |
| "label": label, | |
| "wins": wins, | |
| "fee_each_usdc": fee_each, | |
| "cum_fees_usdc": wins * fee_each, | |
| "win_rate": r.total_wins / max(r.total_bids, 1), | |
| "quality_rate": r.total_quality_passes / max(r.total_wins, 1) if r.total_wins else 1.0, | |
| "fill_signal": score_f(fill_signal(r.cumulative_fees_units, version=version)), | |
| "final_score": score_f(r.score), | |
| } | |
| def reproduce_w9b(*, version: str = "v1") -> dict: | |
| """Reproduce W9-B observed sequence: clean win, quality pass, $0.9 fee.""" | |
| r = Rep() | |
| update_on_auction(r, won=True, version=version) | |
| s1 = score_f(r.score) | |
| update_on_quality(r, passed=True, version=version) | |
| s2 = score_f(r.score) | |
| update_on_fee(r, 0.9, version=version) | |
| s3 = score_f(r.score) | |
| return {"after_auction": s1, "after_quality": s2, "after_fee": s3} | |
| # ----------------------------------------------------------------------------- | |
| # Output helpers | |
| # ----------------------------------------------------------------------------- | |
| SCENARIOS = [ | |
| ("fresh agent, 1 win, $0.9 fee", 1, 0.9), | |
| ("fresh agent, 10 wins, $0.9 each", 10, 0.9), | |
| ("fresh agent, 100 wins, $0.9 each", 100, 0.9), | |
| ("fresh agent, 1 win, $1000 fee", 1, 1000.0), | |
| ("fresh agent, 1 win, $10000 fee", 1, 10000.0), | |
| ("fresh agent, 50 wins, $5 each", 50, 5.0), | |
| ] | |
| def _print_version(version: str) -> None: | |
| print(f"\n=== version: {version} ===") | |
| w9b = reproduce_w9b(version=version) | |
| print(f" W9-B sequence (win + quality-pass + $0.9 fee):") | |
| for k, v in w9b.items(): | |
| print(f" {k:>18}: {v:.6f}") | |
| print(f" scenarios (auction-won + quality-pass + fee each event):") | |
| print(f" {'scenario':<42} {'cumFees':>10} {'fillSig':>9} {'score':>8}") | |
| for label, wins, fee in SCENARIOS: | |
| s = scenario(label, wins, fee, version=version) | |
| print( | |
| f" {s['label']:<42} ${s['cum_fees_usdc']:>8.2f} " | |
| f"{s['fill_signal']:>9.4f} {s['final_score']:>8.4f}" | |
| ) | |
| def _print_side_by_side() -> None: | |
| """Compact v1 vs v2 comparison table — the W14-C-mandated artifact.""" | |
| print() | |
| print("=" * 80) | |
| print("v1 (deployed, buggy) vs v2 (proposed fix) — side-by-side") | |
| print("=" * 80) | |
| rows = [] | |
| rows.append(("W9-B: 1 win + quality + $0.9 fee", | |
| reproduce_w9b(version="v1")["after_fee"], | |
| reproduce_w9b(version="v2")["after_fee"])) | |
| for label, wins, fee in SCENARIOS: | |
| sv1 = scenario(label, wins, fee, version="v1") | |
| sv2 = scenario(label, wins, fee, version="v2") | |
| rows.append((label, sv1["final_score"], sv2["final_score"])) | |
| print(f"{'Scenario':<42} | {'v1 score':>8} | {'v2 score':>8}") | |
| print("-" * 42 + "-+-" + "-" * 9 + "-+-" + "-" * 9) | |
| for label, v1, v2 in rows: | |
| print(f"{label:<42} | {v1:>8.4f} | {v2:>8.4f}") | |
| print() | |
| print("Notes:") | |
| print(" * v1 fillSignal collapses to FILL_MIN=0.5 for any realistic fee") | |
| print(" because USDC 6-decimal units pass through `units/100` and are then") | |
| print(" misinterpreted as a 1e18 fixed-point number (off by 1e12).") | |
| print(" * v1 first touch seeds score=1.0 (max), but the per-event signal is") | |
| print(" bounded around `winRate*qualityRate*0.5 = 0.5` mid-range, so the") | |
| print(" first update strictly *subtracts* (1.0 -> 0.7529 even on a clean win).") | |
| print(" * v2 β-fix rescales fees to 1e18 before the ln() input, so fillSignal") | |
| print(" spans the [0.5, 2.0] band naturally.") | |
| print(" * v2 α-fix seeds first touch at 0.5 (HALF) so the first event nets") | |
| print(" UP for a clean winner instead of dropping from a maxed-out prior.") | |
| # ----------------------------------------------------------------------------- | |
| # Diagnostic: why v1 fillSignal collapses | |
| # ----------------------------------------------------------------------------- | |
| def _print_collapse_diagnostic() -> None: | |
| print("\n[diagnostic] Why v1 fillSignal collapses to 0.5 (FILL_MIN) for demo fees:") | |
| print(" contract does: x = cumFeesUnits / FEE_SCALE(=100)") | |
| print(" then ln(1+x) where it ASSUMES x is 1e18-fixed-point") | |
| print(" but cumFeesUnits is in USDC 6-decimal units (usdc_to_units)") | |
| for fee in [0.9, 9.0, 90.0, 1000.0, 1_000_000.0]: | |
| units = int(fee * 10 ** USDC_DECIMALS) | |
| x_after_scale_v1 = units // FEE_SCALE | |
| x_after_scale_v2 = mul_div(units, USDC_TO_1E18, FEE_SCALE) | |
| x_as_float_v1 = x_after_scale_v1 / ONE | |
| x_as_float_v2 = x_after_scale_v2 / ONE | |
| print( | |
| f" fee=${fee:>10.2f} units={units:>14} " | |
| f"v1 x_post_scale={x_after_scale_v1:>14} (~{x_as_float_v1:.2e}) " | |
| f"v2 x_post_scale={x_after_scale_v2:>14} (~{x_as_float_v2:.4f})" | |
| ) | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description="Simulate ReputationRegistry EMA score (v1=deployed, v2=proposed fix)" | |
| ) | |
| parser.add_argument( | |
| "--version", | |
| choices=("v1", "v2", "both"), | |
| default="both", | |
| help="Which contract version to simulate (default: both — prints side-by-side table).", | |
| ) | |
| args = parser.parse_args() | |
| print("=" * 80) | |
| print("W14-CONTRACT-PREP: EMA reputation simulation") | |
| print(" mirrors contracts/src/ReputationRegistry.sol (v1 + v2)") | |
| print("=" * 80) | |
| if args.version in ("v1", "both"): | |
| _print_version("v1") | |
| if args.version in ("v2", "both"): | |
| _print_version("v2") | |
| if args.version == "both": | |
| _print_side_by_side() | |
| _print_collapse_diagnostic() | |
| if __name__ == "__main__": | |
| main() | |