on-chain-anomaly / scripts /data_pipeline.py
robrtt's picture
first commit
63105b7
"""
data_pipeline.py
Feature engineering on REAL MEV-Boost relay data.
Data source: public relay APIs (Flashbots, Ultrasound, Agnostic, Aestus, Titan).
All free, no API key needed.
What we actually detect (honestly):
- Anomalous builder payment patterns (unusually high/low proposer payments)
- Builder market concentration shifts
- Gas utilization anomalies (blocks using abnormally high/low gas)
- Cross-relay visibility gaps (blocks seen on fewer relays than expected)
- Potential censorship signals (abnormally low tx count for gas used)
What we do NOT claim to detect:
- Individual sandwich attacks (requires mempool + trace data)
- Specific arbitrage transactions (requires decoded calldata)
- Flashloan-based MEV (requires internal transaction traces)
"""
import os
import sqlite3
import logging
from typing import Optional
import pandas as pd
import numpy as np
from scripts.relay_collector import (
fetch_all_relays,
deduplicate_payloads,
identify_builder,
RELAYS,
)
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
DB_PATH = os.path.join(os.path.dirname(__file__), "..", "data", "mev_blocks.db")
# ─── Convert relay data β†’ DataFrame ───────────────────────────────────────
def collect_relay_data(
limit_per_relay: int = 100,
relays: Optional[dict[str, str]] = None,
) -> pd.DataFrame:
"""
Fetch from all relays, deduplicate, return DataFrame.
Each row = one unique block delivered via MEV-Boost.
"""
raw_payloads = fetch_all_relays(
limit_per_relay=limit_per_relay,
relays=relays,
)
if not raw_payloads:
log.warning("No payloads fetched from any relay")
return pd.DataFrame()
deduped = deduplicate_payloads(raw_payloads)
df = pd.DataFrame(deduped)
# Convert relays_seen list to comma-separated string for storage
if "relays_seen" in df.columns:
df["relays_seen"] = df["relays_seen"].apply(lambda x: ",".join(x) if isinstance(x, list) else x)
log.info(f"Collected {len(df)} unique blocks from relay data")
return df
# ─── Feature engineering ──────────────────────────────────────────────────
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Engineer features from REAL relay data.
Every feature is derived from actual on-chain/relay observations.
"""
df = df.copy()
# ── Payment features ──
df["log_value_eth"] = np.log1p(df["value_eth"])
# ── Gas features ──
df["gas_utilization"] = df["gas_used"] / (df["gas_limit"] + 1)
df["log_gas_used"] = np.log1p(df["gas_used"].astype(float))
df["tx_per_gas"] = df["num_tx"] / (df["gas_used"] / 21_000 + 1)
# ── Value efficiency ──
df["value_per_gas"] = df["value_eth"] / (df["gas_used"] + 1)
df["value_per_tx"] = df["value_eth"] / (df["num_tx"] + 1)
# ── Relay visibility ──
total_relays = len(RELAYS)
df["relay_coverage"] = df["relay_count"] / total_relays
# ── Builder concentration ──
builder_counts = df["builder_name"].value_counts()
total_blocks = len(df)
df["builder_block_share"] = df["builder_name"].map(
lambda b: builder_counts.get(b, 0) / total_blocks
)
# ── Transaction density ──
df["log_num_tx"] = np.log1p(df["num_tx"].astype(float))
# ── Relative payment (z-score within batch) ──
mean_val = df["value_eth"].mean()
std_val = df["value_eth"].std()
df["payment_zscore"] = (df["value_eth"] - mean_val) / (std_val + 1e-9)
# ── Censorship signal ──
# Low tx count relative to gas used could indicate tx filtering
expected_tx = df["gas_used"] / 60_000
actual_tx = df["num_tx"].astype(float)
df["tx_deficit_ratio"] = (expected_tx - actual_tx) / (expected_tx + 1)
df["tx_deficit_ratio"] = df["tx_deficit_ratio"].clip(-1, 1)
return df
# ─── Feature columns for ML ──────────────────────────────────────────────
FEATURE_COLS = [
"log_value_eth",
"gas_utilization",
"log_gas_used",
"tx_per_gas",
"value_per_gas",
"value_per_tx",
"relay_coverage",
"builder_block_share",
"log_num_tx",
"payment_zscore",
"tx_deficit_ratio",
]
def get_feature_matrix(df: pd.DataFrame) -> pd.DataFrame:
available = [c for c in FEATURE_COLS if c in df.columns]
return df[available].fillna(0)
# ─── SQLite storage ───────────────────────────────────────────────────────
def save_to_db(df: pd.DataFrame, db_path: str = DB_PATH) -> None:
os.makedirs(os.path.dirname(db_path), exist_ok=True)
conn = sqlite3.connect(db_path)
df.to_sql("blocks", conn, if_exists="replace", index=False)
conn.close()
log.info(f"Saved {len(df)} block records to {db_path}")
def load_from_db(db_path: str = DB_PATH) -> pd.DataFrame:
if not os.path.exists(db_path):
raise FileNotFoundError(f"No database at {db_path}")
conn = sqlite3.connect(db_path)
df = pd.read_sql("SELECT * FROM blocks", conn)
conn.close()
log.info(f"Loaded {len(df)} block records from {db_path}")
return df
# ─── Main pipeline ────────────────────────────────────────────────────────
def run_pipeline(limit_per_relay: int = 100) -> pd.DataFrame:
"""
Full pipeline: fetch relay data β†’ engineer features β†’ save to DB.
No synthetic data. If relays are unreachable, returns empty DataFrame.
"""
log.info("Fetching data from MEV-Boost relays...")
df_raw = collect_relay_data(limit_per_relay=limit_per_relay)
if df_raw.empty:
log.warning("No relay data collected β€” pipeline returns empty DataFrame")
return df_raw
df = engineer_features(df_raw)
save_to_db(df)
log.info(f"Pipeline complete: {len(df)} blocks, {len(FEATURE_COLS)} features")
return df
if __name__ == "__main__":
df = run_pipeline(limit_per_relay=100)
if not df.empty:
print(f"\nDataset shape: {df.shape}")
print(f"\nBuilder distribution:\n{df['builder_name'].value_counts().head(10)}")
print(f"\nPayment stats (ETH):\n{df['value_eth'].describe()}")
else:
print("No data collected β€” check network connectivity")