#!/usr/bin/env python3 """ ETL Pipeline: Download epoch Parquet files, ingest into ClickHouse, and delete local files. Usage: python scripts/ingest_epoch.py --epoch 851 Environment Variables: HF_TOKEN: Hugging Face token for downloading private datasets. CLICKHOUSE_HOST, CLICKHOUSE_HTTP_PORT (or legacy CLICKHOUSE_PORT), CLICKHOUSE_NATIVE_PORT, CLICKHOUSE_USER, CLICKHOUSE_PASSWORD, CLICKHOUSE_DATABASE NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD, NEO4J_MERGE_BATCH_SIZE NEO4J_MERGE_BOLT_PORT, NEO4J_MERGE_HTTP_PORT, NEO4J_MERGE_TEMP_ROOT """ import argparse import os import sys import time from pathlib import Path import clickhouse_connect from huggingface_hub import snapshot_download from tqdm import tqdm # Hugging Face config REPO_ID = "zirobtc/pump-fun-dataset" REPO_TYPE = "model" DEFAULT_DEST_DIR = "./data/pump_fun" DEFAULT_SCHEMA_FILE = "./onchain.sql" CLICKHOUSE_INSERT_SETTINGS = "max_insert_threads=1,max_block_size=65536" NEO4J_TARGET_DB = "neo4j" NEO4J_TEMP_DB_PREFIX = "epoch" # Parquet file stems -> ClickHouse table names # Maps the file stem to the target table. Usually they match. PARQUET_TABLE_MAP = { "wallet_profiles": "wallet_profiles", "wallet_holdings": "wallet_holdings", "trades": "trades", "transfers": "transfers", "burns": "burns", "tokens": "tokens", "mints": "mints", "liquidity": "liquidity", "pool_creations": "pool_creations", "token_metrics": "token_metrics", "wallet_profile_metrics": "wallet_profile_metrics", "migrations": "migrations", "fee_collections": "fee_collections", "supply_locks": "supply_locks", "supply_lock_actions": "supply_lock_actions", "known_wallets": "known_wallets", } # Neo4j dump filename pattern NEO4J_FILENAME = "neo4j_epoch_{epoch}.dump" # Social files (off-chain, not epoch based) SOCIAL_FILES = [ "wallet_socials_1763057853.parquet", "wallet_socials_2.parquet", "wallet_socials_3.parquet", ] def _load_dotenv_if_missing(env_path: Path) -> None: if not env_path.exists(): return for line in env_path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") if key and key not in os.environ: os.environ[key] = value _load_dotenv_if_missing(Path(".env")) # ClickHouse connection defaults (can be overridden by env vars) CH_HOST = os.getenv("CLICKHOUSE_HOST", "localhost") CH_HTTP_PORT = int(os.getenv("CLICKHOUSE_HTTP_PORT", os.getenv("CLICKHOUSE_PORT", "8123"))) CH_NATIVE_PORT = int(os.getenv("CLICKHOUSE_NATIVE_PORT", "9000")) CH_USER = os.getenv("CLICKHOUSE_USER", "default") CH_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD", "") CH_DATABASE = os.getenv("CLICKHOUSE_DATABASE", "default") NEO4J_URI = os.getenv("NEO4J_URI", "bolt://localhost:7687") NEO4J_USER = os.getenv("NEO4J_USER") NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD") NEO4J_MERGE_BATCH_SIZE = int(os.getenv("NEO4J_MERGE_BATCH_SIZE", "50000")) NEO4J_MERGE_LOG_EVERY = int(os.getenv("NEO4J_MERGE_LOG_EVERY", "10")) NEO4J_MERGE_RETRIES = int(os.getenv("NEO4J_MERGE_RETRIES", "5")) NEO4J_MERGE_RETRY_SLEEP = float(os.getenv("NEO4J_MERGE_RETRY_SLEEP", "5")) NEO4J_MERGE_PARALLEL = os.getenv("NEO4J_MERGE_PARALLEL", "true").lower() == "true" NEO4J_MERGE_BOLT_PORT = int(os.getenv("NEO4J_MERGE_BOLT_PORT", "7688")) NEO4J_MERGE_HTTP_PORT = int(os.getenv("NEO4J_MERGE_HTTP_PORT", "7475")) NEO4J_MERGE_TEMP_ROOT = os.getenv("NEO4J_MERGE_TEMP_ROOT", "/tmp/neo4j_merge") NEO4J_MERGE_HEAP_INITIAL = os.getenv("NEO4J_MERGE_HEAP_INITIAL") NEO4J_MERGE_HEAP_MAX = os.getenv("NEO4J_MERGE_HEAP_MAX") NEO4J_MERGE_PAGECACHE = os.getenv("NEO4J_MERGE_PAGECACHE") def _find_free_port(start_port: int) -> int: import socket port = start_port while True: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: sock.bind(("127.0.0.1", port)) return port except OSError: port += 1 def _run_neo4j_cmd( argv: list[str], run_as: str | None = None, env: dict[str, str] | None = None, ) -> "subprocess.CompletedProcess[str]": import pwd import subprocess full_argv = argv if env: env_prefix = ["env"] + [f"{k}={v}" for k, v in env.items()] full_argv = env_prefix + full_argv if run_as is None: try: neo4j_uid = pwd.getpwnam("neo4j").pw_uid except KeyError: neo4j_uid = None if neo4j_uid is not None and os.geteuid() != neo4j_uid: full_argv = ["sudo", "-u", "neo4j"] + full_argv else: if run_as != "root": full_argv = ["sudo", "-u", run_as] + full_argv return subprocess.run(full_argv, capture_output=True, text=True) def _neo4j_process_owner() -> str | None: import re import subprocess status = _run_neo4j_cmd(["neo4j", "status", "--verbose"]) combined = (status.stdout + status.stderr) match = re.search(r"pid\\s+(\\d+)", combined) if not match: return None pid = match.group(1) proc = subprocess.run(["ps", "-o", "user=", "-p", pid], capture_output=True, text=True) if proc.returncode != 0: return None return proc.stdout.strip() or None def _neo4j_is_running() -> bool: result = _run_neo4j_cmd(["neo4j", "status"]) if result.returncode != 0: return False return "running" in (result.stdout + result.stderr).lower() def _ensure_neo4j_log_writable() -> None: import pwd conf_path = Path(os.getenv("NEO4J_CONF", "/etc/neo4j/neo4j.conf")) if not conf_path.exists(): return logs_dir = None for line in conf_path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#"): continue if line.startswith("server.directories.logs="): logs_dir = line.split("=", 1)[1].strip() break if not logs_dir: return logs_path = Path(logs_dir) try: logs_path.mkdir(parents=True, exist_ok=True) except OSError: return try: neo4j_user = pwd.getpwnam("neo4j") except KeyError: return if os.geteuid() != 0: if not os.access(logs_path, os.W_OK): print(f" āš ļø Neo4j logs dir not writable: {logs_path}") return try: for path in [logs_path] + list(logs_path.glob("*")): os.chown(path, neo4j_user.pw_uid, neo4j_user.pw_gid) except OSError: pass def _ensure_neo4j_data_writable() -> None: import pwd conf_path = Path(os.getenv("NEO4J_CONF", "/etc/neo4j/neo4j.conf")) if not conf_path.exists(): return data_dir = None for line in conf_path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#"): continue if line.startswith("server.directories.data="): data_dir = line.split("=", 1)[1].strip() break if not data_dir: return data_path = Path(data_dir) try: neo4j_user = pwd.getpwnam("neo4j") except KeyError: return if os.geteuid() != 0: if not os.access(data_path, os.W_OK): print(f" āš ļø Neo4j data dir not writable: {data_path}") return try: import subprocess subprocess.run(["chown", "-R", f"{neo4j_user.pw_uid}:{neo4j_user.pw_gid}", str(data_path)], check=True) except Exception: pass def _wait_for_bolt( uri: str, auth: tuple[str, str] | None = None, database: str = NEO4J_TARGET_DB, timeout_sec: int = 60, ) -> None: from neo4j import GraphDatabase start = time.time() while True: try: driver = GraphDatabase.driver(uri, auth=auth) with driver.session(database=database) as session: session.run("RETURN 1").consume() driver.close() return except Exception: if time.time() - start > timeout_sec: raise RuntimeError(f"Timed out waiting for Neo4j at {uri}") time.sleep(1) def _neo4j_driver(): from neo4j import GraphDatabase if NEO4J_USER and NEO4J_PASSWORD: return GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) return GraphDatabase.driver(NEO4J_URI, auth=None) def build_patterns(epoch: int) -> list[str]: """Build the list of file patterns to download for a given epoch.""" epoch_str = str(epoch) parquet_patterns = [f"{stem}_epoch_{epoch_str}.parquet" for stem in PARQUET_TABLE_MAP.keys()] neo4j_pattern = NEO4J_FILENAME.format(epoch=epoch_str) return parquet_patterns + [neo4j_pattern] def download_epoch(epoch: int, dest_dir: Path, token: str | None) -> None: """Download epoch artifacts from Hugging Face.""" patterns = build_patterns(epoch) dest_dir.mkdir(parents=True, exist_ok=True) print(f"šŸ“„ Downloading epoch {epoch} from {REPO_ID}...") snapshot_download( repo_id=REPO_ID, repo_type=REPO_TYPE, local_dir=str(dest_dir), local_dir_use_symlinks=False, allow_patterns=patterns, resume_download=True, token=token, ) print("āœ… Download complete.") def ingest_parquet(client, table_name: str, parquet_path: Path, dry_run: bool = False) -> bool: """ Ingest a Parquet file into a ClickHouse table. Returns True on success. """ if dry_run: print(f" [DRY-RUN] insert {parquet_path.name} -> {table_name}") return True try: with parquet_path.open("rb") as fh: magic = fh.read(4) if magic != b"PAR1": print(f" āš ļø Skipping {parquet_path.name}: not a Parquet file.") return False # clickhouse-connect (HTTP) doesn't support FROM INFILE; prefer streaming inserts. # Using insert_file can still be memory-heavy for large Parquet on some setups. import subprocess insert_query = f"INSERT INTO {table_name} FORMAT Parquet SETTINGS {CLICKHOUSE_INSERT_SETTINGS}" infile_query = f"INSERT INTO {table_name} FROM INFILE '{parquet_path.resolve()}' FORMAT Parquet" try: cmd = [ "clickhouse-client", "--host", CH_HOST, "--port", str(CH_NATIVE_PORT), "--user", CH_USER, "--password", CH_PASSWORD, "--database", CH_DATABASE, "--query", infile_query, ] subprocess.run(cmd, check=True) return True except FileNotFoundError: raise RuntimeError( "clickhouse-client not found. Install clickhouse-client for native Parquet inserts." ) except Exception as e: print(f" āŒ Failed to ingest {parquet_path.name}: {e}") return False def init_clickhouse_schema(schema_path: Path, dry_run: bool = False) -> bool: if not schema_path.exists(): print(f" āŒ ClickHouse schema file not found: {schema_path}") return False if dry_run: print(f" [DRY-RUN] init schema from {schema_path}") return True import subprocess cmd = [ "clickhouse-client", "--host", CH_HOST, "--port", str(CH_NATIVE_PORT), "--user", CH_USER, "--password", CH_PASSWORD, "--database", CH_DATABASE, "--multiquery", ] try: with schema_path.open("rb") as fh: subprocess.run(cmd, stdin=fh, check=True) print("āœ… ClickHouse schema initialized.") return True except FileNotFoundError: print(" āŒ clickhouse-client not found. Install it to initialize the schema.") return False except subprocess.CalledProcessError as e: print(f" āŒ Failed to initialize schema: {e}") return False def run_etl( epoch: int, dest_dir: Path, client, dry_run: bool = False, token: str | None = None, skip_neo4j: bool = False, skip_clickhouse: bool = False, merge_neo4j: bool = False, ) -> None: """ Full ETL pipeline: 1. Use local Parquet files (no download) 2. Ingest into ClickHouse 3. Keep local files (no deletion) """ if not dest_dir.exists(): raise FileNotFoundError(f"Epoch directory not found: {dest_dir}") if not skip_clickhouse: # Step 2: Ingest each Parquet file print(f"\nšŸ“¤ Ingesting Parquet files into ClickHouse...") for stem, table_name in tqdm(PARQUET_TABLE_MAP.items(), desc="Ingesting"): parquet_path = dest_dir / f"{stem}_epoch_{epoch}.parquet" if not parquet_path.exists(): print(f" āš ļø Skipping {stem}: file not found.") continue ingest_parquet(client, table_name, parquet_path, dry_run=dry_run) print("\nāœ… ClickHouse ingestion complete.") else: print("\nā„¹ļø ClickHouse ingestion skipped.") # Step 3: Ingest Socials (if not skipping CH) if not skip_clickhouse: # dest_dir is .../epoch_X, so parent is .../pump_fun ingest_socials(client, dest_dir.parent, dry_run=dry_run) # Step 4: Neo4j dump neo4j_path = dest_dir / NEO4J_FILENAME.format(epoch=epoch) if neo4j_path.exists() and not skip_neo4j: # Auto-detect: if target DB is empty, use direct load instead of merge # This prevents the merge-on-empty-DB hang when the first epoch uses --merge-neo4j if merge_neo4j and not dry_run: try: _driver = _neo4j_driver() with _driver.session(database=NEO4J_TARGET_DB) as _session: _count = _session.run("MATCH (n) RETURN count(n) AS c").single()["c"] _driver.close() if _count == 0: print(f" [AUTO] Target DB is empty ({_count} nodes). Switching to direct load instead of merge.") merge_neo4j = False else: print(f" [AUTO] Target DB has {_count} nodes. Proceeding with merge.") except Exception as e: print(f" [WARN] Could not check target DB node count: {e}. Proceeding with merge flag as-is.") if merge_neo4j: ok = merge_neo4j_epoch_dump(epoch, neo4j_path, dry_run=dry_run) if not ok: print(" āŒ Neo4j merge failed. Aborting.") sys.exit(1) else: ok = ingest_neo4j_dump(neo4j_path, database=NEO4J_TARGET_DB, dry_run=dry_run) if not ok: print(" āŒ Neo4j dump load failed. Aborting.") sys.exit(1) elif neo4j_path.exists() and skip_neo4j: print(f"\nā„¹ļø Neo4j dump found but skipped: {neo4j_path}") print("\nšŸŽ‰ Full ETL pipeline complete.") def ingest_socials(client, root_dir: Path, dry_run: bool = False) -> None: """Ingest the static/off-chain wallet social files.""" social_dir = root_dir / "socials" if not social_dir.exists(): print(f"\nā„¹ļø Socials directory not found at {social_dir}. Skipping social ingestion.") return print(f"\nšŸ‘„ Ingesting Wallet Socials from {social_dir}...") for filename in SOCIAL_FILES: parquet_path = social_dir / filename if not parquet_path.exists(): print(f" āš ļø Skipping {filename}: file not found.") continue # Target table is always 'wallet_socials' for these files ingest_parquet(client, "wallet_socials", parquet_path, dry_run=dry_run) print("āœ… Wallet Socials ingestion complete.") def ingest_neo4j_dump(dump_path: Path, database: str = "neo4j", dry_run: bool = False) -> bool: """ Load a Neo4j dump file into the database. Requires neo4j-admin CLI and the Neo4j service to be stopped. Returns True on success. """ import subprocess if not dump_path.exists(): print(f" āš ļø Neo4j dump not found: {dump_path}") return False import shutil expected_dump_name = f"{database}.dump" load_dir = dump_path.parent temp_load_dir = None if dump_path.name != expected_dump_name: temp_load_dir = dump_path.parent / f"_neo4j_load_{database}" temp_load_dir.mkdir(parents=True, exist_ok=True) load_dump_path = temp_load_dir / expected_dump_name shutil.copy2(dump_path, load_dump_path) load_dir = temp_load_dir # neo4j-admin database load requires a directory containing .dump # For Neo4j 5.x: neo4j-admin database load --from-path= --overwrite-destination cmd = [ "neo4j-admin", "database", "load", f"--from-path={load_dir.resolve()}", "--overwrite-destination", database, ] if dry_run: print(f" [DRY-RUN] {' '.join(cmd)}") return True print(f"šŸ”„ Loading Neo4j dump into database '{database}'...") print(" āš ļø Neo4j will be stopped for offline load.") was_running = False owner = None try: if not dry_run: _ensure_neo4j_log_writable() was_running = _neo4j_is_running() if was_running: owner = _neo4j_process_owner() or "root" stop_result = _run_neo4j_cmd(["neo4j", "stop"], run_as=owner) if stop_result.returncode != 0: print(f" āŒ Failed to stop Neo4j: {stop_result.stderr.strip()}") return False _ensure_neo4j_data_writable() # Explicit chown to fix permission mismatch when Neo4j was started as root if os.geteuid() == 0: subprocess.run(["chown", "-R", "neo4j:neo4j", "/var/lib/neo4j/data"], check=False) if dry_run: print(f" [DRY-RUN] {' '.join(cmd)}") return True result = _run_neo4j_cmd(cmd) if result.returncode != 0: raise subprocess.CalledProcessError(result.returncode, cmd, output=result.stdout, stderr=result.stderr) print(" āœ… Neo4j dump loaded successfully.") return True except FileNotFoundError: print(" āŒ neo4j-admin not found. Install it to load the dump locally.") if temp_load_dir and not dry_run: shutil.rmtree(temp_load_dir, ignore_errors=True) return False except subprocess.CalledProcessError as e: print(f" āŒ Failed to load Neo4j dump: {e.stderr}") if temp_load_dir and not dry_run: shutil.rmtree(temp_load_dir, ignore_errors=True) return False finally: if not dry_run and was_running: owner = owner or "root" start_result = _run_neo4j_cmd(["neo4j", "start"], run_as=owner) if start_result.returncode != 0: print(f" āš ļø Failed to start Neo4j: {start_result.stderr.strip()}") def _run_merge_batch(tx, query: str, rows: list[dict]) -> None: tx.run(query, rows=rows) def _ensure_indexes(session) -> None: """Create indexes for efficient MERGE operations.""" indexes = [ "CREATE INDEX wallet_address IF NOT EXISTS FOR (w:Wallet) ON (w.address)", "CREATE INDEX token_address IF NOT EXISTS FOR (t:Token) ON (t.address)", ] for idx_query in indexes: try: session.run(idx_query) except Exception: pass try: session.run("CALL db.awaitIndexes(300)") except Exception: pass def _bulk_merge_apoc( temp_session, target_session, match_query: str, merge_query: str, label: str, total: int | None = None, ) -> None: """ Fast bulk merge using APOC periodic.iterate if available. Falls back to optimized batching if APOC not installed. """ has_apoc = False try: result = target_session.run("RETURN apoc.version() AS version") result.consume() has_apoc = True except Exception: pass if has_apoc: _bulk_merge_with_apoc(temp_session, target_session, match_query, merge_query, label, total) else: _bulk_merge_optimized(temp_session, target_session, match_query, merge_query, label, total) def _bulk_merge_with_apoc( temp_session, target_session, match_query: str, merge_query: str, label: str, total: int | None = None, ) -> None: """Use APOC periodic.iterate for maximum speed.""" from neo4j.exceptions import DatabaseUnavailable print(f" šŸš€ {label}: using APOC bulk merge (total: {total or 'unknown'})") all_rows = [] retries = 0 while True: try: result = temp_session.run(match_query) all_rows = [record.data() for record in result] break except DatabaseUnavailable: retries += 1 if retries > NEO4J_MERGE_RETRIES: raise print(f" āš ļø {label}: database unavailable, retry {retries}/{NEO4J_MERGE_RETRIES}") time.sleep(NEO4J_MERGE_RETRY_SLEEP) if not all_rows: print(f" āœ… {label}: 0 (no data)") return apoc_query = f""" CALL apoc.periodic.iterate( 'UNWIND $rows AS t RETURN t', '{merge_query.replace("UNWIND $rows AS t ", "")}', {{batchSize: {NEO4J_MERGE_BATCH_SIZE}, parallel: {str(NEO4J_MERGE_PARALLEL).lower()}, params: {{rows: $rows}}}} ) YIELD batches, total, errorMessages RETURN batches, total, errorMessages """ try: result = target_session.run(apoc_query, rows=all_rows) record = result.single() if record: batches = record["batches"] processed = record["total"] errors = record["errorMessages"] if errors: print(f" āš ļø {label}: {processed} merged with errors: {errors[:200]}") else: print(f" āœ… {label}: {processed} merged in {batches} batches") else: print(f" āœ… {label}: {len(all_rows)} merged") except Exception as e: print(f" āš ļø APOC failed for {label}, falling back to optimized batch: {e}") _bulk_merge_optimized(temp_session, target_session, match_query, merge_query, label, total) def _bulk_merge_optimized( temp_session, target_session, match_query: str, merge_query: str, label: str, total: int | None = None, ) -> None: """Optimized batching without SKIP pagination (O(n) instead of O(n²)).""" from neo4j.exceptions import DatabaseUnavailable print(f" šŸ“¦ {label}: loading all data into memory for fast merge...") all_rows = [] retries = 0 while True: try: result = temp_session.run(match_query) all_rows = [record.data() for record in result] break except DatabaseUnavailable: retries += 1 if retries > NEO4J_MERGE_RETRIES: raise print(f" āš ļø {label}: database unavailable, retry {retries}/{NEO4J_MERGE_RETRIES}") time.sleep(NEO4J_MERGE_RETRY_SLEEP) total_rows = len(all_rows) if total_rows == 0: print(f" āœ… {label}: 0 (no data)") return print(f" šŸ”„ {label}: merging {total_rows} records...") processed = 0 batches = 0 for i in range(0, total_rows, NEO4J_MERGE_BATCH_SIZE): batch = all_rows[i:i + NEO4J_MERGE_BATCH_SIZE] retries = 0 while True: try: target_session.execute_write(_run_merge_batch, merge_query, batch) break except DatabaseUnavailable: retries += 1 if retries > NEO4J_MERGE_RETRIES: raise print(f" āš ļø {label}: database unavailable, retry {retries}/{NEO4J_MERGE_RETRIES}") time.sleep(NEO4J_MERGE_RETRY_SLEEP) processed += len(batch) batches += 1 if batches % NEO4J_MERGE_LOG_EVERY == 0: pct = (processed / total_rows) * 100 print(f" šŸ”„ {label}: {processed}/{total_rows} ({pct:.1f}%)") print(f" āœ… {label}: {processed}/{total_rows} (100%)") def _stream_merge( temp_session, target_session, match_query: str, merge_query: str, label: str, total: int | None = None, ) -> None: """Wrapper that uses optimized bulk merge.""" _bulk_merge_apoc(temp_session, target_session, match_query, merge_query, label, total) def merge_neo4j_epoch_dump(epoch: int, dump_path: Path, dry_run: bool = False) -> bool: """ Merge relationships from an epoch dump into the target DB by keeping the oldest timestamp. Relationship uniqueness is enforced by (start, end, type) only. """ import shutil import subprocess if not dump_path.exists(): print(f" āš ļø Neo4j dump not found: {dump_path}") return temp_db = "neo4j" expected_dump_name = f"{temp_db}.dump" load_dir = dump_path.parent temp_load_dir = None if dump_path.name != expected_dump_name: temp_load_dir = dump_path.parent / f"_neo4j_load_{NEO4J_TEMP_DB_PREFIX}-{epoch}" temp_load_dir.mkdir(parents=True, exist_ok=True) load_dump_path = temp_load_dir / expected_dump_name shutil.copy2(dump_path, load_dump_path) load_dir = temp_load_dir print(f"\n🧩 Merging Neo4j dump into '{NEO4J_TARGET_DB}' via temp instance...") temp_root = Path(NEO4J_MERGE_TEMP_ROOT) / f"{NEO4J_TEMP_DB_PREFIX}-{epoch}" temp_conf_dir = temp_root / "conf" temp_data_dir = temp_root / "data" temp_logs_dir = temp_root / "logs" temp_run_dir = temp_root / "run" temp_import_dir = temp_root / "import" if dry_run: print(f" [DRY-RUN] setup temp instance at {temp_root}") return driver = None temp_driver = None try: if temp_root.exists(): shutil.rmtree(temp_root, ignore_errors=True) temp_conf_dir.mkdir(parents=True, exist_ok=True) temp_data_dir.mkdir(parents=True, exist_ok=True) temp_logs_dir.mkdir(parents=True, exist_ok=True) temp_run_dir.mkdir(parents=True, exist_ok=True) temp_import_dir.mkdir(parents=True, exist_ok=True) base_conf = Path(os.getenv("NEO4J_CONF", "/etc/neo4j/neo4j.conf")) if not base_conf.exists(): print(f" āŒ Neo4j config not found: {base_conf}") return bolt_port = _find_free_port(NEO4J_MERGE_BOLT_PORT) http_port = _find_free_port(NEO4J_MERGE_HTTP_PORT) overrides = { "server.directories.data": str(temp_data_dir), "server.directories.logs": str(temp_logs_dir), "server.directories.run": str(temp_run_dir), "server.directories.import": str(temp_import_dir), "server.bolt.listen_address": f"127.0.0.1:{bolt_port}", "server.bolt.advertised_address": f"127.0.0.1:{bolt_port}", "server.http.listen_address": f"127.0.0.1:{http_port}", "server.http.advertised_address": f"127.0.0.1:{http_port}", "server.https.enabled": "false", "dbms.security.auth_enabled": "false", } if NEO4J_MERGE_HEAP_INITIAL: overrides["server.memory.heap.initial_size"] = NEO4J_MERGE_HEAP_INITIAL if NEO4J_MERGE_HEAP_MAX: overrides["server.memory.heap.max_size"] = NEO4J_MERGE_HEAP_MAX if NEO4J_MERGE_PAGECACHE: overrides["server.memory.pagecache.size"] = NEO4J_MERGE_PAGECACHE conf_lines = [] for line in base_conf.read_text().splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#") or "=" not in stripped: conf_lines.append(line) continue key, _ = stripped.split("=", 1) if key in overrides: continue conf_lines.append(line) conf_lines.append("") conf_lines.append("# temp merge overrides") for key, value in overrides.items(): conf_lines.append(f"{key}={value}") conf_text = "\n".join(conf_lines) + "\n" (temp_conf_dir / "neo4j.conf").write_text(conf_text) if os.geteuid() == 0: import subprocess try: subprocess.run(["chown", "-R", "neo4j:adm", str(temp_root)], check=True) # Also fix main Neo4j data dir permissions (lock file may be owned by root) subprocess.run(["chown", "-R", "neo4j:neo4j", "/var/lib/neo4j/data"], check=False) except Exception: pass temp_env = { "NEO4J_CONF": str(temp_conf_dir), "NEO4J_HOME": os.getenv("NEO4J_HOME", "/usr/share/neo4j"), } load_cmd = [ "neo4j-admin", "database", "load", f"--from-path={load_dir.resolve()}", "--overwrite-destination", temp_db, ] _run_neo4j_cmd(["neo4j", "stop"], run_as="neo4j", env=temp_env) load_result = _run_neo4j_cmd(load_cmd, run_as="neo4j", env=temp_env) if load_result.returncode != 0: raise subprocess.CalledProcessError(load_result.returncode, load_cmd, output=load_result.stdout, stderr=load_result.stderr) start_result = _run_neo4j_cmd(["neo4j", "start"], run_as="neo4j", env=temp_env) if start_result.returncode != 0: print(f" āŒ Failed to start temp Neo4j: {start_result.stderr.strip()}") return temp_bolt_uri = f"bolt://127.0.0.1:{bolt_port}" _wait_for_bolt(temp_bolt_uri, auth=None, database="neo4j") if not _neo4j_is_running(): start_result = _run_neo4j_cmd(["neo4j", "start"], run_as="root") if start_result.returncode != 0: start_result = _run_neo4j_cmd(["neo4j", "start"], run_as="neo4j") if start_result.returncode != 0: print(f" āŒ Failed to start Neo4j: {start_result.stderr.strip()}") return _wait_for_bolt( NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD) if NEO4J_USER and NEO4J_PASSWORD else None, ) driver = _neo4j_driver() from neo4j import GraphDatabase temp_driver = GraphDatabase.driver(temp_bolt_uri, auth=None) wallet_wallet_types = [ "BUNDLE_TRADE", "TRANSFERRED_TO", "COORDINATED_ACTIVITY", "COPIED_TRADE", ] wallet_token_types = [ "MINTED", "SNIPED", "LOCKED_SUPPLY", "BURNED", "PROVIDED_LIQUIDITY", "TOP_TRADER_OF", "WHALE_OF", ] with temp_driver.session(database="neo4j") as temp_session, driver.session(database=NEO4J_TARGET_DB) as target_session: print(" šŸ“‡ Ensuring indexes exist on target database...") _ensure_indexes(target_session) def _count(query: str) -> int: return temp_session.run(query).single().value() wallet_count = _count("MATCH (w:Wallet) RETURN count(w)") _stream_merge( temp_session, target_session, "MATCH (w:Wallet) RETURN w.address AS address", "UNWIND $rows AS t MERGE (w:Wallet {address: t.address})", "wallets", total=wallet_count, ) token_count = _count("MATCH (t:Token) RETURN count(t)") _stream_merge( temp_session, target_session, "MATCH (t:Token) RETURN t.address AS address, " "CASE WHEN 'created_ts' IN keys(t) THEN t.created_ts ELSE null END AS created_ts", "UNWIND $rows AS t MERGE (k:Token {address: t.address}) " "ON CREATE SET k.created_ts = t.created_ts " "ON MATCH SET k.created_ts = CASE WHEN k.created_ts IS NULL OR " "t.created_ts < k.created_ts THEN t.created_ts ELSE k.created_ts END", "tokens", total=token_count, ) def process_wallet_wallet_rel(rel_type: str) -> None: rel_total = _count( f"MATCH (a:Wallet)-[r:{rel_type}]->(b:Wallet) " "WHERE a.address IS NOT NULL AND b.address IS NOT NULL " "WITH a.address AS source, b.address AS target " "RETURN count(DISTINCT [source, target])" ) match_query = ( f"MATCH (a:Wallet)-[r:{rel_type}]->(b:Wallet) " "WHERE a.address IS NOT NULL AND b.address IS NOT NULL " "WITH a.address AS source, b.address AS target, " "min(coalesce(r.timestamp, 0)) AS timestamp " "RETURN DISTINCT source, target, timestamp" ) merge_query = ( "UNWIND $rows AS t " "MERGE (a:Wallet {address: t.source}) " "MERGE (b:Wallet {address: t.target}) " f"MERGE (a)-[r:{rel_type}]->(b) " "ON CREATE SET r.timestamp = t.timestamp " "ON MATCH SET r.timestamp = CASE WHEN t.timestamp < r.timestamp THEN t.timestamp ELSE r.timestamp END" ) _stream_merge( temp_session, target_session, match_query, merge_query, rel_type.lower(), total=rel_total, ) def process_wallet_token_rel(rel_type: str) -> None: rel_total = _count( f"MATCH (w:Wallet)-[r:{rel_type}]->(t:Token) " "WHERE w.address IS NOT NULL AND t.address IS NOT NULL " "WITH w.address AS source, t.address AS target " "RETURN count(DISTINCT [source, target])" ) match_query = ( f"MATCH (w:Wallet)-[r:{rel_type}]->(t:Token) " "WHERE w.address IS NOT NULL AND t.address IS NOT NULL " "WITH w.address AS source, t.address AS target, " "min(coalesce(r.timestamp, 0)) AS timestamp " "RETURN DISTINCT source, target, timestamp" ) merge_query = ( "UNWIND $rows AS t " "MERGE (w:Wallet {address: t.source}) " "MERGE (k:Token {address: t.target}) " f"MERGE (w)-[r:{rel_type}]->(k) " "ON CREATE SET r.timestamp = t.timestamp " "ON MATCH SET r.timestamp = CASE WHEN t.timestamp < r.timestamp THEN t.timestamp ELSE r.timestamp END" ) _stream_merge( temp_session, target_session, match_query, merge_query, rel_type.lower(), total=rel_total, ) print(f"\n šŸ“Š Processing {len(wallet_wallet_types)} wallet-wallet relationship types...") for rel_type in wallet_wallet_types: process_wallet_wallet_rel(rel_type) print(f"\n šŸ“Š Processing {len(wallet_token_types)} wallet-token relationship types...") for rel_type in wallet_token_types: process_wallet_token_rel(rel_type) print(" āœ… Merge complete.") return True except subprocess.CalledProcessError as e: print(f" āŒ Failed to merge Neo4j dump: {e.stderr}") return False except Exception as e: print(f" āŒ Failed to merge Neo4j dump: {e}") return False finally: if temp_driver: temp_driver.close() if driver: driver.close() if not dry_run: temp_env = { "NEO4J_CONF": str(temp_conf_dir), "NEO4J_HOME": os.getenv("NEO4J_HOME", "/usr/share/neo4j"), } _run_neo4j_cmd(["neo4j", "stop"], run_as="neo4j", env=temp_env) if temp_load_dir and not dry_run: shutil.rmtree(temp_load_dir, ignore_errors=True) if temp_root.exists() and not dry_run: shutil.rmtree(temp_root, ignore_errors=True) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="ETL: Download, Ingest, Delete epoch Parquet files.") parser.add_argument("--epoch", type=int, required=True, help="Epoch number to process (e.g., 851)") parser.add_argument("-c", "--skip-clickhouse", action="store_true", help="Skip ClickHouse ingestion") parser.add_argument("-m", "--merge-neo4j", action="store_true", help="Merge Neo4j dump into existing graph") parser.add_argument("--dry-run", action="store_true", help="Print queries without executing") parser.add_argument("-n", "--skip-neo4j", action="store_true", help="Skip Neo4j dump loading") parser.add_argument("--token", type=str, default=None, help="Hugging Face token (or set HF_TOKEN env var)") return parser.parse_args() def main() -> None: args = parse_args() token = args.token or os.environ.get("HF_TOKEN") dest_dir = Path(DEFAULT_DEST_DIR).expanduser() / f"epoch_{args.epoch}" # Connect to ClickHouse print(f"šŸ”Œ Connecting to ClickHouse at {CH_HOST}:{CH_HTTP_PORT}...") try: client = clickhouse_connect.get_client( host=CH_HOST, port=CH_HTTP_PORT, username=CH_USER, password=CH_PASSWORD, database=CH_DATABASE, ) except Exception as e: print(f"āŒ Failed to connect to ClickHouse: {e}") sys.exit(1) # Always ensure schemas exist (CREATE TABLE IF NOT EXISTS is idempotent) if not args.skip_clickhouse: print("šŸ“‹ Ensuring ClickHouse schemas exist...") for schema_file in ["./onchain.sql", "./offchain.sql"]: schema_path = Path(schema_file).expanduser() if schema_path.exists(): init_clickhouse_schema(schema_path, dry_run=args.dry_run) run_etl( epoch=args.epoch, dest_dir=dest_dir, client=client, dry_run=args.dry_run, token=token, skip_neo4j=args.skip_neo4j, skip_clickhouse=args.skip_clickhouse, merge_neo4j=args.merge_neo4j, ) if __name__ == "__main__": main()