| |
| """ |
| ETL Pipeline: Download epoch Parquet files, ingest into ClickHouse, and delete local files. |
| |
| Usage: |
| python scripts/ingest_epoch.py --epoch 851 |
| |
| Environment Variables: |
| HF_TOKEN: Hugging Face token for downloading private datasets. |
| CLICKHOUSE_HOST, CLICKHOUSE_HTTP_PORT (or legacy CLICKHOUSE_PORT), CLICKHOUSE_NATIVE_PORT, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD, CLICKHOUSE_DATABASE |
| NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD, NEO4J_MERGE_BATCH_SIZE |
| NEO4J_MERGE_BOLT_PORT, NEO4J_MERGE_HTTP_PORT, NEO4J_MERGE_TEMP_ROOT |
| """ |
|
|
| import argparse |
| import os |
| import sys |
| import time |
| from pathlib import Path |
|
|
| import clickhouse_connect |
| from huggingface_hub import snapshot_download |
| from tqdm import tqdm |
|
|
| |
| REPO_ID = "zirobtc/pump-fun-dataset" |
| REPO_TYPE = "model" |
| DEFAULT_DEST_DIR = "./data/pump_fun" |
| DEFAULT_SCHEMA_FILE = "./onchain.sql" |
| CLICKHOUSE_INSERT_SETTINGS = "max_insert_threads=1,max_block_size=65536" |
| NEO4J_TARGET_DB = "neo4j" |
| NEO4J_TEMP_DB_PREFIX = "epoch" |
|
|
| |
| |
| PARQUET_TABLE_MAP = { |
| "wallet_profiles": "wallet_profiles", |
| "wallet_holdings": "wallet_holdings", |
| "trades": "trades", |
| "transfers": "transfers", |
| "burns": "burns", |
| "tokens": "tokens", |
| "mints": "mints", |
| "liquidity": "liquidity", |
| "pool_creations": "pool_creations", |
| "token_metrics": "token_metrics", |
| "wallet_profile_metrics": "wallet_profile_metrics", |
| "migrations": "migrations", |
| "fee_collections": "fee_collections", |
| "supply_locks": "supply_locks", |
| "supply_lock_actions": "supply_lock_actions", |
| "known_wallets": "known_wallets", |
| } |
|
|
| |
| NEO4J_FILENAME = "neo4j_epoch_{epoch}.dump" |
|
|
| |
| SOCIAL_FILES = [ |
| "wallet_socials_1763057853.parquet", |
| "wallet_socials_2.parquet", |
| "wallet_socials_3.parquet", |
| ] |
|
|
| def _load_dotenv_if_missing(env_path: Path) -> None: |
| if not env_path.exists(): |
| return |
| for line in env_path.read_text().splitlines(): |
| line = line.strip() |
| if not line or line.startswith("#") or "=" not in line: |
| continue |
| key, value = line.split("=", 1) |
| key = key.strip() |
| value = value.strip().strip('"').strip("'") |
| if key and key not in os.environ: |
| os.environ[key] = value |
|
|
|
|
| _load_dotenv_if_missing(Path(".env")) |
|
|
| |
| CH_HOST = os.getenv("CLICKHOUSE_HOST", "localhost") |
| CH_HTTP_PORT = int(os.getenv("CLICKHOUSE_HTTP_PORT", os.getenv("CLICKHOUSE_PORT", "8123"))) |
| CH_NATIVE_PORT = int(os.getenv("CLICKHOUSE_NATIVE_PORT", "9000")) |
| CH_USER = os.getenv("CLICKHOUSE_USER", "default") |
| CH_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD", "") |
| CH_DATABASE = os.getenv("CLICKHOUSE_DATABASE", "default") |
| NEO4J_URI = os.getenv("NEO4J_URI", "bolt://localhost:7687") |
| NEO4J_USER = os.getenv("NEO4J_USER") |
| NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD") |
| NEO4J_MERGE_BATCH_SIZE = int(os.getenv("NEO4J_MERGE_BATCH_SIZE", "50000")) |
| NEO4J_MERGE_LOG_EVERY = int(os.getenv("NEO4J_MERGE_LOG_EVERY", "10")) |
| NEO4J_MERGE_RETRIES = int(os.getenv("NEO4J_MERGE_RETRIES", "5")) |
| NEO4J_MERGE_RETRY_SLEEP = float(os.getenv("NEO4J_MERGE_RETRY_SLEEP", "5")) |
| NEO4J_MERGE_PARALLEL = os.getenv("NEO4J_MERGE_PARALLEL", "true").lower() == "true" |
| NEO4J_MERGE_BOLT_PORT = int(os.getenv("NEO4J_MERGE_BOLT_PORT", "7688")) |
| NEO4J_MERGE_HTTP_PORT = int(os.getenv("NEO4J_MERGE_HTTP_PORT", "7475")) |
| NEO4J_MERGE_TEMP_ROOT = os.getenv("NEO4J_MERGE_TEMP_ROOT", "/tmp/neo4j_merge") |
| NEO4J_MERGE_HEAP_INITIAL = os.getenv("NEO4J_MERGE_HEAP_INITIAL") |
| NEO4J_MERGE_HEAP_MAX = os.getenv("NEO4J_MERGE_HEAP_MAX") |
| NEO4J_MERGE_PAGECACHE = os.getenv("NEO4J_MERGE_PAGECACHE") |
|
|
|
|
| def _find_free_port(start_port: int) -> int: |
| import socket |
| port = start_port |
| while True: |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| try: |
| sock.bind(("127.0.0.1", port)) |
| return port |
| except OSError: |
| port += 1 |
|
|
|
|
| def _run_neo4j_cmd( |
| argv: list[str], |
| run_as: str | None = None, |
| env: dict[str, str] | None = None, |
| ) -> "subprocess.CompletedProcess[str]": |
| import pwd |
| import subprocess |
| full_argv = argv |
| if env: |
| env_prefix = ["env"] + [f"{k}={v}" for k, v in env.items()] |
| full_argv = env_prefix + full_argv |
| if run_as is None: |
| try: |
| neo4j_uid = pwd.getpwnam("neo4j").pw_uid |
| except KeyError: |
| neo4j_uid = None |
| if neo4j_uid is not None and os.geteuid() != neo4j_uid: |
| full_argv = ["sudo", "-u", "neo4j"] + full_argv |
| else: |
| if run_as != "root": |
| full_argv = ["sudo", "-u", run_as] + full_argv |
| return subprocess.run(full_argv, capture_output=True, text=True) |
|
|
|
|
| def _neo4j_process_owner() -> str | None: |
| import re |
| import subprocess |
| status = _run_neo4j_cmd(["neo4j", "status", "--verbose"]) |
| combined = (status.stdout + status.stderr) |
| match = re.search(r"pid\\s+(\\d+)", combined) |
| if not match: |
| return None |
| pid = match.group(1) |
| proc = subprocess.run(["ps", "-o", "user=", "-p", pid], capture_output=True, text=True) |
| if proc.returncode != 0: |
| return None |
| return proc.stdout.strip() or None |
|
|
|
|
| def _neo4j_is_running() -> bool: |
| result = _run_neo4j_cmd(["neo4j", "status"]) |
| if result.returncode != 0: |
| return False |
| return "running" in (result.stdout + result.stderr).lower() |
|
|
|
|
| def _ensure_neo4j_log_writable() -> None: |
| import pwd |
| conf_path = Path(os.getenv("NEO4J_CONF", "/etc/neo4j/neo4j.conf")) |
| if not conf_path.exists(): |
| return |
| logs_dir = None |
| for line in conf_path.read_text().splitlines(): |
| line = line.strip() |
| if not line or line.startswith("#"): |
| continue |
| if line.startswith("server.directories.logs="): |
| logs_dir = line.split("=", 1)[1].strip() |
| break |
| if not logs_dir: |
| return |
| logs_path = Path(logs_dir) |
| try: |
| logs_path.mkdir(parents=True, exist_ok=True) |
| except OSError: |
| return |
| try: |
| neo4j_user = pwd.getpwnam("neo4j") |
| except KeyError: |
| return |
| if os.geteuid() != 0: |
| if not os.access(logs_path, os.W_OK): |
| print(f" β οΈ Neo4j logs dir not writable: {logs_path}") |
| return |
| try: |
| for path in [logs_path] + list(logs_path.glob("*")): |
| os.chown(path, neo4j_user.pw_uid, neo4j_user.pw_gid) |
| except OSError: |
| pass |
|
|
|
|
| def _ensure_neo4j_data_writable() -> None: |
| import pwd |
| conf_path = Path(os.getenv("NEO4J_CONF", "/etc/neo4j/neo4j.conf")) |
| if not conf_path.exists(): |
| return |
| data_dir = None |
| for line in conf_path.read_text().splitlines(): |
| line = line.strip() |
| if not line or line.startswith("#"): |
| continue |
| if line.startswith("server.directories.data="): |
| data_dir = line.split("=", 1)[1].strip() |
| break |
| if not data_dir: |
| return |
| data_path = Path(data_dir) |
| try: |
| neo4j_user = pwd.getpwnam("neo4j") |
| except KeyError: |
| return |
| if os.geteuid() != 0: |
| if not os.access(data_path, os.W_OK): |
| print(f" β οΈ Neo4j data dir not writable: {data_path}") |
| return |
| try: |
| import subprocess |
| subprocess.run(["chown", "-R", f"{neo4j_user.pw_uid}:{neo4j_user.pw_gid}", str(data_path)], check=True) |
| except Exception: |
| pass |
|
|
|
|
| def _wait_for_bolt( |
| uri: str, |
| auth: tuple[str, str] | None = None, |
| database: str = NEO4J_TARGET_DB, |
| timeout_sec: int = 60, |
| ) -> None: |
| from neo4j import GraphDatabase |
| start = time.time() |
| while True: |
| try: |
| driver = GraphDatabase.driver(uri, auth=auth) |
| with driver.session(database=database) as session: |
| session.run("RETURN 1").consume() |
| driver.close() |
| return |
| except Exception: |
| if time.time() - start > timeout_sec: |
| raise RuntimeError(f"Timed out waiting for Neo4j at {uri}") |
| time.sleep(1) |
|
|
|
|
| def _neo4j_driver(): |
| from neo4j import GraphDatabase |
| if NEO4J_USER and NEO4J_PASSWORD: |
| return GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) |
| return GraphDatabase.driver(NEO4J_URI, auth=None) |
|
|
|
|
| def build_patterns(epoch: int) -> list[str]: |
| """Build the list of file patterns to download for a given epoch.""" |
| epoch_str = str(epoch) |
| parquet_patterns = [f"{stem}_epoch_{epoch_str}.parquet" for stem in PARQUET_TABLE_MAP.keys()] |
| neo4j_pattern = NEO4J_FILENAME.format(epoch=epoch_str) |
| return parquet_patterns + [neo4j_pattern] |
|
|
|
|
| def download_epoch(epoch: int, dest_dir: Path, token: str | None) -> None: |
| """Download epoch artifacts from Hugging Face.""" |
| patterns = build_patterns(epoch) |
| dest_dir.mkdir(parents=True, exist_ok=True) |
|
|
| print(f"π₯ Downloading epoch {epoch} from {REPO_ID}...") |
| snapshot_download( |
| repo_id=REPO_ID, |
| repo_type=REPO_TYPE, |
| local_dir=str(dest_dir), |
| local_dir_use_symlinks=False, |
| allow_patterns=patterns, |
| resume_download=True, |
| token=token, |
| ) |
| print("β
Download complete.") |
|
|
|
|
| def ingest_parquet(client, table_name: str, parquet_path: Path, dry_run: bool = False) -> bool: |
| """ |
| Ingest a Parquet file into a ClickHouse table. |
| Returns True on success. |
| """ |
| if dry_run: |
| print(f" [DRY-RUN] insert {parquet_path.name} -> {table_name}") |
| return True |
|
|
| try: |
| with parquet_path.open("rb") as fh: |
| magic = fh.read(4) |
| if magic != b"PAR1": |
| print(f" β οΈ Skipping {parquet_path.name}: not a Parquet file.") |
| return False |
|
|
| |
| |
| import subprocess |
| insert_query = f"INSERT INTO {table_name} FORMAT Parquet SETTINGS {CLICKHOUSE_INSERT_SETTINGS}" |
| infile_query = f"INSERT INTO {table_name} FROM INFILE '{parquet_path.resolve()}' FORMAT Parquet" |
| try: |
| cmd = [ |
| "clickhouse-client", |
| "--host", CH_HOST, |
| "--port", str(CH_NATIVE_PORT), |
| "--user", CH_USER, |
| "--password", CH_PASSWORD, |
| "--database", CH_DATABASE, |
| "--query", infile_query, |
| ] |
| subprocess.run(cmd, check=True) |
| return True |
| except FileNotFoundError: |
| raise RuntimeError( |
| "clickhouse-client not found. Install clickhouse-client for native Parquet inserts." |
| ) |
| except Exception as e: |
| print(f" β Failed to ingest {parquet_path.name}: {e}") |
| return False |
|
|
|
|
| def init_clickhouse_schema(schema_path: Path, dry_run: bool = False) -> bool: |
| if not schema_path.exists(): |
| print(f" β ClickHouse schema file not found: {schema_path}") |
| return False |
| if dry_run: |
| print(f" [DRY-RUN] init schema from {schema_path}") |
| return True |
| import subprocess |
| cmd = [ |
| "clickhouse-client", |
| "--host", CH_HOST, |
| "--port", str(CH_NATIVE_PORT), |
| "--user", CH_USER, |
| "--password", CH_PASSWORD, |
| "--database", CH_DATABASE, |
| "--multiquery", |
| ] |
| try: |
| with schema_path.open("rb") as fh: |
| subprocess.run(cmd, stdin=fh, check=True) |
| print("β
ClickHouse schema initialized.") |
| return True |
| except FileNotFoundError: |
| print(" β clickhouse-client not found. Install it to initialize the schema.") |
| return False |
| except subprocess.CalledProcessError as e: |
| print(f" β Failed to initialize schema: {e}") |
| return False |
|
|
|
|
| def run_etl( |
| epoch: int, |
| dest_dir: Path, |
| client, |
| dry_run: bool = False, |
| token: str | None = None, |
| skip_neo4j: bool = False, |
| skip_clickhouse: bool = False, |
| merge_neo4j: bool = False, |
| ) -> None: |
| """ |
| Full ETL pipeline: |
| 1. Use local Parquet files (no download) |
| 2. Ingest into ClickHouse |
| 3. Keep local files (no deletion) |
| """ |
| if not dest_dir.exists(): |
| raise FileNotFoundError(f"Epoch directory not found: {dest_dir}") |
|
|
| if not skip_clickhouse: |
| |
| print(f"\nπ€ Ingesting Parquet files into ClickHouse...") |
| for stem, table_name in tqdm(PARQUET_TABLE_MAP.items(), desc="Ingesting"): |
| parquet_path = dest_dir / f"{stem}_epoch_{epoch}.parquet" |
| if not parquet_path.exists(): |
| print(f" β οΈ Skipping {stem}: file not found.") |
| continue |
|
|
| ingest_parquet(client, table_name, parquet_path, dry_run=dry_run) |
|
|
| print("\nβ
ClickHouse ingestion complete.") |
| else: |
| print("\nβΉοΈ ClickHouse ingestion skipped.") |
|
|
| |
| if not skip_clickhouse: |
| |
| ingest_socials(client, dest_dir.parent, dry_run=dry_run) |
|
|
| |
| neo4j_path = dest_dir / NEO4J_FILENAME.format(epoch=epoch) |
| if neo4j_path.exists() and not skip_neo4j: |
| |
| |
| if merge_neo4j and not dry_run: |
| try: |
| _driver = _neo4j_driver() |
| with _driver.session(database=NEO4J_TARGET_DB) as _session: |
| _count = _session.run("MATCH (n) RETURN count(n) AS c").single()["c"] |
| _driver.close() |
| if _count == 0: |
| print(f" [AUTO] Target DB is empty ({_count} nodes). Switching to direct load instead of merge.") |
| merge_neo4j = False |
| else: |
| print(f" [AUTO] Target DB has {_count} nodes. Proceeding with merge.") |
| except Exception as e: |
| print(f" [WARN] Could not check target DB node count: {e}. Proceeding with merge flag as-is.") |
|
|
| if merge_neo4j: |
| ok = merge_neo4j_epoch_dump(epoch, neo4j_path, dry_run=dry_run) |
| if not ok: |
| print(" β Neo4j merge failed. Aborting.") |
| sys.exit(1) |
| else: |
| ok = ingest_neo4j_dump(neo4j_path, database=NEO4J_TARGET_DB, dry_run=dry_run) |
| if not ok: |
| print(" β Neo4j dump load failed. Aborting.") |
| sys.exit(1) |
| elif neo4j_path.exists() and skip_neo4j: |
| print(f"\nβΉοΈ Neo4j dump found but skipped: {neo4j_path}") |
|
|
| print("\nπ Full ETL pipeline complete.") |
|
|
|
|
| def ingest_socials(client, root_dir: Path, dry_run: bool = False) -> None: |
| """Ingest the static/off-chain wallet social files.""" |
| social_dir = root_dir / "socials" |
| if not social_dir.exists(): |
| print(f"\nβΉοΈ Socials directory not found at {social_dir}. Skipping social ingestion.") |
| return |
|
|
| print(f"\nπ₯ Ingesting Wallet Socials from {social_dir}...") |
| for filename in SOCIAL_FILES: |
| parquet_path = social_dir / filename |
| if not parquet_path.exists(): |
| print(f" β οΈ Skipping {filename}: file not found.") |
| continue |
| |
| |
| ingest_parquet(client, "wallet_socials", parquet_path, dry_run=dry_run) |
| |
| print("β
Wallet Socials ingestion complete.") |
|
|
|
|
| def ingest_neo4j_dump(dump_path: Path, database: str = "neo4j", dry_run: bool = False) -> bool: |
| """ |
| Load a Neo4j dump file into the database. |
| Requires neo4j-admin CLI and the Neo4j service to be stopped. |
| Returns True on success. |
| """ |
| import subprocess |
|
|
| if not dump_path.exists(): |
| print(f" β οΈ Neo4j dump not found: {dump_path}") |
| return False |
|
|
| import shutil |
|
|
| expected_dump_name = f"{database}.dump" |
| load_dir = dump_path.parent |
| temp_load_dir = None |
| if dump_path.name != expected_dump_name: |
| temp_load_dir = dump_path.parent / f"_neo4j_load_{database}" |
| temp_load_dir.mkdir(parents=True, exist_ok=True) |
| load_dump_path = temp_load_dir / expected_dump_name |
| shutil.copy2(dump_path, load_dump_path) |
| load_dir = temp_load_dir |
|
|
| |
| |
| cmd = [ |
| "neo4j-admin", "database", "load", |
| f"--from-path={load_dir.resolve()}", |
| "--overwrite-destination", |
| database, |
| ] |
|
|
| if dry_run: |
| print(f" [DRY-RUN] {' '.join(cmd)}") |
| return True |
|
|
| print(f"π Loading Neo4j dump into database '{database}'...") |
| print(" β οΈ Neo4j will be stopped for offline load.") |
|
|
| was_running = False |
| owner = None |
| try: |
| if not dry_run: |
| _ensure_neo4j_log_writable() |
| was_running = _neo4j_is_running() |
| if was_running: |
| owner = _neo4j_process_owner() or "root" |
| stop_result = _run_neo4j_cmd(["neo4j", "stop"], run_as=owner) |
| if stop_result.returncode != 0: |
| print(f" β Failed to stop Neo4j: {stop_result.stderr.strip()}") |
| return False |
| _ensure_neo4j_data_writable() |
| |
| if os.geteuid() == 0: |
| subprocess.run(["chown", "-R", "neo4j:neo4j", "/var/lib/neo4j/data"], check=False) |
|
|
| if dry_run: |
| print(f" [DRY-RUN] {' '.join(cmd)}") |
| return True |
|
|
| result = _run_neo4j_cmd(cmd) |
| if result.returncode != 0: |
| raise subprocess.CalledProcessError(result.returncode, cmd, output=result.stdout, stderr=result.stderr) |
| print(" β
Neo4j dump loaded successfully.") |
| return True |
| except FileNotFoundError: |
| print(" β neo4j-admin not found. Install it to load the dump locally.") |
| if temp_load_dir and not dry_run: |
| shutil.rmtree(temp_load_dir, ignore_errors=True) |
| return False |
| except subprocess.CalledProcessError as e: |
| print(f" β Failed to load Neo4j dump: {e.stderr}") |
| if temp_load_dir and not dry_run: |
| shutil.rmtree(temp_load_dir, ignore_errors=True) |
| return False |
| finally: |
| if not dry_run and was_running: |
| owner = owner or "root" |
| start_result = _run_neo4j_cmd(["neo4j", "start"], run_as=owner) |
| if start_result.returncode != 0: |
| print(f" β οΈ Failed to start Neo4j: {start_result.stderr.strip()}") |
|
|
|
|
| def _run_merge_batch(tx, query: str, rows: list[dict]) -> None: |
| tx.run(query, rows=rows) |
|
|
|
|
| def _ensure_indexes(session) -> None: |
| """Create indexes for efficient MERGE operations.""" |
| indexes = [ |
| "CREATE INDEX wallet_address IF NOT EXISTS FOR (w:Wallet) ON (w.address)", |
| "CREATE INDEX token_address IF NOT EXISTS FOR (t:Token) ON (t.address)", |
| ] |
| for idx_query in indexes: |
| try: |
| session.run(idx_query) |
| except Exception: |
| pass |
| try: |
| session.run("CALL db.awaitIndexes(300)") |
| except Exception: |
| pass |
|
|
|
|
| def _bulk_merge_apoc( |
| temp_session, |
| target_session, |
| match_query: str, |
| merge_query: str, |
| label: str, |
| total: int | None = None, |
| ) -> None: |
| """ |
| Fast bulk merge using APOC periodic.iterate if available. |
| Falls back to optimized batching if APOC not installed. |
| """ |
| has_apoc = False |
| try: |
| result = target_session.run("RETURN apoc.version() AS version") |
| result.consume() |
| has_apoc = True |
| except Exception: |
| pass |
|
|
| if has_apoc: |
| _bulk_merge_with_apoc(temp_session, target_session, match_query, merge_query, label, total) |
| else: |
| _bulk_merge_optimized(temp_session, target_session, match_query, merge_query, label, total) |
|
|
|
|
| def _bulk_merge_with_apoc( |
| temp_session, |
| target_session, |
| match_query: str, |
| merge_query: str, |
| label: str, |
| total: int | None = None, |
| ) -> None: |
| """Use APOC periodic.iterate for maximum speed.""" |
| from neo4j.exceptions import DatabaseUnavailable |
|
|
| print(f" π {label}: using APOC bulk merge (total: {total or 'unknown'})") |
|
|
| all_rows = [] |
| retries = 0 |
| while True: |
| try: |
| result = temp_session.run(match_query) |
| all_rows = [record.data() for record in result] |
| break |
| except DatabaseUnavailable: |
| retries += 1 |
| if retries > NEO4J_MERGE_RETRIES: |
| raise |
| print(f" β οΈ {label}: database unavailable, retry {retries}/{NEO4J_MERGE_RETRIES}") |
| time.sleep(NEO4J_MERGE_RETRY_SLEEP) |
|
|
| if not all_rows: |
| print(f" β
{label}: 0 (no data)") |
| return |
|
|
| apoc_query = f""" |
| CALL apoc.periodic.iterate( |
| 'UNWIND $rows AS t RETURN t', |
| '{merge_query.replace("UNWIND $rows AS t ", "")}', |
| {{batchSize: {NEO4J_MERGE_BATCH_SIZE}, parallel: {str(NEO4J_MERGE_PARALLEL).lower()}, params: {{rows: $rows}}}} |
| ) |
| YIELD batches, total, errorMessages |
| RETURN batches, total, errorMessages |
| """ |
|
|
| try: |
| result = target_session.run(apoc_query, rows=all_rows) |
| record = result.single() |
| if record: |
| batches = record["batches"] |
| processed = record["total"] |
| errors = record["errorMessages"] |
| if errors: |
| print(f" β οΈ {label}: {processed} merged with errors: {errors[:200]}") |
| else: |
| print(f" β
{label}: {processed} merged in {batches} batches") |
| else: |
| print(f" β
{label}: {len(all_rows)} merged") |
| except Exception as e: |
| print(f" β οΈ APOC failed for {label}, falling back to optimized batch: {e}") |
| _bulk_merge_optimized(temp_session, target_session, match_query, merge_query, label, total) |
|
|
|
|
| def _bulk_merge_optimized( |
| temp_session, |
| target_session, |
| match_query: str, |
| merge_query: str, |
| label: str, |
| total: int | None = None, |
| ) -> None: |
| """Optimized batching without SKIP pagination (O(n) instead of O(nΒ²)).""" |
| from neo4j.exceptions import DatabaseUnavailable |
|
|
| print(f" π¦ {label}: loading all data into memory for fast merge...") |
|
|
| all_rows = [] |
| retries = 0 |
| while True: |
| try: |
| result = temp_session.run(match_query) |
| all_rows = [record.data() for record in result] |
| break |
| except DatabaseUnavailable: |
| retries += 1 |
| if retries > NEO4J_MERGE_RETRIES: |
| raise |
| print(f" β οΈ {label}: database unavailable, retry {retries}/{NEO4J_MERGE_RETRIES}") |
| time.sleep(NEO4J_MERGE_RETRY_SLEEP) |
|
|
| total_rows = len(all_rows) |
| if total_rows == 0: |
| print(f" β
{label}: 0 (no data)") |
| return |
|
|
| print(f" π {label}: merging {total_rows} records...") |
|
|
| processed = 0 |
| batches = 0 |
| for i in range(0, total_rows, NEO4J_MERGE_BATCH_SIZE): |
| batch = all_rows[i:i + NEO4J_MERGE_BATCH_SIZE] |
| retries = 0 |
| while True: |
| try: |
| target_session.execute_write(_run_merge_batch, merge_query, batch) |
| break |
| except DatabaseUnavailable: |
| retries += 1 |
| if retries > NEO4J_MERGE_RETRIES: |
| raise |
| print(f" β οΈ {label}: database unavailable, retry {retries}/{NEO4J_MERGE_RETRIES}") |
| time.sleep(NEO4J_MERGE_RETRY_SLEEP) |
|
|
| processed += len(batch) |
| batches += 1 |
| if batches % NEO4J_MERGE_LOG_EVERY == 0: |
| pct = (processed / total_rows) * 100 |
| print(f" π {label}: {processed}/{total_rows} ({pct:.1f}%)") |
|
|
| print(f" β
{label}: {processed}/{total_rows} (100%)") |
|
|
|
|
| def _stream_merge( |
| temp_session, |
| target_session, |
| match_query: str, |
| merge_query: str, |
| label: str, |
| total: int | None = None, |
| ) -> None: |
| """Wrapper that uses optimized bulk merge.""" |
| _bulk_merge_apoc(temp_session, target_session, match_query, merge_query, label, total) |
|
|
|
|
| def merge_neo4j_epoch_dump(epoch: int, dump_path: Path, dry_run: bool = False) -> bool: |
| """ |
| Merge relationships from an epoch dump into the target DB by keeping the oldest timestamp. |
| Relationship uniqueness is enforced by (start, end, type) only. |
| """ |
| import shutil |
| import subprocess |
|
|
| if not dump_path.exists(): |
| print(f" β οΈ Neo4j dump not found: {dump_path}") |
| return |
|
|
| temp_db = "neo4j" |
| expected_dump_name = f"{temp_db}.dump" |
| load_dir = dump_path.parent |
| temp_load_dir = None |
| if dump_path.name != expected_dump_name: |
| temp_load_dir = dump_path.parent / f"_neo4j_load_{NEO4J_TEMP_DB_PREFIX}-{epoch}" |
| temp_load_dir.mkdir(parents=True, exist_ok=True) |
| load_dump_path = temp_load_dir / expected_dump_name |
| shutil.copy2(dump_path, load_dump_path) |
| load_dir = temp_load_dir |
|
|
| print(f"\nπ§© Merging Neo4j dump into '{NEO4J_TARGET_DB}' via temp instance...") |
|
|
| temp_root = Path(NEO4J_MERGE_TEMP_ROOT) / f"{NEO4J_TEMP_DB_PREFIX}-{epoch}" |
| temp_conf_dir = temp_root / "conf" |
| temp_data_dir = temp_root / "data" |
| temp_logs_dir = temp_root / "logs" |
| temp_run_dir = temp_root / "run" |
| temp_import_dir = temp_root / "import" |
|
|
| if dry_run: |
| print(f" [DRY-RUN] setup temp instance at {temp_root}") |
| return |
|
|
| driver = None |
| temp_driver = None |
| try: |
| if temp_root.exists(): |
| shutil.rmtree(temp_root, ignore_errors=True) |
| temp_conf_dir.mkdir(parents=True, exist_ok=True) |
| temp_data_dir.mkdir(parents=True, exist_ok=True) |
| temp_logs_dir.mkdir(parents=True, exist_ok=True) |
| temp_run_dir.mkdir(parents=True, exist_ok=True) |
| temp_import_dir.mkdir(parents=True, exist_ok=True) |
|
|
| base_conf = Path(os.getenv("NEO4J_CONF", "/etc/neo4j/neo4j.conf")) |
| if not base_conf.exists(): |
| print(f" β Neo4j config not found: {base_conf}") |
| return |
| bolt_port = _find_free_port(NEO4J_MERGE_BOLT_PORT) |
| http_port = _find_free_port(NEO4J_MERGE_HTTP_PORT) |
| overrides = { |
| "server.directories.data": str(temp_data_dir), |
| "server.directories.logs": str(temp_logs_dir), |
| "server.directories.run": str(temp_run_dir), |
| "server.directories.import": str(temp_import_dir), |
| "server.bolt.listen_address": f"127.0.0.1:{bolt_port}", |
| "server.bolt.advertised_address": f"127.0.0.1:{bolt_port}", |
| "server.http.listen_address": f"127.0.0.1:{http_port}", |
| "server.http.advertised_address": f"127.0.0.1:{http_port}", |
| "server.https.enabled": "false", |
| "dbms.security.auth_enabled": "false", |
| } |
| if NEO4J_MERGE_HEAP_INITIAL: |
| overrides["server.memory.heap.initial_size"] = NEO4J_MERGE_HEAP_INITIAL |
| if NEO4J_MERGE_HEAP_MAX: |
| overrides["server.memory.heap.max_size"] = NEO4J_MERGE_HEAP_MAX |
| if NEO4J_MERGE_PAGECACHE: |
| overrides["server.memory.pagecache.size"] = NEO4J_MERGE_PAGECACHE |
| conf_lines = [] |
| for line in base_conf.read_text().splitlines(): |
| stripped = line.strip() |
| if not stripped or stripped.startswith("#") or "=" not in stripped: |
| conf_lines.append(line) |
| continue |
| key, _ = stripped.split("=", 1) |
| if key in overrides: |
| continue |
| conf_lines.append(line) |
| conf_lines.append("") |
| conf_lines.append("# temp merge overrides") |
| for key, value in overrides.items(): |
| conf_lines.append(f"{key}={value}") |
| conf_text = "\n".join(conf_lines) + "\n" |
| (temp_conf_dir / "neo4j.conf").write_text(conf_text) |
|
|
| if os.geteuid() == 0: |
| import subprocess |
| try: |
| subprocess.run(["chown", "-R", "neo4j:adm", str(temp_root)], check=True) |
| |
| subprocess.run(["chown", "-R", "neo4j:neo4j", "/var/lib/neo4j/data"], check=False) |
| except Exception: |
| pass |
|
|
| temp_env = { |
| "NEO4J_CONF": str(temp_conf_dir), |
| "NEO4J_HOME": os.getenv("NEO4J_HOME", "/usr/share/neo4j"), |
| } |
|
|
| load_cmd = [ |
| "neo4j-admin", "database", "load", |
| f"--from-path={load_dir.resolve()}", |
| "--overwrite-destination", |
| temp_db, |
| ] |
| _run_neo4j_cmd(["neo4j", "stop"], run_as="neo4j", env=temp_env) |
| load_result = _run_neo4j_cmd(load_cmd, run_as="neo4j", env=temp_env) |
| if load_result.returncode != 0: |
| raise subprocess.CalledProcessError(load_result.returncode, load_cmd, output=load_result.stdout, stderr=load_result.stderr) |
|
|
| start_result = _run_neo4j_cmd(["neo4j", "start"], run_as="neo4j", env=temp_env) |
| if start_result.returncode != 0: |
| print(f" β Failed to start temp Neo4j: {start_result.stderr.strip()}") |
| return |
|
|
| temp_bolt_uri = f"bolt://127.0.0.1:{bolt_port}" |
| _wait_for_bolt(temp_bolt_uri, auth=None, database="neo4j") |
|
|
| if not _neo4j_is_running(): |
| start_result = _run_neo4j_cmd(["neo4j", "start"], run_as="root") |
| if start_result.returncode != 0: |
| start_result = _run_neo4j_cmd(["neo4j", "start"], run_as="neo4j") |
| if start_result.returncode != 0: |
| print(f" β Failed to start Neo4j: {start_result.stderr.strip()}") |
| return |
| _wait_for_bolt( |
| NEO4J_URI, |
| auth=(NEO4J_USER, NEO4J_PASSWORD) if NEO4J_USER and NEO4J_PASSWORD else None, |
| ) |
| driver = _neo4j_driver() |
| from neo4j import GraphDatabase |
| temp_driver = GraphDatabase.driver(temp_bolt_uri, auth=None) |
|
|
| wallet_wallet_types = [ |
| "BUNDLE_TRADE", |
| "TRANSFERRED_TO", |
| "COORDINATED_ACTIVITY", |
| "COPIED_TRADE", |
| ] |
| wallet_token_types = [ |
| "MINTED", |
| "SNIPED", |
| "LOCKED_SUPPLY", |
| "BURNED", |
| "PROVIDED_LIQUIDITY", |
| "TOP_TRADER_OF", |
| "WHALE_OF", |
| ] |
|
|
| with temp_driver.session(database="neo4j") as temp_session, driver.session(database=NEO4J_TARGET_DB) as target_session: |
| print(" π Ensuring indexes exist on target database...") |
| _ensure_indexes(target_session) |
|
|
| def _count(query: str) -> int: |
| return temp_session.run(query).single().value() |
|
|
| wallet_count = _count("MATCH (w:Wallet) RETURN count(w)") |
| _stream_merge( |
| temp_session, |
| target_session, |
| "MATCH (w:Wallet) RETURN w.address AS address", |
| "UNWIND $rows AS t MERGE (w:Wallet {address: t.address})", |
| "wallets", |
| total=wallet_count, |
| ) |
| token_count = _count("MATCH (t:Token) RETURN count(t)") |
| _stream_merge( |
| temp_session, |
| target_session, |
| "MATCH (t:Token) RETURN t.address AS address, " |
| "CASE WHEN 'created_ts' IN keys(t) THEN t.created_ts ELSE null END AS created_ts", |
| "UNWIND $rows AS t MERGE (k:Token {address: t.address}) " |
| "ON CREATE SET k.created_ts = t.created_ts " |
| "ON MATCH SET k.created_ts = CASE WHEN k.created_ts IS NULL OR " |
| "t.created_ts < k.created_ts THEN t.created_ts ELSE k.created_ts END", |
| "tokens", |
| total=token_count, |
| ) |
| def process_wallet_wallet_rel(rel_type: str) -> None: |
| rel_total = _count( |
| f"MATCH (a:Wallet)-[r:{rel_type}]->(b:Wallet) " |
| "WHERE a.address IS NOT NULL AND b.address IS NOT NULL " |
| "WITH a.address AS source, b.address AS target " |
| "RETURN count(DISTINCT [source, target])" |
| ) |
| match_query = ( |
| f"MATCH (a:Wallet)-[r:{rel_type}]->(b:Wallet) " |
| "WHERE a.address IS NOT NULL AND b.address IS NOT NULL " |
| "WITH a.address AS source, b.address AS target, " |
| "min(coalesce(r.timestamp, 0)) AS timestamp " |
| "RETURN DISTINCT source, target, timestamp" |
| ) |
| merge_query = ( |
| "UNWIND $rows AS t " |
| "MERGE (a:Wallet {address: t.source}) " |
| "MERGE (b:Wallet {address: t.target}) " |
| f"MERGE (a)-[r:{rel_type}]->(b) " |
| "ON CREATE SET r.timestamp = t.timestamp " |
| "ON MATCH SET r.timestamp = CASE WHEN t.timestamp < r.timestamp THEN t.timestamp ELSE r.timestamp END" |
| ) |
| _stream_merge( |
| temp_session, |
| target_session, |
| match_query, |
| merge_query, |
| rel_type.lower(), |
| total=rel_total, |
| ) |
|
|
| def process_wallet_token_rel(rel_type: str) -> None: |
| rel_total = _count( |
| f"MATCH (w:Wallet)-[r:{rel_type}]->(t:Token) " |
| "WHERE w.address IS NOT NULL AND t.address IS NOT NULL " |
| "WITH w.address AS source, t.address AS target " |
| "RETURN count(DISTINCT [source, target])" |
| ) |
| match_query = ( |
| f"MATCH (w:Wallet)-[r:{rel_type}]->(t:Token) " |
| "WHERE w.address IS NOT NULL AND t.address IS NOT NULL " |
| "WITH w.address AS source, t.address AS target, " |
| "min(coalesce(r.timestamp, 0)) AS timestamp " |
| "RETURN DISTINCT source, target, timestamp" |
| ) |
| merge_query = ( |
| "UNWIND $rows AS t " |
| "MERGE (w:Wallet {address: t.source}) " |
| "MERGE (k:Token {address: t.target}) " |
| f"MERGE (w)-[r:{rel_type}]->(k) " |
| "ON CREATE SET r.timestamp = t.timestamp " |
| "ON MATCH SET r.timestamp = CASE WHEN t.timestamp < r.timestamp THEN t.timestamp ELSE r.timestamp END" |
| ) |
| _stream_merge( |
| temp_session, |
| target_session, |
| match_query, |
| merge_query, |
| rel_type.lower(), |
| total=rel_total, |
| ) |
|
|
| print(f"\n π Processing {len(wallet_wallet_types)} wallet-wallet relationship types...") |
| for rel_type in wallet_wallet_types: |
| process_wallet_wallet_rel(rel_type) |
|
|
| print(f"\n π Processing {len(wallet_token_types)} wallet-token relationship types...") |
| for rel_type in wallet_token_types: |
| process_wallet_token_rel(rel_type) |
|
|
| print(" β
Merge complete.") |
| return True |
| except subprocess.CalledProcessError as e: |
| print(f" β Failed to merge Neo4j dump: {e.stderr}") |
| return False |
| except Exception as e: |
| print(f" β Failed to merge Neo4j dump: {e}") |
| return False |
| finally: |
| if temp_driver: |
| temp_driver.close() |
| if driver: |
| driver.close() |
| if not dry_run: |
| temp_env = { |
| "NEO4J_CONF": str(temp_conf_dir), |
| "NEO4J_HOME": os.getenv("NEO4J_HOME", "/usr/share/neo4j"), |
| } |
| _run_neo4j_cmd(["neo4j", "stop"], run_as="neo4j", env=temp_env) |
| if temp_load_dir and not dry_run: |
| shutil.rmtree(temp_load_dir, ignore_errors=True) |
| if temp_root.exists() and not dry_run: |
| shutil.rmtree(temp_root, ignore_errors=True) |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description="ETL: Download, Ingest, Delete epoch Parquet files.") |
| parser.add_argument("--epoch", type=int, required=True, help="Epoch number to process (e.g., 851)") |
| parser.add_argument("-c", "--skip-clickhouse", action="store_true", help="Skip ClickHouse ingestion") |
| parser.add_argument("-m", "--merge-neo4j", action="store_true", help="Merge Neo4j dump into existing graph") |
| parser.add_argument("--dry-run", action="store_true", help="Print queries without executing") |
| parser.add_argument("-n", "--skip-neo4j", action="store_true", help="Skip Neo4j dump loading") |
| parser.add_argument("--token", type=str, default=None, help="Hugging Face token (or set HF_TOKEN env var)") |
| return parser.parse_args() |
|
|
|
|
| def main() -> None: |
| args = parse_args() |
| token = args.token or os.environ.get("HF_TOKEN") |
|
|
| dest_dir = Path(DEFAULT_DEST_DIR).expanduser() / f"epoch_{args.epoch}" |
|
|
| |
| print(f"π Connecting to ClickHouse at {CH_HOST}:{CH_HTTP_PORT}...") |
| try: |
| client = clickhouse_connect.get_client( |
| host=CH_HOST, |
| port=CH_HTTP_PORT, |
| username=CH_USER, |
| password=CH_PASSWORD, |
| database=CH_DATABASE, |
| ) |
| except Exception as e: |
| print(f"β Failed to connect to ClickHouse: {e}") |
| sys.exit(1) |
|
|
| |
| if not args.skip_clickhouse: |
| print("π Ensuring ClickHouse schemas exist...") |
| for schema_file in ["./onchain.sql", "./offchain.sql"]: |
| schema_path = Path(schema_file).expanduser() |
| if schema_path.exists(): |
| init_clickhouse_schema(schema_path, dry_run=args.dry_run) |
|
|
| run_etl( |
| epoch=args.epoch, |
| dest_dir=dest_dir, |
| client=client, |
| dry_run=args.dry_run, |
| token=token, |
| skip_neo4j=args.skip_neo4j, |
| skip_clickhouse=args.skip_clickhouse, |
| merge_neo4j=args.merge_neo4j, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|