| import os |
| import sys |
| import datetime |
| import numpy as np |
| import math |
| from clickhouse_driver import Client as ClickHouseClient |
|
|
| |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
| from models.vocabulary import RETURN_THRESHOLDS, MANIPULATED_CLASS_ID |
|
|
| CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST", "localhost") |
| CLICKHOUSE_PORT = int(os.getenv("CLICKHOUSE_PORT", 9000)) |
| CLICKHOUSE_USER = os.getenv("CLICKHOUSE_USER", "default") |
| CLICKHOUSE_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD", "") |
| CLICKHOUSE_DATABASE = os.getenv("CLICKHOUSE_DATABASE", "default") |
| LAUNCH_PRICE_USD = 0.000004 |
| EPS = 1e-9 |
|
|
| def get_client(): |
| return ClickHouseClient( |
| host=CLICKHOUSE_HOST, |
| port=CLICKHOUSE_PORT, |
| user=CLICKHOUSE_USER, |
| password=CLICKHOUSE_PASSWORD, |
| database=CLICKHOUSE_DATABASE |
| ) |
|
|
| def compute_p99_clamps(client): |
| """ |
| Computes P99 percentile clamp values from ClickHouse for fields prone to |
| garbage outliers. These values replace hardcoded clamps in data_loader.py. |
| Returns a dict of {field_name: p99_value}. |
| """ |
| print(" -> Computing P99 clamp values from trades table...") |
| trade_query = """ |
| SELECT |
| quantile(0.99)(abs(slippage)) AS p99_slippage, |
| quantile(0.99)(total_usd) AS p99_total_usd |
| FROM trades |
| WHERE success = 1 |
| """ |
| trade_row = client.execute(trade_query) |
|
|
| print(" -> Computing P99 clamp values from wallet_holdings table...") |
| holdings_query = """ |
| SELECT |
| quantile(0.99)(history_bought_cost_sol) AS p99_bought_cost_sol, |
| quantile(0.99)(abs(realized_profit_sol)) AS p99_realized_profit_sol |
| FROM wallet_holdings |
| """ |
| holdings_row = client.execute(holdings_query) |
|
|
| clamps = { |
| |
| 'slippage': 1.0, |
|
|
| 'total_usd': 100000.0, |
| 'history_bought_cost_sol': 30.0, |
| 'realized_profit_sol': 150.0, |
| } |
|
|
| if trade_row and trade_row[0]: |
| r = trade_row[0] |
| clamps['slippage'] = max(float(r[0]), 0.01) |
| clamps['total_usd'] = max(float(r[1]), 1.0) |
|
|
| if holdings_row and holdings_row[0]: |
| r = holdings_row[0] |
| clamps['history_bought_cost_sol'] = max(float(r[0]), 0.01) |
| clamps['realized_profit_sol'] = max(float(r[1]), 0.01) |
|
|
| print(f" -> P99 Clamps: {clamps}") |
| return clamps |
|
|
| def fetch_all_metrics(client): |
| """ |
| Fetches all needed metrics for all tokens in a single query. |
| Base Table: MINTS (to ensure we cover all ~50k tokens). |
| Definitions: |
| - Snipers: Peak Balance Sum of top 70 buyers |
| - Bundles: Base Amount Sum of trades in multi-buy slots |
| - Dev Hold: Max Peak Balance of Creator |
| """ |
| print(" -> Fetching all token metrics (Unified Query)...") |
| |
| query = f""" |
| WITH |
| -- 1. Aggregated trade stats (Fees, Volume, ATH Time) |
| trade_agg AS ( |
| SELECT |
| base_address, |
| sum(priority_fee + coin_creator_fee) AS fees_sol, |
| sum(total_usd) AS volume_usd, |
| count() AS n_trades, |
| argMax(timestamp, price_usd) AS t_ath, |
| min(timestamp) AS t0 |
| FROM trades |
| GROUP BY base_address |
| ), |
| |
| -- 2. Token Metadata from MINTS (Base Source of Truth) |
| token_meta AS ( |
| SELECT |
| mint_address AS token_address, |
| argMax(creator_address, timestamp) AS creator_address, |
| argMax(total_supply, timestamp) AS total_supply, |
| argMax(token_decimals, timestamp) AS decimals |
| FROM mints |
| GROUP BY mint_address |
| ), |
| |
| -- 3. Returns & Holders (from Token Metrics or manual calc) |
| metrics AS ( |
| SELECT |
| token_address, |
| argMax(ath_price_usd, updated_at) as ath_price_usd, |
| argMax(unique_holders, updated_at) as unique_holders |
| FROM token_metrics |
| GROUP BY token_address |
| ), |
| |
| -- 4. WALLET PEAKS (normalized balance likely) |
| wallet_peaks AS ( |
| SELECT |
| mint_address, |
| wallet_address, |
| max(current_balance) AS peak_balance |
| FROM wallet_holdings |
| GROUP BY mint_address, wallet_address |
| ), |
| |
| -- 5. SNIPERS: Identify sniper addresses (rank <= 70) |
| snipers_list AS ( |
| SELECT |
| base_address, |
| maker |
| FROM ( |
| SELECT |
| base_address, |
| maker, |
| dense_rank() OVER (PARTITION BY base_address ORDER BY min_slot, min_idx) AS buyer_rank |
| FROM ( |
| SELECT |
| base_address, |
| maker, |
| min(slot) AS min_slot, |
| min(transaction_index) AS min_idx |
| FROM trades |
| WHERE trade_type = 0 -- buy |
| GROUP BY base_address, maker |
| ) |
| ) |
| WHERE buyer_rank <= 70 |
| ), |
| snipers_agg AS ( |
| SELECT |
| s.base_address AS token_address, |
| sum(wp.peak_balance) AS snipers_total_peak |
| FROM snipers_list s |
| JOIN wallet_peaks wp ON s.base_address = wp.mint_address AND s.maker = wp.wallet_address |
| GROUP BY s.base_address |
| ), |
| |
| -- 6. BUNDLED: Sum the base_amount of ALL trades that happened in a slot with multiple buys |
| bundled_agg AS ( |
| SELECT |
| t.base_address AS token_address, |
| sum(t.base_amount) AS bundled_total_peak |
| FROM trades t |
| WHERE (t.base_address, t.slot) IN ( |
| SELECT base_address, slot |
| FROM trades |
| WHERE trade_type = 0 -- buy |
| GROUP BY base_address, slot |
| HAVING count() > 1 |
| ) |
| AND t.trade_type = 0 -- buy |
| GROUP BY t.base_address |
| ), |
| |
| -- 7. DEV HOLD: Creator's Peak Balance |
| dev_hold_agg AS ( |
| SELECT |
| t.token_address, |
| max(wp.peak_balance) AS dev_peak |
| FROM token_meta t |
| JOIN wallet_peaks wp ON t.token_address = wp.mint_address AND t.creator_address = wp.wallet_address |
| GROUP BY t.token_address |
| ) |
| |
| SELECT |
| t.token_address, |
| (COALESCE(m.ath_price_usd, ta.t_ath, 0) / {LAUNCH_PRICE_USD}) AS ret, |
| |
| COALESCE(ta.fees_sol, 0) AS fees_sol, |
| COALESCE(ta.volume_usd, 0) AS volume_usd, |
| COALESCE(m.unique_holders, 0) AS unique_holders, |
| (ta.t_ath - ta.t0) AS time_to_ath_sec, |
| |
| COALESCE(s.snipers_total_peak, 0) AS snipers_val, |
| COALESCE(b.bundled_total_peak, 0) AS bundled_val, |
| COALESCE(d.dev_peak, 0) AS dev_val, |
| |
| t.total_supply AS total_supply, |
| t.decimals AS decimals |
| |
| FROM token_meta t |
| LEFT JOIN trade_agg ta ON t.token_address = ta.base_address |
| LEFT JOIN metrics m ON t.token_address = m.token_address |
| LEFT JOIN snipers_agg s ON t.token_address = s.token_address |
| LEFT JOIN bundled_agg b ON t.token_address = b.token_address |
| LEFT JOIN dev_hold_agg d ON t.token_address = d.token_address |
| """ |
| |
| rows = client.execute(query) |
| |
| cols = [ |
| "token_address", "ret", "fees_sol", "volume_usd", "unique_holders", "time_to_ath_sec", |
| "snipers_val", "bundled_val", "dev_val", "total_supply", "decimals" |
| ] |
| results = [] |
| |
| print(f" -> Fetched {len(rows)} tokens.") |
| |
| for r in rows: |
| d = dict(zip(cols, r)) |
| |
| supply = d["total_supply"] |
| decimals = d["decimals"] |
| |
| try: |
| adj_supply = supply / (10 ** decimals) if (supply and decimals is not None) else supply |
| except: |
| adj_supply = supply |
|
|
| if adj_supply and adj_supply > 0: |
| d["snipers_pct"] = (d["snipers_val"] / adj_supply) * 100 |
| d["dev_hold_pct"] = (d["dev_val"] / adj_supply) * 100 |
| else: |
| d["snipers_pct"] = 0.0 |
| d["dev_hold_pct"] = 0.0 |
| |
| if supply and supply > 0: |
| d["bundled_pct"] = (d["bundled_val"] / supply) * 100 |
| else: |
| d["bundled_pct"] = 0.0 |
|
|
| results.append(d) |
| |
| return results |
|
|
| def _classify_tokens(data): |
| """ |
| Internal logic: returns (buckets_dict, thresholds_dict, count_manipulated) |
| buckets_dict: {class_id: [list of tokens]} |
| """ |
| |
| temp_buckets = {i: [] for i in range(len(RETURN_THRESHOLDS))} |
| |
| for d in data: |
| ret = d["ret"] |
| if ret > 10000: continue |
| |
| cid = 0 |
| found = False |
| for i in range(len(RETURN_THRESHOLDS) - 1): |
| lower = RETURN_THRESHOLDS[i] |
| upper = RETURN_THRESHOLDS[i+1] |
| if ret >= lower and ret < upper: |
| cid = i |
| found = True |
| break |
| |
| if found: |
| d["class_id_initial"] = cid |
| temp_buckets[cid].append(d) |
| else: |
| if ret >= 10000: continue |
| d["class_id_initial"] = 0 |
| temp_buckets[0].append(d) |
|
|
| |
| print("\n -> Calculating Class Medians & Thresholds (Dynamic Outlier Detection)...") |
| thresholds = {} |
| |
| for i in range(1, len(RETURN_THRESHOLDS)-1): |
| items = temp_buckets.get(i, []) |
| if len(items) > 5: |
| fees = [x["fees_sol"] for x in items] |
| vols = [x["volume_usd"] for x in items] |
| holders = [x["unique_holders"] for x in items] |
| |
| med_fees = np.median(fees) |
| med_vol = np.median(vols) |
| med_holders = np.median(holders) |
| |
| thresholds[i] = { |
| 'fees': med_fees * 0.5, |
| 'vol': med_vol * 0.5, |
| 'holders': med_holders * 0.5 |
| } |
| else: |
| thresholds[i] = {'fees': 0, 'vol': 0, 'holders': 0} |
|
|
| |
| final_buckets = {i: [] for i in range(len(RETURN_THRESHOLDS))} |
| final_buckets[MANIPULATED_CLASS_ID] = [] |
| |
| count_manipulated = 0 |
| |
| for cid, items in temp_buckets.items(): |
| for d in items: |
| final_cid = cid |
| if cid > 0 and cid in thresholds: |
| t = thresholds[cid] |
| if (d["fees_sol"] < t['fees']) or (d["volume_usd"] < t['vol']) or (d["unique_holders"] < t['holders']): |
| final_cid = MANIPULATED_CLASS_ID |
| count_manipulated += 1 |
| |
| d["class_id_final"] = final_cid |
| if final_cid not in final_buckets: |
| final_buckets[final_cid] = [] |
| final_buckets[final_cid].append(d) |
| |
| return final_buckets, thresholds, count_manipulated |
|
|
| def get_return_class_map(client): |
| """ |
| Returns (map {token_addr: class_id}, thresholds) |
| Used by cache_dataset.py |
| """ |
| data = fetch_all_metrics(client) |
| buckets, thresholds, _ = _classify_tokens(data) |
| |
| |
| ret_map = {} |
| for cid, items in buckets.items(): |
| for d in items: |
| ret_map[d["token_address"]] = cid |
| |
| return ret_map, thresholds |
|
|
| def print_stats(name, values): |
| """ |
| prints compact stats: mean, p50, p90, p99 |
| """ |
| if not values: |
| print(f" {name}: No data") |
| return |
| |
| vals = np.array(values) |
| mean = np.mean(vals) |
| p50 = np.percentile(vals, 50) |
| p90 = np.percentile(vals, 90) |
| p99 = np.percentile(vals, 99) |
| nonzero = np.count_nonzero(vals) |
| nonzero_rate = nonzero / len(vals) |
| |
| print(f" {name}: mean={mean:.4f} p50={p50:.4f} p90={p90:.4f} p99={p99:.4f} nonzero_rate={nonzero_rate:.3f} (n={len(vals)})") |
|
|
| def fetch_wallet_pnl_stats(client): |
| print(" -> Fetching Wallet PnL Quantiles (7d, 30d) - Unique per wallet...") |
| |
| query = """ |
| WITH unique_wallets AS ( |
| SELECT |
| wallet_address, |
| argMax(stats_30d_realized_profit_pnl, updated_at) as pnl_30d, |
| argMax(stats_7d_realized_profit_pnl, updated_at) as pnl_7d |
| FROM wallet_profile_metrics |
| GROUP BY wallet_address |
| ) |
| SELECT |
| count() as n, |
| countIf(pnl_30d > 0.001) as pos_30d, |
| quantiles(0.5, 0.9, 0.95, 0.99, 0.999)(pnl_30d) as q_30d, |
| max(pnl_30d) as max_30d, |
| |
| countIf(pnl_7d > 0.001) as pos_7d, |
| quantiles(0.5, 0.9, 0.95, 0.99, 0.999)(pnl_7d) as q_7d, |
| max(pnl_7d) as max_7d |
| FROM unique_wallets |
| WHERE pnl_30d > -999 OR pnl_7d > -999 |
| """ |
| rows = client.execute(query) |
| if not rows: return None |
| return rows[0] |
|
|
| def fetch_trade_stats(client): |
| print(" -> Fetching Trade Quantiles (USD & Supply %)...") |
| query = """ |
| SELECT |
| count() as n, |
| quantiles(0.5, 0.9, 0.95, 0.99, 0.999)(t.total_usd) as q_usd, |
| quantiles(0.5, 0.9, 0.95, 0.99, 0.999)((t.base_amount / m.total_supply) * 100) as q_sup |
| FROM trades t |
| JOIN mints m ON t.base_address = m.mint_address |
| WHERE m.total_supply > 0 |
| """ |
| rows = client.execute(query) |
| if not rows: return None |
| return rows[0] |
|
|
| def fetch_transfer_stats(client): |
| print(" -> Fetching Transfer Quantiles (Amount & Supply %)...") |
| |
| |
| |
| query = """ |
| SELECT |
| count() as n, |
| quantiles(0.5, 0.9, 0.95, 0.99, 0.999)(t.amount_decimal) as q_amt, |
| quantiles(0.5, 0.9, 0.95, 0.99, 0.999)((t.amount_decimal / 1000000000) * 100) as q_sup |
| FROM transfers t |
| """ |
| rows = client.execute(query) |
| if not rows: return None |
| return rows[0] |
|
|
| def fetch_kol_stats(client): |
| print(" -> Fetching KOL stats from wallet_socials...") |
| query = """ |
| SELECT |
| uniq(wallet_address) as total_wallets, |
| uniqIf(wallet_address, kolscan_name != '' OR cabalspy_name != '' OR axiom_kol_name != '') as kols |
| FROM wallet_socials |
| """ |
| rows = client.execute(query) |
| print(f" (DEBUG) KOL query result: {rows}") |
| if rows: |
| return rows[0] |
| return (0, 0) |
|
|
| def print_quantiles(name, n, pos_rate, q, max_val=None): |
| |
| print(f"\n[{name}] (n={n})") |
| if pos_rate is not None: |
| print(f" Positive Rate: {pos_rate*100:.1f}%") |
| print(f" p50={q[0]:.4f}") |
| print(f" p90={q[1]:.4f}") |
| print(f" p95={q[2]:.4f}") |
| print(f" p99={q[3]:.4f}") |
| print(f" p99.9={q[4]:.4f}") |
| if max_val is not None: |
| print(f" Max={max_val:.4f}") |
|
|
| def analyze_thresholds(client): |
| print("\n=== THRESHOLD DISTRIBUTION ANALYSIS (DB-Side) ===") |
| |
| |
| pnl_row = fetch_wallet_pnl_stats(client) |
| if pnl_row: |
| n, pos_30d, q_30d, max_30d, pos_7d, q_7d, max_7d = pnl_row |
| print_quantiles("Wallet PnL (30d)", n, pos_30d/n if n>0 else 0, q_30d, max_30d) |
| print_quantiles("Wallet PnL (7d)", n, pos_7d/n if n>0 else 0, q_7d, max_7d) |
|
|
| |
| trade_row = fetch_trade_stats(client) |
| if trade_row: |
| n, q_usd, q_sup = trade_row |
| print_quantiles("Trade USD Size", n, None, q_usd) |
| print_quantiles("Trade USD Size", n, None, q_usd) |
| print_quantiles("Trade Supply %", n, None, q_sup) |
|
|
| |
| transfer_row = fetch_transfer_stats(client) |
| if transfer_row: |
| n, q_amt, q_sup = transfer_row |
| print_quantiles("Transfer Amount", n, None, q_amt) |
| print_quantiles("Transfer Supply %", n, None, q_sup) |
|
|
| |
| total, kols = fetch_kol_stats(client) |
| if total > 0: |
| print("\n[KOL Statistics]") |
| print(f" Total Wallets with Socials: {total}") |
| print(f" Identified KOLs: {kols}") |
| print(f" KOL Ratio: {(kols/total)*100:.2f}%") |
|
|
|
|
| def analyze(): |
| client = get_client() |
| |
| |
| analyze_thresholds(client) |
|
|
| data = fetch_all_metrics(client) |
| final_buckets, thresholds, count_manipulated = _classify_tokens(data) |
| |
| print(f" -> Reclassification Complete. Identified {count_manipulated} manipulated tokens.") |
| print("\n=== SEGMENTED DISTRIBUTION ANALYSIS ===") |
| |
| |
| for k, t in thresholds.items(): |
| if t['fees'] > 0: |
| print(f" [Class {k}] Thresh: Fees>{t['fees']:.3f} Vol>${t['vol']:.0f} Holders>{t['holders']:.0f}") |
|
|
| sorted_classes = sorted([k for k in final_buckets.keys() if k != MANIPULATED_CLASS_ID]) + [MANIPULATED_CLASS_ID] |
| |
| for cid in sorted_classes: |
| items = final_buckets.get(cid, []) |
| if not items: continue |
| |
| if cid == MANIPULATED_CLASS_ID: |
| label = f"{cid}. MANIPULATED / FAKE (Outliers from {1}~{4})" |
| elif cid < len(RETURN_THRESHOLDS)-1: |
| label = f"{cid}. {RETURN_THRESHOLDS[cid]}x - {RETURN_THRESHOLDS[cid+1]}x" |
| else: |
| label = f"{cid}. Unknown" |
| |
| print(f"\nSEGMENT: {label}") |
| print("="*50) |
| print(f"Tokens in segment: {len(items)}") |
| |
| bundled = [x["bundled_pct"] for x in items] |
| dev_hold = [x["dev_hold_pct"] for x in items] |
| fees = [x["fees_sol"] for x in items] |
| snipers = [x["snipers_pct"] for x in items] |
| |
| print_stats("bundled_pct", bundled) |
| print_stats("dev_hold_pct", dev_hold) |
| print_stats("fees_sol", fees) |
| print_stats("snipers_pct", snipers) |
|
|
| if __name__ == "__main__": |
| analyze() |