""" data_pipeline.py Feature engineering on REAL MEV-Boost relay data. Data source: public relay APIs (Flashbots, Ultrasound, Agnostic, Aestus, Titan). All free, no API key needed. What we actually detect (honestly): - Anomalous builder payment patterns (unusually high/low proposer payments) - Builder market concentration shifts - Gas utilization anomalies (blocks using abnormally high/low gas) - Cross-relay visibility gaps (blocks seen on fewer relays than expected) - Potential censorship signals (abnormally low tx count for gas used) What we do NOT claim to detect: - Individual sandwich attacks (requires mempool + trace data) - Specific arbitrage transactions (requires decoded calldata) - Flashloan-based MEV (requires internal transaction traces) """ import os import sqlite3 import logging from typing import Optional import pandas as pd import numpy as np from scripts.relay_collector import ( fetch_all_relays, deduplicate_payloads, identify_builder, RELAYS, ) log = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") DB_PATH = os.path.join(os.path.dirname(__file__), "..", "data", "mev_blocks.db") # ─── Convert relay data → DataFrame ─────────────────────────────────────── def collect_relay_data( limit_per_relay: int = 100, relays: Optional[dict[str, str]] = None, ) -> pd.DataFrame: """ Fetch from all relays, deduplicate, return DataFrame. Each row = one unique block delivered via MEV-Boost. """ raw_payloads = fetch_all_relays( limit_per_relay=limit_per_relay, relays=relays, ) if not raw_payloads: log.warning("No payloads fetched from any relay") return pd.DataFrame() deduped = deduplicate_payloads(raw_payloads) df = pd.DataFrame(deduped) # Convert relays_seen list to comma-separated string for storage if "relays_seen" in df.columns: df["relays_seen"] = df["relays_seen"].apply(lambda x: ",".join(x) if isinstance(x, list) else x) log.info(f"Collected {len(df)} unique blocks from relay data") return df # ─── Feature engineering ────────────────────────────────────────────────── def engineer_features(df: pd.DataFrame) -> pd.DataFrame: """ Engineer features from REAL relay data. Every feature is derived from actual on-chain/relay observations. """ df = df.copy() # ── Payment features ── df["log_value_eth"] = np.log1p(df["value_eth"]) # ── Gas features ── df["gas_utilization"] = df["gas_used"] / (df["gas_limit"] + 1) df["log_gas_used"] = np.log1p(df["gas_used"].astype(float)) df["tx_per_gas"] = df["num_tx"] / (df["gas_used"] / 21_000 + 1) # ── Value efficiency ── df["value_per_gas"] = df["value_eth"] / (df["gas_used"] + 1) df["value_per_tx"] = df["value_eth"] / (df["num_tx"] + 1) # ── Relay visibility ── total_relays = len(RELAYS) df["relay_coverage"] = df["relay_count"] / total_relays # ── Builder concentration ── builder_counts = df["builder_name"].value_counts() total_blocks = len(df) df["builder_block_share"] = df["builder_name"].map( lambda b: builder_counts.get(b, 0) / total_blocks ) # ── Transaction density ── df["log_num_tx"] = np.log1p(df["num_tx"].astype(float)) # ── Relative payment (z-score within batch) ── mean_val = df["value_eth"].mean() std_val = df["value_eth"].std() df["payment_zscore"] = (df["value_eth"] - mean_val) / (std_val + 1e-9) # ── Censorship signal ── # Low tx count relative to gas used could indicate tx filtering expected_tx = df["gas_used"] / 60_000 actual_tx = df["num_tx"].astype(float) df["tx_deficit_ratio"] = (expected_tx - actual_tx) / (expected_tx + 1) df["tx_deficit_ratio"] = df["tx_deficit_ratio"].clip(-1, 1) return df # ─── Feature columns for ML ────────────────────────────────────────────── FEATURE_COLS = [ "log_value_eth", "gas_utilization", "log_gas_used", "tx_per_gas", "value_per_gas", "value_per_tx", "relay_coverage", "builder_block_share", "log_num_tx", "payment_zscore", "tx_deficit_ratio", ] def get_feature_matrix(df: pd.DataFrame) -> pd.DataFrame: available = [c for c in FEATURE_COLS if c in df.columns] return df[available].fillna(0) # ─── SQLite storage ─────────────────────────────────────────────────────── def save_to_db(df: pd.DataFrame, db_path: str = DB_PATH) -> None: os.makedirs(os.path.dirname(db_path), exist_ok=True) conn = sqlite3.connect(db_path) df.to_sql("blocks", conn, if_exists="replace", index=False) conn.close() log.info(f"Saved {len(df)} block records to {db_path}") def load_from_db(db_path: str = DB_PATH) -> pd.DataFrame: if not os.path.exists(db_path): raise FileNotFoundError(f"No database at {db_path}") conn = sqlite3.connect(db_path) df = pd.read_sql("SELECT * FROM blocks", conn) conn.close() log.info(f"Loaded {len(df)} block records from {db_path}") return df # ─── Main pipeline ──────────────────────────────────────────────────────── def run_pipeline(limit_per_relay: int = 100) -> pd.DataFrame: """ Full pipeline: fetch relay data → engineer features → save to DB. No synthetic data. If relays are unreachable, returns empty DataFrame. """ log.info("Fetching data from MEV-Boost relays...") df_raw = collect_relay_data(limit_per_relay=limit_per_relay) if df_raw.empty: log.warning("No relay data collected — pipeline returns empty DataFrame") return df_raw df = engineer_features(df_raw) save_to_db(df) log.info(f"Pipeline complete: {len(df)} blocks, {len(FEATURE_COLS)} features") return df if __name__ == "__main__": df = run_pipeline(limit_per_relay=100) if not df.empty: print(f"\nDataset shape: {df.shape}") print(f"\nBuilder distribution:\n{df['builder_name'].value_counts().head(10)}") print(f"\nPayment stats (ETH):\n{df['value_eth'].describe()}") else: print("No data collected — check network connectivity")