""" Hyperliquid Data Fetcher — Real funding, OI, L/S ratio Chạy như background task trong Coordinator Space mỗi 1 giờ. Push enrichment data lên HF dataset để workers dùng khi tính features. """ from __future__ import annotations import os, json, time, logging, tempfile from datetime import datetime, timezone log = logging.getLogger("hyperliquid_data") HL_API = "https://api.hyperliquid.xyz/info" HF_TOKEN = os.environ.get("HF_TOKEN", "") EXPERIENCE_REPO = os.environ.get("EXPERIENCE_REPO", "gionuibk/aetheris-experiences") COINS = ["BTC", "ETH", "SOL"] # coins to track def _post(payload: dict, timeout: int = 10) -> dict | None: try: import requests r = requests.post(HL_API, json=payload, timeout=timeout) r.raise_for_status() return r.json() except Exception as e: log.warning(f"HL API error: {e}") return None def fetch_funding_rates() -> dict: """Fetch current funding rates for all coins.""" result = {} data = _post({"type": "metaAndAssetCtxs"}) if not data or len(data) < 2: return result try: universe = data[0].get("universe", []) ctxs = data[1] for i, coin_info in enumerate(universe): coin = coin_info.get("name", "") if coin not in COINS: continue ctx = ctxs[i] if i < len(ctxs) else {} result[coin] = { "funding": float(ctx.get("funding", 0)), "open_interest": float(ctx.get("openInterest", 0)), "mark_price": float(ctx.get("markPx", 0)), "premium": float(ctx.get("premium", 0)), } except Exception as e: log.warning(f"parse metaAndAssetCtxs: {e}") return result def fetch_funding_history(coin: str, hours_back: int = 8) -> list[dict]: """Fetch recent funding history for trend calculation.""" now_ms = int(time.time() * 1000) ago_ms = now_ms - hours_back * 3600 * 1000 data = _post({"type": "fundingHistory", "coin": coin, "startTime": ago_ms}) if not data: return [] history = [] for entry in data[-10:]: # last 10 entries try: history.append({ "time": entry.get("time", 0), "funding": float(entry.get("fundingRate", 0)), }) except Exception: pass return history def compute_funding_trend(history: list[dict]) -> float: """Positive = funding rising (longs being squeezed more), Negative = falling.""" if len(history) < 2: return 0.0 rates = [h["funding"] for h in history] # Simple linear trend: last - first return float(rates[-1] - rates[0]) def fetch_ls_ratio(coin: str) -> float: """ Hyperliquid does not expose L/S ratio directly. We approximate from OI data and mark vs index price spread. Returns estimated long ratio (0.5 = neutral). """ data = _post({"type": "metaAndAssetCtxs"}) if not data or len(data) < 2: return 0.5 try: universe = data[0].get("universe", []) ctxs = data[1] for i, ci in enumerate(universe): if ci.get("name") == coin and i < len(ctxs): ctx = ctxs[i] premium = float(ctx.get("premium", 0)) # Positive premium = longs pay → more longs than shorts if premium > 0.0002: return min(0.75, 0.5 + premium * 200) elif premium < -0.0001: return max(0.25, 0.5 + premium * 200) return 0.5 except Exception: pass return 0.5 def collect_all() -> dict: """Collect all macro data and return as unified dict.""" log.info("📡 Collecting Hyperliquid macro data...") output = {"timestamp": datetime.now(timezone.utc).isoformat(), "coins": {}} current = fetch_funding_rates() for coin in COINS: if coin not in current: log.warning(f" ⚠️ {coin}: no data") continue cur = current[coin] hist = fetch_funding_history(coin) trend = compute_funding_trend(hist) ls = fetch_ls_ratio(coin) output["coins"][coin] = { "funding": cur["funding"], "funding_trend": trend, "open_interest": cur["open_interest"], "mark_price": cur["mark_price"], "ls_ratio": ls, "oi_delta": 0.0, # filled on next call by comparing to previous "fetched_at": output["timestamp"], } log.info(f" ✅ {coin}: funding={cur['funding']:.5f} oi={cur['open_interest']:.0f} ls={ls:.2f}") return output def push_to_hf(data: dict) -> bool: """Push macro snapshot to HF dataset.""" if not HF_TOKEN: log.warning("No HF_TOKEN — skipping push") return False try: from huggingface_hub import HfApi api = HfApi(token=HF_TOKEN) ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M") tmp = None with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: json.dump(data, f, indent=2) tmp = f.name api.upload_file( path_or_fileobj=tmp, token=HF_TOKEN, path_in_repo=f"macro/snapshot_{ts}.json", repo_id=EXPERIENCE_REPO, repo_type="dataset", commit_message=f"Macro snapshot {ts}", ) # Also maintain a "latest" file for workers to read quickly api.upload_file( path_or_fileobj=tmp, token=HF_TOKEN, path_in_repo="macro/latest.json", repo_id=EXPERIENCE_REPO, repo_type="dataset", commit_message=f"Macro latest {ts}", ) log.info(f"☁️ Pushed macro snapshot {ts}") if tmp and os.path.exists(tmp): os.unlink(tmp) return True except Exception as e: log.error(f"❌ push_to_hf: {e}") return False def load_latest_macro(coin: str = "BTC") -> dict: """ Workers call this to get real funding/OI/LS ratio. Falls back to zeros if data not available. """ DEFAULTS = {"funding": 0.0, "funding_trend": 0.0, "open_interest": 0.0, "ls_ratio": 0.5, "oi_delta": 0.0} try: from huggingface_hub import hf_hub_download local = hf_hub_download( repo_id=EXPERIENCE_REPO, filename="macro/latest.json", repo_type="dataset", token=HF_TOKEN, cache_dir="/tmp/macro_cache", ) with open(local) as f: data = json.load(f) coin_data = data.get("coins", {}).get(coin, DEFAULTS) return {k: coin_data.get(k, v) for k, v in DEFAULTS.items()} except Exception: return DEFAULTS # Data quality constants REQUIRED_FEATURES = ["obi", "spread_pct", "label"] MIN_FEATURE_COMPLETENESS = 0.70 # 70% of numeric features must be non-zero def check_experience_quality(df) -> tuple[bool, str]: """ Returns (is_valid, reason). Rejects files with missing critical features or too many nulls. """ import pandas as pd if df is None or len(df) == 0: return False, "empty_file" # Check required features present missing = [f for f in REQUIRED_FEATURES if f not in df.columns] if missing: return False, f"missing_required: {missing}" # Check null ratio null_ratio = df.isnull().mean().mean() if null_ratio > (1 - MIN_FEATURE_COMPLETENESS): return False, f"too_many_nulls: {null_ratio:.2%}" # Check label distribution (reject if all same label) if "label" in df.columns: unique_labels = df["label"].nunique() if unique_labels < 2: return False, f"degenerate_labels: all={df['label'].iloc[0]}" # Check schema version compatibility numeric_cols = df.select_dtypes(include=["number"]).columns if len(numeric_cols) < 5: return False, f"too_few_features: {len(numeric_cols)}" return True, "ok" # Schema version tracking CURRENT_SCHEMA_VERSION = "v4.0" SCHEMA_FEATURES = [ "obi", "spread_pct", "vpin", "entropy", "cvd_norm", "absorption", "ema_cross_fs", "ema_cross_sl", "adx", "momentum_zscore", "trend_strength", "vol_surge", "atr", "zscore_60", "zscore_300", "hurst", "bb_pct_b", "bb_width", "rsi", "mean_rev_signal", "vwap_dev", "market_structure", "bos", "pin_bar", "engulfing", "inside_bar", "pivot_dist_h", "pivot_dist_l", "fib_618_prox", "funding", "funding_trend", "oi_delta", "ls_ratio", "label", "timestamp", ] def get_schema_info() -> dict: return { "version": CURRENT_SCHEMA_VERSION, "features": SCHEMA_FEATURES, "n_features": len(SCHEMA_FEATURES), } # Run loop for coordinator background thread def run_collection_loop(interval_seconds: int = 3600): """Run forever, collecting macro data every interval.""" log.info(f"📡 MacroCollector started — interval={interval_seconds}s") while True: try: data = collect_all() if data["coins"]: push_to_hf(data) except Exception as e: log.error(f"❌ collection_loop: {e}") time.sleep(interval_seconds) if __name__ == "__main__": logging.basicConfig(level=logging.INFO) data = collect_all() print(json.dumps(data, indent=2)) push_to_hf(data)