Spaces:
Sleeping
Sleeping
| """ | |
| Training entry point for the ARB-MAX v1_arb model. | |
| Flow: | |
| 1. Download markets_index + ohlcv + orderbook (filtered to asset's slugs). | |
| 2. For each window: build_window_frame @ tick=120, extract features, | |
| compute optimal arb PnL & binary label. | |
| 3. Assemble pandas frame, split 90/10, walk-forward 5-fold on 90%. | |
| 4. Optuna TPE (seed=42, maximize) with objective = | |
| mean across folds of (sum_flagged_pnl - 0.2 * max_drawdown). | |
| 5. Refit best params on full 90%, evaluate on 10% holdout. | |
| 6. Save artifacts, upload to HF dataset `commanderzee/{asset}-v1-arb-full`. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import time | |
| import traceback | |
| from pathlib import Path | |
| from typing import Callable, Dict, List, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| import lightgbm as lgb | |
| import optuna | |
| from huggingface_hub import HfApi | |
| from huggingface_hub.utils import HfHubHTTPError | |
| from sklearn.metrics import roc_auc_score | |
| from data_loader import ( | |
| build_window_frame, | |
| get_window_label, | |
| load_markets_index, | |
| load_ohlcv, | |
| load_orderbook_filtered, | |
| ) | |
| from features_ohlcv import FEATURE_NAMES as OHLCV_FEATS, extract as extract_ohlcv | |
| from features_orderbook import ( | |
| FEATURE_NAMES as OB_FEATS, | |
| extract as extract_ob, | |
| ) | |
| from features_state import FEATURE_NAMES as STATE_FEATS, extract as extract_state | |
| from labels import ( | |
| compute_optimal_arb, | |
| simulate_live_policy, | |
| simulate_live_policy_from_eff, | |
| _precompute_effective_arr, | |
| ) | |
| from replay import simulate_flagging_pnl | |
| ALL_FEATURES: List[str] = list(OHLCV_FEATS) + list(OB_FEATS) + list(STATE_FEATS) | |
| # --------------------------------------------------------------------------- | |
| # Feature assembly per window | |
| # --------------------------------------------------------------------------- | |
| def _extract_all(wf: pd.DataFrame, at_tick: int = 120) -> np.ndarray: | |
| a = extract_ohlcv(wf, at_tick=at_tick) | |
| b = extract_ob(wf, at_tick=at_tick) | |
| c = extract_state(wf, at_tick=at_tick) | |
| out = np.concatenate([a, b, c], dtype=np.float32) | |
| assert out.shape[0] == len(ALL_FEATURES) | |
| return out | |
| # --------------------------------------------------------------------------- | |
| # HF upload retry wrapper | |
| # --------------------------------------------------------------------------- | |
| def _upload_with_retry( | |
| api: HfApi, | |
| folder_path: str, | |
| repo_id: str, | |
| repo_type: str, | |
| hf_token: str, | |
| max_attempts: int = 6, | |
| log: Callable[[str], None] = print, | |
| ) -> None: | |
| attempt = 0 | |
| while attempt < max_attempts: | |
| try: | |
| api.upload_folder( | |
| folder_path=folder_path, | |
| repo_id=repo_id, | |
| repo_type=repo_type, | |
| token=hf_token, | |
| create_pr=False, | |
| ) | |
| return | |
| except HfHubHTTPError as e: # type: ignore[attr-defined] | |
| status = getattr(getattr(e, "response", None), "status_code", None) | |
| if status is not None and (status == 429 or 500 <= status < 600): | |
| sleep_s = min(60.0, 2.0 ** attempt) | |
| log(f"[upload] HTTP {status}; retry in {sleep_s}s") | |
| time.sleep(sleep_s) | |
| attempt += 1 | |
| continue | |
| raise | |
| except Exception as e: # noqa: BLE001 | |
| sleep_s = min(60.0, 2.0 ** attempt) | |
| log(f"[upload] error {e!r}; retry in {sleep_s}s") | |
| time.sleep(sleep_s) | |
| attempt += 1 | |
| raise RuntimeError(f"upload_folder failed for {repo_id} after {max_attempts} attempts") | |
| # --------------------------------------------------------------------------- | |
| # Dataset build | |
| # --------------------------------------------------------------------------- | |
| def _build_training_dataset( | |
| asset: str, | |
| hf_token: str, | |
| cache_dir: Path, | |
| log: Callable[[str], None] = print, | |
| ob_batch_size: int = 500, | |
| ) -> pd.DataFrame: | |
| """Build the per-window training dataframe. | |
| IMPORTANT: the orderbook parquet for big assets is ~30-40 GB on disk and | |
| cannot fit in a 32 GB Space's RAM. We iterate the filtered file in batches | |
| of `ob_batch_size` slugs via `iter_orderbook_batches`, processing each | |
| batch's windows end-to-end and dropping the polars DataFrame before | |
| advancing. Peak memory per batch ~1-2 GB. | |
| """ | |
| import gc | |
| import polars as pl # local import to keep module import-light | |
| from data_loader import iter_orderbook_slug_pairs | |
| log(f"[data] loading markets_index for {asset}") | |
| markets = load_markets_index(asset, hf_token, cache_dir) | |
| log(f"[data] {len(markets)} resolved updown-15m markets") | |
| log(f"[data] loading ohlcv_1s for {asset}") | |
| ohlcv = load_ohlcv(asset, hf_token, cache_dir) | |
| slugs = markets["slug"].to_list() | |
| slug_ts_list = markets["slug_ts"].to_list() | |
| slug_ts_map = dict(zip(slugs, [int(t) for t in slug_ts_list])) | |
| wanted = set(slugs) | |
| log(f"[data] streaming book_snapshot_5 row-groups (~{len(slugs)} slugs, " | |
| f"peak ~5 MB per slug)") | |
| rows: List[Dict] = [] | |
| eff_cache: List[Dict] = [] # parallel to rows — holds up_eff/dn_eff arrays | |
| built = 0 | |
| skipped = 0 | |
| processed = 0 | |
| for slug, ob_up, ob_dn in iter_orderbook_slug_pairs( | |
| asset, hf_token, cache_dir, wanted | |
| ): | |
| processed += 1 | |
| slug_ts = slug_ts_map.get(slug) | |
| if slug_ts is None: | |
| continue | |
| try: | |
| spot = get_window_label(slug_ts, ohlcv) | |
| if spot is None: | |
| skipped += 1 | |
| continue | |
| wf = build_window_frame(slug, slug_ts, ob_up, ob_dn, ohlcv) | |
| feats = _extract_all(wf, at_tick=120) | |
| # DIRECTIONAL label: predict whether UP wins the window. | |
| # At tick 120 we snapshot both sides' walked-book asks (for | |
| # 100 shares) so the downstream trading sim can enforce the | |
| # "lower is better" entry-price rule. | |
| up_eff = _precompute_effective_arr(wf, "up", size=100) | |
| dn_eff = _precompute_effective_arr(wf, "dn", size=100) | |
| up_ask_120 = float(up_eff[120]) if len(up_eff) > 120 else 1.0 | |
| dn_ask_120 = float(dn_eff[120]) if len(dn_eff) > 120 else 1.0 | |
| if not np.isfinite(up_ask_120) or up_ask_120 <= 0: | |
| up_ask_120 = 1.0 | |
| if not np.isfinite(dn_ask_120) or dn_ask_120 <= 0: | |
| dn_ask_120 = 1.0 | |
| fee_up = 0.072 * up_ask_120 * (1.0 - up_ask_120) * 100 if 0 < up_ask_120 < 1 else 0.0 | |
| fee_dn = 0.072 * dn_ask_120 * (1.0 - dn_ask_120) * 100 if 0 < dn_ask_120 < 1 else 0.0 | |
| # PnL per side if entered at tick 120 and held to resolution | |
| if spot == 1: # UP won | |
| pnl_up = (1.0 - up_ask_120) * 100 - fee_up | |
| pnl_dn = -dn_ask_120 * 100 - fee_dn | |
| else: # DN won | |
| pnl_up = -up_ask_120 * 100 - fee_up | |
| pnl_dn = (1.0 - dn_ask_120) * 100 - fee_dn | |
| row = { | |
| "slug": slug, | |
| "slug_ts": slug_ts, | |
| "spot_label": spot, | |
| "binary_label": int(spot == 1), # 1 if UP wins | |
| "up_ask_120": up_ask_120, | |
| "dn_ask_120": dn_ask_120, | |
| "pnl_up": pnl_up, | |
| "pnl_dn": pnl_dn, | |
| "realized_pnl": pnl_up, # kept for schema compat; not used directly | |
| } | |
| for name, v in zip(ALL_FEATURES, feats): | |
| row[name] = float(v) | |
| rows.append(row) | |
| eff_cache.append({ | |
| "slug": slug, | |
| "spot_label": spot, | |
| "up_eff": np.zeros(1, dtype=np.float32), # unused in v4 mode | |
| "dn_eff": np.zeros(1, dtype=np.float32), | |
| }) | |
| built += 1 | |
| except Exception as e: # noqa: BLE001 | |
| skipped += 1 | |
| if skipped <= 3: | |
| import traceback | |
| log(f"[data] window error slug={slug}: {e!r}\n" | |
| f"{traceback.format_exc()}") | |
| else: | |
| log(f"[data] window error slug={slug}: {e!r}") | |
| # free the row-group tables before the next one | |
| del ob_up, ob_dn | |
| if processed % 1000 == 0: | |
| gc.collect() | |
| log(f"[data] processed={processed} built={built} skipped={skipped}") | |
| gc.collect() | |
| log(f"[data] done streaming. processed={processed} built={built} skipped={skipped}") | |
| df = pd.DataFrame(rows) | |
| if len(df) == 0: | |
| raise RuntimeError("No valid training rows produced") | |
| df = df.sort_values("slug_ts").reset_index(drop=True) | |
| # Directional-mode diagnostics | |
| import numpy as _np | |
| if "realized_pnl" in df.columns: | |
| pnls = df["realized_pnl"].to_numpy() | |
| pcts = _np.percentile(pnls, [1, 5, 25, 50, 75, 95, 99]) | |
| log(f"[data] realized_pnl(buy_up_@120) percentiles " | |
| f"1%={pcts[0]:.2f} 5%={pcts[1]:.2f} 25%={pcts[2]:.2f} " | |
| f"50%={pcts[3]:.2f} 75%={pcts[4]:.2f} 95%={pcts[5]:.2f} 99%={pcts[6]:.2f}") | |
| if "up_ask_120" in df.columns: | |
| ua = df["up_ask_120"].to_numpy() | |
| da = df["dn_ask_120"].to_numpy() | |
| log(f"[data] up_ask_120: median={_np.median(ua):.3f} " | |
| f"5%={_np.quantile(ua, 0.05):.3f} 95%={_np.quantile(ua, 0.95):.3f} | " | |
| f"dn_ask_120: median={_np.median(da):.3f}") | |
| cheap_either = float(((ua <= 0.40) | (da <= 0.40)).mean()) | |
| cheap_both_lo = float(((ua <= 0.35) | (da <= 0.35)).mean()) | |
| log(f"[data] entry opportunity: " | |
| f"some_side≤$0.40={cheap_either:.3f} some_side≤$0.35={cheap_both_lo:.3f}") | |
| # re-sort eff_cache to mirror df row order (df was just sorted by slug_ts) | |
| slug_to_idx = {c["slug"]: i for i, c in enumerate(eff_cache)} | |
| reordered = [eff_cache[slug_to_idx[s]] for s in df["slug"].tolist() if s in slug_to_idx] | |
| eff_cache[:] = reordered | |
| df.attrs["eff_cache"] = eff_cache | |
| log(f"[data] final dataset rows: {len(df)}; " | |
| f"positive rate: {df['binary_label'].mean():.3f}") | |
| return df | |
| # --------------------------------------------------------------------------- | |
| # Policy-parameter Optuna | |
| # --------------------------------------------------------------------------- | |
| def _policy_optuna( | |
| eff_cache: List[Dict], | |
| n_trials: int = 100, | |
| log: Callable[[str], None] = print, | |
| ) -> Dict: | |
| """Optuna over the trading policy's own parameters (entry_threshold, | |
| combined_threshold, min_tick) — NOT the classifier's hyperparameters. | |
| Objective: mean realized PnL per window across the full pool. Uses the | |
| cached walked-book effective-ask arrays so each trial is pure numpy (~1s | |
| for 16k windows), making 100 trials a few minutes. | |
| """ | |
| if not eff_cache: | |
| log("[policy] no eff_cache available; skipping policy Optuna") | |
| return {"params": {"entry_threshold": 0.40, "combined_threshold": 0.80, | |
| "min_tick": 120}, "mean_pnl": 0.0} | |
| # precompute common arrays | |
| ups = [c["up_eff"] for c in eff_cache] | |
| dns = [c["dn_eff"] for c in eff_cache] | |
| labels = [c["spot_label"] for c in eff_cache] | |
| def _mean_pnl(ent: float, comb: float, tick: int) -> float: | |
| total = 0.0 | |
| n = 0 | |
| for u, d, lab in zip(ups, dns, labels): | |
| res = simulate_live_policy_from_eff( | |
| u.astype(np.float64), d.astype(np.float64), lab, | |
| entry_threshold=ent, combined_threshold=comb, | |
| size=100, min_tick=tick, binary_threshold=20.0, | |
| ) | |
| total += res["realized_pnl"] | |
| n += 1 | |
| return total / max(n, 1) | |
| def _obj(trial: optuna.Trial) -> float: | |
| ent = trial.suggest_float("entry_threshold", 0.20, 0.49) | |
| comb = trial.suggest_float("combined_threshold", 0.55, 0.95) | |
| tick = trial.suggest_int("min_tick", 120, 300, step=30) | |
| if comb - ent < 0.10: # combined must be meaningfully > entry | |
| return -1e6 | |
| return _mean_pnl(ent, comb, tick) | |
| sampler = optuna.samplers.TPESampler(seed=42) | |
| study = optuna.create_study(direction="maximize", sampler=sampler) | |
| log(f"[policy] Optuna over {len(eff_cache)} windows × {n_trials} trials") | |
| study.optimize(_obj, n_trials=n_trials, show_progress_bar=False) | |
| return {"params": study.best_params, "mean_pnl": float(study.best_value)} | |
| # --------------------------------------------------------------------------- | |
| # Walk-forward folds | |
| # --------------------------------------------------------------------------- | |
| def _walk_forward_folds(n: int, n_folds: int = 5) -> List[Tuple[int, int, int]]: | |
| """Return list of (train_end, test_start, test_end) triples. | |
| Expanding-window walk-forward over the WF pool [0, n). The pool is split | |
| into (n_folds + 1) contiguous segments. Fold k trains on the first (k+1) | |
| segments and tests on segment (k+1). No overlap with the final holdout | |
| (caller passes `n` = length of the 90% pool, not the full dataset). | |
| Fold k: train [0, (k+1)/(n_folds+1) * n), test [(k+1)/(n_folds+1) * n, (k+2)/(n_folds+1) * n) | |
| """ | |
| out: List[Tuple[int, int, int]] = [] | |
| for k in range(n_folds): | |
| train_end = int(round((k + 1) / (n_folds + 1) * n)) | |
| test_end = int(round((k + 2) / (n_folds + 1) * n)) | |
| test_start = train_end | |
| if test_end <= test_start: | |
| continue | |
| out.append((train_end, test_start, test_end)) | |
| return out | |
| # --------------------------------------------------------------------------- | |
| # Optuna objective | |
| # --------------------------------------------------------------------------- | |
| def _train_fold_core( | |
| X_train: np.ndarray, | |
| y_train: np.ndarray, | |
| X_test: np.ndarray, | |
| y_test: np.ndarray, | |
| params: Dict, | |
| ) -> Dict: | |
| """Train LightGBM on fold's train slice with internal early-stopping, | |
| predict on X_test. Returns preds + booster + AUC. No PnL / threshold | |
| logic here — that's applied per-trial in the Optuna objective so the | |
| same trained model can be reused across many threshold combos.""" | |
| n_tr = X_train.shape[0] | |
| es_cut = max(200, int(0.9 * n_tr)) if n_tr >= 500 else n_tr | |
| X_fit, y_fit = X_train[:es_cut], y_train[:es_cut] | |
| X_es, y_es = X_train[es_cut:], y_train[es_cut:] | |
| params_no_n = {k: v for k, v in params.items() if k != "n_estimators"} | |
| dtrain = lgb.Dataset(X_fit, label=y_fit, feature_name=ALL_FEATURES) | |
| callbacks = [lgb.log_evaluation(0)] | |
| valid_sets = None | |
| if len(X_es) >= 20 and len(set(y_es.tolist())) > 1: | |
| dvalid = lgb.Dataset( | |
| X_es, label=y_es, feature_name=ALL_FEATURES, reference=dtrain | |
| ) | |
| valid_sets = [dvalid] | |
| callbacks.insert(0, lgb.early_stopping(50, verbose=False)) | |
| booster = lgb.train( | |
| params_no_n, dtrain, | |
| num_boost_round=params.get("n_estimators", 500), | |
| valid_sets=valid_sets, callbacks=callbacks, | |
| ) | |
| best_iter = booster.best_iteration if valid_sets is not None else None | |
| preds = booster.predict(X_test, num_iteration=best_iter) | |
| try: | |
| auc = float(roc_auc_score(y_test, preds)) if len(set(y_test.tolist())) > 1 else 0.5 | |
| except Exception: | |
| auc = 0.5 | |
| return {"auc": auc, "booster": booster, "preds": preds} | |
| def _directional_policy_apply( | |
| preds: np.ndarray, | |
| up_asks: np.ndarray, | |
| dn_asks: np.ndarray, | |
| pnl_ups: np.ndarray, | |
| pnl_dns: np.ndarray, | |
| edge_threshold: float, | |
| entry_price_max: float = 1.0, | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| """Edge-margin decision rule. | |
| On Polymarket the ask for side S is approximately the market's implied | |
| P(S wins). So `edge_up = model_pred - up_ask` = our estimate minus the | |
| market's — the expected PnL per share before fees. If this exceeds a | |
| threshold on either side, we trade that side. Whichever edge is larger | |
| wins when both clear threshold. | |
| Single-knob policy (one Optuna param) — drops the prior 3-param setup | |
| (p_up_threshold, p_dn_threshold, entry_price_max) which was | |
| susceptible to overfit in Optuna's larger search space.""" | |
| n = len(preds) | |
| flag = np.zeros(n, dtype=bool) | |
| pnl = np.zeros(n, dtype=np.float64) | |
| for i in range(n): | |
| edge_up = preds[i] - up_asks[i] | |
| edge_dn = (1.0 - preds[i]) - dn_asks[i] | |
| up_ok = (edge_up >= edge_threshold) and (up_asks[i] <= entry_price_max) | |
| dn_ok = (edge_dn >= edge_threshold) and (dn_asks[i] <= entry_price_max) | |
| if up_ok and (not dn_ok or edge_up >= edge_dn): | |
| flag[i] = True | |
| pnl[i] = pnl_ups[i] | |
| elif dn_ok: | |
| flag[i] = True | |
| pnl[i] = pnl_dns[i] | |
| return flag, pnl | |
| def _lgb_params_from_trial(trial: optuna.Trial) -> Tuple[Dict, Dict]: | |
| params = { | |
| "objective": "binary", | |
| "metric": "binary_logloss", | |
| "verbosity": -1, | |
| "boosting_type": "gbdt", | |
| "num_leaves": trial.suggest_int("num_leaves", 31, 127), | |
| "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.1, log=True), | |
| "feature_fraction": trial.suggest_float("feature_fraction", 0.5, 1.0), | |
| "bagging_fraction": trial.suggest_float("bagging_fraction", 0.5, 1.0), | |
| "bagging_freq": trial.suggest_int("bagging_freq", 1, 5), | |
| "min_child_samples": trial.suggest_int("min_child_samples", 20, 200), | |
| "reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 1.0, log=True), | |
| "reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 1.0, log=True), | |
| "n_estimators": 500, | |
| } | |
| trading = { | |
| # primary: edge-margin (model_pred - market_ask for the chosen side) | |
| "edge_threshold": trial.suggest_float("edge_threshold", 0.03, 0.25), | |
| # safety cap: block high-entry overconfident trades where the | |
| # model's probability is likely uncalibrated. Without this, BTC | |
| # lost -$621 on a run (vs +$390 with the cap). | |
| "entry_price_max": trial.suggest_float("entry_price_max", 0.40, 0.65), | |
| } | |
| return params, trading | |
| # --------------------------------------------------------------------------- | |
| # Calibration table (10 buckets) | |
| # --------------------------------------------------------------------------- | |
| def _calibration_buckets(preds: np.ndarray, y: np.ndarray, n_buckets: int = 10) -> List[Dict]: | |
| preds = np.asarray(preds) | |
| y = np.asarray(y) | |
| edges = np.linspace(0.0, 1.0, n_buckets + 1) | |
| out = [] | |
| for i in range(n_buckets): | |
| lo, hi = edges[i], edges[i + 1] | |
| mask = (preds >= lo) & (preds < hi) if i < n_buckets - 1 else (preds >= lo) & (preds <= hi) | |
| n_here = int(mask.sum()) | |
| if n_here == 0: | |
| out.append({"bucket": i, "lo": lo, "hi": hi, "n": 0, "predicted": 0.0, "actual": 0.0}) | |
| continue | |
| out.append( | |
| { | |
| "bucket": i, | |
| "lo": float(lo), | |
| "hi": float(hi), | |
| "n": n_here, | |
| "predicted": float(preds[mask].mean()), | |
| "actual": float(y[mask].mean()), | |
| } | |
| ) | |
| return out | |
| # --------------------------------------------------------------------------- | |
| # Main entry | |
| # --------------------------------------------------------------------------- | |
| def run_training( | |
| asset: str, | |
| hf_token: str, | |
| variant: str = "v1_arb", | |
| n_trials: int = 150, | |
| n_folds: int = 5, | |
| log: Callable[[str], None] = print, | |
| cache_dir: str = "/tmp/cache", | |
| output_dir: str = "/tmp/output", | |
| ) -> Dict: | |
| cache_path = Path(cache_dir) | |
| out_path = Path(output_dir) | |
| cache_path.mkdir(parents=True, exist_ok=True) | |
| out_path.mkdir(parents=True, exist_ok=True) | |
| log(f"[train] asset={asset} variant={variant} trials={n_trials} folds={n_folds}") | |
| # 1. Build dataset | |
| df = _build_training_dataset(asset, hf_token, cache_path, log=log) | |
| df = df.sort_values("slug_ts").reset_index(drop=True) | |
| # DIRECTIONAL: label is "UP wins the window?" (spot_label), 1/0 balanced. | |
| # Per row we also track entry-price (tick 120) for each side and the | |
| # realized per-side PnL if we bought 100 shares at that price and held | |
| # to resolution. Asymmetry: the lower the ask we buy at, the larger the | |
| # payout on a correct call — this is enforced at trading-policy time | |
| # via the Optuna-tuned `entry_price_max` cap. | |
| log(f"[train] directional: up_win_rate={float(df['binary_label'].mean()):.3f} " | |
| f"mean_pnl_up={float(df['pnl_up'].mean()):.2f} " | |
| f"mean_pnl_dn={float(df['pnl_dn'].mean()):.2f} " | |
| f"up_ask_median={float(df['up_ask_120'].median()):.3f} " | |
| f"dn_ask_median={float(df['dn_ask_120'].median()):.3f}") | |
| X_all = df[ALL_FEATURES].to_numpy(dtype=np.float32) | |
| y_all = df["binary_label"].to_numpy(dtype=np.int32) | |
| up_ask_all = df["up_ask_120"].to_numpy(dtype=np.float64) | |
| dn_ask_all = df["dn_ask_120"].to_numpy(dtype=np.float64) | |
| pnl_up_all = df["pnl_up"].to_numpy(dtype=np.float64) | |
| pnl_dn_all = df["pnl_dn"].to_numpy(dtype=np.float64) | |
| n = len(df) | |
| split = int(round(0.9 * n)) | |
| X_pool = X_all[:split] | |
| y_pool = y_all[:split] | |
| up_ask_pool = up_ask_all[:split] | |
| dn_ask_pool = dn_ask_all[:split] | |
| pnl_up_pool = pnl_up_all[:split] | |
| pnl_dn_pool = pnl_dn_all[:split] | |
| X_hold = X_all[split:] | |
| y_hold = y_all[split:] | |
| up_ask_hold = up_ask_all[split:] | |
| dn_ask_hold = dn_ask_all[split:] | |
| pnl_up_hold = pnl_up_all[split:] | |
| pnl_dn_hold = pnl_dn_all[split:] | |
| log(f"[train] pool={len(X_pool)} holdout={len(X_hold)}") | |
| folds = _walk_forward_folds(len(X_pool), n_folds=n_folds) | |
| def _objective(trial: optuna.Trial) -> float: | |
| params, trading = _lgb_params_from_trial(trial) | |
| fold_scores = [] | |
| for (train_end, test_start, test_end) in folds: | |
| Xt = X_pool[:train_end] | |
| yt = y_pool[:train_end] | |
| Xv = X_pool[test_start:test_end] | |
| yv = y_pool[test_start:test_end] | |
| uav = up_ask_pool[test_start:test_end] | |
| dav = dn_ask_pool[test_start:test_end] | |
| pup_v = pnl_up_pool[test_start:test_end] | |
| pdn_v = pnl_dn_pool[test_start:test_end] | |
| if len(Xt) < 50 or len(Xv) < 20 or len(set(yt.tolist())) < 2: | |
| continue | |
| try: | |
| res = _train_fold_core(Xt, yt, Xv, yv, params) | |
| except Exception as e: # noqa: BLE001 | |
| log(f"[optuna] fold error: {e!r}") | |
| continue | |
| flag, pnl = _directional_policy_apply( | |
| res["preds"], uav, dav, pup_v, pdn_v, | |
| trading["edge_threshold"], | |
| trading["entry_price_max"], | |
| ) | |
| sim = simulate_flagging_pnl(pnl, flag) | |
| score = sim["sum_flagged"] - 0.2 * sim["max_drawdown"] | |
| fold_scores.append(score) | |
| if not fold_scores: | |
| return -1e9 | |
| return float(np.mean(fold_scores)) | |
| sampler = optuna.samplers.TPESampler(seed=42) | |
| study = optuna.create_study(direction="maximize", sampler=sampler) | |
| log(f"[optuna] starting with {n_trials} trials") | |
| study.optimize(_objective, n_trials=n_trials, show_progress_bar=False) | |
| best_params_trial = dict(study.best_trial.params) | |
| best_trading = { | |
| "edge_threshold": float(best_params_trial.pop("edge_threshold")), | |
| "entry_price_max": float(best_params_trial.pop("entry_price_max")), | |
| } | |
| best_lgb_params = { | |
| "objective": "binary", | |
| "metric": "binary_logloss", | |
| "verbosity": -1, | |
| "boosting_type": "gbdt", | |
| "n_estimators": 500, | |
| **best_params_trial, | |
| } | |
| log(f"[optuna] best value={study.best_value:.4f} trading={best_trading}") | |
| log(f"[optuna] best lgb params: {best_params_trial}") | |
| fold_metrics: List[Dict] = [] | |
| per_fold_thresholds: Dict[str, float] = {} | |
| for k, (train_end, test_start, test_end) in enumerate(folds): | |
| Xt = X_pool[:train_end] | |
| yt = y_pool[:train_end] | |
| Xv = X_pool[test_start:test_end] | |
| yv = y_pool[test_start:test_end] | |
| uav = up_ask_pool[test_start:test_end] | |
| dav = dn_ask_pool[test_start:test_end] | |
| pup_v = pnl_up_pool[test_start:test_end] | |
| pdn_v = pnl_dn_pool[test_start:test_end] | |
| if len(Xt) < 50 or len(Xv) < 20 or len(set(yt.tolist())) < 2: | |
| fold_metrics.append({"fold": k, "skipped": True}) | |
| continue | |
| res = _train_fold_core(Xt, yt, Xv, yv, best_lgb_params) | |
| flag, pnl = _directional_policy_apply( | |
| res["preds"], uav, dav, pup_v, pdn_v, | |
| best_trading["edge_threshold"], | |
| best_trading["entry_price_max"], | |
| ) | |
| sim = simulate_flagging_pnl(pnl, flag) | |
| # counterfactual: "trade every window, always buy cheaper side" | |
| pnl_all_cheaper = np.where(uav <= dav, pup_v, pdn_v) | |
| cf = simulate_flagging_pnl(pnl_all_cheaper, np.ones_like(pnl_all_cheaper, dtype=bool)) | |
| fold_metrics.append( | |
| { | |
| "fold": k, | |
| "train_end": train_end, | |
| "test_start": test_start, | |
| "test_end": test_end, | |
| "n_train": len(Xt), | |
| "n_test": len(Xv), | |
| "auc": res["auc"], | |
| "n_flagged": sim["n_flagged"], | |
| "n_total": sim["n_total"], | |
| "flagged_ratio": sim["flagged_ratio"], | |
| "realized_sum_pnl": sim["sum_flagged"], | |
| "realized_mean_pnl_of_flagged": sim["mean_flagged"], | |
| "realized_median_pnl_of_flagged": sim["median_flagged"], | |
| "hit_rate": sim["hit_rate"], | |
| "max_drawdown": sim["max_drawdown"], | |
| "flag_all_baseline_sum_pnl": cf["sum_flagged"], | |
| "calibration": _calibration_buckets(res["preds"], yv), | |
| } | |
| ) | |
| # 2. Final refit on full pool, evaluate on holdout | |
| log(f"[train] refit on pool size={len(X_pool)} → predict holdout size={len(X_hold)}") | |
| dtrain = lgb.Dataset(X_pool, label=y_pool, feature_name=ALL_FEATURES) | |
| final_booster = lgb.train( | |
| best_lgb_params, | |
| dtrain, | |
| num_boost_round=best_lgb_params.get("n_estimators", 500), | |
| callbacks=[lgb.log_evaluation(0)], | |
| ) | |
| if len(X_hold) > 0: | |
| hold_preds = final_booster.predict(X_hold) | |
| hold_flag, hold_pnl = _directional_policy_apply( | |
| hold_preds, up_ask_hold, dn_ask_hold, pnl_up_hold, pnl_dn_hold, | |
| best_trading["edge_threshold"], | |
| best_trading["entry_price_max"], | |
| ) | |
| hold_sim = simulate_flagging_pnl(hold_pnl, hold_flag) | |
| try: | |
| hold_auc = ( | |
| float(roc_auc_score(y_hold, hold_preds)) | |
| if len(set(y_hold.tolist())) > 1 | |
| else 0.5 | |
| ) | |
| except Exception: | |
| hold_auc = 0.5 | |
| hold_cal = _calibration_buckets(hold_preds, y_hold) | |
| cheaper_hold = np.where(up_ask_hold <= dn_ask_hold, pnl_up_hold, pnl_dn_hold) | |
| hold_cf = simulate_flagging_pnl(cheaper_hold, np.ones_like(cheaper_hold, dtype=bool)) | |
| else: | |
| hold_preds = np.array([]) | |
| hold_sim = { | |
| "sum_flagged": 0.0, "mean_flagged": 0.0, "median_flagged": 0.0, | |
| "max_drawdown": 0.0, "hit_rate": 0.0, "n_flagged": 0, "n_total": 0, | |
| "flagged_ratio": 0.0, | |
| } | |
| hold_auc = 0.5 | |
| hold_cal = [] | |
| hold_cf = {"sum_flagged": 0.0} | |
| # 3. Save artifacts | |
| model_path = out_path / "lgbm_model.txt" | |
| final_booster.save_model(str(model_path)) | |
| (out_path / "feature_spec.json").write_text( | |
| json.dumps( | |
| { | |
| "ohlcv_features": list(OHLCV_FEATS), | |
| "orderbook_features": list(OB_FEATS), | |
| "state_features": list(STATE_FEATS), | |
| "all_features": list(ALL_FEATURES), | |
| "n_features": len(ALL_FEATURES), | |
| }, | |
| indent=2, | |
| ) | |
| ) | |
| (out_path / "thresholds.json").write_text( | |
| json.dumps( | |
| { | |
| "trading": best_trading, | |
| "notes": "Edge-margin rule: edge_up = model_pred - up_ask; " | |
| "edge_dn = (1 - model_pred) - dn_ask. Enter the side " | |
| "with the larger edge, iff that edge >= edge_threshold. " | |
| "Edge is expected PnL per share before fees.", | |
| }, | |
| indent=2, | |
| ) | |
| ) | |
| (out_path / "fold_metrics.json").write_text(json.dumps(fold_metrics, indent=2)) | |
| trial_values = [ | |
| t.value if t.value is not None else float("nan") | |
| for t in study.trials | |
| if t.state == optuna.trial.TrialState.COMPLETE | |
| ] | |
| (out_path / "optuna_report.json").write_text( | |
| json.dumps( | |
| { | |
| "asset": asset, | |
| "variant": variant, | |
| "n_trials_requested": n_trials, | |
| "n_trials_completed": len(trial_values), | |
| "best_value": float(study.best_value), | |
| "best_params": dict(study.best_trial.params), | |
| "trial_values": trial_values, | |
| }, | |
| indent=2, | |
| ) | |
| ) | |
| (out_path / "holdout_metrics.json").write_text( | |
| json.dumps( | |
| { | |
| "n_holdout": len(X_hold), | |
| "auc": hold_auc, | |
| "sim": hold_sim, | |
| "calibration": hold_cal, | |
| "flag_all_baseline_sum_pnl": hold_cf.get("sum_flagged", 0.0), | |
| }, | |
| indent=2, | |
| ) | |
| ) | |
| # 4. Upload | |
| try: | |
| api = HfApi(token=hf_token) | |
| target_repo = f"commanderzee/{asset.lower()}-v1-arb-full" | |
| try: | |
| api.create_repo(repo_id=target_repo, repo_type="dataset", exist_ok=True, token=hf_token) | |
| except Exception as e: # noqa: BLE001 | |
| log(f"[upload] create_repo warning: {e!r}") | |
| _upload_with_retry(api, str(out_path), target_repo, "dataset", hf_token, log=log) | |
| log(f"[upload] pushed artifacts to {target_repo}") | |
| except Exception as e: # noqa: BLE001 | |
| log(f"[upload] FAILED: {e!r}") | |
| log(traceback.format_exc()) | |
| return { | |
| "asset": asset, | |
| "variant": variant, | |
| "n_windows": int(n), | |
| "n_pool": int(len(X_pool)), | |
| "n_holdout": int(len(X_hold)), | |
| "best_value": float(study.best_value), | |
| "best_trading": best_trading, | |
| "holdout": {"auc": hold_auc, "sim": hold_sim}, | |
| "output_dir": str(out_path), | |
| } | |
| __all__ = ["run_training", "ALL_FEATURES"] | |