Spaces:
Sleeping
Sleeping
| """ | |
| data_pipeline.py | |
| Feature engineering on REAL MEV-Boost relay data. | |
| Data source: public relay APIs (Flashbots, Ultrasound, Agnostic, Aestus, Titan). | |
| All free, no API key needed. | |
| What we actually detect (honestly): | |
| - Anomalous builder payment patterns (unusually high/low proposer payments) | |
| - Builder market concentration shifts | |
| - Gas utilization anomalies (blocks using abnormally high/low gas) | |
| - Cross-relay visibility gaps (blocks seen on fewer relays than expected) | |
| - Potential censorship signals (abnormally low tx count for gas used) | |
| What we do NOT claim to detect: | |
| - Individual sandwich attacks (requires mempool + trace data) | |
| - Specific arbitrage transactions (requires decoded calldata) | |
| - Flashloan-based MEV (requires internal transaction traces) | |
| """ | |
| import os | |
| import sqlite3 | |
| import logging | |
| from typing import Optional | |
| import pandas as pd | |
| import numpy as np | |
| from scripts.relay_collector import ( | |
| fetch_all_relays, | |
| deduplicate_payloads, | |
| identify_builder, | |
| RELAYS, | |
| ) | |
| log = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
| DB_PATH = os.path.join(os.path.dirname(__file__), "..", "data", "mev_blocks.db") | |
| # βββ Convert relay data β DataFrame βββββββββββββββββββββββββββββββββββββββ | |
| def collect_relay_data( | |
| limit_per_relay: int = 100, | |
| relays: Optional[dict[str, str]] = None, | |
| ) -> pd.DataFrame: | |
| """ | |
| Fetch from all relays, deduplicate, return DataFrame. | |
| Each row = one unique block delivered via MEV-Boost. | |
| """ | |
| raw_payloads = fetch_all_relays( | |
| limit_per_relay=limit_per_relay, | |
| relays=relays, | |
| ) | |
| if not raw_payloads: | |
| log.warning("No payloads fetched from any relay") | |
| return pd.DataFrame() | |
| deduped = deduplicate_payloads(raw_payloads) | |
| df = pd.DataFrame(deduped) | |
| # Convert relays_seen list to comma-separated string for storage | |
| if "relays_seen" in df.columns: | |
| df["relays_seen"] = df["relays_seen"].apply(lambda x: ",".join(x) if isinstance(x, list) else x) | |
| log.info(f"Collected {len(df)} unique blocks from relay data") | |
| return df | |
| # βββ Feature engineering ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def engineer_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Engineer features from REAL relay data. | |
| Every feature is derived from actual on-chain/relay observations. | |
| """ | |
| df = df.copy() | |
| # ββ Payment features ββ | |
| df["log_value_eth"] = np.log1p(df["value_eth"]) | |
| # ββ Gas features ββ | |
| df["gas_utilization"] = df["gas_used"] / (df["gas_limit"] + 1) | |
| df["log_gas_used"] = np.log1p(df["gas_used"].astype(float)) | |
| df["tx_per_gas"] = df["num_tx"] / (df["gas_used"] / 21_000 + 1) | |
| # ββ Value efficiency ββ | |
| df["value_per_gas"] = df["value_eth"] / (df["gas_used"] + 1) | |
| df["value_per_tx"] = df["value_eth"] / (df["num_tx"] + 1) | |
| # ββ Relay visibility ββ | |
| total_relays = len(RELAYS) | |
| df["relay_coverage"] = df["relay_count"] / total_relays | |
| # ββ Builder concentration ββ | |
| builder_counts = df["builder_name"].value_counts() | |
| total_blocks = len(df) | |
| df["builder_block_share"] = df["builder_name"].map( | |
| lambda b: builder_counts.get(b, 0) / total_blocks | |
| ) | |
| # ββ Transaction density ββ | |
| df["log_num_tx"] = np.log1p(df["num_tx"].astype(float)) | |
| # ββ Relative payment (z-score within batch) ββ | |
| mean_val = df["value_eth"].mean() | |
| std_val = df["value_eth"].std() | |
| df["payment_zscore"] = (df["value_eth"] - mean_val) / (std_val + 1e-9) | |
| # ββ Censorship signal ββ | |
| # Low tx count relative to gas used could indicate tx filtering | |
| expected_tx = df["gas_used"] / 60_000 | |
| actual_tx = df["num_tx"].astype(float) | |
| df["tx_deficit_ratio"] = (expected_tx - actual_tx) / (expected_tx + 1) | |
| df["tx_deficit_ratio"] = df["tx_deficit_ratio"].clip(-1, 1) | |
| return df | |
| # βββ Feature columns for ML ββββββββββββββββββββββββββββββββββββββββββββββ | |
| FEATURE_COLS = [ | |
| "log_value_eth", | |
| "gas_utilization", | |
| "log_gas_used", | |
| "tx_per_gas", | |
| "value_per_gas", | |
| "value_per_tx", | |
| "relay_coverage", | |
| "builder_block_share", | |
| "log_num_tx", | |
| "payment_zscore", | |
| "tx_deficit_ratio", | |
| ] | |
| def get_feature_matrix(df: pd.DataFrame) -> pd.DataFrame: | |
| available = [c for c in FEATURE_COLS if c in df.columns] | |
| return df[available].fillna(0) | |
| # βββ SQLite storage βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def save_to_db(df: pd.DataFrame, db_path: str = DB_PATH) -> None: | |
| os.makedirs(os.path.dirname(db_path), exist_ok=True) | |
| conn = sqlite3.connect(db_path) | |
| df.to_sql("blocks", conn, if_exists="replace", index=False) | |
| conn.close() | |
| log.info(f"Saved {len(df)} block records to {db_path}") | |
| def load_from_db(db_path: str = DB_PATH) -> pd.DataFrame: | |
| if not os.path.exists(db_path): | |
| raise FileNotFoundError(f"No database at {db_path}") | |
| conn = sqlite3.connect(db_path) | |
| df = pd.read_sql("SELECT * FROM blocks", conn) | |
| conn.close() | |
| log.info(f"Loaded {len(df)} block records from {db_path}") | |
| return df | |
| # βββ Main pipeline ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_pipeline(limit_per_relay: int = 100) -> pd.DataFrame: | |
| """ | |
| Full pipeline: fetch relay data β engineer features β save to DB. | |
| No synthetic data. If relays are unreachable, returns empty DataFrame. | |
| """ | |
| log.info("Fetching data from MEV-Boost relays...") | |
| df_raw = collect_relay_data(limit_per_relay=limit_per_relay) | |
| if df_raw.empty: | |
| log.warning("No relay data collected β pipeline returns empty DataFrame") | |
| return df_raw | |
| df = engineer_features(df_raw) | |
| save_to_db(df) | |
| log.info(f"Pipeline complete: {len(df)} blocks, {len(FEATURE_COLS)} features") | |
| return df | |
| if __name__ == "__main__": | |
| df = run_pipeline(limit_per_relay=100) | |
| if not df.empty: | |
| print(f"\nDataset shape: {df.shape}") | |
| print(f"\nBuilder distribution:\n{df['builder_name'].value_counts().head(10)}") | |
| print(f"\nPayment stats (ETH):\n{df['value_eth'].describe()}") | |
| else: | |
| print("No data collected β check network connectivity") | |