eth-arb-trainer / train.py
commanderzee's picture
hybrid edge-margin policy (edge_threshold + entry_price_max cap) — BTC +$1247
7d54c7c verified
"""
Training entry point for the ARB-MAX v1_arb model.
Flow:
1. Download markets_index + ohlcv + orderbook (filtered to asset's slugs).
2. For each window: build_window_frame @ tick=120, extract features,
compute optimal arb PnL & binary label.
3. Assemble pandas frame, split 90/10, walk-forward 5-fold on 90%.
4. Optuna TPE (seed=42, maximize) with objective =
mean across folds of (sum_flagged_pnl - 0.2 * max_drawdown).
5. Refit best params on full 90%, evaluate on 10% holdout.
6. Save artifacts, upload to HF dataset `commanderzee/{asset}-v1-arb-full`.
"""
from __future__ import annotations
import json
import os
import time
import traceback
from pathlib import Path
from typing import Callable, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
import lightgbm as lgb
import optuna
from huggingface_hub import HfApi
from huggingface_hub.utils import HfHubHTTPError
from sklearn.metrics import roc_auc_score
from data_loader import (
build_window_frame,
get_window_label,
load_markets_index,
load_ohlcv,
load_orderbook_filtered,
)
from features_ohlcv import FEATURE_NAMES as OHLCV_FEATS, extract as extract_ohlcv
from features_orderbook import (
FEATURE_NAMES as OB_FEATS,
extract as extract_ob,
)
from features_state import FEATURE_NAMES as STATE_FEATS, extract as extract_state
from labels import (
compute_optimal_arb,
simulate_live_policy,
simulate_live_policy_from_eff,
_precompute_effective_arr,
)
from replay import simulate_flagging_pnl
ALL_FEATURES: List[str] = list(OHLCV_FEATS) + list(OB_FEATS) + list(STATE_FEATS)
# ---------------------------------------------------------------------------
# Feature assembly per window
# ---------------------------------------------------------------------------
def _extract_all(wf: pd.DataFrame, at_tick: int = 120) -> np.ndarray:
a = extract_ohlcv(wf, at_tick=at_tick)
b = extract_ob(wf, at_tick=at_tick)
c = extract_state(wf, at_tick=at_tick)
out = np.concatenate([a, b, c], dtype=np.float32)
assert out.shape[0] == len(ALL_FEATURES)
return out
# ---------------------------------------------------------------------------
# HF upload retry wrapper
# ---------------------------------------------------------------------------
def _upload_with_retry(
api: HfApi,
folder_path: str,
repo_id: str,
repo_type: str,
hf_token: str,
max_attempts: int = 6,
log: Callable[[str], None] = print,
) -> None:
attempt = 0
while attempt < max_attempts:
try:
api.upload_folder(
folder_path=folder_path,
repo_id=repo_id,
repo_type=repo_type,
token=hf_token,
create_pr=False,
)
return
except HfHubHTTPError as e: # type: ignore[attr-defined]
status = getattr(getattr(e, "response", None), "status_code", None)
if status is not None and (status == 429 or 500 <= status < 600):
sleep_s = min(60.0, 2.0 ** attempt)
log(f"[upload] HTTP {status}; retry in {sleep_s}s")
time.sleep(sleep_s)
attempt += 1
continue
raise
except Exception as e: # noqa: BLE001
sleep_s = min(60.0, 2.0 ** attempt)
log(f"[upload] error {e!r}; retry in {sleep_s}s")
time.sleep(sleep_s)
attempt += 1
raise RuntimeError(f"upload_folder failed for {repo_id} after {max_attempts} attempts")
# ---------------------------------------------------------------------------
# Dataset build
# ---------------------------------------------------------------------------
def _build_training_dataset(
asset: str,
hf_token: str,
cache_dir: Path,
log: Callable[[str], None] = print,
ob_batch_size: int = 500,
) -> pd.DataFrame:
"""Build the per-window training dataframe.
IMPORTANT: the orderbook parquet for big assets is ~30-40 GB on disk and
cannot fit in a 32 GB Space's RAM. We iterate the filtered file in batches
of `ob_batch_size` slugs via `iter_orderbook_batches`, processing each
batch's windows end-to-end and dropping the polars DataFrame before
advancing. Peak memory per batch ~1-2 GB.
"""
import gc
import polars as pl # local import to keep module import-light
from data_loader import iter_orderbook_slug_pairs
log(f"[data] loading markets_index for {asset}")
markets = load_markets_index(asset, hf_token, cache_dir)
log(f"[data] {len(markets)} resolved updown-15m markets")
log(f"[data] loading ohlcv_1s for {asset}")
ohlcv = load_ohlcv(asset, hf_token, cache_dir)
slugs = markets["slug"].to_list()
slug_ts_list = markets["slug_ts"].to_list()
slug_ts_map = dict(zip(slugs, [int(t) for t in slug_ts_list]))
wanted = set(slugs)
log(f"[data] streaming book_snapshot_5 row-groups (~{len(slugs)} slugs, "
f"peak ~5 MB per slug)")
rows: List[Dict] = []
eff_cache: List[Dict] = [] # parallel to rows — holds up_eff/dn_eff arrays
built = 0
skipped = 0
processed = 0
for slug, ob_up, ob_dn in iter_orderbook_slug_pairs(
asset, hf_token, cache_dir, wanted
):
processed += 1
slug_ts = slug_ts_map.get(slug)
if slug_ts is None:
continue
try:
spot = get_window_label(slug_ts, ohlcv)
if spot is None:
skipped += 1
continue
wf = build_window_frame(slug, slug_ts, ob_up, ob_dn, ohlcv)
feats = _extract_all(wf, at_tick=120)
# DIRECTIONAL label: predict whether UP wins the window.
# At tick 120 we snapshot both sides' walked-book asks (for
# 100 shares) so the downstream trading sim can enforce the
# "lower is better" entry-price rule.
up_eff = _precompute_effective_arr(wf, "up", size=100)
dn_eff = _precompute_effective_arr(wf, "dn", size=100)
up_ask_120 = float(up_eff[120]) if len(up_eff) > 120 else 1.0
dn_ask_120 = float(dn_eff[120]) if len(dn_eff) > 120 else 1.0
if not np.isfinite(up_ask_120) or up_ask_120 <= 0:
up_ask_120 = 1.0
if not np.isfinite(dn_ask_120) or dn_ask_120 <= 0:
dn_ask_120 = 1.0
fee_up = 0.072 * up_ask_120 * (1.0 - up_ask_120) * 100 if 0 < up_ask_120 < 1 else 0.0
fee_dn = 0.072 * dn_ask_120 * (1.0 - dn_ask_120) * 100 if 0 < dn_ask_120 < 1 else 0.0
# PnL per side if entered at tick 120 and held to resolution
if spot == 1: # UP won
pnl_up = (1.0 - up_ask_120) * 100 - fee_up
pnl_dn = -dn_ask_120 * 100 - fee_dn
else: # DN won
pnl_up = -up_ask_120 * 100 - fee_up
pnl_dn = (1.0 - dn_ask_120) * 100 - fee_dn
row = {
"slug": slug,
"slug_ts": slug_ts,
"spot_label": spot,
"binary_label": int(spot == 1), # 1 if UP wins
"up_ask_120": up_ask_120,
"dn_ask_120": dn_ask_120,
"pnl_up": pnl_up,
"pnl_dn": pnl_dn,
"realized_pnl": pnl_up, # kept for schema compat; not used directly
}
for name, v in zip(ALL_FEATURES, feats):
row[name] = float(v)
rows.append(row)
eff_cache.append({
"slug": slug,
"spot_label": spot,
"up_eff": np.zeros(1, dtype=np.float32), # unused in v4 mode
"dn_eff": np.zeros(1, dtype=np.float32),
})
built += 1
except Exception as e: # noqa: BLE001
skipped += 1
if skipped <= 3:
import traceback
log(f"[data] window error slug={slug}: {e!r}\n"
f"{traceback.format_exc()}")
else:
log(f"[data] window error slug={slug}: {e!r}")
# free the row-group tables before the next one
del ob_up, ob_dn
if processed % 1000 == 0:
gc.collect()
log(f"[data] processed={processed} built={built} skipped={skipped}")
gc.collect()
log(f"[data] done streaming. processed={processed} built={built} skipped={skipped}")
df = pd.DataFrame(rows)
if len(df) == 0:
raise RuntimeError("No valid training rows produced")
df = df.sort_values("slug_ts").reset_index(drop=True)
# Directional-mode diagnostics
import numpy as _np
if "realized_pnl" in df.columns:
pnls = df["realized_pnl"].to_numpy()
pcts = _np.percentile(pnls, [1, 5, 25, 50, 75, 95, 99])
log(f"[data] realized_pnl(buy_up_@120) percentiles "
f"1%={pcts[0]:.2f} 5%={pcts[1]:.2f} 25%={pcts[2]:.2f} "
f"50%={pcts[3]:.2f} 75%={pcts[4]:.2f} 95%={pcts[5]:.2f} 99%={pcts[6]:.2f}")
if "up_ask_120" in df.columns:
ua = df["up_ask_120"].to_numpy()
da = df["dn_ask_120"].to_numpy()
log(f"[data] up_ask_120: median={_np.median(ua):.3f} "
f"5%={_np.quantile(ua, 0.05):.3f} 95%={_np.quantile(ua, 0.95):.3f} | "
f"dn_ask_120: median={_np.median(da):.3f}")
cheap_either = float(((ua <= 0.40) | (da <= 0.40)).mean())
cheap_both_lo = float(((ua <= 0.35) | (da <= 0.35)).mean())
log(f"[data] entry opportunity: "
f"some_side≤$0.40={cheap_either:.3f} some_side≤$0.35={cheap_both_lo:.3f}")
# re-sort eff_cache to mirror df row order (df was just sorted by slug_ts)
slug_to_idx = {c["slug"]: i for i, c in enumerate(eff_cache)}
reordered = [eff_cache[slug_to_idx[s]] for s in df["slug"].tolist() if s in slug_to_idx]
eff_cache[:] = reordered
df.attrs["eff_cache"] = eff_cache
log(f"[data] final dataset rows: {len(df)}; "
f"positive rate: {df['binary_label'].mean():.3f}")
return df
# ---------------------------------------------------------------------------
# Policy-parameter Optuna
# ---------------------------------------------------------------------------
def _policy_optuna(
eff_cache: List[Dict],
n_trials: int = 100,
log: Callable[[str], None] = print,
) -> Dict:
"""Optuna over the trading policy's own parameters (entry_threshold,
combined_threshold, min_tick) — NOT the classifier's hyperparameters.
Objective: mean realized PnL per window across the full pool. Uses the
cached walked-book effective-ask arrays so each trial is pure numpy (~1s
for 16k windows), making 100 trials a few minutes.
"""
if not eff_cache:
log("[policy] no eff_cache available; skipping policy Optuna")
return {"params": {"entry_threshold": 0.40, "combined_threshold": 0.80,
"min_tick": 120}, "mean_pnl": 0.0}
# precompute common arrays
ups = [c["up_eff"] for c in eff_cache]
dns = [c["dn_eff"] for c in eff_cache]
labels = [c["spot_label"] for c in eff_cache]
def _mean_pnl(ent: float, comb: float, tick: int) -> float:
total = 0.0
n = 0
for u, d, lab in zip(ups, dns, labels):
res = simulate_live_policy_from_eff(
u.astype(np.float64), d.astype(np.float64), lab,
entry_threshold=ent, combined_threshold=comb,
size=100, min_tick=tick, binary_threshold=20.0,
)
total += res["realized_pnl"]
n += 1
return total / max(n, 1)
def _obj(trial: optuna.Trial) -> float:
ent = trial.suggest_float("entry_threshold", 0.20, 0.49)
comb = trial.suggest_float("combined_threshold", 0.55, 0.95)
tick = trial.suggest_int("min_tick", 120, 300, step=30)
if comb - ent < 0.10: # combined must be meaningfully > entry
return -1e6
return _mean_pnl(ent, comb, tick)
sampler = optuna.samplers.TPESampler(seed=42)
study = optuna.create_study(direction="maximize", sampler=sampler)
log(f"[policy] Optuna over {len(eff_cache)} windows × {n_trials} trials")
study.optimize(_obj, n_trials=n_trials, show_progress_bar=False)
return {"params": study.best_params, "mean_pnl": float(study.best_value)}
# ---------------------------------------------------------------------------
# Walk-forward folds
# ---------------------------------------------------------------------------
def _walk_forward_folds(n: int, n_folds: int = 5) -> List[Tuple[int, int, int]]:
"""Return list of (train_end, test_start, test_end) triples.
Expanding-window walk-forward over the WF pool [0, n). The pool is split
into (n_folds + 1) contiguous segments. Fold k trains on the first (k+1)
segments and tests on segment (k+1). No overlap with the final holdout
(caller passes `n` = length of the 90% pool, not the full dataset).
Fold k: train [0, (k+1)/(n_folds+1) * n), test [(k+1)/(n_folds+1) * n, (k+2)/(n_folds+1) * n)
"""
out: List[Tuple[int, int, int]] = []
for k in range(n_folds):
train_end = int(round((k + 1) / (n_folds + 1) * n))
test_end = int(round((k + 2) / (n_folds + 1) * n))
test_start = train_end
if test_end <= test_start:
continue
out.append((train_end, test_start, test_end))
return out
# ---------------------------------------------------------------------------
# Optuna objective
# ---------------------------------------------------------------------------
def _train_fold_core(
X_train: np.ndarray,
y_train: np.ndarray,
X_test: np.ndarray,
y_test: np.ndarray,
params: Dict,
) -> Dict:
"""Train LightGBM on fold's train slice with internal early-stopping,
predict on X_test. Returns preds + booster + AUC. No PnL / threshold
logic here — that's applied per-trial in the Optuna objective so the
same trained model can be reused across many threshold combos."""
n_tr = X_train.shape[0]
es_cut = max(200, int(0.9 * n_tr)) if n_tr >= 500 else n_tr
X_fit, y_fit = X_train[:es_cut], y_train[:es_cut]
X_es, y_es = X_train[es_cut:], y_train[es_cut:]
params_no_n = {k: v for k, v in params.items() if k != "n_estimators"}
dtrain = lgb.Dataset(X_fit, label=y_fit, feature_name=ALL_FEATURES)
callbacks = [lgb.log_evaluation(0)]
valid_sets = None
if len(X_es) >= 20 and len(set(y_es.tolist())) > 1:
dvalid = lgb.Dataset(
X_es, label=y_es, feature_name=ALL_FEATURES, reference=dtrain
)
valid_sets = [dvalid]
callbacks.insert(0, lgb.early_stopping(50, verbose=False))
booster = lgb.train(
params_no_n, dtrain,
num_boost_round=params.get("n_estimators", 500),
valid_sets=valid_sets, callbacks=callbacks,
)
best_iter = booster.best_iteration if valid_sets is not None else None
preds = booster.predict(X_test, num_iteration=best_iter)
try:
auc = float(roc_auc_score(y_test, preds)) if len(set(y_test.tolist())) > 1 else 0.5
except Exception:
auc = 0.5
return {"auc": auc, "booster": booster, "preds": preds}
def _directional_policy_apply(
preds: np.ndarray,
up_asks: np.ndarray,
dn_asks: np.ndarray,
pnl_ups: np.ndarray,
pnl_dns: np.ndarray,
edge_threshold: float,
entry_price_max: float = 1.0,
) -> Tuple[np.ndarray, np.ndarray]:
"""Edge-margin decision rule.
On Polymarket the ask for side S is approximately the market's implied
P(S wins). So `edge_up = model_pred - up_ask` = our estimate minus the
market's — the expected PnL per share before fees. If this exceeds a
threshold on either side, we trade that side. Whichever edge is larger
wins when both clear threshold.
Single-knob policy (one Optuna param) — drops the prior 3-param setup
(p_up_threshold, p_dn_threshold, entry_price_max) which was
susceptible to overfit in Optuna's larger search space."""
n = len(preds)
flag = np.zeros(n, dtype=bool)
pnl = np.zeros(n, dtype=np.float64)
for i in range(n):
edge_up = preds[i] - up_asks[i]
edge_dn = (1.0 - preds[i]) - dn_asks[i]
up_ok = (edge_up >= edge_threshold) and (up_asks[i] <= entry_price_max)
dn_ok = (edge_dn >= edge_threshold) and (dn_asks[i] <= entry_price_max)
if up_ok and (not dn_ok or edge_up >= edge_dn):
flag[i] = True
pnl[i] = pnl_ups[i]
elif dn_ok:
flag[i] = True
pnl[i] = pnl_dns[i]
return flag, pnl
def _lgb_params_from_trial(trial: optuna.Trial) -> Tuple[Dict, Dict]:
params = {
"objective": "binary",
"metric": "binary_logloss",
"verbosity": -1,
"boosting_type": "gbdt",
"num_leaves": trial.suggest_int("num_leaves", 31, 127),
"learning_rate": trial.suggest_float("learning_rate", 0.01, 0.1, log=True),
"feature_fraction": trial.suggest_float("feature_fraction", 0.5, 1.0),
"bagging_fraction": trial.suggest_float("bagging_fraction", 0.5, 1.0),
"bagging_freq": trial.suggest_int("bagging_freq", 1, 5),
"min_child_samples": trial.suggest_int("min_child_samples", 20, 200),
"reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 1.0, log=True),
"reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 1.0, log=True),
"n_estimators": 500,
}
trading = {
# primary: edge-margin (model_pred - market_ask for the chosen side)
"edge_threshold": trial.suggest_float("edge_threshold", 0.03, 0.25),
# safety cap: block high-entry overconfident trades where the
# model's probability is likely uncalibrated. Without this, BTC
# lost -$621 on a run (vs +$390 with the cap).
"entry_price_max": trial.suggest_float("entry_price_max", 0.40, 0.65),
}
return params, trading
# ---------------------------------------------------------------------------
# Calibration table (10 buckets)
# ---------------------------------------------------------------------------
def _calibration_buckets(preds: np.ndarray, y: np.ndarray, n_buckets: int = 10) -> List[Dict]:
preds = np.asarray(preds)
y = np.asarray(y)
edges = np.linspace(0.0, 1.0, n_buckets + 1)
out = []
for i in range(n_buckets):
lo, hi = edges[i], edges[i + 1]
mask = (preds >= lo) & (preds < hi) if i < n_buckets - 1 else (preds >= lo) & (preds <= hi)
n_here = int(mask.sum())
if n_here == 0:
out.append({"bucket": i, "lo": lo, "hi": hi, "n": 0, "predicted": 0.0, "actual": 0.0})
continue
out.append(
{
"bucket": i,
"lo": float(lo),
"hi": float(hi),
"n": n_here,
"predicted": float(preds[mask].mean()),
"actual": float(y[mask].mean()),
}
)
return out
# ---------------------------------------------------------------------------
# Main entry
# ---------------------------------------------------------------------------
def run_training(
asset: str,
hf_token: str,
variant: str = "v1_arb",
n_trials: int = 150,
n_folds: int = 5,
log: Callable[[str], None] = print,
cache_dir: str = "/tmp/cache",
output_dir: str = "/tmp/output",
) -> Dict:
cache_path = Path(cache_dir)
out_path = Path(output_dir)
cache_path.mkdir(parents=True, exist_ok=True)
out_path.mkdir(parents=True, exist_ok=True)
log(f"[train] asset={asset} variant={variant} trials={n_trials} folds={n_folds}")
# 1. Build dataset
df = _build_training_dataset(asset, hf_token, cache_path, log=log)
df = df.sort_values("slug_ts").reset_index(drop=True)
# DIRECTIONAL: label is "UP wins the window?" (spot_label), 1/0 balanced.
# Per row we also track entry-price (tick 120) for each side and the
# realized per-side PnL if we bought 100 shares at that price and held
# to resolution. Asymmetry: the lower the ask we buy at, the larger the
# payout on a correct call — this is enforced at trading-policy time
# via the Optuna-tuned `entry_price_max` cap.
log(f"[train] directional: up_win_rate={float(df['binary_label'].mean()):.3f} "
f"mean_pnl_up={float(df['pnl_up'].mean()):.2f} "
f"mean_pnl_dn={float(df['pnl_dn'].mean()):.2f} "
f"up_ask_median={float(df['up_ask_120'].median()):.3f} "
f"dn_ask_median={float(df['dn_ask_120'].median()):.3f}")
X_all = df[ALL_FEATURES].to_numpy(dtype=np.float32)
y_all = df["binary_label"].to_numpy(dtype=np.int32)
up_ask_all = df["up_ask_120"].to_numpy(dtype=np.float64)
dn_ask_all = df["dn_ask_120"].to_numpy(dtype=np.float64)
pnl_up_all = df["pnl_up"].to_numpy(dtype=np.float64)
pnl_dn_all = df["pnl_dn"].to_numpy(dtype=np.float64)
n = len(df)
split = int(round(0.9 * n))
X_pool = X_all[:split]
y_pool = y_all[:split]
up_ask_pool = up_ask_all[:split]
dn_ask_pool = dn_ask_all[:split]
pnl_up_pool = pnl_up_all[:split]
pnl_dn_pool = pnl_dn_all[:split]
X_hold = X_all[split:]
y_hold = y_all[split:]
up_ask_hold = up_ask_all[split:]
dn_ask_hold = dn_ask_all[split:]
pnl_up_hold = pnl_up_all[split:]
pnl_dn_hold = pnl_dn_all[split:]
log(f"[train] pool={len(X_pool)} holdout={len(X_hold)}")
folds = _walk_forward_folds(len(X_pool), n_folds=n_folds)
def _objective(trial: optuna.Trial) -> float:
params, trading = _lgb_params_from_trial(trial)
fold_scores = []
for (train_end, test_start, test_end) in folds:
Xt = X_pool[:train_end]
yt = y_pool[:train_end]
Xv = X_pool[test_start:test_end]
yv = y_pool[test_start:test_end]
uav = up_ask_pool[test_start:test_end]
dav = dn_ask_pool[test_start:test_end]
pup_v = pnl_up_pool[test_start:test_end]
pdn_v = pnl_dn_pool[test_start:test_end]
if len(Xt) < 50 or len(Xv) < 20 or len(set(yt.tolist())) < 2:
continue
try:
res = _train_fold_core(Xt, yt, Xv, yv, params)
except Exception as e: # noqa: BLE001
log(f"[optuna] fold error: {e!r}")
continue
flag, pnl = _directional_policy_apply(
res["preds"], uav, dav, pup_v, pdn_v,
trading["edge_threshold"],
trading["entry_price_max"],
)
sim = simulate_flagging_pnl(pnl, flag)
score = sim["sum_flagged"] - 0.2 * sim["max_drawdown"]
fold_scores.append(score)
if not fold_scores:
return -1e9
return float(np.mean(fold_scores))
sampler = optuna.samplers.TPESampler(seed=42)
study = optuna.create_study(direction="maximize", sampler=sampler)
log(f"[optuna] starting with {n_trials} trials")
study.optimize(_objective, n_trials=n_trials, show_progress_bar=False)
best_params_trial = dict(study.best_trial.params)
best_trading = {
"edge_threshold": float(best_params_trial.pop("edge_threshold")),
"entry_price_max": float(best_params_trial.pop("entry_price_max")),
}
best_lgb_params = {
"objective": "binary",
"metric": "binary_logloss",
"verbosity": -1,
"boosting_type": "gbdt",
"n_estimators": 500,
**best_params_trial,
}
log(f"[optuna] best value={study.best_value:.4f} trading={best_trading}")
log(f"[optuna] best lgb params: {best_params_trial}")
fold_metrics: List[Dict] = []
per_fold_thresholds: Dict[str, float] = {}
for k, (train_end, test_start, test_end) in enumerate(folds):
Xt = X_pool[:train_end]
yt = y_pool[:train_end]
Xv = X_pool[test_start:test_end]
yv = y_pool[test_start:test_end]
uav = up_ask_pool[test_start:test_end]
dav = dn_ask_pool[test_start:test_end]
pup_v = pnl_up_pool[test_start:test_end]
pdn_v = pnl_dn_pool[test_start:test_end]
if len(Xt) < 50 or len(Xv) < 20 or len(set(yt.tolist())) < 2:
fold_metrics.append({"fold": k, "skipped": True})
continue
res = _train_fold_core(Xt, yt, Xv, yv, best_lgb_params)
flag, pnl = _directional_policy_apply(
res["preds"], uav, dav, pup_v, pdn_v,
best_trading["edge_threshold"],
best_trading["entry_price_max"],
)
sim = simulate_flagging_pnl(pnl, flag)
# counterfactual: "trade every window, always buy cheaper side"
pnl_all_cheaper = np.where(uav <= dav, pup_v, pdn_v)
cf = simulate_flagging_pnl(pnl_all_cheaper, np.ones_like(pnl_all_cheaper, dtype=bool))
fold_metrics.append(
{
"fold": k,
"train_end": train_end,
"test_start": test_start,
"test_end": test_end,
"n_train": len(Xt),
"n_test": len(Xv),
"auc": res["auc"],
"n_flagged": sim["n_flagged"],
"n_total": sim["n_total"],
"flagged_ratio": sim["flagged_ratio"],
"realized_sum_pnl": sim["sum_flagged"],
"realized_mean_pnl_of_flagged": sim["mean_flagged"],
"realized_median_pnl_of_flagged": sim["median_flagged"],
"hit_rate": sim["hit_rate"],
"max_drawdown": sim["max_drawdown"],
"flag_all_baseline_sum_pnl": cf["sum_flagged"],
"calibration": _calibration_buckets(res["preds"], yv),
}
)
# 2. Final refit on full pool, evaluate on holdout
log(f"[train] refit on pool size={len(X_pool)} → predict holdout size={len(X_hold)}")
dtrain = lgb.Dataset(X_pool, label=y_pool, feature_name=ALL_FEATURES)
final_booster = lgb.train(
best_lgb_params,
dtrain,
num_boost_round=best_lgb_params.get("n_estimators", 500),
callbacks=[lgb.log_evaluation(0)],
)
if len(X_hold) > 0:
hold_preds = final_booster.predict(X_hold)
hold_flag, hold_pnl = _directional_policy_apply(
hold_preds, up_ask_hold, dn_ask_hold, pnl_up_hold, pnl_dn_hold,
best_trading["edge_threshold"],
best_trading["entry_price_max"],
)
hold_sim = simulate_flagging_pnl(hold_pnl, hold_flag)
try:
hold_auc = (
float(roc_auc_score(y_hold, hold_preds))
if len(set(y_hold.tolist())) > 1
else 0.5
)
except Exception:
hold_auc = 0.5
hold_cal = _calibration_buckets(hold_preds, y_hold)
cheaper_hold = np.where(up_ask_hold <= dn_ask_hold, pnl_up_hold, pnl_dn_hold)
hold_cf = simulate_flagging_pnl(cheaper_hold, np.ones_like(cheaper_hold, dtype=bool))
else:
hold_preds = np.array([])
hold_sim = {
"sum_flagged": 0.0, "mean_flagged": 0.0, "median_flagged": 0.0,
"max_drawdown": 0.0, "hit_rate": 0.0, "n_flagged": 0, "n_total": 0,
"flagged_ratio": 0.0,
}
hold_auc = 0.5
hold_cal = []
hold_cf = {"sum_flagged": 0.0}
# 3. Save artifacts
model_path = out_path / "lgbm_model.txt"
final_booster.save_model(str(model_path))
(out_path / "feature_spec.json").write_text(
json.dumps(
{
"ohlcv_features": list(OHLCV_FEATS),
"orderbook_features": list(OB_FEATS),
"state_features": list(STATE_FEATS),
"all_features": list(ALL_FEATURES),
"n_features": len(ALL_FEATURES),
},
indent=2,
)
)
(out_path / "thresholds.json").write_text(
json.dumps(
{
"trading": best_trading,
"notes": "Edge-margin rule: edge_up = model_pred - up_ask; "
"edge_dn = (1 - model_pred) - dn_ask. Enter the side "
"with the larger edge, iff that edge >= edge_threshold. "
"Edge is expected PnL per share before fees.",
},
indent=2,
)
)
(out_path / "fold_metrics.json").write_text(json.dumps(fold_metrics, indent=2))
trial_values = [
t.value if t.value is not None else float("nan")
for t in study.trials
if t.state == optuna.trial.TrialState.COMPLETE
]
(out_path / "optuna_report.json").write_text(
json.dumps(
{
"asset": asset,
"variant": variant,
"n_trials_requested": n_trials,
"n_trials_completed": len(trial_values),
"best_value": float(study.best_value),
"best_params": dict(study.best_trial.params),
"trial_values": trial_values,
},
indent=2,
)
)
(out_path / "holdout_metrics.json").write_text(
json.dumps(
{
"n_holdout": len(X_hold),
"auc": hold_auc,
"sim": hold_sim,
"calibration": hold_cal,
"flag_all_baseline_sum_pnl": hold_cf.get("sum_flagged", 0.0),
},
indent=2,
)
)
# 4. Upload
try:
api = HfApi(token=hf_token)
target_repo = f"commanderzee/{asset.lower()}-v1-arb-full"
try:
api.create_repo(repo_id=target_repo, repo_type="dataset", exist_ok=True, token=hf_token)
except Exception as e: # noqa: BLE001
log(f"[upload] create_repo warning: {e!r}")
_upload_with_retry(api, str(out_path), target_repo, "dataset", hf_token, log=log)
log(f"[upload] pushed artifacts to {target_repo}")
except Exception as e: # noqa: BLE001
log(f"[upload] FAILED: {e!r}")
log(traceback.format_exc())
return {
"asset": asset,
"variant": variant,
"n_windows": int(n),
"n_pool": int(len(X_pool)),
"n_holdout": int(len(X_hold)),
"best_value": float(study.best_value),
"best_trading": best_trading,
"holdout": {"auc": hold_auc, "sim": hold_sim},
"output_dir": str(out_path),
}
__all__ = ["run_training", "ALL_FEATURES"]