Spaces:
Running
Running
| """ | |
| Hyperliquid Data Fetcher — Real funding, OI, L/S ratio | |
| Chạy như background task trong Coordinator Space mỗi 1 giờ. | |
| Push enrichment data lên HF dataset để workers dùng khi tính features. | |
| """ | |
| from __future__ import annotations | |
| import os, json, time, logging, tempfile | |
| from datetime import datetime, timezone | |
| log = logging.getLogger("hyperliquid_data") | |
| HL_API = "https://api.hyperliquid.xyz/info" | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") | |
| EXPERIENCE_REPO = os.environ.get("EXPERIENCE_REPO", "gionuibk/aetheris-experiences") | |
| COINS = ["BTC", "ETH", "SOL"] # coins to track | |
| def _post(payload: dict, timeout: int = 10) -> dict | None: | |
| try: | |
| import requests | |
| r = requests.post(HL_API, json=payload, timeout=timeout) | |
| r.raise_for_status() | |
| return r.json() | |
| except Exception as e: | |
| log.warning(f"HL API error: {e}") | |
| return None | |
| def fetch_funding_rates() -> dict: | |
| """Fetch current funding rates for all coins.""" | |
| result = {} | |
| data = _post({"type": "metaAndAssetCtxs"}) | |
| if not data or len(data) < 2: | |
| return result | |
| try: | |
| universe = data[0].get("universe", []) | |
| ctxs = data[1] | |
| for i, coin_info in enumerate(universe): | |
| coin = coin_info.get("name", "") | |
| if coin not in COINS: | |
| continue | |
| ctx = ctxs[i] if i < len(ctxs) else {} | |
| result[coin] = { | |
| "funding": float(ctx.get("funding", 0)), | |
| "open_interest": float(ctx.get("openInterest", 0)), | |
| "mark_price": float(ctx.get("markPx", 0)), | |
| "premium": float(ctx.get("premium", 0)), | |
| } | |
| except Exception as e: | |
| log.warning(f"parse metaAndAssetCtxs: {e}") | |
| return result | |
| def fetch_funding_history(coin: str, hours_back: int = 8) -> list[dict]: | |
| """Fetch recent funding history for trend calculation.""" | |
| now_ms = int(time.time() * 1000) | |
| ago_ms = now_ms - hours_back * 3600 * 1000 | |
| data = _post({"type": "fundingHistory", "coin": coin, "startTime": ago_ms}) | |
| if not data: | |
| return [] | |
| history = [] | |
| for entry in data[-10:]: # last 10 entries | |
| try: | |
| history.append({ | |
| "time": entry.get("time", 0), | |
| "funding": float(entry.get("fundingRate", 0)), | |
| }) | |
| except Exception: | |
| pass | |
| return history | |
| def compute_funding_trend(history: list[dict]) -> float: | |
| """Positive = funding rising (longs being squeezed more), Negative = falling.""" | |
| if len(history) < 2: | |
| return 0.0 | |
| rates = [h["funding"] for h in history] | |
| # Simple linear trend: last - first | |
| return float(rates[-1] - rates[0]) | |
| def fetch_ls_ratio(coin: str) -> float: | |
| """ | |
| Hyperliquid does not expose L/S ratio directly. | |
| We approximate from OI data and mark vs index price spread. | |
| Returns estimated long ratio (0.5 = neutral). | |
| """ | |
| data = _post({"type": "metaAndAssetCtxs"}) | |
| if not data or len(data) < 2: | |
| return 0.5 | |
| try: | |
| universe = data[0].get("universe", []) | |
| ctxs = data[1] | |
| for i, ci in enumerate(universe): | |
| if ci.get("name") == coin and i < len(ctxs): | |
| ctx = ctxs[i] | |
| premium = float(ctx.get("premium", 0)) | |
| # Positive premium = longs pay → more longs than shorts | |
| if premium > 0.0002: | |
| return min(0.75, 0.5 + premium * 200) | |
| elif premium < -0.0001: | |
| return max(0.25, 0.5 + premium * 200) | |
| return 0.5 | |
| except Exception: | |
| pass | |
| return 0.5 | |
| def collect_all() -> dict: | |
| """Collect all macro data and return as unified dict.""" | |
| log.info("📡 Collecting Hyperliquid macro data...") | |
| output = {"timestamp": datetime.now(timezone.utc).isoformat(), "coins": {}} | |
| current = fetch_funding_rates() | |
| for coin in COINS: | |
| if coin not in current: | |
| log.warning(f" ⚠️ {coin}: no data") | |
| continue | |
| cur = current[coin] | |
| hist = fetch_funding_history(coin) | |
| trend = compute_funding_trend(hist) | |
| ls = fetch_ls_ratio(coin) | |
| output["coins"][coin] = { | |
| "funding": cur["funding"], | |
| "funding_trend": trend, | |
| "open_interest": cur["open_interest"], | |
| "mark_price": cur["mark_price"], | |
| "ls_ratio": ls, | |
| "oi_delta": 0.0, # filled on next call by comparing to previous | |
| "fetched_at": output["timestamp"], | |
| } | |
| log.info(f" ✅ {coin}: funding={cur['funding']:.5f} oi={cur['open_interest']:.0f} ls={ls:.2f}") | |
| return output | |
| def push_to_hf(data: dict) -> bool: | |
| """Push macro snapshot to HF dataset.""" | |
| if not HF_TOKEN: | |
| log.warning("No HF_TOKEN — skipping push") | |
| return False | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi(token=HF_TOKEN) | |
| ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M") | |
| tmp = None | |
| with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: | |
| json.dump(data, f, indent=2) | |
| tmp = f.name | |
| api.upload_file( | |
| path_or_fileobj=tmp, token=HF_TOKEN, | |
| path_in_repo=f"macro/snapshot_{ts}.json", | |
| repo_id=EXPERIENCE_REPO, repo_type="dataset", | |
| commit_message=f"Macro snapshot {ts}", | |
| ) | |
| # Also maintain a "latest" file for workers to read quickly | |
| api.upload_file( | |
| path_or_fileobj=tmp, token=HF_TOKEN, | |
| path_in_repo="macro/latest.json", | |
| repo_id=EXPERIENCE_REPO, repo_type="dataset", | |
| commit_message=f"Macro latest {ts}", | |
| ) | |
| log.info(f"☁️ Pushed macro snapshot {ts}") | |
| if tmp and os.path.exists(tmp): | |
| os.unlink(tmp) | |
| return True | |
| except Exception as e: | |
| log.error(f"❌ push_to_hf: {e}") | |
| return False | |
| def load_latest_macro(coin: str = "BTC") -> dict: | |
| """ | |
| Workers call this to get real funding/OI/LS ratio. | |
| Falls back to zeros if data not available. | |
| """ | |
| DEFAULTS = {"funding": 0.0, "funding_trend": 0.0, | |
| "open_interest": 0.0, "ls_ratio": 0.5, "oi_delta": 0.0} | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| local = hf_hub_download( | |
| repo_id=EXPERIENCE_REPO, filename="macro/latest.json", | |
| repo_type="dataset", token=HF_TOKEN, cache_dir="/tmp/macro_cache", | |
| ) | |
| with open(local) as f: | |
| data = json.load(f) | |
| coin_data = data.get("coins", {}).get(coin, DEFAULTS) | |
| return {k: coin_data.get(k, v) for k, v in DEFAULTS.items()} | |
| except Exception: | |
| return DEFAULTS | |
| # Data quality constants | |
| REQUIRED_FEATURES = ["obi", "spread_pct", "label"] | |
| MIN_FEATURE_COMPLETENESS = 0.70 # 70% of numeric features must be non-zero | |
| def check_experience_quality(df) -> tuple[bool, str]: | |
| """ | |
| Returns (is_valid, reason). | |
| Rejects files with missing critical features or too many nulls. | |
| """ | |
| import pandas as pd | |
| if df is None or len(df) == 0: | |
| return False, "empty_file" | |
| # Check required features present | |
| missing = [f for f in REQUIRED_FEATURES if f not in df.columns] | |
| if missing: | |
| return False, f"missing_required: {missing}" | |
| # Check null ratio | |
| null_ratio = df.isnull().mean().mean() | |
| if null_ratio > (1 - MIN_FEATURE_COMPLETENESS): | |
| return False, f"too_many_nulls: {null_ratio:.2%}" | |
| # Check label distribution (reject if all same label) | |
| if "label" in df.columns: | |
| unique_labels = df["label"].nunique() | |
| if unique_labels < 2: | |
| return False, f"degenerate_labels: all={df['label'].iloc[0]}" | |
| # Check schema version compatibility | |
| numeric_cols = df.select_dtypes(include=["number"]).columns | |
| if len(numeric_cols) < 5: | |
| return False, f"too_few_features: {len(numeric_cols)}" | |
| return True, "ok" | |
| # Schema version tracking | |
| CURRENT_SCHEMA_VERSION = "v4.0" | |
| SCHEMA_FEATURES = [ | |
| "obi", "spread_pct", "vpin", "entropy", "cvd_norm", "absorption", | |
| "ema_cross_fs", "ema_cross_sl", "adx", "momentum_zscore", "trend_strength", | |
| "vol_surge", "atr", "zscore_60", "zscore_300", "hurst", "bb_pct_b", | |
| "bb_width", "rsi", "mean_rev_signal", "vwap_dev", "market_structure", | |
| "bos", "pin_bar", "engulfing", "inside_bar", "pivot_dist_h", "pivot_dist_l", | |
| "fib_618_prox", "funding", "funding_trend", "oi_delta", "ls_ratio", | |
| "label", "timestamp", | |
| ] | |
| def get_schema_info() -> dict: | |
| return { | |
| "version": CURRENT_SCHEMA_VERSION, | |
| "features": SCHEMA_FEATURES, | |
| "n_features": len(SCHEMA_FEATURES), | |
| } | |
| # Run loop for coordinator background thread | |
| def run_collection_loop(interval_seconds: int = 3600): | |
| """Run forever, collecting macro data every interval.""" | |
| log.info(f"📡 MacroCollector started — interval={interval_seconds}s") | |
| while True: | |
| try: | |
| data = collect_all() | |
| if data["coins"]: | |
| push_to_hf(data) | |
| except Exception as e: | |
| log.error(f"❌ collection_loop: {e}") | |
| time.sleep(interval_seconds) | |
| if __name__ == "__main__": | |
| logging.basicConfig(level=logging.INFO) | |
| data = collect_all() | |
| print(json.dumps(data, indent=2)) | |
| push_to_hf(data) | |