Spaces:
Running
Running
| """ | |
| T5 β Validation Layer (Standalone) | |
| Runs after every training cycle. Results saved to HF. | |
| 1. Per-expert validation (E1..E5 individually) | |
| 2. Walk-forward OOS (5 rolling windows, no lookahead) | |
| 3. Regime-specific tests (TRENDING / RANGING / VOLATILE) | |
| 4. Benchmark: vs Buy-Hold, vs Random | |
| 5. Paper Trading Sim (lightweight, slippage-aware) | |
| 6. Promotion gate: model only keeps if Sharpe > threshold | |
| """ | |
| from __future__ import annotations | |
| import os, json, math, logging, time | |
| from datetime import datetime, timezone | |
| from typing import Any | |
| import numpy as np | |
| import pandas as pd | |
| log = logging.getLogger("validation_layer") | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") | |
| EXPERIENCE_REPO = os.environ.get("EXPERIENCE_REPO", "gionuibk/aetheris-experiences") | |
| MODEL_REPO = os.environ.get("MODEL_REPO", "gionuibk/aetheris-models") | |
| # Promotion gates | |
| GATE = { | |
| "min_sharpe_oos": 0.80, | |
| "min_win_rate": 0.52, | |
| "max_drawdown": -0.15, | |
| "min_edge_vs_random": 0.03, | |
| "min_folds_pass": 3, # must pass at least 3/5 OOS windows | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # UTILS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _sharpe(returns: list[float]) -> float: | |
| if len(returns) < 2: return 0.0 | |
| arr = np.array(returns) | |
| return float(np.mean(arr) / (np.std(arr) + 1e-9) * math.sqrt(len(arr))) | |
| def _max_drawdown(equity_curve: list[float]) -> float: | |
| if not equity_curve: return 0.0 | |
| peak = equity_curve[0] | |
| max_dd = 0.0 | |
| for v in equity_curve: | |
| if v > peak: peak = v | |
| dd = (v - peak) / (peak + 1e-9) | |
| if dd < max_dd: max_dd = dd | |
| return round(max_dd, 4) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 1. PER-EXPERT VALIDATOR | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class PerExpertValidator: | |
| """Test each Expert model on its own held-out data.""" | |
| EXPERT_FEATURE_MAP = { | |
| "E1": ["obi","spread_pct","vpin","entropy","cvd_norm","absorption", | |
| "tape_speed","large_trade","vol_adjusted_spread","poc_proximity","vol_skew"], | |
| "E2": ["ema_cross_fs","ema_cross_sl","adx","plus_di","minus_di", | |
| "momentum_zscore","trend_strength","vol_surge","macd_hist","macd_line"], | |
| "E3": ["zscore_60","zscore_300","rsi","hurst","bb_pct_b","bb_width", | |
| "vwap_dev","mean_rev_signal"], | |
| "E4": ["market_structure","bos","wyckoff_phase","fib_618_prox", | |
| "fvg_score","ob_score","pin_bar","engulfing","inside_bar", | |
| "pivot_dist_h","pivot_dist_l"], | |
| "E5": ["funding","funding_trend","oi_delta","ls_ratio", | |
| "session_hour","session_asian","session_london","session_ny", | |
| "session_overlap","day_of_week"], | |
| } | |
| def validate_expert(self, model, df: pd.DataFrame, | |
| expert_id: str, strategy: str = "general") -> dict: | |
| feat_cols = self.EXPERT_FEATURE_MAP.get(expert_id, []) | |
| available = [c for c in feat_cols if c in df.columns] | |
| if not available: | |
| return {"status": "no_features", "expert": expert_id} | |
| # Held-out: last 20% | |
| n = len(df) | |
| cutoff = int(n * 0.8) | |
| df_oos = df.iloc[cutoff:].copy() | |
| for c in available: | |
| if c not in df_oos.columns: df_oos[c] = 0.0 | |
| X = df_oos[available].fillna(0).values.astype(np.float32) | |
| y = df_oos["label"].values.astype(np.int32) | |
| try: | |
| preds = model.predict(X) | |
| probs = model.predict_proba(X) | |
| conf = np.max(probs, axis=1) | |
| except Exception as e: | |
| return {"status": "predict_error", "error": str(e)} | |
| acc = float(np.mean(preds == y)) | |
| sharpe_oos = (float(np.mean(conf)) - 0.5) / (float(np.std(conf)) + 1e-9) * math.sqrt(len(conf)) | |
| non_flat = [(t, p) for t, p in zip(y, preds) if p != 0] | |
| win_rate = sum(1 for t, p in non_flat if t == p) / max(len(non_flat), 1) | |
| return { | |
| "expert": expert_id, | |
| "strategy": strategy, | |
| "status": "ok", | |
| "n_oos": len(y), | |
| "accuracy": round(acc, 4), | |
| "sharpe": round(sharpe_oos, 4), | |
| "win_rate": round(win_rate, 4), | |
| "n_signals": len(non_flat), | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 2. WALK-FORWARD VALIDATOR | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class WalkForwardValidator: | |
| """5-fold rolling OOS β no lookahead bias.""" | |
| N_FOLDS = 5 | |
| def validate(self, model, df: pd.DataFrame, feat_cols: list) -> dict: | |
| # Sort by time | |
| df = df.sort_values("timestamp").reset_index(drop=True) if "timestamp" in df.columns else df.reset_index(drop=True) | |
| n = len(df) | |
| if n < 200: | |
| return {"status": "insufficient", "n": n} | |
| window = n // (self.N_FOLDS + 1) | |
| fold_results = [] | |
| for i in range(self.N_FOLDS): | |
| train_end = window * (i + 1) | |
| test_start = train_end | |
| test_end = min(test_start + window, n) | |
| if test_end - test_start < 30: | |
| continue | |
| df_train = df.iloc[:train_end] | |
| df_test = df.iloc[test_start:test_end] | |
| for c in feat_cols: | |
| if c not in df_train.columns: df_train = df_train.copy(); df_train[c] = 0.0 | |
| if c not in df_test.columns: df_test = df_test.copy(); df_test[c] = 0.0 | |
| X_tr = df_train[feat_cols].fillna(0).values.astype(np.float32) | |
| y_tr = df_train["label"].values.astype(np.int32) | |
| X_te = df_test[feat_cols].fillna(0).values.astype(np.float32) | |
| y_te = df_test["label"].values.astype(np.int32) | |
| try: | |
| model.fit(X_tr, y_tr) | |
| preds = model.predict(X_te) | |
| probs = model.predict_proba(X_te) | |
| conf = np.max(probs, axis=1) | |
| acc = float(np.mean(preds == y_te)) | |
| sharpe = (float(np.mean(conf)) - 0.5) / (float(np.std(conf)) + 1e-9) * math.sqrt(len(conf)) | |
| non_flat = [(t, p) for t, p in zip(y_te, preds) if p != 0] | |
| win_rate = sum(1 for t, p in non_flat if t == p) / max(len(non_flat), 1) | |
| passed = sharpe >= GATE["min_sharpe_oos"] and win_rate >= GATE["min_win_rate"] | |
| fold_results.append({ | |
| "fold": i, "n_train": len(X_tr), "n_test": len(X_te), | |
| "acc": round(acc, 4), "sharpe": round(sharpe, 4), | |
| "win_rate": round(win_rate, 4), "passed": passed, | |
| }) | |
| except Exception as e: | |
| log.warning(f"WF fold {i}: {e}") | |
| if not fold_results: | |
| return {"status": "no_folds"} | |
| n_pass = sum(1 for f in fold_results if f["passed"]) | |
| avg_sh = sum(f["sharpe"] for f in fold_results) / len(fold_results) | |
| avg_wr = sum(f["win_rate"] for f in fold_results) / len(fold_results) | |
| overall_pass = n_pass >= GATE["min_folds_pass"] | |
| return { | |
| "status": "pass" if overall_pass else "fail", | |
| "n_folds": len(fold_results), | |
| "folds_passed": n_pass, | |
| "avg_sharpe": round(avg_sh, 4), | |
| "avg_win_rate": round(avg_wr, 4), | |
| "fold_details": fold_results, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 3. REGIME-SPECIFIC VALIDATOR | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class RegimeValidator: | |
| """Test model separately in each market regime.""" | |
| REGIMES = { | |
| "TRENDING": lambda df: df["adx"] > 25 if "adx" in df.columns else pd.Series([True]*len(df)), | |
| "RANGING": lambda df: df["adx"] < 20 if "adx" in df.columns else pd.Series([False]*len(df)), | |
| "VOLATILE": lambda df: df["vol_surge"] > 2.0 if "vol_surge" in df.columns else pd.Series([False]*len(df)), | |
| "QUIET": lambda df: df["vol_surge"] < 0.7 if "vol_surge" in df.columns else pd.Series([False]*len(df)), | |
| } | |
| def validate(self, model, df: pd.DataFrame, feat_cols: list) -> dict: | |
| results = {} | |
| for regime, mask_fn in self.REGIMES.items(): | |
| try: | |
| mask = mask_fn(df) | |
| sub = df[mask].copy() | |
| if len(sub) < 30: | |
| results[regime] = {"status": "insufficient", "n": len(sub)} | |
| continue | |
| for c in feat_cols: | |
| if c not in sub.columns: sub[c] = 0.0 | |
| X = sub[feat_cols].fillna(0).values.astype(np.float32) | |
| y = sub["label"].values.astype(np.int32) | |
| preds = model.predict(X) | |
| probs = model.predict_proba(X) | |
| conf = np.max(probs, axis=1) | |
| acc = float(np.mean(preds == y)) | |
| sharpe = (float(np.mean(conf)) - 0.5) / (float(np.std(conf)) + 1e-9) * math.sqrt(len(conf)) | |
| non_flat = [(t, p) for t, p in zip(y, preds) if p != 0] | |
| win_rate = sum(1 for t, p in non_flat if t == p) / max(len(non_flat), 1) | |
| results[regime] = { | |
| "n": len(y), "accuracy": round(acc, 4), | |
| "sharpe": round(sharpe, 4), "win_rate": round(win_rate, 4), | |
| "n_signals": len(non_flat), | |
| } | |
| except Exception as e: | |
| results[regime] = {"status": "error", "error": str(e)} | |
| return results | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 4. BENCHMARK COMPARATOR | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class BenchmarkComparator: | |
| """Compare model vs Buy-Hold and vs Random.""" | |
| def compare(self, model, df: pd.DataFrame, feat_cols: list) -> dict: | |
| df_oos = df.iloc[int(len(df)*0.8):].copy() | |
| for c in feat_cols: | |
| if c not in df_oos.columns: df_oos[c] = 0.0 | |
| X = df_oos[feat_cols].fillna(0).values.astype(np.float32) | |
| y = df_oos["label"].values.astype(np.int32) | |
| try: | |
| preds = model.predict(X) | |
| except Exception: | |
| return {"status": "error"} | |
| model_acc = float(np.mean(preds == y)) | |
| random_acc = float(np.mean(np.random.choice([-1,0,1], size=len(y)) == y)) | |
| # Buy-hold baseline: always predict +1 | |
| buyhold_acc = float(np.mean(np.ones(len(y), dtype=int) == y)) | |
| edge_vs_random = model_acc - random_acc | |
| edge_vs_buyhold = model_acc - buyhold_acc | |
| # Signal quality | |
| non_flat = [(t, p) for t, p in zip(y, preds) if p != 0] | |
| signal_rate = len(non_flat) / max(len(preds), 1) | |
| win_rate = sum(1 for t, p in non_flat if t == p) / max(len(non_flat), 1) | |
| passes_edge = edge_vs_random >= GATE["min_edge_vs_random"] | |
| return { | |
| "model_acc": round(model_acc, 4), | |
| "random_acc": round(random_acc, 4), | |
| "buyhold_acc": round(buyhold_acc, 4), | |
| "edge_vs_random": round(edge_vs_random, 4), | |
| "edge_vs_buyhold": round(edge_vs_buyhold, 4), | |
| "signal_rate": round(signal_rate, 4), | |
| "win_rate": round(win_rate, 4), | |
| "passes_edge_gate": passes_edge, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 5. PAPER TRADING SIMULATOR | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class PaperTradingSimulator: | |
| """ | |
| Lightweight paper trading sim using OOS experience data. | |
| Simulates actual trade P&L with slippage and spread costs. | |
| No NautilusTrader dependency β runs on historical experience data. | |
| """ | |
| SLIPPAGE_PCT = 0.0002 # 0.02% slippage per side | |
| SPREAD_COST = 0.0001 # 0.01% spread cost | |
| MAX_POSITION = 1.0 # fraction of capital | |
| STOP_LOSS_PCT = 0.005 # 0.5% hard stop | |
| def simulate(self, model, df: pd.DataFrame, feat_cols: list, | |
| initial_capital: float = 10_000.0) -> dict: | |
| """ | |
| Walk through OOS data chronologically. | |
| Open/close trades based on model signals. | |
| Track P&L, drawdown, win rate. | |
| """ | |
| df_oos = df.sort_values("timestamp").iloc[int(len(df)*0.8):].copy() \ | |
| if "timestamp" in df.columns else df.iloc[int(len(df)*0.8):].copy() | |
| if len(df_oos) < 50: | |
| return {"status": "insufficient_oos_data"} | |
| for c in feat_cols: | |
| if c not in df_oos.columns: df_oos[c] = 0.0 | |
| X = df_oos[feat_cols].fillna(0).values.astype(np.float32) | |
| y = df_oos["label"].values.astype(np.int32) | |
| pnl = df_oos["actual_pnl"].values if "actual_pnl" in df_oos.columns else np.zeros(len(y)) | |
| try: | |
| preds = model.predict(X) | |
| probs = model.predict_proba(X) | |
| except Exception as e: | |
| return {"status": "predict_error", "error": str(e)} | |
| capital = initial_capital | |
| equity_curve = [capital] | |
| trades = [] | |
| in_trade = False | |
| entry_direction = 0 | |
| for i, (pred, prob, actual_pnl_pct) in enumerate(zip(preds, np.max(probs, axis=1), pnl)): | |
| if pred == 0: | |
| # No signal β if in trade, check stop | |
| if in_trade: | |
| current_ret = actual_pnl_pct * entry_direction | |
| if current_ret < -self.STOP_LOSS_PCT: | |
| # Hit stop loss | |
| net = current_ret - self.SLIPPAGE_PCT - self.SPREAD_COST | |
| capital *= (1 + net) | |
| trades.append({"type": "stop_loss", "ret": net, "capital": capital}) | |
| equity_curve.append(capital) | |
| in_trade = False | |
| continue | |
| if not in_trade: | |
| # Open new trade | |
| cost = self.SLIPPAGE_PCT + self.SPREAD_COST | |
| in_trade = True | |
| entry_direction = pred | |
| else: | |
| # Already in trade β if signal flips, close and reopen | |
| if pred != entry_direction: | |
| # Close current | |
| net = actual_pnl_pct * entry_direction - self.SLIPPAGE_PCT - self.SPREAD_COST | |
| capital *= (1 + net) | |
| trades.append({"type": "close_flip", "ret": net, "capital": capital}) | |
| equity_curve.append(capital) | |
| # Open new | |
| entry_direction = pred | |
| else: | |
| # Confirm existing trade | |
| net = actual_pnl_pct * entry_direction - self.SLIPPAGE_PCT | |
| capital *= (1 + net) | |
| trades.append({"type": "confirm", "ret": net, "capital": capital}) | |
| equity_curve.append(capital) | |
| if not trades: | |
| return {"status": "no_trades"} | |
| returns = [t["ret"] for t in trades] | |
| winning = [r for r in returns if r > 0] | |
| losing = [r for r in returns if r < 0] | |
| total_ret = (capital - initial_capital) / initial_capital | |
| sharpe_sim = _sharpe(returns) | |
| max_dd = _max_drawdown(equity_curve) | |
| win_rate_sim = len(winning) / max(len(returns), 1) | |
| profit_factor = abs(sum(winning)) / max(abs(sum(losing)), 1e-9) | |
| passed = (sharpe_sim >= GATE["min_sharpe_oos"] | |
| and win_rate_sim >= GATE["min_win_rate"] | |
| and max_dd >= GATE["max_drawdown"]) | |
| return { | |
| "status": "pass" if passed else "fail", | |
| "initial_capital":initial_capital, | |
| "final_capital": round(capital, 2), | |
| "total_return": round(total_ret, 4), | |
| "sharpe": round(sharpe_sim, 4), | |
| "max_drawdown": round(max_dd, 4), | |
| "win_rate": round(win_rate_sim, 4), | |
| "profit_factor": round(profit_factor, 4), | |
| "n_trades": len(trades), | |
| "n_wins": len(winning), | |
| "n_losses": len(losing), | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 6. MASTER VALIDATOR β ties everything together | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class MasterValidator: | |
| """ | |
| Run full validation suite on a model. | |
| Publishes results to HF. Updates model_registry. | |
| """ | |
| def __init__(self): | |
| self.per_expert = PerExpertValidator() | |
| self.walk_fwd = WalkForwardValidator() | |
| self.regime = RegimeValidator() | |
| self.benchmark = BenchmarkComparator() | |
| self.paper = PaperTradingSimulator() | |
| def run_full_validation(self, model, df_oos: pd.DataFrame, | |
| model_name: str, feat_cols: list, | |
| expert_id: str = "E1", | |
| strategy: str = "general") -> dict: | |
| log.info(f"π§ͺ Validating {model_name} ({len(df_oos):,} OOS rows)") | |
| ts = datetime.now(timezone.utc).isoformat() | |
| report = {"model": model_name, "validated_at": ts, "strategy": strategy} | |
| report["per_expert"] = self.per_expert.validate_expert(model, df_oos, expert_id, strategy) | |
| report["walk_forward"] = self.walk_fwd.validate(model, df_oos, feat_cols) | |
| report["regime"] = self.regime.validate(model, df_oos, feat_cols) | |
| report["benchmark"] = self.benchmark.compare(model, df_oos, feat_cols) | |
| report["paper_trading"] = self.paper.simulate(model, df_oos, feat_cols) | |
| # Overall promotion decision | |
| wf = report["walk_forward"] | |
| bm = report["benchmark"] | |
| pt = report["paper_trading"] | |
| pe = report["per_expert"] | |
| promotion_checks = { | |
| "wf_pass": wf.get("status") == "pass", | |
| "edge_vs_random": bm.get("passes_edge_gate", False), | |
| "paper_pass": pt.get("status") == "pass", | |
| "expert_sharpe": pe.get("sharpe", 0) >= GATE["min_sharpe_oos"], | |
| } | |
| promote = sum(promotion_checks.values()) >= 3 # need 3/4 checks | |
| report["promotion"] = {"promote": promote, "checks": promotion_checks} | |
| log.info(f" {'β PROMOTE' if promote else 'β REJECT'} {model_name} | " | |
| f"WF={wf.get('avg_sharpe','?')} PT={pt.get('sharpe','?')}") | |
| self._save_report(model_name, report) | |
| return report | |
| def _save_report(self, model_name: str, report: dict): | |
| path = f"validation_{model_name.replace('/','_')}.json" | |
| with open(path, "w") as f: | |
| json.dump(report, f, indent=2) | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi(token=HF_TOKEN) | |
| ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M") | |
| api.upload_file( | |
| path_or_fileobj=path, token=HF_TOKEN, | |
| path_in_repo=f"validation/{model_name}_{ts}.json", | |
| repo_id=EXPERIENCE_REPO, repo_type="dataset", | |
| commit_message=f"Validation: {model_name}", | |
| ) | |
| except Exception as e: | |
| log.warning(f"Cannot push report: {e}") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Run loop (called from forge_controller or training_engine) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def validate_after_training(model, df_oos: pd.DataFrame, | |
| model_name: str, feat_cols: list, | |
| expert_id: str = "E1", | |
| strategy: str = "general") -> bool: | |
| """Returns True if model should be promoted.""" | |
| mv = MasterValidator() | |
| report = mv.run_full_validation(model, df_oos, model_name, feat_cols, expert_id, strategy) | |
| return report["promotion"]["promote"] | |
| if __name__ == "__main__": | |
| logging.basicConfig(level=logging.INFO) | |
| # Quick self-test with synthetic data | |
| from sklearn.ensemble import GradientBoostingClassifier | |
| import numpy as np | |
| np.random.seed(42) | |
| n = 1000 | |
| feat_cols = ["obi","vpin","adx","rsi","zscore_60","funding"] | |
| df_test = pd.DataFrame({c: np.random.randn(n) for c in feat_cols}) | |
| df_test["label"] = np.random.choice([-1,0,1], n) | |
| df_test["actual_pnl"] = np.random.uniform(-0.01, 0.01, n) | |
| df_test["timestamp"] = np.arange(n) | |
| model = GradientBoostingClassifier(n_estimators=30, random_state=42) | |
| X = df_test[feat_cols].values; y = df_test["label"].values | |
| model.fit(X, y) | |
| mv = MasterValidator() | |
| report = mv.run_full_validation(model, df_test, "test_model", feat_cols, "E1", "test") | |
| print(json.dumps({ | |
| "wf_status": report["walk_forward"]["status"], | |
| "wf_sharpe": report["walk_forward"].get("avg_sharpe"), | |
| "pt_status": report["paper_trading"]["status"], | |
| "pt_sharpe": report["paper_trading"].get("sharpe"), | |
| "promote": report["promotion"]["promote"], | |
| }, indent=2)) | |