oracle / scripts /analyze_distribution.py
zirobtc's picture
Upload folder using huggingface_hub
3780496
import os
import sys
import datetime
import numpy as np
import math
from clickhouse_driver import Client as ClickHouseClient
# Add parent to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from models.vocabulary import RETURN_THRESHOLDS, MANIPULATED_CLASS_ID
CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST", "localhost")
CLICKHOUSE_PORT = int(os.getenv("CLICKHOUSE_PORT", 9000))
CLICKHOUSE_USER = os.getenv("CLICKHOUSE_USER", "default")
CLICKHOUSE_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD", "")
CLICKHOUSE_DATABASE = os.getenv("CLICKHOUSE_DATABASE", "default")
LAUNCH_PRICE_USD = 0.000004
EPS = 1e-9
def get_client():
return ClickHouseClient(
host=CLICKHOUSE_HOST,
port=CLICKHOUSE_PORT,
user=CLICKHOUSE_USER,
password=CLICKHOUSE_PASSWORD,
database=CLICKHOUSE_DATABASE
)
def compute_p99_clamps(client):
"""
Computes P99 percentile clamp values from ClickHouse for fields prone to
garbage outliers. These values replace hardcoded clamps in data_loader.py.
Returns a dict of {field_name: p99_value}.
"""
print(" -> Computing P99 clamp values from trades table...")
trade_query = """
SELECT
quantile(0.99)(abs(slippage)) AS p99_slippage,
quantile(0.99)(total_usd) AS p99_total_usd
FROM trades
WHERE success = 1
"""
trade_row = client.execute(trade_query)
print(" -> Computing P99 clamp values from wallet_holdings table...")
holdings_query = """
SELECT
quantile(0.99)(history_bought_cost_sol) AS p99_bought_cost_sol,
quantile(0.99)(abs(realized_profit_sol)) AS p99_realized_profit_sol
FROM wallet_holdings
"""
holdings_row = client.execute(holdings_query)
clamps = {
# Defaults as fallback if queries return nothing
'slippage': 1.0,
'total_usd': 100000.0,
'history_bought_cost_sol': 30.0,
'realized_profit_sol': 150.0,
}
if trade_row and trade_row[0]:
r = trade_row[0]
clamps['slippage'] = max(float(r[0]), 0.01)
clamps['total_usd'] = max(float(r[1]), 1.0)
if holdings_row and holdings_row[0]:
r = holdings_row[0]
clamps['history_bought_cost_sol'] = max(float(r[0]), 0.01)
clamps['realized_profit_sol'] = max(float(r[1]), 0.01)
print(f" -> P99 Clamps: {clamps}")
return clamps
def fetch_all_metrics(client):
"""
Fetches all needed metrics for all tokens in a single query.
Base Table: MINTS (to ensure we cover all ~50k tokens).
Definitions:
- Snipers: Peak Balance Sum of top 70 buyers
- Bundles: Base Amount Sum of trades in multi-buy slots
- Dev Hold: Max Peak Balance of Creator
"""
print(" -> Fetching all token metrics (Unified Query)...")
query = f"""
WITH
-- 1. Aggregated trade stats (Fees, Volume, ATH Time)
trade_agg AS (
SELECT
base_address,
sum(priority_fee + coin_creator_fee) AS fees_sol,
sum(total_usd) AS volume_usd,
count() AS n_trades,
argMax(timestamp, price_usd) AS t_ath,
min(timestamp) AS t0
FROM trades
GROUP BY base_address
),
-- 2. Token Metadata from MINTS (Base Source of Truth)
token_meta AS (
SELECT
mint_address AS token_address,
argMax(creator_address, timestamp) AS creator_address,
argMax(total_supply, timestamp) AS total_supply,
argMax(token_decimals, timestamp) AS decimals
FROM mints
GROUP BY mint_address
),
-- 3. Returns & Holders (from Token Metrics or manual calc)
metrics AS (
SELECT
token_address,
argMax(ath_price_usd, updated_at) as ath_price_usd,
argMax(unique_holders, updated_at) as unique_holders
FROM token_metrics
GROUP BY token_address
),
-- 4. WALLET PEAKS (normalized balance likely)
wallet_peaks AS (
SELECT
mint_address,
wallet_address,
max(current_balance) AS peak_balance
FROM wallet_holdings
GROUP BY mint_address, wallet_address
),
-- 5. SNIPERS: Identify sniper addresses (rank <= 70)
snipers_list AS (
SELECT
base_address,
maker
FROM (
SELECT
base_address,
maker,
dense_rank() OVER (PARTITION BY base_address ORDER BY min_slot, min_idx) AS buyer_rank
FROM (
SELECT
base_address,
maker,
min(slot) AS min_slot,
min(transaction_index) AS min_idx
FROM trades
WHERE trade_type = 0 -- buy
GROUP BY base_address, maker
)
)
WHERE buyer_rank <= 70
),
snipers_agg AS (
SELECT
s.base_address AS token_address,
sum(wp.peak_balance) AS snipers_total_peak
FROM snipers_list s
JOIN wallet_peaks wp ON s.base_address = wp.mint_address AND s.maker = wp.wallet_address
GROUP BY s.base_address
),
-- 6. BUNDLED: Sum the base_amount of ALL trades that happened in a slot with multiple buys
bundled_agg AS (
SELECT
t.base_address AS token_address,
sum(t.base_amount) AS bundled_total_peak
FROM trades t
WHERE (t.base_address, t.slot) IN (
SELECT base_address, slot
FROM trades
WHERE trade_type = 0 -- buy
GROUP BY base_address, slot
HAVING count() > 1
)
AND t.trade_type = 0 -- buy
GROUP BY t.base_address
),
-- 7. DEV HOLD: Creator's Peak Balance
dev_hold_agg AS (
SELECT
t.token_address,
max(wp.peak_balance) AS dev_peak
FROM token_meta t
JOIN wallet_peaks wp ON t.token_address = wp.mint_address AND t.creator_address = wp.wallet_address
GROUP BY t.token_address
)
SELECT
t.token_address,
(COALESCE(m.ath_price_usd, ta.t_ath, 0) / {LAUNCH_PRICE_USD}) AS ret,
COALESCE(ta.fees_sol, 0) AS fees_sol,
COALESCE(ta.volume_usd, 0) AS volume_usd,
COALESCE(m.unique_holders, 0) AS unique_holders,
(ta.t_ath - ta.t0) AS time_to_ath_sec,
COALESCE(s.snipers_total_peak, 0) AS snipers_val,
COALESCE(b.bundled_total_peak, 0) AS bundled_val,
COALESCE(d.dev_peak, 0) AS dev_val,
t.total_supply AS total_supply,
t.decimals AS decimals
FROM token_meta t
LEFT JOIN trade_agg ta ON t.token_address = ta.base_address
LEFT JOIN metrics m ON t.token_address = m.token_address
LEFT JOIN snipers_agg s ON t.token_address = s.token_address
LEFT JOIN bundled_agg b ON t.token_address = b.token_address
LEFT JOIN dev_hold_agg d ON t.token_address = d.token_address
"""
rows = client.execute(query)
# Convert to list of dicts
cols = [
"token_address", "ret", "fees_sol", "volume_usd", "unique_holders", "time_to_ath_sec",
"snipers_val", "bundled_val", "dev_val", "total_supply", "decimals"
]
results = []
print(f" -> Fetched {len(rows)} tokens.")
for r in rows:
d = dict(zip(cols, r))
supply = d["total_supply"]
decimals = d["decimals"]
try:
adj_supply = supply / (10 ** decimals) if (supply and decimals is not None) else supply
except:
adj_supply = supply
if adj_supply and adj_supply > 0:
d["snipers_pct"] = (d["snipers_val"] / adj_supply) * 100
d["dev_hold_pct"] = (d["dev_val"] / adj_supply) * 100
else:
d["snipers_pct"] = 0.0
d["dev_hold_pct"] = 0.0
if supply and supply > 0:
d["bundled_pct"] = (d["bundled_val"] / supply) * 100
else:
d["bundled_pct"] = 0.0
results.append(d)
return results
def _classify_tokens(data):
"""
Internal logic: returns (buckets_dict, thresholds_dict, count_manipulated)
buckets_dict: {class_id: [list of tokens]}
"""
# 1. Initial Classification
temp_buckets = {i: [] for i in range(len(RETURN_THRESHOLDS))}
for d in data:
ret = d["ret"]
if ret > 10000: continue
cid = 0
found = False
for i in range(len(RETURN_THRESHOLDS) - 1):
lower = RETURN_THRESHOLDS[i]
upper = RETURN_THRESHOLDS[i+1]
if ret >= lower and ret < upper:
cid = i
found = True
break
if found:
d["class_id_initial"] = cid
temp_buckets[cid].append(d)
else:
if ret >= 10000: continue
d["class_id_initial"] = 0
temp_buckets[0].append(d)
# 2. Calculate Thresholds (50% of Median)
print("\n -> Calculating Class Medians & Thresholds (Dynamic Outlier Detection)...")
thresholds = {}
for i in range(1, len(RETURN_THRESHOLDS)-1):
items = temp_buckets.get(i, [])
if len(items) > 5:
fees = [x["fees_sol"] for x in items]
vols = [x["volume_usd"] for x in items]
holders = [x["unique_holders"] for x in items]
med_fees = np.median(fees)
med_vol = np.median(vols)
med_holders = np.median(holders)
thresholds[i] = {
'fees': med_fees * 0.5,
'vol': med_vol * 0.5,
'holders': med_holders * 0.5
}
else:
thresholds[i] = {'fees': 0, 'vol': 0, 'holders': 0}
# 3. Reclassification
final_buckets = {i: [] for i in range(len(RETURN_THRESHOLDS))}
final_buckets[MANIPULATED_CLASS_ID] = []
count_manipulated = 0
for cid, items in temp_buckets.items():
for d in items:
final_cid = cid
if cid > 0 and cid in thresholds:
t = thresholds[cid]
if (d["fees_sol"] < t['fees']) or (d["volume_usd"] < t['vol']) or (d["unique_holders"] < t['holders']):
final_cid = MANIPULATED_CLASS_ID
count_manipulated += 1
d["class_id_final"] = final_cid
if final_cid not in final_buckets:
final_buckets[final_cid] = []
final_buckets[final_cid].append(d)
return final_buckets, thresholds, count_manipulated
def get_return_class_map(client):
"""
Returns (map {token_addr: class_id}, thresholds)
Used by cache_dataset.py
"""
data = fetch_all_metrics(client)
buckets, thresholds, _ = _classify_tokens(data)
# Flatten buckets to map
ret_map = {}
for cid, items in buckets.items():
for d in items:
ret_map[d["token_address"]] = cid
return ret_map, thresholds
def print_stats(name, values):
"""
prints compact stats: mean, p50, p90, p99
"""
if not values:
print(f" {name}: No data")
return
vals = np.array(values)
mean = np.mean(vals)
p50 = np.percentile(vals, 50)
p90 = np.percentile(vals, 90)
p99 = np.percentile(vals, 99)
nonzero = np.count_nonzero(vals)
nonzero_rate = nonzero / len(vals)
print(f" {name}: mean={mean:.4f} p50={p50:.4f} p90={p90:.4f} p99={p99:.4f} nonzero_rate={nonzero_rate:.3f} (n={len(vals)})")
def fetch_wallet_pnl_stats(client):
print(" -> Fetching Wallet PnL Quantiles (7d, 30d) - Unique per wallet...")
# Use argMax to get latest entry per wallet (table is a time-series dump)
query = """
WITH unique_wallets AS (
SELECT
wallet_address,
argMax(stats_30d_realized_profit_pnl, updated_at) as pnl_30d,
argMax(stats_7d_realized_profit_pnl, updated_at) as pnl_7d
FROM wallet_profile_metrics
GROUP BY wallet_address
)
SELECT
count() as n,
countIf(pnl_30d > 0.001) as pos_30d,
quantiles(0.5, 0.9, 0.95, 0.99, 0.999)(pnl_30d) as q_30d,
max(pnl_30d) as max_30d,
countIf(pnl_7d > 0.001) as pos_7d,
quantiles(0.5, 0.9, 0.95, 0.99, 0.999)(pnl_7d) as q_7d,
max(pnl_7d) as max_7d
FROM unique_wallets
WHERE pnl_30d > -999 OR pnl_7d > -999
"""
rows = client.execute(query)
if not rows: return None
return rows[0]
def fetch_trade_stats(client):
print(" -> Fetching Trade Quantiles (USD & Supply %)...")
query = """
SELECT
count() as n,
quantiles(0.5, 0.9, 0.95, 0.99, 0.999)(t.total_usd) as q_usd,
quantiles(0.5, 0.9, 0.95, 0.99, 0.999)((t.base_amount / m.total_supply) * 100) as q_sup
FROM trades t
JOIN mints m ON t.base_address = m.mint_address
WHERE m.total_supply > 0
"""
rows = client.execute(query)
if not rows: return None
return rows[0]
def fetch_transfer_stats(client):
print(" -> Fetching Transfer Quantiles (Amount & Supply %)...")
# Assuming 1B supply (1,000,000,000) for all tokens as they are Pump.fun tokens
# Using 1e6 for decimals adjustment if needed, but 'amount_decimal' is usually raw/decimals
# If amount_decimal is actual token count:
query = """
SELECT
count() as n,
quantiles(0.5, 0.9, 0.95, 0.99, 0.999)(t.amount_decimal) as q_amt,
quantiles(0.5, 0.9, 0.95, 0.99, 0.999)((t.amount_decimal / 1000000000) * 100) as q_sup
FROM transfers t
"""
rows = client.execute(query)
if not rows: return None
return rows[0]
def fetch_kol_stats(client):
print(" -> Fetching KOL stats from wallet_socials...")
query = """
SELECT
uniq(wallet_address) as total_wallets,
uniqIf(wallet_address, kolscan_name != '' OR cabalspy_name != '' OR axiom_kol_name != '') as kols
FROM wallet_socials
"""
rows = client.execute(query)
print(f" (DEBUG) KOL query result: {rows}")
if rows:
return rows[0]
return (0, 0)
def print_quantiles(name, n, pos_rate, q, max_val=None):
# q is list [p50, p90, p95, p99, p999]
print(f"\n[{name}] (n={n})")
if pos_rate is not None:
print(f" Positive Rate: {pos_rate*100:.1f}%")
print(f" p50={q[0]:.4f}")
print(f" p90={q[1]:.4f}")
print(f" p95={q[2]:.4f}")
print(f" p99={q[3]:.4f}")
print(f" p99.9={q[4]:.4f}")
if max_val is not None:
print(f" Max={max_val:.4f}")
def analyze_thresholds(client):
print("\n=== THRESHOLD DISTRIBUTION ANALYSIS (DB-Side) ===")
# 1. PnL
pnl_row = fetch_wallet_pnl_stats(client)
if pnl_row:
n, pos_30d, q_30d, max_30d, pos_7d, q_7d, max_7d = pnl_row
print_quantiles("Wallet PnL (30d)", n, pos_30d/n if n>0 else 0, q_30d, max_30d)
print_quantiles("Wallet PnL (7d)", n, pos_7d/n if n>0 else 0, q_7d, max_7d)
# 2. Trades
trade_row = fetch_trade_stats(client)
if trade_row:
n, q_usd, q_sup = trade_row
print_quantiles("Trade USD Size", n, None, q_usd)
print_quantiles("Trade USD Size", n, None, q_usd)
print_quantiles("Trade Supply %", n, None, q_sup)
# 3. Transfers
transfer_row = fetch_transfer_stats(client)
if transfer_row:
n, q_amt, q_sup = transfer_row
print_quantiles("Transfer Amount", n, None, q_amt)
print_quantiles("Transfer Supply %", n, None, q_sup)
# 4. KOLs
total, kols = fetch_kol_stats(client)
if total > 0:
print("\n[KOL Statistics]")
print(f" Total Wallets with Socials: {total}")
print(f" Identified KOLs: {kols}")
print(f" KOL Ratio: {(kols/total)*100:.2f}%")
def analyze():
client = get_client()
# Run new analysis first
analyze_thresholds(client)
data = fetch_all_metrics(client)
final_buckets, thresholds, count_manipulated = _classify_tokens(data)
print(f" -> Reclassification Complete. Identified {count_manipulated} manipulated tokens.")
print("\n=== SEGMENTED DISTRIBUTION ANALYSIS ===")
# Print Thresholds debug
for k, t in thresholds.items():
if t['fees'] > 0:
print(f" [Class {k}] Thresh: Fees>{t['fees']:.3f} Vol>${t['vol']:.0f} Holders>{t['holders']:.0f}")
sorted_classes = sorted([k for k in final_buckets.keys() if k != MANIPULATED_CLASS_ID]) + [MANIPULATED_CLASS_ID]
for cid in sorted_classes:
items = final_buckets.get(cid, [])
if not items: continue
if cid == MANIPULATED_CLASS_ID:
label = f"{cid}. MANIPULATED / FAKE (Outliers from {1}~{4})"
elif cid < len(RETURN_THRESHOLDS)-1:
label = f"{cid}. {RETURN_THRESHOLDS[cid]}x - {RETURN_THRESHOLDS[cid+1]}x"
else:
label = f"{cid}. Unknown"
print(f"\nSEGMENT: {label}")
print("="*50)
print(f"Tokens in segment: {len(items)}")
bundled = [x["bundled_pct"] for x in items]
dev_hold = [x["dev_hold_pct"] for x in items]
fees = [x["fees_sol"] for x in items]
snipers = [x["snipers_pct"] for x in items]
print_stats("bundled_pct", bundled)
print_stats("dev_hold_pct", dev_hold)
print_stats("fees_sol", fees)
print_stats("snipers_pct", snipers)
if __name__ == "__main__":
analyze()