File size: 5,671 Bytes
858826c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | #!/usr/bin/env python3
"""
Preprocess distribution statistics for OHLC normalization and token history coverage.
This script:
1. Computes global mean/std figures for price/volume so downstream code can normalize.
2. Prints descriptive stats about how much price history (in seconds) each token has,
helping decide which horizons are realistic.
All configuration is done via environment variables (see below).
"""
import os
import pathlib
import sys
from typing import List
import numpy as np
import clickhouse_connect
# --- Configuration (override via env vars if needed) ---
CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST", "localhost")
CLICKHOUSE_PORT = int(os.getenv("CLICKHOUSE_PORT", "8123"))
CLICKHOUSE_USERNAME = os.getenv("CLICKHOUSE_USERNAME", "default")
CLICKHOUSE_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD", "")
CLICKHOUSE_DATABASE = os.getenv("CLICKHOUSE_DATABASE", "default")
OUTPUT_PATH = pathlib.Path(os.getenv("OHLC_STATS_PATH", "ohlc_stats.npz"))
MIN_PRICE_USD = float(os.getenv("OHLC_MIN_PRICE_USD", "0.0"))
MIN_VOLUME_USD = float(os.getenv("OHLC_MIN_VOLUME_USD", "0.0"))
TOKEN_ADDRESSES_ENV = os.getenv("OHLC_TOKEN_ADDRESSES", "")
TOKEN_ADDRESSES = tuple(addr.strip() for addr in TOKEN_ADDRESSES_ENV.split(",") if addr.strip()) or None
def build_where_clause() -> List[str]:
clauses = ["t.price_usd > %(min_price)s", "t.total_usd > %(min_vol)s"]
if TOKEN_ADDRESSES:
clauses.append("t.base_address IN %(token_addresses)s")
return clauses
def build_stats_query(where_sql: str) -> str:
return f"""
SELECT
AVG(t.price_usd) AS mean_price_usd,
stddevPop(t.price_usd) AS std_price_usd,
AVG(t.price) AS mean_price_native,
stddevPop(t.price) AS std_price_native,
AVG(t.total_usd) AS mean_trade_value_usd,
stddevPop(t.total_usd) AS std_trade_value_usd
FROM trades AS t
INNER JOIN mints AS m
ON m.mint_address = t.base_address
WHERE {where_sql}
"""
def build_history_query(where_sql: str) -> str:
return f"""
SELECT
t.base_address AS token_address,
toUnixTimestamp(min(t.timestamp)) AS first_ts,
toUnixTimestamp(max(t.timestamp)) AS last_ts,
toUnixTimestamp(max(t.timestamp)) - toUnixTimestamp(min(t.timestamp)) AS history_seconds
FROM trades AS t
INNER JOIN mints AS m
ON m.mint_address = t.base_address
WHERE {where_sql}
GROUP BY token_address
"""
def summarize_histories(histories: np.ndarray) -> None:
if histories.size == 0:
print("No token history stats available (no qualifying trades).")
return
stats = {
"count": histories.size,
"min": histories.min(),
"median": float(np.median(histories)),
"mean": histories.mean(),
"p90": float(np.percentile(histories, 90)),
"max": histories.max(),
}
def format_seconds(sec: float) -> str:
hours = sec / 3600.0
days = hours / 24.0
return f"{sec:.0f}s ({hours:.2f}h / {days:.2f}d)"
print("\nToken history coverage (seconds):")
print(f" Tokens analyzed: {int(stats['count'])}")
print(f" Min history: {format_seconds(stats['min'])}")
print(f" Median history: {format_seconds(stats['median'])}")
print(f" Mean history: {format_seconds(stats['mean'])}")
print(f" 90th percentile: {format_seconds(stats['p90'])}")
print(f" Max history: {format_seconds(stats['max'])}")
def main() -> int:
where_clauses = build_where_clause()
where_sql = " AND ".join(where_clauses) if where_clauses else "1"
params: dict[str, object] = {
"min_price": max(MIN_PRICE_USD, 0.0),
"min_vol": max(MIN_VOLUME_USD, 0.0),
}
if TOKEN_ADDRESSES:
params["token_addresses"] = TOKEN_ADDRESSES
client = clickhouse_connect.get_client(
host=CLICKHOUSE_HOST,
port=CLICKHOUSE_PORT,
username=CLICKHOUSE_USERNAME,
password=CLICKHOUSE_PASSWORD,
database=CLICKHOUSE_DATABASE,
)
# --- Price/volume stats ---
stats_query = build_stats_query(where_sql)
stats_result = client.query(stats_query, parameters=params)
if not stats_result.result_rows:
print("ERROR: Stats query returned no rows. Check filters / connectivity.", file=sys.stderr)
return 1
(
mean_price_usd,
std_price_usd,
mean_price_native,
std_price_native,
mean_trade_value_usd,
std_trade_value_usd,
) = stats_result.result_rows[0]
stats = {
"mean_price_usd": float(mean_price_usd or 0.0),
"std_price_usd": float(std_price_usd or 1.0),
"mean_price_native": float(mean_price_native or 0.0),
"std_price_native": float(std_price_native or 1.0),
"mean_trade_value_usd": float(mean_trade_value_usd or 0.0),
"std_trade_value_usd": float(std_trade_value_usd or 1.0),
}
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
np.savez(OUTPUT_PATH, **stats)
print(f"Saved stats to {OUTPUT_PATH.resolve()}:")
for key, value in stats.items():
print(f" {key}: {value:.6f}")
# --- Token history coverage ---
history_query = build_history_query(where_sql)
history_result = client.query(history_query, parameters=params)
history_seconds = np.array(
[float(row[3]) for row in history_result.result_rows if row[3] is not None],
dtype=np.float64
)
summarize_histories(history_seconds)
return 0
if __name__ == "__main__":
raise SystemExit(main())
|