File size: 9,411 Bytes
4449814
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
"""
Hyperliquid Data Fetcher — Real funding, OI, L/S ratio
Chạy như background task trong Coordinator Space mỗi 1 giờ.
Push enrichment data lên HF dataset để workers dùng khi tính features.
"""
from __future__ import annotations
import os, json, time, logging, tempfile
from datetime import datetime, timezone

log = logging.getLogger("hyperliquid_data")

HL_API = "https://api.hyperliquid.xyz/info"
HF_TOKEN      = os.environ.get("HF_TOKEN", "")
EXPERIENCE_REPO = os.environ.get("EXPERIENCE_REPO", "gionuibk/aetheris-experiences")

COINS = ["BTC", "ETH", "SOL"]  # coins to track


def _post(payload: dict, timeout: int = 10) -> dict | None:
    try:
        import requests
        r = requests.post(HL_API, json=payload, timeout=timeout)
        r.raise_for_status()
        return r.json()
    except Exception as e:
        log.warning(f"HL API error: {e}")
        return None


def fetch_funding_rates() -> dict:
    """Fetch current funding rates for all coins."""
    result = {}
    data = _post({"type": "metaAndAssetCtxs"})
    if not data or len(data) < 2:
        return result
    try:
        universe = data[0].get("universe", [])
        ctxs     = data[1]
        for i, coin_info in enumerate(universe):
            coin = coin_info.get("name", "")
            if coin not in COINS:
                continue
            ctx = ctxs[i] if i < len(ctxs) else {}
            result[coin] = {
                "funding":     float(ctx.get("funding", 0)),
                "open_interest": float(ctx.get("openInterest", 0)),
                "mark_price":  float(ctx.get("markPx", 0)),
                "premium":     float(ctx.get("premium", 0)),
            }
    except Exception as e:
        log.warning(f"parse metaAndAssetCtxs: {e}")
    return result


def fetch_funding_history(coin: str, hours_back: int = 8) -> list[dict]:
    """Fetch recent funding history for trend calculation."""
    now_ms  = int(time.time() * 1000)
    ago_ms  = now_ms - hours_back * 3600 * 1000
    data = _post({"type": "fundingHistory", "coin": coin, "startTime": ago_ms})
    if not data:
        return []
    history = []
    for entry in data[-10:]:  # last 10 entries
        try:
            history.append({
                "time":    entry.get("time", 0),
                "funding": float(entry.get("fundingRate", 0)),
            })
        except Exception:
            pass
    return history


def compute_funding_trend(history: list[dict]) -> float:
    """Positive = funding rising (longs being squeezed more), Negative = falling."""
    if len(history) < 2:
        return 0.0
    rates = [h["funding"] for h in history]
    # Simple linear trend: last - first
    return float(rates[-1] - rates[0])


def fetch_ls_ratio(coin: str) -> float:
    """
    Hyperliquid does not expose L/S ratio directly.
    We approximate from OI data and mark vs index price spread.
    Returns estimated long ratio (0.5 = neutral).
    """
    data = _post({"type": "metaAndAssetCtxs"})
    if not data or len(data) < 2:
        return 0.5
    try:
        universe = data[0].get("universe", [])
        ctxs     = data[1]
        for i, ci in enumerate(universe):
            if ci.get("name") == coin and i < len(ctxs):
                ctx = ctxs[i]
                premium = float(ctx.get("premium", 0))
                # Positive premium = longs pay → more longs than shorts
                if premium > 0.0002:
                    return min(0.75, 0.5 + premium * 200)
                elif premium < -0.0001:
                    return max(0.25, 0.5 + premium * 200)
                return 0.5
    except Exception:
        pass
    return 0.5


def collect_all() -> dict:
    """Collect all macro data and return as unified dict."""
    log.info("📡 Collecting Hyperliquid macro data...")
    output = {"timestamp": datetime.now(timezone.utc).isoformat(), "coins": {}}

    current = fetch_funding_rates()
    for coin in COINS:
        if coin not in current:
            log.warning(f"  ⚠️ {coin}: no data")
            continue
        cur    = current[coin]
        hist   = fetch_funding_history(coin)
        trend  = compute_funding_trend(hist)
        ls     = fetch_ls_ratio(coin)

        output["coins"][coin] = {
            "funding":      cur["funding"],
            "funding_trend": trend,
            "open_interest": cur["open_interest"],
            "mark_price":   cur["mark_price"],
            "ls_ratio":     ls,
            "oi_delta":     0.0,  # filled on next call by comparing to previous
            "fetched_at":   output["timestamp"],
        }
        log.info(f"  ✅ {coin}: funding={cur['funding']:.5f} oi={cur['open_interest']:.0f} ls={ls:.2f}")

    return output


def push_to_hf(data: dict) -> bool:
    """Push macro snapshot to HF dataset."""
    if not HF_TOKEN:
        log.warning("No HF_TOKEN — skipping push")
        return False
    try:
        from huggingface_hub import HfApi
        api = HfApi(token=HF_TOKEN)
        ts  = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M")
        tmp = None
        with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
            json.dump(data, f, indent=2)
            tmp = f.name
        api.upload_file(
            path_or_fileobj=tmp, token=HF_TOKEN,
            path_in_repo=f"macro/snapshot_{ts}.json",
            repo_id=EXPERIENCE_REPO, repo_type="dataset",
            commit_message=f"Macro snapshot {ts}",
        )
        # Also maintain a "latest" file for workers to read quickly
        api.upload_file(
            path_or_fileobj=tmp, token=HF_TOKEN,
            path_in_repo="macro/latest.json",
            repo_id=EXPERIENCE_REPO, repo_type="dataset",
            commit_message=f"Macro latest {ts}",
        )
        log.info(f"☁️  Pushed macro snapshot {ts}")
        if tmp and os.path.exists(tmp):
            os.unlink(tmp)
        return True
    except Exception as e:
        log.error(f"❌ push_to_hf: {e}")
        return False


def load_latest_macro(coin: str = "BTC") -> dict:
    """
    Workers call this to get real funding/OI/LS ratio.
    Falls back to zeros if data not available.
    """
    DEFAULTS = {"funding": 0.0, "funding_trend": 0.0,
                "open_interest": 0.0, "ls_ratio": 0.5, "oi_delta": 0.0}
    try:
        from huggingface_hub import hf_hub_download
        local = hf_hub_download(
            repo_id=EXPERIENCE_REPO, filename="macro/latest.json",
            repo_type="dataset", token=HF_TOKEN, cache_dir="/tmp/macro_cache",
        )
        with open(local) as f:
            data = json.load(f)
        coin_data = data.get("coins", {}).get(coin, DEFAULTS)
        return {k: coin_data.get(k, v) for k, v in DEFAULTS.items()}
    except Exception:
        return DEFAULTS


# Data quality constants
REQUIRED_FEATURES = ["obi", "spread_pct", "label"]
MIN_FEATURE_COMPLETENESS = 0.70  # 70% of numeric features must be non-zero


def check_experience_quality(df) -> tuple[bool, str]:
    """
    Returns (is_valid, reason).
    Rejects files with missing critical features or too many nulls.
    """
    import pandas as pd

    if df is None or len(df) == 0:
        return False, "empty_file"

    # Check required features present
    missing = [f for f in REQUIRED_FEATURES if f not in df.columns]
    if missing:
        return False, f"missing_required: {missing}"

    # Check null ratio
    null_ratio = df.isnull().mean().mean()
    if null_ratio > (1 - MIN_FEATURE_COMPLETENESS):
        return False, f"too_many_nulls: {null_ratio:.2%}"

    # Check label distribution (reject if all same label)
    if "label" in df.columns:
        unique_labels = df["label"].nunique()
        if unique_labels < 2:
            return False, f"degenerate_labels: all={df['label'].iloc[0]}"

    # Check schema version compatibility
    numeric_cols = df.select_dtypes(include=["number"]).columns
    if len(numeric_cols) < 5:
        return False, f"too_few_features: {len(numeric_cols)}"

    return True, "ok"


# Schema version tracking
CURRENT_SCHEMA_VERSION = "v4.0"
SCHEMA_FEATURES = [
    "obi", "spread_pct", "vpin", "entropy", "cvd_norm", "absorption",
    "ema_cross_fs", "ema_cross_sl", "adx", "momentum_zscore", "trend_strength",
    "vol_surge", "atr", "zscore_60", "zscore_300", "hurst", "bb_pct_b",
    "bb_width", "rsi", "mean_rev_signal", "vwap_dev", "market_structure",
    "bos", "pin_bar", "engulfing", "inside_bar", "pivot_dist_h", "pivot_dist_l",
    "fib_618_prox", "funding", "funding_trend", "oi_delta", "ls_ratio",
    "label", "timestamp",
]


def get_schema_info() -> dict:
    return {
        "version":  CURRENT_SCHEMA_VERSION,
        "features": SCHEMA_FEATURES,
        "n_features": len(SCHEMA_FEATURES),
    }


# Run loop for coordinator background thread
def run_collection_loop(interval_seconds: int = 3600):
    """Run forever, collecting macro data every interval."""
    log.info(f"📡 MacroCollector started — interval={interval_seconds}s")
    while True:
        try:
            data = collect_all()
            if data["coins"]:
                push_to_hf(data)
        except Exception as e:
            log.error(f"❌ collection_loop: {e}")
        time.sleep(interval_seconds)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    data = collect_all()
    print(json.dumps(data, indent=2))
    push_to_hf(data)