oracle / scripts /analyze_data_distribution.py
zirobtc's picture
Upload folder using huggingface_hub
9dd732c
import os
import sys
import datetime
import math
from collections import defaultdict
import statistics
# Add parent directory to path to import modules
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from data.data_fetcher import DataFetcher
from clickhouse_driver import Client as ClickHouseClient
from neo4j import GraphDatabase
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# --- Configuration ---
CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST", "localhost")
CLICKHOUSE_PORT = int(os.getenv("CLICKHOUSE_PORT", 9000))
CLICKHOUSE_USER = os.getenv("CLICKHOUSE_USER") or "default"
CLICKHOUSE_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD") or ""
CLICKHOUSE_DATABASE = os.getenv("CLICKHOUSE_DATABASE", "default")
NEO4J_URI = os.getenv("NEO4J_URI", "bolt://localhost:7687")
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD", "password")
def get_percentile(data, p):
if not data:
return 0.0
data.sort()
k = (len(data) - 1) * p
f = math.floor(k)
c = math.ceil(k)
if f == c:
return data[int(k)]
d0 = data[int(f)]
d1 = data[int(c)]
return d0 + (d1 - d0) * (k - f)
def main():
print("INFO: Connecting to Databases...")
try:
clickhouse_client = ClickHouseClient(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT, user=CLICKHOUSE_USER, password=CLICKHOUSE_PASSWORD, database=CLICKHOUSE_DATABASE)
# We don't strictly need Neo4j for these aggregate stats, but initializing DataFetcher might require it
# Actually we can just run raw SQL queries on Clickhouse as that's where the metrics are.
except Exception as e:
print(f"ERROR: Failed to connect to Clickhouse: {e}")
sys.exit(1)
print("INFO: Fetching Aggregate Statistics from ClickHouse...")
print(" This may take a moment depending on dataset size...")
# 1. Migration Statistics
print("INFO: Counting Total Tokens (Mints table)...")
res_total = clickhouse_client.execute("SELECT count() FROM mints")
total_tokens = res_total[0][0]
print("INFO: Counting Migrated Tokens (Migrations table)...")
res_migrated = clickhouse_client.execute("SELECT count() FROM migrations")
migrated_count = res_migrated[0][0]
print(f"\n--- General Population ---")
print(f"Total Tokens: {total_tokens}")
if total_tokens > 0:
print(f"Migrated: {migrated_count} ({migrated_count/total_tokens*100:.2f}%)")
print(f"Not Migrated: {total_tokens - migrated_count} ({(total_tokens - migrated_count)/total_tokens*100:.2f}%)")
else:
print("Migrated: 0 (0.00%)")
# 2. Volume & Market Cap Distribution (Peak)
# We'll fetch the ATH stats for all tokens to build histograms
print("\nINFO: Fetching metrics per token...")
query_metrics = """
SELECT
t.token_address,
max(tm.ath_price_usd) as peak_price,
max(tm.ath_price_usd * t.total_supply / pow(10, t.decimals)) as peak_mc_usd
FROM token_metrics tm
JOIN tokens t ON tm.token_address = t.token_address
GROUP BY t.token_address, t.total_supply, t.decimals
"""
# Note: If token_metrics is huge, we might want to sample or do percentiles in SQL.
# For now, let's try SQL percentiles first to be efficient.
query_percentiles = """
SELECT
quantiles(0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99)(ath_price_usd * total_supply / pow(10, decimals)) as mc_quantiles
FROM (
SELECT
tm.token_address,
avg(tm.ath_price_usd) as ath_price_usd,
any(t.total_supply) as total_supply,
any(t.decimals) as decimals
FROM token_metrics tm
JOIN tokens t ON tm.token_address = t.token_address
GROUP BY tm.token_address
)
"""
# Simplified query if the join is expensive or complex, but let's assume we can get Peak MC.
# Actually, simpler proxy: Let's look at `trades` to see volume per token.
print("INFO: Calculates Volume & ATH Distribution from token_metrics_latest...")
query_metrics = """
SELECT
total_volume_usd,
ath_price_usd
FROM token_metrics_latest
WHERE total_volume_usd > 0
"""
rows = clickhouse_client.execute(query_metrics)
volumes = []
ath_prices = []
for r in rows:
volumes.append(float(r[0]))
ath_prices.append(float(r[1]))
if not volumes:
print("WARN: No metric data found in token_metrics_latest. Trying token_metrics...")
# Fallback to aggregation on token_metrics if latest is empty
query_fallback = """
SELECT
argMax(total_volume_usd, updated_at),
argMax(ath_price_usd, updated_at)
FROM token_metrics
GROUP BY token_address
"""
rows = clickhouse_client.execute(query_fallback)
volumes = []
ath_prices = []
for r in rows:
volumes.append(float(r[0]))
ath_prices.append(float(r[1]))
if not volumes:
print("WARN: No metric data found. Exiting.")
return
volumes.sort()
ath_prices.sort()
n = len(volumes)
print("\n--- Volume USD Distribution (Per Token) ---")
print(f"Min: ${volumes[0]:.2f}")
print(f"10th %ile: ${get_percentile(volumes, 0.1):.2f}")
print(f"25th %ile: ${get_percentile(volumes, 0.25):.2f}")
print(f"50th %ile: ${get_percentile(volumes, 0.5):.2f} (Median)")
print(f"75th %ile: ${get_percentile(volumes, 0.75):.2f}")
print(f"90th %ile: ${get_percentile(volumes, 0.9):.2f}")
print(f"95th %ile: ${get_percentile(volumes, 0.95):.2f}")
print(f"99th %ile: ${get_percentile(volumes, 0.99):.2f}")
print(f"Max: ${volumes[-1]:.2f}")
# --- 3. Fees Distribution (Priority + Bribe) ---
print("\nINFO: Calculating Aggregated Fees per Token (Priority + Bribe)...")
query_fees = """
SELECT
base_address,
sum(priority_fee) + sum(bribe_fee) as total_fees_sol
FROM trades
GROUP BY base_address
HAVING total_fees_sol > 0
"""
rows_fees = clickhouse_client.execute(query_fees)
fees = []
for r in rows_fees:
fees.append(float(r[1]))
if fees:
fees.sort()
print(f"\n--- Total Fees Spent (SOL) Distribution (Per Token) ---")
print(f"Min: {fees[0]:.4f} SOL")
print(f"50th %ile: {get_percentile(fees, 0.5):.4f} SOL")
print(f"75th %ile: {get_percentile(fees, 0.75):.4f} SOL")
print(f"90th %ile: {get_percentile(fees, 0.9):.4f} SOL")
print(f"95th %ile: {get_percentile(fees, 0.95):.4f} SOL")
print(f"Max: {fees[-1]:.4f} SOL")
count_low_fees = sum(1 for f in fees if f < 0.1)
count_mid_fees = sum(1 for f in fees if f >= 0.1 and f < 1.0)
count_high_fees = sum(1 for f in fees if f >= 1.0)
print(f"\n--- Fee Thresholds Analysis ---")
print(f"Tokens < 0.1 SOL Fees: {count_low_fees} ({count_low_fees/len(fees)*100:.1f}%)")
print(f"Tokens 0.1 - 1.0 SOL: {count_mid_fees} ({count_mid_fees/len(fees)*100:.1f}%)")
print(f"Tokens > 1.0 SOL Fees: {count_high_fees} ({count_high_fees/len(fees)*100:.1f}%)")
else:
print("WARN: No fee data found.")
print(f"\n--- Potential Thresholds Analysis ---")
count_under_1k = sum(1 for v in volumes if v < 1000)
count_over_20k = sum(1 for v in volumes if v > 20000)
count_over_500k = sum(1 for v in volumes if v > 500000)
print(f"Tokens < $1k Vol ('Instant Garbage'?): {count_under_1k} ({count_under_1k/n*100:.1f}%)")
print(f"Tokens > $20k Vol ('Contenders'?): {count_over_20k} ({count_over_20k/n*100:.1f}%)")
print(f"Tokens > $500k Vol ('Alpha'?): {count_over_500k} ({count_over_500k/n*100:.1f}%)")
if __name__ == "__main__":
main()