HPLL-DataReview / hyperliquid_data.py
gionuibk's picture
T2: real Hyperliquid macro data collector
4449814 verified
"""
Hyperliquid Data Fetcher — Real funding, OI, L/S ratio
Chạy như background task trong Coordinator Space mỗi 1 giờ.
Push enrichment data lên HF dataset để workers dùng khi tính features.
"""
from __future__ import annotations
import os, json, time, logging, tempfile
from datetime import datetime, timezone
log = logging.getLogger("hyperliquid_data")
HL_API = "https://api.hyperliquid.xyz/info"
HF_TOKEN = os.environ.get("HF_TOKEN", "")
EXPERIENCE_REPO = os.environ.get("EXPERIENCE_REPO", "gionuibk/aetheris-experiences")
COINS = ["BTC", "ETH", "SOL"] # coins to track
def _post(payload: dict, timeout: int = 10) -> dict | None:
try:
import requests
r = requests.post(HL_API, json=payload, timeout=timeout)
r.raise_for_status()
return r.json()
except Exception as e:
log.warning(f"HL API error: {e}")
return None
def fetch_funding_rates() -> dict:
"""Fetch current funding rates for all coins."""
result = {}
data = _post({"type": "metaAndAssetCtxs"})
if not data or len(data) < 2:
return result
try:
universe = data[0].get("universe", [])
ctxs = data[1]
for i, coin_info in enumerate(universe):
coin = coin_info.get("name", "")
if coin not in COINS:
continue
ctx = ctxs[i] if i < len(ctxs) else {}
result[coin] = {
"funding": float(ctx.get("funding", 0)),
"open_interest": float(ctx.get("openInterest", 0)),
"mark_price": float(ctx.get("markPx", 0)),
"premium": float(ctx.get("premium", 0)),
}
except Exception as e:
log.warning(f"parse metaAndAssetCtxs: {e}")
return result
def fetch_funding_history(coin: str, hours_back: int = 8) -> list[dict]:
"""Fetch recent funding history for trend calculation."""
now_ms = int(time.time() * 1000)
ago_ms = now_ms - hours_back * 3600 * 1000
data = _post({"type": "fundingHistory", "coin": coin, "startTime": ago_ms})
if not data:
return []
history = []
for entry in data[-10:]: # last 10 entries
try:
history.append({
"time": entry.get("time", 0),
"funding": float(entry.get("fundingRate", 0)),
})
except Exception:
pass
return history
def compute_funding_trend(history: list[dict]) -> float:
"""Positive = funding rising (longs being squeezed more), Negative = falling."""
if len(history) < 2:
return 0.0
rates = [h["funding"] for h in history]
# Simple linear trend: last - first
return float(rates[-1] - rates[0])
def fetch_ls_ratio(coin: str) -> float:
"""
Hyperliquid does not expose L/S ratio directly.
We approximate from OI data and mark vs index price spread.
Returns estimated long ratio (0.5 = neutral).
"""
data = _post({"type": "metaAndAssetCtxs"})
if not data or len(data) < 2:
return 0.5
try:
universe = data[0].get("universe", [])
ctxs = data[1]
for i, ci in enumerate(universe):
if ci.get("name") == coin and i < len(ctxs):
ctx = ctxs[i]
premium = float(ctx.get("premium", 0))
# Positive premium = longs pay → more longs than shorts
if premium > 0.0002:
return min(0.75, 0.5 + premium * 200)
elif premium < -0.0001:
return max(0.25, 0.5 + premium * 200)
return 0.5
except Exception:
pass
return 0.5
def collect_all() -> dict:
"""Collect all macro data and return as unified dict."""
log.info("📡 Collecting Hyperliquid macro data...")
output = {"timestamp": datetime.now(timezone.utc).isoformat(), "coins": {}}
current = fetch_funding_rates()
for coin in COINS:
if coin not in current:
log.warning(f" ⚠️ {coin}: no data")
continue
cur = current[coin]
hist = fetch_funding_history(coin)
trend = compute_funding_trend(hist)
ls = fetch_ls_ratio(coin)
output["coins"][coin] = {
"funding": cur["funding"],
"funding_trend": trend,
"open_interest": cur["open_interest"],
"mark_price": cur["mark_price"],
"ls_ratio": ls,
"oi_delta": 0.0, # filled on next call by comparing to previous
"fetched_at": output["timestamp"],
}
log.info(f" ✅ {coin}: funding={cur['funding']:.5f} oi={cur['open_interest']:.0f} ls={ls:.2f}")
return output
def push_to_hf(data: dict) -> bool:
"""Push macro snapshot to HF dataset."""
if not HF_TOKEN:
log.warning("No HF_TOKEN — skipping push")
return False
try:
from huggingface_hub import HfApi
api = HfApi(token=HF_TOKEN)
ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M")
tmp = None
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(data, f, indent=2)
tmp = f.name
api.upload_file(
path_or_fileobj=tmp, token=HF_TOKEN,
path_in_repo=f"macro/snapshot_{ts}.json",
repo_id=EXPERIENCE_REPO, repo_type="dataset",
commit_message=f"Macro snapshot {ts}",
)
# Also maintain a "latest" file for workers to read quickly
api.upload_file(
path_or_fileobj=tmp, token=HF_TOKEN,
path_in_repo="macro/latest.json",
repo_id=EXPERIENCE_REPO, repo_type="dataset",
commit_message=f"Macro latest {ts}",
)
log.info(f"☁️ Pushed macro snapshot {ts}")
if tmp and os.path.exists(tmp):
os.unlink(tmp)
return True
except Exception as e:
log.error(f"❌ push_to_hf: {e}")
return False
def load_latest_macro(coin: str = "BTC") -> dict:
"""
Workers call this to get real funding/OI/LS ratio.
Falls back to zeros if data not available.
"""
DEFAULTS = {"funding": 0.0, "funding_trend": 0.0,
"open_interest": 0.0, "ls_ratio": 0.5, "oi_delta": 0.0}
try:
from huggingface_hub import hf_hub_download
local = hf_hub_download(
repo_id=EXPERIENCE_REPO, filename="macro/latest.json",
repo_type="dataset", token=HF_TOKEN, cache_dir="/tmp/macro_cache",
)
with open(local) as f:
data = json.load(f)
coin_data = data.get("coins", {}).get(coin, DEFAULTS)
return {k: coin_data.get(k, v) for k, v in DEFAULTS.items()}
except Exception:
return DEFAULTS
# Data quality constants
REQUIRED_FEATURES = ["obi", "spread_pct", "label"]
MIN_FEATURE_COMPLETENESS = 0.70 # 70% of numeric features must be non-zero
def check_experience_quality(df) -> tuple[bool, str]:
"""
Returns (is_valid, reason).
Rejects files with missing critical features or too many nulls.
"""
import pandas as pd
if df is None or len(df) == 0:
return False, "empty_file"
# Check required features present
missing = [f for f in REQUIRED_FEATURES if f not in df.columns]
if missing:
return False, f"missing_required: {missing}"
# Check null ratio
null_ratio = df.isnull().mean().mean()
if null_ratio > (1 - MIN_FEATURE_COMPLETENESS):
return False, f"too_many_nulls: {null_ratio:.2%}"
# Check label distribution (reject if all same label)
if "label" in df.columns:
unique_labels = df["label"].nunique()
if unique_labels < 2:
return False, f"degenerate_labels: all={df['label'].iloc[0]}"
# Check schema version compatibility
numeric_cols = df.select_dtypes(include=["number"]).columns
if len(numeric_cols) < 5:
return False, f"too_few_features: {len(numeric_cols)}"
return True, "ok"
# Schema version tracking
CURRENT_SCHEMA_VERSION = "v4.0"
SCHEMA_FEATURES = [
"obi", "spread_pct", "vpin", "entropy", "cvd_norm", "absorption",
"ema_cross_fs", "ema_cross_sl", "adx", "momentum_zscore", "trend_strength",
"vol_surge", "atr", "zscore_60", "zscore_300", "hurst", "bb_pct_b",
"bb_width", "rsi", "mean_rev_signal", "vwap_dev", "market_structure",
"bos", "pin_bar", "engulfing", "inside_bar", "pivot_dist_h", "pivot_dist_l",
"fib_618_prox", "funding", "funding_trend", "oi_delta", "ls_ratio",
"label", "timestamp",
]
def get_schema_info() -> dict:
return {
"version": CURRENT_SCHEMA_VERSION,
"features": SCHEMA_FEATURES,
"n_features": len(SCHEMA_FEATURES),
}
# Run loop for coordinator background thread
def run_collection_loop(interval_seconds: int = 3600):
"""Run forever, collecting macro data every interval."""
log.info(f"📡 MacroCollector started — interval={interval_seconds}s")
while True:
try:
data = collect_all()
if data["coins"]:
push_to_hf(data)
except Exception as e:
log.error(f"❌ collection_loop: {e}")
time.sleep(interval_seconds)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
data = collect_all()
print(json.dumps(data, indent=2))
push_to_hf(data)