""" T5 — Validation Layer (Standalone) Runs after every training cycle. Results saved to HF. 1. Per-expert validation (E1..E5 individually) 2. Walk-forward OOS (5 rolling windows, no lookahead) 3. Regime-specific tests (TRENDING / RANGING / VOLATILE) 4. Benchmark: vs Buy-Hold, vs Random 5. Paper Trading Sim (lightweight, slippage-aware) 6. Promotion gate: model only keeps if Sharpe > threshold """ from __future__ import annotations import os, json, math, logging, time from datetime import datetime, timezone from typing import Any import numpy as np import pandas as pd log = logging.getLogger("validation_layer") HF_TOKEN = os.environ.get("HF_TOKEN", "") EXPERIENCE_REPO = os.environ.get("EXPERIENCE_REPO", "gionuibk/aetheris-experiences") MODEL_REPO = os.environ.get("MODEL_REPO", "gionuibk/aetheris-models") # Promotion gates GATE = { "min_sharpe_oos": 0.80, "min_win_rate": 0.52, "max_drawdown": -0.15, "min_edge_vs_random": 0.03, "min_folds_pass": 3, # must pass at least 3/5 OOS windows } # ───────────────────────────────────────────────────────────────── # UTILS # ───────────────────────────────────────────────────────────────── def _sharpe(returns: list[float]) -> float: if len(returns) < 2: return 0.0 arr = np.array(returns) return float(np.mean(arr) / (np.std(arr) + 1e-9) * math.sqrt(len(arr))) def _max_drawdown(equity_curve: list[float]) -> float: if not equity_curve: return 0.0 peak = equity_curve[0] max_dd = 0.0 for v in equity_curve: if v > peak: peak = v dd = (v - peak) / (peak + 1e-9) if dd < max_dd: max_dd = dd return round(max_dd, 4) # ───────────────────────────────────────────────────────────────── # 1. PER-EXPERT VALIDATOR # ───────────────────────────────────────────────────────────────── class PerExpertValidator: """Test each Expert model on its own held-out data.""" EXPERT_FEATURE_MAP = { "E1": ["obi","spread_pct","vpin","entropy","cvd_norm","absorption", "tape_speed","large_trade","vol_adjusted_spread","poc_proximity","vol_skew"], "E2": ["ema_cross_fs","ema_cross_sl","adx","plus_di","minus_di", "momentum_zscore","trend_strength","vol_surge","macd_hist","macd_line"], "E3": ["zscore_60","zscore_300","rsi","hurst","bb_pct_b","bb_width", "vwap_dev","mean_rev_signal"], "E4": ["market_structure","bos","wyckoff_phase","fib_618_prox", "fvg_score","ob_score","pin_bar","engulfing","inside_bar", "pivot_dist_h","pivot_dist_l"], "E5": ["funding","funding_trend","oi_delta","ls_ratio", "session_hour","session_asian","session_london","session_ny", "session_overlap","day_of_week"], } def validate_expert(self, model, df: pd.DataFrame, expert_id: str, strategy: str = "general") -> dict: feat_cols = self.EXPERT_FEATURE_MAP.get(expert_id, []) available = [c for c in feat_cols if c in df.columns] if not available: return {"status": "no_features", "expert": expert_id} # Held-out: last 20% n = len(df) cutoff = int(n * 0.8) df_oos = df.iloc[cutoff:].copy() for c in available: if c not in df_oos.columns: df_oos[c] = 0.0 X = df_oos[available].fillna(0).values.astype(np.float32) y = df_oos["label"].values.astype(np.int32) try: preds = model.predict(X) probs = model.predict_proba(X) conf = np.max(probs, axis=1) except Exception as e: return {"status": "predict_error", "error": str(e)} acc = float(np.mean(preds == y)) sharpe_oos = (float(np.mean(conf)) - 0.5) / (float(np.std(conf)) + 1e-9) * math.sqrt(len(conf)) non_flat = [(t, p) for t, p in zip(y, preds) if p != 0] win_rate = sum(1 for t, p in non_flat if t == p) / max(len(non_flat), 1) return { "expert": expert_id, "strategy": strategy, "status": "ok", "n_oos": len(y), "accuracy": round(acc, 4), "sharpe": round(sharpe_oos, 4), "win_rate": round(win_rate, 4), "n_signals": len(non_flat), } # ───────────────────────────────────────────────────────────────── # 2. WALK-FORWARD VALIDATOR # ───────────────────────────────────────────────────────────────── class WalkForwardValidator: """5-fold rolling OOS — no lookahead bias.""" N_FOLDS = 5 def validate(self, model, df: pd.DataFrame, feat_cols: list) -> dict: # Sort by time df = df.sort_values("timestamp").reset_index(drop=True) if "timestamp" in df.columns else df.reset_index(drop=True) n = len(df) if n < 200: return {"status": "insufficient", "n": n} window = n // (self.N_FOLDS + 1) fold_results = [] for i in range(self.N_FOLDS): train_end = window * (i + 1) test_start = train_end test_end = min(test_start + window, n) if test_end - test_start < 30: continue df_train = df.iloc[:train_end] df_test = df.iloc[test_start:test_end] for c in feat_cols: if c not in df_train.columns: df_train = df_train.copy(); df_train[c] = 0.0 if c not in df_test.columns: df_test = df_test.copy(); df_test[c] = 0.0 X_tr = df_train[feat_cols].fillna(0).values.astype(np.float32) y_tr = df_train["label"].values.astype(np.int32) X_te = df_test[feat_cols].fillna(0).values.astype(np.float32) y_te = df_test["label"].values.astype(np.int32) try: model.fit(X_tr, y_tr) preds = model.predict(X_te) probs = model.predict_proba(X_te) conf = np.max(probs, axis=1) acc = float(np.mean(preds == y_te)) sharpe = (float(np.mean(conf)) - 0.5) / (float(np.std(conf)) + 1e-9) * math.sqrt(len(conf)) non_flat = [(t, p) for t, p in zip(y_te, preds) if p != 0] win_rate = sum(1 for t, p in non_flat if t == p) / max(len(non_flat), 1) passed = sharpe >= GATE["min_sharpe_oos"] and win_rate >= GATE["min_win_rate"] fold_results.append({ "fold": i, "n_train": len(X_tr), "n_test": len(X_te), "acc": round(acc, 4), "sharpe": round(sharpe, 4), "win_rate": round(win_rate, 4), "passed": passed, }) except Exception as e: log.warning(f"WF fold {i}: {e}") if not fold_results: return {"status": "no_folds"} n_pass = sum(1 for f in fold_results if f["passed"]) avg_sh = sum(f["sharpe"] for f in fold_results) / len(fold_results) avg_wr = sum(f["win_rate"] for f in fold_results) / len(fold_results) overall_pass = n_pass >= GATE["min_folds_pass"] return { "status": "pass" if overall_pass else "fail", "n_folds": len(fold_results), "folds_passed": n_pass, "avg_sharpe": round(avg_sh, 4), "avg_win_rate": round(avg_wr, 4), "fold_details": fold_results, } # ───────────────────────────────────────────────────────────────── # 3. REGIME-SPECIFIC VALIDATOR # ───────────────────────────────────────────────────────────────── class RegimeValidator: """Test model separately in each market regime.""" REGIMES = { "TRENDING": lambda df: df["adx"] > 25 if "adx" in df.columns else pd.Series([True]*len(df)), "RANGING": lambda df: df["adx"] < 20 if "adx" in df.columns else pd.Series([False]*len(df)), "VOLATILE": lambda df: df["vol_surge"] > 2.0 if "vol_surge" in df.columns else pd.Series([False]*len(df)), "QUIET": lambda df: df["vol_surge"] < 0.7 if "vol_surge" in df.columns else pd.Series([False]*len(df)), } def validate(self, model, df: pd.DataFrame, feat_cols: list) -> dict: results = {} for regime, mask_fn in self.REGIMES.items(): try: mask = mask_fn(df) sub = df[mask].copy() if len(sub) < 30: results[regime] = {"status": "insufficient", "n": len(sub)} continue for c in feat_cols: if c not in sub.columns: sub[c] = 0.0 X = sub[feat_cols].fillna(0).values.astype(np.float32) y = sub["label"].values.astype(np.int32) preds = model.predict(X) probs = model.predict_proba(X) conf = np.max(probs, axis=1) acc = float(np.mean(preds == y)) sharpe = (float(np.mean(conf)) - 0.5) / (float(np.std(conf)) + 1e-9) * math.sqrt(len(conf)) non_flat = [(t, p) for t, p in zip(y, preds) if p != 0] win_rate = sum(1 for t, p in non_flat if t == p) / max(len(non_flat), 1) results[regime] = { "n": len(y), "accuracy": round(acc, 4), "sharpe": round(sharpe, 4), "win_rate": round(win_rate, 4), "n_signals": len(non_flat), } except Exception as e: results[regime] = {"status": "error", "error": str(e)} return results # ───────────────────────────────────────────────────────────────── # 4. BENCHMARK COMPARATOR # ───────────────────────────────────────────────────────────────── class BenchmarkComparator: """Compare model vs Buy-Hold and vs Random.""" def compare(self, model, df: pd.DataFrame, feat_cols: list) -> dict: df_oos = df.iloc[int(len(df)*0.8):].copy() for c in feat_cols: if c not in df_oos.columns: df_oos[c] = 0.0 X = df_oos[feat_cols].fillna(0).values.astype(np.float32) y = df_oos["label"].values.astype(np.int32) try: preds = model.predict(X) except Exception: return {"status": "error"} model_acc = float(np.mean(preds == y)) random_acc = float(np.mean(np.random.choice([-1,0,1], size=len(y)) == y)) # Buy-hold baseline: always predict +1 buyhold_acc = float(np.mean(np.ones(len(y), dtype=int) == y)) edge_vs_random = model_acc - random_acc edge_vs_buyhold = model_acc - buyhold_acc # Signal quality non_flat = [(t, p) for t, p in zip(y, preds) if p != 0] signal_rate = len(non_flat) / max(len(preds), 1) win_rate = sum(1 for t, p in non_flat if t == p) / max(len(non_flat), 1) passes_edge = edge_vs_random >= GATE["min_edge_vs_random"] return { "model_acc": round(model_acc, 4), "random_acc": round(random_acc, 4), "buyhold_acc": round(buyhold_acc, 4), "edge_vs_random": round(edge_vs_random, 4), "edge_vs_buyhold": round(edge_vs_buyhold, 4), "signal_rate": round(signal_rate, 4), "win_rate": round(win_rate, 4), "passes_edge_gate": passes_edge, } # ───────────────────────────────────────────────────────────────── # 5. PAPER TRADING SIMULATOR # ───────────────────────────────────────────────────────────────── class PaperTradingSimulator: """ Lightweight paper trading sim using OOS experience data. Simulates actual trade P&L with slippage and spread costs. No NautilusTrader dependency — runs on historical experience data. """ SLIPPAGE_PCT = 0.0002 # 0.02% slippage per side SPREAD_COST = 0.0001 # 0.01% spread cost MAX_POSITION = 1.0 # fraction of capital STOP_LOSS_PCT = 0.005 # 0.5% hard stop def simulate(self, model, df: pd.DataFrame, feat_cols: list, initial_capital: float = 10_000.0) -> dict: """ Walk through OOS data chronologically. Open/close trades based on model signals. Track P&L, drawdown, win rate. """ df_oos = df.sort_values("timestamp").iloc[int(len(df)*0.8):].copy() \ if "timestamp" in df.columns else df.iloc[int(len(df)*0.8):].copy() if len(df_oos) < 50: return {"status": "insufficient_oos_data"} for c in feat_cols: if c not in df_oos.columns: df_oos[c] = 0.0 X = df_oos[feat_cols].fillna(0).values.astype(np.float32) y = df_oos["label"].values.astype(np.int32) pnl = df_oos["actual_pnl"].values if "actual_pnl" in df_oos.columns else np.zeros(len(y)) try: preds = model.predict(X) probs = model.predict_proba(X) except Exception as e: return {"status": "predict_error", "error": str(e)} capital = initial_capital equity_curve = [capital] trades = [] in_trade = False entry_direction = 0 for i, (pred, prob, actual_pnl_pct) in enumerate(zip(preds, np.max(probs, axis=1), pnl)): if pred == 0: # No signal — if in trade, check stop if in_trade: current_ret = actual_pnl_pct * entry_direction if current_ret < -self.STOP_LOSS_PCT: # Hit stop loss net = current_ret - self.SLIPPAGE_PCT - self.SPREAD_COST capital *= (1 + net) trades.append({"type": "stop_loss", "ret": net, "capital": capital}) equity_curve.append(capital) in_trade = False continue if not in_trade: # Open new trade cost = self.SLIPPAGE_PCT + self.SPREAD_COST in_trade = True entry_direction = pred else: # Already in trade — if signal flips, close and reopen if pred != entry_direction: # Close current net = actual_pnl_pct * entry_direction - self.SLIPPAGE_PCT - self.SPREAD_COST capital *= (1 + net) trades.append({"type": "close_flip", "ret": net, "capital": capital}) equity_curve.append(capital) # Open new entry_direction = pred else: # Confirm existing trade net = actual_pnl_pct * entry_direction - self.SLIPPAGE_PCT capital *= (1 + net) trades.append({"type": "confirm", "ret": net, "capital": capital}) equity_curve.append(capital) if not trades: return {"status": "no_trades"} returns = [t["ret"] for t in trades] winning = [r for r in returns if r > 0] losing = [r for r in returns if r < 0] total_ret = (capital - initial_capital) / initial_capital sharpe_sim = _sharpe(returns) max_dd = _max_drawdown(equity_curve) win_rate_sim = len(winning) / max(len(returns), 1) profit_factor = abs(sum(winning)) / max(abs(sum(losing)), 1e-9) passed = (sharpe_sim >= GATE["min_sharpe_oos"] and win_rate_sim >= GATE["min_win_rate"] and max_dd >= GATE["max_drawdown"]) return { "status": "pass" if passed else "fail", "initial_capital":initial_capital, "final_capital": round(capital, 2), "total_return": round(total_ret, 4), "sharpe": round(sharpe_sim, 4), "max_drawdown": round(max_dd, 4), "win_rate": round(win_rate_sim, 4), "profit_factor": round(profit_factor, 4), "n_trades": len(trades), "n_wins": len(winning), "n_losses": len(losing), } # ───────────────────────────────────────────────────────────────── # 6. MASTER VALIDATOR — ties everything together # ───────────────────────────────────────────────────────────────── class MasterValidator: """ Run full validation suite on a model. Publishes results to HF. Updates model_registry. """ def __init__(self): self.per_expert = PerExpertValidator() self.walk_fwd = WalkForwardValidator() self.regime = RegimeValidator() self.benchmark = BenchmarkComparator() self.paper = PaperTradingSimulator() def run_full_validation(self, model, df_oos: pd.DataFrame, model_name: str, feat_cols: list, expert_id: str = "E1", strategy: str = "general") -> dict: log.info(f"🧪 Validating {model_name} ({len(df_oos):,} OOS rows)") ts = datetime.now(timezone.utc).isoformat() report = {"model": model_name, "validated_at": ts, "strategy": strategy} report["per_expert"] = self.per_expert.validate_expert(model, df_oos, expert_id, strategy) report["walk_forward"] = self.walk_fwd.validate(model, df_oos, feat_cols) report["regime"] = self.regime.validate(model, df_oos, feat_cols) report["benchmark"] = self.benchmark.compare(model, df_oos, feat_cols) report["paper_trading"] = self.paper.simulate(model, df_oos, feat_cols) # Overall promotion decision wf = report["walk_forward"] bm = report["benchmark"] pt = report["paper_trading"] pe = report["per_expert"] promotion_checks = { "wf_pass": wf.get("status") == "pass", "edge_vs_random": bm.get("passes_edge_gate", False), "paper_pass": pt.get("status") == "pass", "expert_sharpe": pe.get("sharpe", 0) >= GATE["min_sharpe_oos"], } promote = sum(promotion_checks.values()) >= 3 # need 3/4 checks report["promotion"] = {"promote": promote, "checks": promotion_checks} log.info(f" {'✅ PROMOTE' if promote else '❌ REJECT'} {model_name} | " f"WF={wf.get('avg_sharpe','?')} PT={pt.get('sharpe','?')}") self._save_report(model_name, report) return report def _save_report(self, model_name: str, report: dict): path = f"validation_{model_name.replace('/','_')}.json" with open(path, "w") as f: json.dump(report, f, indent=2) try: from huggingface_hub import HfApi api = HfApi(token=HF_TOKEN) ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M") api.upload_file( path_or_fileobj=path, token=HF_TOKEN, path_in_repo=f"validation/{model_name}_{ts}.json", repo_id=EXPERIENCE_REPO, repo_type="dataset", commit_message=f"Validation: {model_name}", ) except Exception as e: log.warning(f"Cannot push report: {e}") # ───────────────────────────────────────────────────────────────── # Run loop (called from forge_controller or training_engine) # ───────────────────────────────────────────────────────────────── def validate_after_training(model, df_oos: pd.DataFrame, model_name: str, feat_cols: list, expert_id: str = "E1", strategy: str = "general") -> bool: """Returns True if model should be promoted.""" mv = MasterValidator() report = mv.run_full_validation(model, df_oos, model_name, feat_cols, expert_id, strategy) return report["promotion"]["promote"] if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Quick self-test with synthetic data from sklearn.ensemble import GradientBoostingClassifier import numpy as np np.random.seed(42) n = 1000 feat_cols = ["obi","vpin","adx","rsi","zscore_60","funding"] df_test = pd.DataFrame({c: np.random.randn(n) for c in feat_cols}) df_test["label"] = np.random.choice([-1,0,1], n) df_test["actual_pnl"] = np.random.uniform(-0.01, 0.01, n) df_test["timestamp"] = np.arange(n) model = GradientBoostingClassifier(n_estimators=30, random_state=42) X = df_test[feat_cols].values; y = df_test["label"].values model.fit(X, y) mv = MasterValidator() report = mv.run_full_validation(model, df_test, "test_model", feat_cols, "E1", "test") print(json.dumps({ "wf_status": report["walk_forward"]["status"], "wf_sharpe": report["walk_forward"].get("avg_sharpe"), "pt_status": report["paper_trading"]["status"], "pt_sharpe": report["paper_trading"].get("sharpe"), "promote": report["promotion"]["promote"], }, indent=2))