import os import sys import datetime import numpy as np import math from clickhouse_driver import Client as ClickHouseClient # Add parent to path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from models.vocabulary import RETURN_THRESHOLDS, MANIPULATED_CLASS_ID CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST", "localhost") CLICKHOUSE_PORT = int(os.getenv("CLICKHOUSE_PORT", 9000)) CLICKHOUSE_USER = os.getenv("CLICKHOUSE_USER", "default") CLICKHOUSE_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD", "") CLICKHOUSE_DATABASE = os.getenv("CLICKHOUSE_DATABASE", "default") LAUNCH_PRICE_USD = 0.000004 EPS = 1e-9 def get_client(): return ClickHouseClient( host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT, user=CLICKHOUSE_USER, password=CLICKHOUSE_PASSWORD, database=CLICKHOUSE_DATABASE ) def compute_p99_clamps(client): """ Computes P99 percentile clamp values from ClickHouse for fields prone to garbage outliers. These values replace hardcoded clamps in data_loader.py. Returns a dict of {field_name: p99_value}. """ print(" -> Computing P99 clamp values from trades table...") trade_query = """ SELECT quantile(0.99)(abs(slippage)) AS p99_slippage, quantile(0.99)(total_usd) AS p99_total_usd FROM trades WHERE success = 1 """ trade_row = client.execute(trade_query) print(" -> Computing P99 clamp values from wallet_holdings table...") holdings_query = """ SELECT quantile(0.99)(history_bought_cost_sol) AS p99_bought_cost_sol, quantile(0.99)(abs(realized_profit_sol)) AS p99_realized_profit_sol FROM wallet_holdings """ holdings_row = client.execute(holdings_query) clamps = { # Defaults as fallback if queries return nothing 'slippage': 1.0, 'total_usd': 100000.0, 'history_bought_cost_sol': 30.0, 'realized_profit_sol': 150.0, } if trade_row and trade_row[0]: r = trade_row[0] clamps['slippage'] = max(float(r[0]), 0.01) clamps['total_usd'] = max(float(r[1]), 1.0) if holdings_row and holdings_row[0]: r = holdings_row[0] clamps['history_bought_cost_sol'] = max(float(r[0]), 0.01) clamps['realized_profit_sol'] = max(float(r[1]), 0.01) print(f" -> P99 Clamps: {clamps}") return clamps def fetch_all_metrics(client): """ Fetches all needed metrics for all tokens in a single query. Base Table: MINTS (to ensure we cover all ~50k tokens). Definitions: - Snipers: Peak Balance Sum of top 70 buyers - Bundles: Base Amount Sum of trades in multi-buy slots - Dev Hold: Max Peak Balance of Creator """ print(" -> Fetching all token metrics (Unified Query)...") query = f""" WITH -- 1. Aggregated trade stats (Fees, Volume, ATH Time) trade_agg AS ( SELECT base_address, sum(priority_fee + coin_creator_fee) AS fees_sol, sum(total_usd) AS volume_usd, count() AS n_trades, argMax(timestamp, price_usd) AS t_ath, min(timestamp) AS t0 FROM trades GROUP BY base_address ), -- 2. Token Metadata from MINTS (Base Source of Truth) token_meta AS ( SELECT mint_address AS token_address, argMax(creator_address, timestamp) AS creator_address, argMax(total_supply, timestamp) AS total_supply, argMax(token_decimals, timestamp) AS decimals FROM mints GROUP BY mint_address ), -- 3. Returns & Holders (from Token Metrics or manual calc) metrics AS ( SELECT token_address, argMax(ath_price_usd, updated_at) as ath_price_usd, argMax(unique_holders, updated_at) as unique_holders FROM token_metrics GROUP BY token_address ), -- 4. WALLET PEAKS (normalized balance likely) wallet_peaks AS ( SELECT mint_address, wallet_address, max(current_balance) AS peak_balance FROM wallet_holdings GROUP BY mint_address, wallet_address ), -- 5. SNIPERS: Identify sniper addresses (rank <= 70) snipers_list AS ( SELECT base_address, maker FROM ( SELECT base_address, maker, dense_rank() OVER (PARTITION BY base_address ORDER BY min_slot, min_idx) AS buyer_rank FROM ( SELECT base_address, maker, min(slot) AS min_slot, min(transaction_index) AS min_idx FROM trades WHERE trade_type = 0 -- buy GROUP BY base_address, maker ) ) WHERE buyer_rank <= 70 ), snipers_agg AS ( SELECT s.base_address AS token_address, sum(wp.peak_balance) AS snipers_total_peak FROM snipers_list s JOIN wallet_peaks wp ON s.base_address = wp.mint_address AND s.maker = wp.wallet_address GROUP BY s.base_address ), -- 6. BUNDLED: Sum the base_amount of ALL trades that happened in a slot with multiple buys bundled_agg AS ( SELECT t.base_address AS token_address, sum(t.base_amount) AS bundled_total_peak FROM trades t WHERE (t.base_address, t.slot) IN ( SELECT base_address, slot FROM trades WHERE trade_type = 0 -- buy GROUP BY base_address, slot HAVING count() > 1 ) AND t.trade_type = 0 -- buy GROUP BY t.base_address ), -- 7. DEV HOLD: Creator's Peak Balance dev_hold_agg AS ( SELECT t.token_address, max(wp.peak_balance) AS dev_peak FROM token_meta t JOIN wallet_peaks wp ON t.token_address = wp.mint_address AND t.creator_address = wp.wallet_address GROUP BY t.token_address ) SELECT t.token_address, (COALESCE(m.ath_price_usd, ta.t_ath, 0) / {LAUNCH_PRICE_USD}) AS ret, COALESCE(ta.fees_sol, 0) AS fees_sol, COALESCE(ta.volume_usd, 0) AS volume_usd, COALESCE(m.unique_holders, 0) AS unique_holders, (ta.t_ath - ta.t0) AS time_to_ath_sec, COALESCE(s.snipers_total_peak, 0) AS snipers_val, COALESCE(b.bundled_total_peak, 0) AS bundled_val, COALESCE(d.dev_peak, 0) AS dev_val, t.total_supply AS total_supply, t.decimals AS decimals FROM token_meta t LEFT JOIN trade_agg ta ON t.token_address = ta.base_address LEFT JOIN metrics m ON t.token_address = m.token_address LEFT JOIN snipers_agg s ON t.token_address = s.token_address LEFT JOIN bundled_agg b ON t.token_address = b.token_address LEFT JOIN dev_hold_agg d ON t.token_address = d.token_address """ rows = client.execute(query) # Convert to list of dicts cols = [ "token_address", "ret", "fees_sol", "volume_usd", "unique_holders", "time_to_ath_sec", "snipers_val", "bundled_val", "dev_val", "total_supply", "decimals" ] results = [] print(f" -> Fetched {len(rows)} tokens.") for r in rows: d = dict(zip(cols, r)) supply = d["total_supply"] decimals = d["decimals"] try: adj_supply = supply / (10 ** decimals) if (supply and decimals is not None) else supply except: adj_supply = supply if adj_supply and adj_supply > 0: d["snipers_pct"] = (d["snipers_val"] / adj_supply) * 100 d["dev_hold_pct"] = (d["dev_val"] / adj_supply) * 100 else: d["snipers_pct"] = 0.0 d["dev_hold_pct"] = 0.0 if supply and supply > 0: d["bundled_pct"] = (d["bundled_val"] / supply) * 100 else: d["bundled_pct"] = 0.0 results.append(d) return results def _classify_tokens(data): """ Internal logic: returns (buckets_dict, thresholds_dict, count_manipulated) buckets_dict: {class_id: [list of tokens]} """ # 1. Initial Classification temp_buckets = {i: [] for i in range(len(RETURN_THRESHOLDS))} for d in data: ret = d["ret"] if ret > 10000: continue cid = 0 found = False for i in range(len(RETURN_THRESHOLDS) - 1): lower = RETURN_THRESHOLDS[i] upper = RETURN_THRESHOLDS[i+1] if ret >= lower and ret < upper: cid = i found = True break if found: d["class_id_initial"] = cid temp_buckets[cid].append(d) else: if ret >= 10000: continue d["class_id_initial"] = 0 temp_buckets[0].append(d) # 2. Calculate Thresholds (50% of Median) print("\n -> Calculating Class Medians & Thresholds (Dynamic Outlier Detection)...") thresholds = {} for i in range(1, len(RETURN_THRESHOLDS)-1): items = temp_buckets.get(i, []) if len(items) > 5: fees = [x["fees_sol"] for x in items] vols = [x["volume_usd"] for x in items] holders = [x["unique_holders"] for x in items] med_fees = np.median(fees) med_vol = np.median(vols) med_holders = np.median(holders) thresholds[i] = { 'fees': med_fees * 0.5, 'vol': med_vol * 0.5, 'holders': med_holders * 0.5 } else: thresholds[i] = {'fees': 0, 'vol': 0, 'holders': 0} # 3. Reclassification final_buckets = {i: [] for i in range(len(RETURN_THRESHOLDS))} final_buckets[MANIPULATED_CLASS_ID] = [] count_manipulated = 0 for cid, items in temp_buckets.items(): for d in items: final_cid = cid if cid > 0 and cid in thresholds: t = thresholds[cid] if (d["fees_sol"] < t['fees']) or (d["volume_usd"] < t['vol']) or (d["unique_holders"] < t['holders']): final_cid = MANIPULATED_CLASS_ID count_manipulated += 1 d["class_id_final"] = final_cid if final_cid not in final_buckets: final_buckets[final_cid] = [] final_buckets[final_cid].append(d) return final_buckets, thresholds, count_manipulated def get_return_class_map(client): """ Returns (map {token_addr: class_id}, thresholds) Used by cache_dataset.py """ data = fetch_all_metrics(client) buckets, thresholds, _ = _classify_tokens(data) # Flatten buckets to map ret_map = {} for cid, items in buckets.items(): for d in items: ret_map[d["token_address"]] = cid return ret_map, thresholds def print_stats(name, values): """ prints compact stats: mean, p50, p90, p99 """ if not values: print(f" {name}: No data") return vals = np.array(values) mean = np.mean(vals) p50 = np.percentile(vals, 50) p90 = np.percentile(vals, 90) p99 = np.percentile(vals, 99) nonzero = np.count_nonzero(vals) nonzero_rate = nonzero / len(vals) print(f" {name}: mean={mean:.4f} p50={p50:.4f} p90={p90:.4f} p99={p99:.4f} nonzero_rate={nonzero_rate:.3f} (n={len(vals)})") def fetch_wallet_pnl_stats(client): print(" -> Fetching Wallet PnL Quantiles (7d, 30d) - Unique per wallet...") # Use argMax to get latest entry per wallet (table is a time-series dump) query = """ WITH unique_wallets AS ( SELECT wallet_address, argMax(stats_30d_realized_profit_pnl, updated_at) as pnl_30d, argMax(stats_7d_realized_profit_pnl, updated_at) as pnl_7d FROM wallet_profile_metrics GROUP BY wallet_address ) SELECT count() as n, countIf(pnl_30d > 0.001) as pos_30d, quantiles(0.5, 0.9, 0.95, 0.99, 0.999)(pnl_30d) as q_30d, max(pnl_30d) as max_30d, countIf(pnl_7d > 0.001) as pos_7d, quantiles(0.5, 0.9, 0.95, 0.99, 0.999)(pnl_7d) as q_7d, max(pnl_7d) as max_7d FROM unique_wallets WHERE pnl_30d > -999 OR pnl_7d > -999 """ rows = client.execute(query) if not rows: return None return rows[0] def fetch_trade_stats(client): print(" -> Fetching Trade Quantiles (USD & Supply %)...") query = """ SELECT count() as n, quantiles(0.5, 0.9, 0.95, 0.99, 0.999)(t.total_usd) as q_usd, quantiles(0.5, 0.9, 0.95, 0.99, 0.999)((t.base_amount / m.total_supply) * 100) as q_sup FROM trades t JOIN mints m ON t.base_address = m.mint_address WHERE m.total_supply > 0 """ rows = client.execute(query) if not rows: return None return rows[0] def fetch_transfer_stats(client): print(" -> Fetching Transfer Quantiles (Amount & Supply %)...") # Assuming 1B supply (1,000,000,000) for all tokens as they are Pump.fun tokens # Using 1e6 for decimals adjustment if needed, but 'amount_decimal' is usually raw/decimals # If amount_decimal is actual token count: query = """ SELECT count() as n, quantiles(0.5, 0.9, 0.95, 0.99, 0.999)(t.amount_decimal) as q_amt, quantiles(0.5, 0.9, 0.95, 0.99, 0.999)((t.amount_decimal / 1000000000) * 100) as q_sup FROM transfers t """ rows = client.execute(query) if not rows: return None return rows[0] def fetch_kol_stats(client): print(" -> Fetching KOL stats from wallet_socials...") query = """ SELECT uniq(wallet_address) as total_wallets, uniqIf(wallet_address, kolscan_name != '' OR cabalspy_name != '' OR axiom_kol_name != '') as kols FROM wallet_socials """ rows = client.execute(query) print(f" (DEBUG) KOL query result: {rows}") if rows: return rows[0] return (0, 0) def print_quantiles(name, n, pos_rate, q, max_val=None): # q is list [p50, p90, p95, p99, p999] print(f"\n[{name}] (n={n})") if pos_rate is not None: print(f" Positive Rate: {pos_rate*100:.1f}%") print(f" p50={q[0]:.4f}") print(f" p90={q[1]:.4f}") print(f" p95={q[2]:.4f}") print(f" p99={q[3]:.4f}") print(f" p99.9={q[4]:.4f}") if max_val is not None: print(f" Max={max_val:.4f}") def analyze_thresholds(client): print("\n=== THRESHOLD DISTRIBUTION ANALYSIS (DB-Side) ===") # 1. PnL pnl_row = fetch_wallet_pnl_stats(client) if pnl_row: n, pos_30d, q_30d, max_30d, pos_7d, q_7d, max_7d = pnl_row print_quantiles("Wallet PnL (30d)", n, pos_30d/n if n>0 else 0, q_30d, max_30d) print_quantiles("Wallet PnL (7d)", n, pos_7d/n if n>0 else 0, q_7d, max_7d) # 2. Trades trade_row = fetch_trade_stats(client) if trade_row: n, q_usd, q_sup = trade_row print_quantiles("Trade USD Size", n, None, q_usd) print_quantiles("Trade USD Size", n, None, q_usd) print_quantiles("Trade Supply %", n, None, q_sup) # 3. Transfers transfer_row = fetch_transfer_stats(client) if transfer_row: n, q_amt, q_sup = transfer_row print_quantiles("Transfer Amount", n, None, q_amt) print_quantiles("Transfer Supply %", n, None, q_sup) # 4. KOLs total, kols = fetch_kol_stats(client) if total > 0: print("\n[KOL Statistics]") print(f" Total Wallets with Socials: {total}") print(f" Identified KOLs: {kols}") print(f" KOL Ratio: {(kols/total)*100:.2f}%") def analyze(): client = get_client() # Run new analysis first analyze_thresholds(client) data = fetch_all_metrics(client) final_buckets, thresholds, count_manipulated = _classify_tokens(data) print(f" -> Reclassification Complete. Identified {count_manipulated} manipulated tokens.") print("\n=== SEGMENTED DISTRIBUTION ANALYSIS ===") # Print Thresholds debug for k, t in thresholds.items(): if t['fees'] > 0: print(f" [Class {k}] Thresh: Fees>{t['fees']:.3f} Vol>${t['vol']:.0f} Holders>{t['holders']:.0f}") sorted_classes = sorted([k for k in final_buckets.keys() if k != MANIPULATED_CLASS_ID]) + [MANIPULATED_CLASS_ID] for cid in sorted_classes: items = final_buckets.get(cid, []) if not items: continue if cid == MANIPULATED_CLASS_ID: label = f"{cid}. MANIPULATED / FAKE (Outliers from {1}~{4})" elif cid < len(RETURN_THRESHOLDS)-1: label = f"{cid}. {RETURN_THRESHOLDS[cid]}x - {RETURN_THRESHOLDS[cid+1]}x" else: label = f"{cid}. Unknown" print(f"\nSEGMENT: {label}") print("="*50) print(f"Tokens in segment: {len(items)}") bundled = [x["bundled_pct"] for x in items] dev_hold = [x["dev_hold_pct"] for x in items] fees = [x["fees_sol"] for x in items] snipers = [x["snipers_pct"] for x in items] print_stats("bundled_pct", bundled) print_stats("dev_hold_pct", dev_hold) print_stats("fees_sol", fees) print_stats("snipers_pct", snipers) if __name__ == "__main__": analyze()