""" Training entry point for the ARB-MAX v1_arb model. Flow: 1. Download markets_index + ohlcv + orderbook (filtered to asset's slugs). 2. For each window: build_window_frame @ tick=120, extract features, compute optimal arb PnL & binary label. 3. Assemble pandas frame, split 90/10, walk-forward 5-fold on 90%. 4. Optuna TPE (seed=42, maximize) with objective = mean across folds of (sum_flagged_pnl - 0.2 * max_drawdown). 5. Refit best params on full 90%, evaluate on 10% holdout. 6. Save artifacts, upload to HF dataset `commanderzee/{asset}-v1-arb-full`. """ from __future__ import annotations import json import os import time import traceback from pathlib import Path from typing import Callable, Dict, List, Optional, Tuple import numpy as np import pandas as pd import lightgbm as lgb import optuna from huggingface_hub import HfApi from huggingface_hub.utils import HfHubHTTPError from sklearn.metrics import roc_auc_score from data_loader import ( build_window_frame, get_window_label, load_markets_index, load_ohlcv, load_orderbook_filtered, ) from features_ohlcv import FEATURE_NAMES as OHLCV_FEATS, extract as extract_ohlcv from features_orderbook import ( FEATURE_NAMES as OB_FEATS, extract as extract_ob, ) from features_state import FEATURE_NAMES as STATE_FEATS, extract as extract_state from labels import ( compute_optimal_arb, simulate_live_policy, simulate_live_policy_from_eff, _precompute_effective_arr, ) from replay import simulate_flagging_pnl ALL_FEATURES: List[str] = list(OHLCV_FEATS) + list(OB_FEATS) + list(STATE_FEATS) # --------------------------------------------------------------------------- # Feature assembly per window # --------------------------------------------------------------------------- def _extract_all(wf: pd.DataFrame, at_tick: int = 120) -> np.ndarray: a = extract_ohlcv(wf, at_tick=at_tick) b = extract_ob(wf, at_tick=at_tick) c = extract_state(wf, at_tick=at_tick) out = np.concatenate([a, b, c], dtype=np.float32) assert out.shape[0] == len(ALL_FEATURES) return out # --------------------------------------------------------------------------- # HF upload retry wrapper # --------------------------------------------------------------------------- def _upload_with_retry( api: HfApi, folder_path: str, repo_id: str, repo_type: str, hf_token: str, max_attempts: int = 6, log: Callable[[str], None] = print, ) -> None: attempt = 0 while attempt < max_attempts: try: api.upload_folder( folder_path=folder_path, repo_id=repo_id, repo_type=repo_type, token=hf_token, create_pr=False, ) return except HfHubHTTPError as e: # type: ignore[attr-defined] status = getattr(getattr(e, "response", None), "status_code", None) if status is not None and (status == 429 or 500 <= status < 600): sleep_s = min(60.0, 2.0 ** attempt) log(f"[upload] HTTP {status}; retry in {sleep_s}s") time.sleep(sleep_s) attempt += 1 continue raise except Exception as e: # noqa: BLE001 sleep_s = min(60.0, 2.0 ** attempt) log(f"[upload] error {e!r}; retry in {sleep_s}s") time.sleep(sleep_s) attempt += 1 raise RuntimeError(f"upload_folder failed for {repo_id} after {max_attempts} attempts") # --------------------------------------------------------------------------- # Dataset build # --------------------------------------------------------------------------- def _build_training_dataset( asset: str, hf_token: str, cache_dir: Path, log: Callable[[str], None] = print, ob_batch_size: int = 500, ) -> pd.DataFrame: """Build the per-window training dataframe. IMPORTANT: the orderbook parquet for big assets is ~30-40 GB on disk and cannot fit in a 32 GB Space's RAM. We iterate the filtered file in batches of `ob_batch_size` slugs via `iter_orderbook_batches`, processing each batch's windows end-to-end and dropping the polars DataFrame before advancing. Peak memory per batch ~1-2 GB. """ import gc import polars as pl # local import to keep module import-light from data_loader import iter_orderbook_slug_pairs log(f"[data] loading markets_index for {asset}") markets = load_markets_index(asset, hf_token, cache_dir) log(f"[data] {len(markets)} resolved updown-15m markets") log(f"[data] loading ohlcv_1s for {asset}") ohlcv = load_ohlcv(asset, hf_token, cache_dir) slugs = markets["slug"].to_list() slug_ts_list = markets["slug_ts"].to_list() slug_ts_map = dict(zip(slugs, [int(t) for t in slug_ts_list])) wanted = set(slugs) log(f"[data] streaming book_snapshot_5 row-groups (~{len(slugs)} slugs, " f"peak ~5 MB per slug)") rows: List[Dict] = [] eff_cache: List[Dict] = [] # parallel to rows — holds up_eff/dn_eff arrays built = 0 skipped = 0 processed = 0 for slug, ob_up, ob_dn in iter_orderbook_slug_pairs( asset, hf_token, cache_dir, wanted ): processed += 1 slug_ts = slug_ts_map.get(slug) if slug_ts is None: continue try: spot = get_window_label(slug_ts, ohlcv) if spot is None: skipped += 1 continue wf = build_window_frame(slug, slug_ts, ob_up, ob_dn, ohlcv) feats = _extract_all(wf, at_tick=120) # DIRECTIONAL label: predict whether UP wins the window. # At tick 120 we snapshot both sides' walked-book asks (for # 100 shares) so the downstream trading sim can enforce the # "lower is better" entry-price rule. up_eff = _precompute_effective_arr(wf, "up", size=100) dn_eff = _precompute_effective_arr(wf, "dn", size=100) up_ask_120 = float(up_eff[120]) if len(up_eff) > 120 else 1.0 dn_ask_120 = float(dn_eff[120]) if len(dn_eff) > 120 else 1.0 if not np.isfinite(up_ask_120) or up_ask_120 <= 0: up_ask_120 = 1.0 if not np.isfinite(dn_ask_120) or dn_ask_120 <= 0: dn_ask_120 = 1.0 fee_up = 0.072 * up_ask_120 * (1.0 - up_ask_120) * 100 if 0 < up_ask_120 < 1 else 0.0 fee_dn = 0.072 * dn_ask_120 * (1.0 - dn_ask_120) * 100 if 0 < dn_ask_120 < 1 else 0.0 # PnL per side if entered at tick 120 and held to resolution if spot == 1: # UP won pnl_up = (1.0 - up_ask_120) * 100 - fee_up pnl_dn = -dn_ask_120 * 100 - fee_dn else: # DN won pnl_up = -up_ask_120 * 100 - fee_up pnl_dn = (1.0 - dn_ask_120) * 100 - fee_dn row = { "slug": slug, "slug_ts": slug_ts, "spot_label": spot, "binary_label": int(spot == 1), # 1 if UP wins "up_ask_120": up_ask_120, "dn_ask_120": dn_ask_120, "pnl_up": pnl_up, "pnl_dn": pnl_dn, "realized_pnl": pnl_up, # kept for schema compat; not used directly } for name, v in zip(ALL_FEATURES, feats): row[name] = float(v) rows.append(row) eff_cache.append({ "slug": slug, "spot_label": spot, "up_eff": np.zeros(1, dtype=np.float32), # unused in v4 mode "dn_eff": np.zeros(1, dtype=np.float32), }) built += 1 except Exception as e: # noqa: BLE001 skipped += 1 if skipped <= 3: import traceback log(f"[data] window error slug={slug}: {e!r}\n" f"{traceback.format_exc()}") else: log(f"[data] window error slug={slug}: {e!r}") # free the row-group tables before the next one del ob_up, ob_dn if processed % 1000 == 0: gc.collect() log(f"[data] processed={processed} built={built} skipped={skipped}") gc.collect() log(f"[data] done streaming. processed={processed} built={built} skipped={skipped}") df = pd.DataFrame(rows) if len(df) == 0: raise RuntimeError("No valid training rows produced") df = df.sort_values("slug_ts").reset_index(drop=True) # Directional-mode diagnostics import numpy as _np if "realized_pnl" in df.columns: pnls = df["realized_pnl"].to_numpy() pcts = _np.percentile(pnls, [1, 5, 25, 50, 75, 95, 99]) log(f"[data] realized_pnl(buy_up_@120) percentiles " f"1%={pcts[0]:.2f} 5%={pcts[1]:.2f} 25%={pcts[2]:.2f} " f"50%={pcts[3]:.2f} 75%={pcts[4]:.2f} 95%={pcts[5]:.2f} 99%={pcts[6]:.2f}") if "up_ask_120" in df.columns: ua = df["up_ask_120"].to_numpy() da = df["dn_ask_120"].to_numpy() log(f"[data] up_ask_120: median={_np.median(ua):.3f} " f"5%={_np.quantile(ua, 0.05):.3f} 95%={_np.quantile(ua, 0.95):.3f} | " f"dn_ask_120: median={_np.median(da):.3f}") cheap_either = float(((ua <= 0.40) | (da <= 0.40)).mean()) cheap_both_lo = float(((ua <= 0.35) | (da <= 0.35)).mean()) log(f"[data] entry opportunity: " f"some_side≤$0.40={cheap_either:.3f} some_side≤$0.35={cheap_both_lo:.3f}") # re-sort eff_cache to mirror df row order (df was just sorted by slug_ts) slug_to_idx = {c["slug"]: i for i, c in enumerate(eff_cache)} reordered = [eff_cache[slug_to_idx[s]] for s in df["slug"].tolist() if s in slug_to_idx] eff_cache[:] = reordered df.attrs["eff_cache"] = eff_cache log(f"[data] final dataset rows: {len(df)}; " f"positive rate: {df['binary_label'].mean():.3f}") return df # --------------------------------------------------------------------------- # Policy-parameter Optuna # --------------------------------------------------------------------------- def _policy_optuna( eff_cache: List[Dict], n_trials: int = 100, log: Callable[[str], None] = print, ) -> Dict: """Optuna over the trading policy's own parameters (entry_threshold, combined_threshold, min_tick) — NOT the classifier's hyperparameters. Objective: mean realized PnL per window across the full pool. Uses the cached walked-book effective-ask arrays so each trial is pure numpy (~1s for 16k windows), making 100 trials a few minutes. """ if not eff_cache: log("[policy] no eff_cache available; skipping policy Optuna") return {"params": {"entry_threshold": 0.40, "combined_threshold": 0.80, "min_tick": 120}, "mean_pnl": 0.0} # precompute common arrays ups = [c["up_eff"] for c in eff_cache] dns = [c["dn_eff"] for c in eff_cache] labels = [c["spot_label"] for c in eff_cache] def _mean_pnl(ent: float, comb: float, tick: int) -> float: total = 0.0 n = 0 for u, d, lab in zip(ups, dns, labels): res = simulate_live_policy_from_eff( u.astype(np.float64), d.astype(np.float64), lab, entry_threshold=ent, combined_threshold=comb, size=100, min_tick=tick, binary_threshold=20.0, ) total += res["realized_pnl"] n += 1 return total / max(n, 1) def _obj(trial: optuna.Trial) -> float: ent = trial.suggest_float("entry_threshold", 0.20, 0.49) comb = trial.suggest_float("combined_threshold", 0.55, 0.95) tick = trial.suggest_int("min_tick", 120, 300, step=30) if comb - ent < 0.10: # combined must be meaningfully > entry return -1e6 return _mean_pnl(ent, comb, tick) sampler = optuna.samplers.TPESampler(seed=42) study = optuna.create_study(direction="maximize", sampler=sampler) log(f"[policy] Optuna over {len(eff_cache)} windows × {n_trials} trials") study.optimize(_obj, n_trials=n_trials, show_progress_bar=False) return {"params": study.best_params, "mean_pnl": float(study.best_value)} # --------------------------------------------------------------------------- # Walk-forward folds # --------------------------------------------------------------------------- def _walk_forward_folds(n: int, n_folds: int = 5) -> List[Tuple[int, int, int]]: """Return list of (train_end, test_start, test_end) triples. Expanding-window walk-forward over the WF pool [0, n). The pool is split into (n_folds + 1) contiguous segments. Fold k trains on the first (k+1) segments and tests on segment (k+1). No overlap with the final holdout (caller passes `n` = length of the 90% pool, not the full dataset). Fold k: train [0, (k+1)/(n_folds+1) * n), test [(k+1)/(n_folds+1) * n, (k+2)/(n_folds+1) * n) """ out: List[Tuple[int, int, int]] = [] for k in range(n_folds): train_end = int(round((k + 1) / (n_folds + 1) * n)) test_end = int(round((k + 2) / (n_folds + 1) * n)) test_start = train_end if test_end <= test_start: continue out.append((train_end, test_start, test_end)) return out # --------------------------------------------------------------------------- # Optuna objective # --------------------------------------------------------------------------- def _train_fold_core( X_train: np.ndarray, y_train: np.ndarray, X_test: np.ndarray, y_test: np.ndarray, params: Dict, ) -> Dict: """Train LightGBM on fold's train slice with internal early-stopping, predict on X_test. Returns preds + booster + AUC. No PnL / threshold logic here — that's applied per-trial in the Optuna objective so the same trained model can be reused across many threshold combos.""" n_tr = X_train.shape[0] es_cut = max(200, int(0.9 * n_tr)) if n_tr >= 500 else n_tr X_fit, y_fit = X_train[:es_cut], y_train[:es_cut] X_es, y_es = X_train[es_cut:], y_train[es_cut:] params_no_n = {k: v for k, v in params.items() if k != "n_estimators"} dtrain = lgb.Dataset(X_fit, label=y_fit, feature_name=ALL_FEATURES) callbacks = [lgb.log_evaluation(0)] valid_sets = None if len(X_es) >= 20 and len(set(y_es.tolist())) > 1: dvalid = lgb.Dataset( X_es, label=y_es, feature_name=ALL_FEATURES, reference=dtrain ) valid_sets = [dvalid] callbacks.insert(0, lgb.early_stopping(50, verbose=False)) booster = lgb.train( params_no_n, dtrain, num_boost_round=params.get("n_estimators", 500), valid_sets=valid_sets, callbacks=callbacks, ) best_iter = booster.best_iteration if valid_sets is not None else None preds = booster.predict(X_test, num_iteration=best_iter) try: auc = float(roc_auc_score(y_test, preds)) if len(set(y_test.tolist())) > 1 else 0.5 except Exception: auc = 0.5 return {"auc": auc, "booster": booster, "preds": preds} def _directional_policy_apply( preds: np.ndarray, up_asks: np.ndarray, dn_asks: np.ndarray, pnl_ups: np.ndarray, pnl_dns: np.ndarray, edge_threshold: float, entry_price_max: float = 1.0, ) -> Tuple[np.ndarray, np.ndarray]: """Edge-margin decision rule. On Polymarket the ask for side S is approximately the market's implied P(S wins). So `edge_up = model_pred - up_ask` = our estimate minus the market's — the expected PnL per share before fees. If this exceeds a threshold on either side, we trade that side. Whichever edge is larger wins when both clear threshold. Single-knob policy (one Optuna param) — drops the prior 3-param setup (p_up_threshold, p_dn_threshold, entry_price_max) which was susceptible to overfit in Optuna's larger search space.""" n = len(preds) flag = np.zeros(n, dtype=bool) pnl = np.zeros(n, dtype=np.float64) for i in range(n): edge_up = preds[i] - up_asks[i] edge_dn = (1.0 - preds[i]) - dn_asks[i] up_ok = (edge_up >= edge_threshold) and (up_asks[i] <= entry_price_max) dn_ok = (edge_dn >= edge_threshold) and (dn_asks[i] <= entry_price_max) if up_ok and (not dn_ok or edge_up >= edge_dn): flag[i] = True pnl[i] = pnl_ups[i] elif dn_ok: flag[i] = True pnl[i] = pnl_dns[i] return flag, pnl def _lgb_params_from_trial(trial: optuna.Trial) -> Tuple[Dict, Dict]: params = { "objective": "binary", "metric": "binary_logloss", "verbosity": -1, "boosting_type": "gbdt", "num_leaves": trial.suggest_int("num_leaves", 31, 127), "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.1, log=True), "feature_fraction": trial.suggest_float("feature_fraction", 0.5, 1.0), "bagging_fraction": trial.suggest_float("bagging_fraction", 0.5, 1.0), "bagging_freq": trial.suggest_int("bagging_freq", 1, 5), "min_child_samples": trial.suggest_int("min_child_samples", 20, 200), "reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 1.0, log=True), "reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 1.0, log=True), "n_estimators": 500, } trading = { # primary: edge-margin (model_pred - market_ask for the chosen side) "edge_threshold": trial.suggest_float("edge_threshold", 0.03, 0.25), # safety cap: block high-entry overconfident trades where the # model's probability is likely uncalibrated. Without this, BTC # lost -$621 on a run (vs +$390 with the cap). "entry_price_max": trial.suggest_float("entry_price_max", 0.40, 0.65), } return params, trading # --------------------------------------------------------------------------- # Calibration table (10 buckets) # --------------------------------------------------------------------------- def _calibration_buckets(preds: np.ndarray, y: np.ndarray, n_buckets: int = 10) -> List[Dict]: preds = np.asarray(preds) y = np.asarray(y) edges = np.linspace(0.0, 1.0, n_buckets + 1) out = [] for i in range(n_buckets): lo, hi = edges[i], edges[i + 1] mask = (preds >= lo) & (preds < hi) if i < n_buckets - 1 else (preds >= lo) & (preds <= hi) n_here = int(mask.sum()) if n_here == 0: out.append({"bucket": i, "lo": lo, "hi": hi, "n": 0, "predicted": 0.0, "actual": 0.0}) continue out.append( { "bucket": i, "lo": float(lo), "hi": float(hi), "n": n_here, "predicted": float(preds[mask].mean()), "actual": float(y[mask].mean()), } ) return out # --------------------------------------------------------------------------- # Main entry # --------------------------------------------------------------------------- def run_training( asset: str, hf_token: str, variant: str = "v1_arb", n_trials: int = 150, n_folds: int = 5, log: Callable[[str], None] = print, cache_dir: str = "/tmp/cache", output_dir: str = "/tmp/output", ) -> Dict: cache_path = Path(cache_dir) out_path = Path(output_dir) cache_path.mkdir(parents=True, exist_ok=True) out_path.mkdir(parents=True, exist_ok=True) log(f"[train] asset={asset} variant={variant} trials={n_trials} folds={n_folds}") # 1. Build dataset df = _build_training_dataset(asset, hf_token, cache_path, log=log) df = df.sort_values("slug_ts").reset_index(drop=True) # DIRECTIONAL: label is "UP wins the window?" (spot_label), 1/0 balanced. # Per row we also track entry-price (tick 120) for each side and the # realized per-side PnL if we bought 100 shares at that price and held # to resolution. Asymmetry: the lower the ask we buy at, the larger the # payout on a correct call — this is enforced at trading-policy time # via the Optuna-tuned `entry_price_max` cap. log(f"[train] directional: up_win_rate={float(df['binary_label'].mean()):.3f} " f"mean_pnl_up={float(df['pnl_up'].mean()):.2f} " f"mean_pnl_dn={float(df['pnl_dn'].mean()):.2f} " f"up_ask_median={float(df['up_ask_120'].median()):.3f} " f"dn_ask_median={float(df['dn_ask_120'].median()):.3f}") X_all = df[ALL_FEATURES].to_numpy(dtype=np.float32) y_all = df["binary_label"].to_numpy(dtype=np.int32) up_ask_all = df["up_ask_120"].to_numpy(dtype=np.float64) dn_ask_all = df["dn_ask_120"].to_numpy(dtype=np.float64) pnl_up_all = df["pnl_up"].to_numpy(dtype=np.float64) pnl_dn_all = df["pnl_dn"].to_numpy(dtype=np.float64) n = len(df) split = int(round(0.9 * n)) X_pool = X_all[:split] y_pool = y_all[:split] up_ask_pool = up_ask_all[:split] dn_ask_pool = dn_ask_all[:split] pnl_up_pool = pnl_up_all[:split] pnl_dn_pool = pnl_dn_all[:split] X_hold = X_all[split:] y_hold = y_all[split:] up_ask_hold = up_ask_all[split:] dn_ask_hold = dn_ask_all[split:] pnl_up_hold = pnl_up_all[split:] pnl_dn_hold = pnl_dn_all[split:] log(f"[train] pool={len(X_pool)} holdout={len(X_hold)}") folds = _walk_forward_folds(len(X_pool), n_folds=n_folds) def _objective(trial: optuna.Trial) -> float: params, trading = _lgb_params_from_trial(trial) fold_scores = [] for (train_end, test_start, test_end) in folds: Xt = X_pool[:train_end] yt = y_pool[:train_end] Xv = X_pool[test_start:test_end] yv = y_pool[test_start:test_end] uav = up_ask_pool[test_start:test_end] dav = dn_ask_pool[test_start:test_end] pup_v = pnl_up_pool[test_start:test_end] pdn_v = pnl_dn_pool[test_start:test_end] if len(Xt) < 50 or len(Xv) < 20 or len(set(yt.tolist())) < 2: continue try: res = _train_fold_core(Xt, yt, Xv, yv, params) except Exception as e: # noqa: BLE001 log(f"[optuna] fold error: {e!r}") continue flag, pnl = _directional_policy_apply( res["preds"], uav, dav, pup_v, pdn_v, trading["edge_threshold"], trading["entry_price_max"], ) sim = simulate_flagging_pnl(pnl, flag) score = sim["sum_flagged"] - 0.2 * sim["max_drawdown"] fold_scores.append(score) if not fold_scores: return -1e9 return float(np.mean(fold_scores)) sampler = optuna.samplers.TPESampler(seed=42) study = optuna.create_study(direction="maximize", sampler=sampler) log(f"[optuna] starting with {n_trials} trials") study.optimize(_objective, n_trials=n_trials, show_progress_bar=False) best_params_trial = dict(study.best_trial.params) best_trading = { "edge_threshold": float(best_params_trial.pop("edge_threshold")), "entry_price_max": float(best_params_trial.pop("entry_price_max")), } best_lgb_params = { "objective": "binary", "metric": "binary_logloss", "verbosity": -1, "boosting_type": "gbdt", "n_estimators": 500, **best_params_trial, } log(f"[optuna] best value={study.best_value:.4f} trading={best_trading}") log(f"[optuna] best lgb params: {best_params_trial}") fold_metrics: List[Dict] = [] per_fold_thresholds: Dict[str, float] = {} for k, (train_end, test_start, test_end) in enumerate(folds): Xt = X_pool[:train_end] yt = y_pool[:train_end] Xv = X_pool[test_start:test_end] yv = y_pool[test_start:test_end] uav = up_ask_pool[test_start:test_end] dav = dn_ask_pool[test_start:test_end] pup_v = pnl_up_pool[test_start:test_end] pdn_v = pnl_dn_pool[test_start:test_end] if len(Xt) < 50 or len(Xv) < 20 or len(set(yt.tolist())) < 2: fold_metrics.append({"fold": k, "skipped": True}) continue res = _train_fold_core(Xt, yt, Xv, yv, best_lgb_params) flag, pnl = _directional_policy_apply( res["preds"], uav, dav, pup_v, pdn_v, best_trading["edge_threshold"], best_trading["entry_price_max"], ) sim = simulate_flagging_pnl(pnl, flag) # counterfactual: "trade every window, always buy cheaper side" pnl_all_cheaper = np.where(uav <= dav, pup_v, pdn_v) cf = simulate_flagging_pnl(pnl_all_cheaper, np.ones_like(pnl_all_cheaper, dtype=bool)) fold_metrics.append( { "fold": k, "train_end": train_end, "test_start": test_start, "test_end": test_end, "n_train": len(Xt), "n_test": len(Xv), "auc": res["auc"], "n_flagged": sim["n_flagged"], "n_total": sim["n_total"], "flagged_ratio": sim["flagged_ratio"], "realized_sum_pnl": sim["sum_flagged"], "realized_mean_pnl_of_flagged": sim["mean_flagged"], "realized_median_pnl_of_flagged": sim["median_flagged"], "hit_rate": sim["hit_rate"], "max_drawdown": sim["max_drawdown"], "flag_all_baseline_sum_pnl": cf["sum_flagged"], "calibration": _calibration_buckets(res["preds"], yv), } ) # 2. Final refit on full pool, evaluate on holdout log(f"[train] refit on pool size={len(X_pool)} → predict holdout size={len(X_hold)}") dtrain = lgb.Dataset(X_pool, label=y_pool, feature_name=ALL_FEATURES) final_booster = lgb.train( best_lgb_params, dtrain, num_boost_round=best_lgb_params.get("n_estimators", 500), callbacks=[lgb.log_evaluation(0)], ) if len(X_hold) > 0: hold_preds = final_booster.predict(X_hold) hold_flag, hold_pnl = _directional_policy_apply( hold_preds, up_ask_hold, dn_ask_hold, pnl_up_hold, pnl_dn_hold, best_trading["edge_threshold"], best_trading["entry_price_max"], ) hold_sim = simulate_flagging_pnl(hold_pnl, hold_flag) try: hold_auc = ( float(roc_auc_score(y_hold, hold_preds)) if len(set(y_hold.tolist())) > 1 else 0.5 ) except Exception: hold_auc = 0.5 hold_cal = _calibration_buckets(hold_preds, y_hold) cheaper_hold = np.where(up_ask_hold <= dn_ask_hold, pnl_up_hold, pnl_dn_hold) hold_cf = simulate_flagging_pnl(cheaper_hold, np.ones_like(cheaper_hold, dtype=bool)) else: hold_preds = np.array([]) hold_sim = { "sum_flagged": 0.0, "mean_flagged": 0.0, "median_flagged": 0.0, "max_drawdown": 0.0, "hit_rate": 0.0, "n_flagged": 0, "n_total": 0, "flagged_ratio": 0.0, } hold_auc = 0.5 hold_cal = [] hold_cf = {"sum_flagged": 0.0} # 3. Save artifacts model_path = out_path / "lgbm_model.txt" final_booster.save_model(str(model_path)) (out_path / "feature_spec.json").write_text( json.dumps( { "ohlcv_features": list(OHLCV_FEATS), "orderbook_features": list(OB_FEATS), "state_features": list(STATE_FEATS), "all_features": list(ALL_FEATURES), "n_features": len(ALL_FEATURES), }, indent=2, ) ) (out_path / "thresholds.json").write_text( json.dumps( { "trading": best_trading, "notes": "Edge-margin rule: edge_up = model_pred - up_ask; " "edge_dn = (1 - model_pred) - dn_ask. Enter the side " "with the larger edge, iff that edge >= edge_threshold. " "Edge is expected PnL per share before fees.", }, indent=2, ) ) (out_path / "fold_metrics.json").write_text(json.dumps(fold_metrics, indent=2)) trial_values = [ t.value if t.value is not None else float("nan") for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE ] (out_path / "optuna_report.json").write_text( json.dumps( { "asset": asset, "variant": variant, "n_trials_requested": n_trials, "n_trials_completed": len(trial_values), "best_value": float(study.best_value), "best_params": dict(study.best_trial.params), "trial_values": trial_values, }, indent=2, ) ) (out_path / "holdout_metrics.json").write_text( json.dumps( { "n_holdout": len(X_hold), "auc": hold_auc, "sim": hold_sim, "calibration": hold_cal, "flag_all_baseline_sum_pnl": hold_cf.get("sum_flagged", 0.0), }, indent=2, ) ) # 4. Upload try: api = HfApi(token=hf_token) target_repo = f"commanderzee/{asset.lower()}-v1-arb-full" try: api.create_repo(repo_id=target_repo, repo_type="dataset", exist_ok=True, token=hf_token) except Exception as e: # noqa: BLE001 log(f"[upload] create_repo warning: {e!r}") _upload_with_retry(api, str(out_path), target_repo, "dataset", hf_token, log=log) log(f"[upload] pushed artifacts to {target_repo}") except Exception as e: # noqa: BLE001 log(f"[upload] FAILED: {e!r}") log(traceback.format_exc()) return { "asset": asset, "variant": variant, "n_windows": int(n), "n_pool": int(len(X_pool)), "n_holdout": int(len(X_hold)), "best_value": float(study.best_value), "best_trading": best_trading, "holdout": {"auc": hold_auc, "sim": hold_sim}, "output_dir": str(out_path), } __all__ = ["run_training", "ALL_FEATURES"]