contract-guard / engine /server.py
eltociear's picture
feat: contract-guard x402 v2 endpoint — EVM contract/token risk check
362b363
Raw
History Blame Contribute Delete
7.97 kB
#!/usr/bin/env python3
"""contract-guard engine — pre-interaction risk signals for an EVM contract/token.
Pure JSON-RPC (no signing, no upstream cost). Given a contract address it reports:
- is it a contract (eth_getCode) or an EOA / self-destructed
- upgradeable proxy detection (EIP-1967 implementation + admin slots)
- ERC20 metadata (name / symbol / decimals / totalSupply via eth_call)
- a risk score + human-readable flags an agent can act on before interacting
Why agents pay for this: the on-chain-data category is the busiest on the x402
discovery layer, but it's all raw RPC proxies. This adds a security verdict
(mutable-logic proxy = rug vector, non-standard token, dead address) on top.
"""
import json
import urllib.request
RPCS = {
"base": ["https://mainnet.base.org", "https://base-rpc.publicnode.com", "https://base.drpc.org"],
"ethereum": ["https://ethereum-rpc.publicnode.com", "https://eth.drpc.org"],
}
CHAIN_IDS = {"base": 8453, "ethereum": 1}
# EIP-1967 storage slots (+ legacy zeppelinos slot used by older proxies e.g. USDC)
SLOT_IMPL = "0x360894a13ba1a3210667c828492db98dca3e2076cc3735a920a3ca505d382bbc"
SLOT_ADMIN = "0xb53127684a568b3173ae13b9f8a6016e243e63b6e8ee1178d6a717850b5d6103"
SLOT_LEGACY = "0x7050c9e0f4ca769c69bd3a8ef740bc37934f8e2c036e5a723fd8ee048ed3f8c3"
# Common ERC20 view selectors
SEL = {
"name": "0x06fdde03",
"symbol": "0x95d89b41",
"decimals": "0x313ce567",
"totalSupply": "0x18160ddd",
}
def _rpc(rpc_urls, method, params):
if isinstance(rpc_urls, str):
rpc_urls = [rpc_urls]
body = json.dumps({"jsonrpc": "2.0", "id": 1, "method": method, "params": params}).encode()
last = None
for url in rpc_urls:
try:
req = urllib.request.Request(url, data=body, headers={
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (contract-guard x402)",
}, method="POST")
with urllib.request.urlopen(req, timeout=12) as r:
d = json.loads(r.read().decode())
if "error" in d:
last = RuntimeError(d["error"].get("message", "rpc error"))
continue
return d.get("result")
except Exception as e:
last = e
continue
if last:
raise last
return None
def _eth_call(rpc_url, to, data):
try:
return _rpc(rpc_url, "eth_call", [{"to": to, "data": data}, "latest"])
except Exception:
return None
def _dec_uint(hexstr):
if not hexstr or hexstr == "0x":
return None
try:
return int(hexstr, 16)
except Exception:
return None
def _dec_string(hexstr):
"""Decode an ABI string return, falling back to bytes32-style tokens."""
if not hexstr or hexstr == "0x":
return None
raw = bytes.fromhex(hexstr[2:])
# dynamic string: [offset(32)][length(32)][data]
if len(raw) >= 64:
try:
length = int.from_bytes(raw[32:64], "big")
if 0 < length <= len(raw) - 64:
s = raw[64:64 + length].decode("utf-8", "replace").strip("\x00")
if s:
return s
except Exception:
pass
# bytes32 fallback (non-standard tokens)
s = raw.rstrip(b"\x00").decode("utf-8", "replace").strip("\x00")
return s or None
def _addr_from_slot(hexstr):
if not hexstr:
return None
h = hexstr[2:].rjust(64, "0")
addr = "0x" + h[-40:]
return None if int(addr, 16) == 0 else addr.lower()
def analyze(address, chain="base"):
chain = (chain or "base").lower()
if chain not in RPCS:
return {"error": "unsupported chain '%s' (use: %s)" % (chain, ", ".join(RPCS))}
a = (address or "").strip()
if not (a.startswith("0x") and len(a) == 42):
try:
int(a, 16)
except Exception:
return {"error": "invalid EVM address"}
return {"error": "invalid EVM address (expected 0x + 40 hex)"}
rpc = RPCS[chain]
flags = []
score = 0
code = _rpc(rpc, "eth_getCode", [a, "latest"])
is_contract = bool(code) and code != "0x"
bytecode_len = (len(code) - 2) // 2 if is_contract else 0
if not is_contract:
return {
"address": a, "chain": chain, "chain_id": CHAIN_IDS[chain],
"is_contract": False,
"risk_level": "NOT_A_CONTRACT",
"risk_score": 0,
"flags": ["Address has no bytecode — it is an EOA (wallet) or a self-destructed contract, not a token/contract."],
"summary": "Not a contract.",
}
# EIP-7702 delegated EOA: bytecode is exactly 0xef0100 + 20-byte delegate address
if code[:8].lower() == "0xef0100":
delegate = "0x" + code[8:48].lower()
return {
"address": a, "chain": chain, "chain_id": CHAIN_IDS[chain],
"is_contract": False,
"eip7702_delegated": True,
"delegate": delegate,
"risk_level": "HIGH",
"risk_score": 45,
"flags": ["EIP-7702 delegated EOA — this wallet has set code delegating control to contract %s. The delegate contract can move assets per the wallet's authorization; treat as smart-account, not a plain wallet." % delegate],
"summary": "HIGH — EIP-7702 delegated EOA -> %s" % delegate,
}
# Proxy detection: EIP-1967 (impl + admin) and legacy zeppelinos slot
impl = _addr_from_slot(_rpc(rpc, "eth_getStorageAt", [a, SLOT_IMPL, "latest"]))
admin = _addr_from_slot(_rpc(rpc, "eth_getStorageAt", [a, SLOT_ADMIN, "latest"]))
if impl is None:
impl = _addr_from_slot(_rpc(rpc, "eth_getStorageAt", [a, SLOT_LEGACY, "latest"]))
is_proxy = impl is not None
if is_proxy:
score += 40
flags.append("Upgradeable proxy (EIP-1967): the admin can swap the implementation, so token/transfer logic can change after you interact. impl=%s%s" % (impl, (" admin=%s" % admin) if admin else ""))
# ERC20 metadata
name = _dec_string(_eth_call(rpc, a, SEL["name"]))
symbol = _dec_string(_eth_call(rpc, a, SEL["symbol"]))
decimals = _dec_uint(_eth_call(rpc, a, SEL["decimals"]))
supply = _dec_uint(_eth_call(rpc, a, SEL["totalSupply"]))
looks_erc20 = symbol is not None and decimals is not None
if not looks_erc20:
score += 15
flags.append("No standard ERC20 metadata (symbol/decimals) — non-standard token or a non-token contract; verify intent before approving/swapping.")
if bytecode_len < 200:
score += 10
flags.append("Very small bytecode (%d bytes) — minimal/forwarder contract; confirm it does what you expect." % bytecode_len)
score = min(100, score)
level = "CRITICAL" if score >= 50 else "HIGH" if score >= 40 else "MEDIUM" if score >= 15 else "LOW" if score > 0 else "OK"
return {
"address": a, "chain": chain, "chain_id": CHAIN_IDS[chain],
"is_contract": True,
"bytecode_bytes": bytecode_len,
"is_proxy": is_proxy,
"implementation": impl,
"admin": admin,
"token": {"name": name, "symbol": symbol, "decimals": decimals, "total_supply": str(supply) if supply is not None else None},
"looks_erc20": looks_erc20,
"risk_level": level,
"risk_score": score,
"flags": flags or ["No elevated risk signals from on-chain checks. (Not a substitute for a full audit.)"],
"summary": "%s — %s%s" % (
level,
("proxy " if is_proxy else "") + (("%s (%s)" % (name, symbol)) if symbol else "contract"),
" | %d risk flag(s)" % len(flags) if flags else "",
),
}
if __name__ == "__main__":
import sys
addr = sys.argv[1] if len(sys.argv) > 1 else "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913"
chain = sys.argv[2] if len(sys.argv) > 2 else "base"
print(json.dumps(analyze(addr, chain), indent=2))