| import os |
| import argparse |
| from typing import List, Optional, Sequence, Tuple |
|
|
| from dotenv import load_dotenv |
| from clickhouse_driver import Client as ClickHouseClient |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser( |
| description="Fast SQL-based hyperparameter analysis (trades-only) for seq_len + horizons." |
| ) |
| parser.add_argument("--token_address", type=str, default=None, help="Analyze a single token address.") |
| parser.add_argument( |
| "--windows_min", |
| type=str, |
| default="5,10,30,60", |
| help="Comma-separated trade-count windows in minutes (e.g. '5,10,30,60').", |
| ) |
| parser.add_argument( |
| "--min_price_usd", |
| type=float, |
| default=0.0, |
| help="Treat trades with price_usd <= min_price_usd as invalid (default: 0.0).", |
| ) |
| return parser.parse_args() |
|
|
|
|
| def _parse_windows(windows_min: str) -> List[int]: |
| out: List[int] = [] |
| for part in (windows_min or "").split(","): |
| part = part.strip() |
| if not part: |
| continue |
| out.append(int(part)) |
| out = sorted(set([w for w in out if w > 0])) |
| if not out: |
| raise ValueError("No valid --windows_min provided.") |
| return out |
|
|
|
|
| def _connect_clickhouse_from_env() -> ClickHouseClient: |
| ch_host = os.getenv("CLICKHOUSE_HOST", "localhost") |
| ch_port = int(os.getenv("CLICKHOUSE_NATIVE_PORT", "9000")) |
| ch_user = os.getenv("CLICKHOUSE_USER", None) |
| ch_pass = os.getenv("CLICKHOUSE_PASSWORD", None) |
| ch_db = os.getenv("CLICKHOUSE_DB", None) |
|
|
| kwargs = {"host": ch_host, "port": ch_port} |
| if ch_user: |
| kwargs["user"] = ch_user |
| if ch_pass: |
| kwargs["password"] = ch_pass |
| if ch_db: |
| kwargs["database"] = ch_db |
| return ClickHouseClient(**kwargs) |
|
|
|
|
| def _quantile_levels() -> Sequence[float]: |
| |
| return (0.25, 0.5, 0.75, 0.90, 0.95, 0.99) |
|
|
|
|
| def _fmt_q_tuple(q: Tuple[float, ...]) -> str: |
| |
| labels = ["25%", "50%", "75%", "90%", "95%", "99%"] |
| parts = [] |
| for lbl, v in zip(labels, q): |
| parts.append(f"{lbl}: {float(v):.2f}") |
| return " | ".join(parts) |
|
|
|
|
| def _print_row(prefix: str, mean_v: float, q_tuple: Tuple[float, ...], max_v: float) -> None: |
| print(f"[{prefix}]") |
| print(f" Mean: {float(mean_v):.2f} | Median: {float(q_tuple[1]):.2f} | Max: {float(max_v):.2f}") |
| print(f" {_fmt_q_tuple(q_tuple)}") |
|
|
|
|
| def fetch_aggregated_stats_sql( |
| ch: ClickHouseClient, |
| windows_min: List[int], |
| min_price_usd: float, |
| token_address: Optional[str] = None, |
| ) -> List[tuple]: |
| """ |
| One ClickHouse query that computes distribution statistics directly (no per-token loop in Python). |
| Returns two groups: |
| - grp='all' |
| - grp='subset' where trades_full > 50 and lifespan_sec > 300 (5 minutes) |
| """ |
| q_levels = _quantile_levels() |
| q_levels_sql = ", ".join(str(q) for q in q_levels) |
|
|
| per_token_window_exprs = [] |
| agg_window_exprs = [] |
| for w in windows_min: |
| sec = int(w) * 60 |
| per_token_window_exprs.append( |
| f"countIf(is_valid AND (trade_ts - mint_ts) <= {sec}) AS trades_{w}m" |
| ) |
| agg_window_exprs.append( |
| f"avg(trades_{w}m) AS trades_{w}m_mean," |
| f" quantilesExact({q_levels_sql})(trades_{w}m) AS trades_{w}m_q," |
| f" max(trades_{w}m) AS trades_{w}m_max" |
| ) |
|
|
| params = {"min_price": float(min_price_usd)} |
| token_filter = "" |
| if token_address: |
| token_filter = "AND m.mint_address = %(token)s" |
| params["token"] = token_address |
|
|
| |
| query = f""" |
| WITH |
| per_token AS ( |
| SELECT |
| m.mint_address AS mint_address, |
| toUnixTimestamp(m.timestamp) AS mint_ts, |
| countIf(is_valid) AS trades_full, |
| (maxIf(trade_ts, is_valid) - mint_ts) AS lifespan_sec, |
| (toUnixTimestamp(argMaxIf(t.timestamp, t.price_usd, is_valid)) - mint_ts) AS time_to_ath_sec, |
| {", ".join(per_token_window_exprs)} |
| FROM mints AS m |
| INNER JOIN |
| ( |
| SELECT |
| base_address, |
| timestamp, |
| toUnixTimestamp(timestamp) AS trade_ts, |
| price_usd, |
| (price_usd > %(min_price)s) AS is_valid |
| FROM trades |
| WHERE base_address IN (SELECT mint_address FROM mints) |
| ) AS t |
| ON t.base_address = m.mint_address |
| WHERE 1=1 |
| {token_filter} |
| GROUP BY |
| mint_address, |
| mint_ts |
| HAVING |
| trades_full > 0 |
| ) |
| SELECT |
| grp, |
| count() AS tokens, |
| |
| avg(trades_full) AS trades_full_mean, |
| quantilesExact({q_levels_sql})(trades_full) AS trades_full_q, |
| max(trades_full) AS trades_full_max, |
| |
| avg(lifespan_sec / 60.0) AS lifespan_min_mean, |
| quantilesExact({q_levels_sql})(lifespan_sec / 60.0) AS lifespan_min_q, |
| max(lifespan_sec / 60.0) AS lifespan_min_max, |
| |
| avg(time_to_ath_sec / 60.0) AS tta_min_mean, |
| quantilesExact({q_levels_sql})(time_to_ath_sec / 60.0) AS tta_min_q, |
| max(time_to_ath_sec / 60.0) AS tta_min_max, |
| |
| {", ".join(agg_window_exprs)} |
| FROM per_token |
| ARRAY JOIN ['all', 'subset'] AS grp |
| WHERE (grp = 'all') |
| OR (grp = 'subset' AND trades_full > 50 AND lifespan_sec > 300) |
| GROUP BY grp |
| ORDER BY grp |
| """ |
|
|
| return ch.execute(query, params) |
|
|
|
|
| def fetch_single_token_sql( |
| ch: ClickHouseClient, |
| windows_min: List[int], |
| min_price_usd: float, |
| token_address: str, |
| ) -> Optional[tuple]: |
| per_token_window_exprs = [] |
| for w in windows_min: |
| sec = int(w) * 60 |
| per_token_window_exprs.append( |
| f"countIf(is_valid AND (trade_ts - mint_ts) <= {sec}) AS trades_{w}m" |
| ) |
|
|
| params = {"min_price": float(min_price_usd), "token": token_address} |
| query = f""" |
| SELECT |
| m.mint_address AS mint_address, |
| toUnixTimestamp(m.timestamp) AS mint_ts, |
| countIf(is_valid) AS trades_full, |
| (maxIf(trade_ts, is_valid) - mint_ts) AS lifespan_sec, |
| (toUnixTimestamp(argMaxIf(t.timestamp, t.price_usd, is_valid)) - mint_ts) AS time_to_ath_sec, |
| {", ".join(per_token_window_exprs)} |
| FROM mints AS m |
| INNER JOIN |
| ( |
| SELECT |
| base_address, |
| timestamp, |
| toUnixTimestamp(timestamp) AS trade_ts, |
| price_usd, |
| (price_usd > %(min_price)s) AS is_valid |
| FROM trades |
| WHERE base_address = %(token)s |
| ) AS t |
| ON t.base_address = m.mint_address |
| WHERE m.mint_address = %(token)s |
| GROUP BY |
| mint_address, |
| mint_ts |
| HAVING |
| trades_full > 0 |
| """ |
| rows = ch.execute(query, params) |
| return rows[0] if rows else None |
|
|
|
|
| def main() -> None: |
| load_dotenv() |
| args = parse_args() |
| windows_min = _parse_windows(args.windows_min) |
|
|
| print("--- Hyperparameter Calibration Analysis (FAST SQL) ---") |
| print(f"Windows (min): {windows_min}") |
| print(f"Valid trade filter: price_usd > {float(args.min_price_usd)}") |
|
|
| ch = _connect_clickhouse_from_env() |
| if args.token_address: |
| row = fetch_single_token_sql( |
| ch=ch, |
| windows_min=windows_min, |
| min_price_usd=float(args.min_price_usd), |
| token_address=args.token_address, |
| ) |
| if not row: |
| print("Token not found (or no valid trades).") |
| return |
|
|
| mint_addr = row[0] |
| trades_full = int(row[2]) |
| lifespan_min = float(row[3]) / 60.0 |
| tta_min = float(row[4]) / 60.0 |
| print("\n" + "=" * 40) |
| print("RESULTS (SINGLE TOKEN)") |
| print("=" * 40) |
| print(f"Token: {mint_addr}") |
| print(f"Valid trades: {trades_full}") |
| print(f"Lifespan (min): {lifespan_min:.2f}") |
| print(f"Time to ATH (min): {tta_min:.2f}") |
| for i, w in enumerate(windows_min): |
| print(f"Trades in first {w}m: {int(row[5 + i])}") |
| else: |
| rows = fetch_aggregated_stats_sql( |
| ch=ch, |
| windows_min=windows_min, |
| min_price_usd=float(args.min_price_usd), |
| token_address=None, |
| ) |
| if not rows: |
| print("No tokens found with valid trades.") |
| return |
|
|
| print("\n" + "=" * 40) |
| print("RESULTS (DISTRIBUTION)") |
| print("=" * 40) |
|
|
| |
| |
| |
| |
| |
| |
| for row in rows: |
| grp = row[0] |
| tokens = int(row[1]) |
| print(f"\n--- Group: {grp} (tokens={tokens}) ---") |
|
|
| _print_row("Trades (Full History, Valid Only)", row[2], row[3], row[4]) |
| print("") |
| _print_row("Token Lifespan (Minutes)", row[5], row[6], row[7]) |
| print("") |
| _print_row("Time to ATH (Minutes)", row[8], row[9], row[10]) |
|
|
| cursor = 11 |
| for w in windows_min: |
| mean_v = row[cursor] |
| q_v = row[cursor + 1] |
| max_v = row[cursor + 2] |
| cursor += 3 |
| print("") |
| _print_row(f"Trades in First {w} Minutes (Valid Only)", mean_v, q_v, max_v) |
|
|
| print("\nRecommendation Logic (Trades-only):") |
| print("- Horizons: look at Time-to-ATH p90/p95 (all vs subset).") |
| print("- Max seq len: look at Trades-in-first-(max horizon) p95/p99.") |
| print(" Then add headroom for non-trade events (transfers/pool/liquidity/etc).") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|