Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """contract-guard engine — pre-interaction risk signals for an EVM contract/token. | |
| Pure JSON-RPC (no signing, no upstream cost). Given a contract address it reports: | |
| - is it a contract (eth_getCode) or an EOA / self-destructed | |
| - upgradeable proxy detection (EIP-1967 implementation + admin slots) | |
| - ERC20 metadata (name / symbol / decimals / totalSupply via eth_call) | |
| - a risk score + human-readable flags an agent can act on before interacting | |
| Why agents pay for this: the on-chain-data category is the busiest on the x402 | |
| discovery layer, but it's all raw RPC proxies. This adds a security verdict | |
| (mutable-logic proxy = rug vector, non-standard token, dead address) on top. | |
| """ | |
| import json | |
| import urllib.request | |
| RPCS = { | |
| "base": ["https://mainnet.base.org", "https://base-rpc.publicnode.com", "https://base.drpc.org"], | |
| "ethereum": ["https://ethereum-rpc.publicnode.com", "https://eth.drpc.org"], | |
| } | |
| CHAIN_IDS = {"base": 8453, "ethereum": 1} | |
| # EIP-1967 storage slots (+ legacy zeppelinos slot used by older proxies e.g. USDC) | |
| SLOT_IMPL = "0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc" | |
| SLOT_ADMIN = "0xb53127684a568b3173ae13b9f8a6016e243e63b6e8ee1178d6a717850b5d6103" | |
| SLOT_LEGACY = "0x7050c9e0f4ca769c69bd3a8ef740bc37934f8e2c036e5a723fd8ee048ed3f8c3" | |
| # Common ERC20 view selectors | |
| SEL = { | |
| "name": "0x06fdde03", | |
| "symbol": "0x95d89b41", | |
| "decimals": "0x313ce567", | |
| "totalSupply": "0x18160ddd", | |
| } | |
| def _rpc(rpc_urls, method, params): | |
| if isinstance(rpc_urls, str): | |
| rpc_urls = [rpc_urls] | |
| body = json.dumps({"jsonrpc": "2.0", "id": 1, "method": method, "params": params}).encode() | |
| last = None | |
| for url in rpc_urls: | |
| try: | |
| req = urllib.request.Request(url, data=body, headers={ | |
| "Content-Type": "application/json", | |
| "User-Agent": "Mozilla/5.0 (contract-guard x402)", | |
| }, method="POST") | |
| with urllib.request.urlopen(req, timeout=12) as r: | |
| d = json.loads(r.read().decode()) | |
| if "error" in d: | |
| last = RuntimeError(d["error"].get("message", "rpc error")) | |
| continue | |
| return d.get("result") | |
| except Exception as e: | |
| last = e | |
| continue | |
| if last: | |
| raise last | |
| return None | |
| def _eth_call(rpc_url, to, data): | |
| try: | |
| return _rpc(rpc_url, "eth_call", [{"to": to, "data": data}, "latest"]) | |
| except Exception: | |
| return None | |
| def _dec_uint(hexstr): | |
| if not hexstr or hexstr == "0x": | |
| return None | |
| try: | |
| return int(hexstr, 16) | |
| except Exception: | |
| return None | |
| def _dec_string(hexstr): | |
| """Decode an ABI string return, falling back to bytes32-style tokens.""" | |
| if not hexstr or hexstr == "0x": | |
| return None | |
| raw = bytes.fromhex(hexstr[2:]) | |
| # dynamic string: [offset(32)][length(32)][data] | |
| if len(raw) >= 64: | |
| try: | |
| length = int.from_bytes(raw[32:64], "big") | |
| if 0 < length <= len(raw) - 64: | |
| s = raw[64:64 + length].decode("utf-8", "replace").strip("\x00") | |
| if s: | |
| return s | |
| except Exception: | |
| pass | |
| # bytes32 fallback (non-standard tokens) | |
| s = raw.rstrip(b"\x00").decode("utf-8", "replace").strip("\x00") | |
| return s or None | |
| def _addr_from_slot(hexstr): | |
| if not hexstr: | |
| return None | |
| h = hexstr[2:].rjust(64, "0") | |
| addr = "0x" + h[-40:] | |
| return None if int(addr, 16) == 0 else addr.lower() | |
| def analyze(address, chain="base"): | |
| chain = (chain or "base").lower() | |
| if chain not in RPCS: | |
| return {"error": "unsupported chain '%s' (use: %s)" % (chain, ", ".join(RPCS))} | |
| a = (address or "").strip() | |
| if not (a.startswith("0x") and len(a) == 42): | |
| try: | |
| int(a, 16) | |
| except Exception: | |
| return {"error": "invalid EVM address"} | |
| return {"error": "invalid EVM address (expected 0x + 40 hex)"} | |
| rpc = RPCS[chain] | |
| flags = [] | |
| score = 0 | |
| code = _rpc(rpc, "eth_getCode", [a, "latest"]) | |
| is_contract = bool(code) and code != "0x" | |
| bytecode_len = (len(code) - 2) // 2 if is_contract else 0 | |
| if not is_contract: | |
| return { | |
| "address": a, "chain": chain, "chain_id": CHAIN_IDS[chain], | |
| "is_contract": False, | |
| "risk_level": "NOT_A_CONTRACT", | |
| "risk_score": 0, | |
| "flags": ["Address has no bytecode — it is an EOA (wallet) or a self-destructed contract, not a token/contract."], | |
| "summary": "Not a contract.", | |
| } | |
| # EIP-7702 delegated EOA: bytecode is exactly 0xef0100 + 20-byte delegate address | |
| if code[:8].lower() == "0xef0100": | |
| delegate = "0x" + code[8:48].lower() | |
| return { | |
| "address": a, "chain": chain, "chain_id": CHAIN_IDS[chain], | |
| "is_contract": False, | |
| "eip7702_delegated": True, | |
| "delegate": delegate, | |
| "risk_level": "HIGH", | |
| "risk_score": 45, | |
| "flags": ["EIP-7702 delegated EOA — this wallet has set code delegating control to contract %s. The delegate contract can move assets per the wallet's authorization; treat as smart-account, not a plain wallet." % delegate], | |
| "summary": "HIGH — EIP-7702 delegated EOA -> %s" % delegate, | |
| } | |
| # Proxy detection: EIP-1967 (impl + admin) and legacy zeppelinos slot | |
| impl = _addr_from_slot(_rpc(rpc, "eth_getStorageAt", [a, SLOT_IMPL, "latest"])) | |
| admin = _addr_from_slot(_rpc(rpc, "eth_getStorageAt", [a, SLOT_ADMIN, "latest"])) | |
| if impl is None: | |
| impl = _addr_from_slot(_rpc(rpc, "eth_getStorageAt", [a, SLOT_LEGACY, "latest"])) | |
| is_proxy = impl is not None | |
| if is_proxy: | |
| score += 40 | |
| flags.append("Upgradeable proxy (EIP-1967): the admin can swap the implementation, so token/transfer logic can change after you interact. impl=%s%s" % (impl, (" admin=%s" % admin) if admin else "")) | |
| # ERC20 metadata | |
| name = _dec_string(_eth_call(rpc, a, SEL["name"])) | |
| symbol = _dec_string(_eth_call(rpc, a, SEL["symbol"])) | |
| decimals = _dec_uint(_eth_call(rpc, a, SEL["decimals"])) | |
| supply = _dec_uint(_eth_call(rpc, a, SEL["totalSupply"])) | |
| looks_erc20 = symbol is not None and decimals is not None | |
| if not looks_erc20: | |
| score += 15 | |
| flags.append("No standard ERC20 metadata (symbol/decimals) — non-standard token or a non-token contract; verify intent before approving/swapping.") | |
| if bytecode_len < 200: | |
| score += 10 | |
| flags.append("Very small bytecode (%d bytes) — minimal/forwarder contract; confirm it does what you expect." % bytecode_len) | |
| score = min(100, score) | |
| level = "CRITICAL" if score >= 50 else "HIGH" if score >= 40 else "MEDIUM" if score >= 15 else "LOW" if score > 0 else "OK" | |
| return { | |
| "address": a, "chain": chain, "chain_id": CHAIN_IDS[chain], | |
| "is_contract": True, | |
| "bytecode_bytes": bytecode_len, | |
| "is_proxy": is_proxy, | |
| "implementation": impl, | |
| "admin": admin, | |
| "token": {"name": name, "symbol": symbol, "decimals": decimals, "total_supply": str(supply) if supply is not None else None}, | |
| "looks_erc20": looks_erc20, | |
| "risk_level": level, | |
| "risk_score": score, | |
| "flags": flags or ["No elevated risk signals from on-chain checks. (Not a substitute for a full audit.)"], | |
| "summary": "%s — %s%s" % ( | |
| level, | |
| ("proxy " if is_proxy else "") + (("%s (%s)" % (name, symbol)) if symbol else "contract"), | |
| " | %d risk flag(s)" % len(flags) if flags else "", | |
| ), | |
| } | |
| if __name__ == "__main__": | |
| import sys | |
| addr = sys.argv[1] if len(sys.argv) > 1 else "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" | |
| chain = sys.argv[2] if len(sys.argv) > 2 else "base" | |
| print(json.dumps(analyze(addr, chain), indent=2)) | |