HPLL-DataReview / validation_layer.py
gionuibk's picture
T5: Full validation layer - WF, regime, benchmark, paper trading
0b8c22e verified
"""
T5 β€” Validation Layer (Standalone)
Runs after every training cycle. Results saved to HF.
1. Per-expert validation (E1..E5 individually)
2. Walk-forward OOS (5 rolling windows, no lookahead)
3. Regime-specific tests (TRENDING / RANGING / VOLATILE)
4. Benchmark: vs Buy-Hold, vs Random
5. Paper Trading Sim (lightweight, slippage-aware)
6. Promotion gate: model only keeps if Sharpe > threshold
"""
from __future__ import annotations
import os, json, math, logging, time
from datetime import datetime, timezone
from typing import Any
import numpy as np
import pandas as pd
log = logging.getLogger("validation_layer")
HF_TOKEN = os.environ.get("HF_TOKEN", "")
EXPERIENCE_REPO = os.environ.get("EXPERIENCE_REPO", "gionuibk/aetheris-experiences")
MODEL_REPO = os.environ.get("MODEL_REPO", "gionuibk/aetheris-models")
# Promotion gates
GATE = {
"min_sharpe_oos": 0.80,
"min_win_rate": 0.52,
"max_drawdown": -0.15,
"min_edge_vs_random": 0.03,
"min_folds_pass": 3, # must pass at least 3/5 OOS windows
}
# ─────────────────────────────────────────────────────────────────
# UTILS
# ─────────────────────────────────────────────────────────────────
def _sharpe(returns: list[float]) -> float:
if len(returns) < 2: return 0.0
arr = np.array(returns)
return float(np.mean(arr) / (np.std(arr) + 1e-9) * math.sqrt(len(arr)))
def _max_drawdown(equity_curve: list[float]) -> float:
if not equity_curve: return 0.0
peak = equity_curve[0]
max_dd = 0.0
for v in equity_curve:
if v > peak: peak = v
dd = (v - peak) / (peak + 1e-9)
if dd < max_dd: max_dd = dd
return round(max_dd, 4)
# ─────────────────────────────────────────────────────────────────
# 1. PER-EXPERT VALIDATOR
# ─────────────────────────────────────────────────────────────────
class PerExpertValidator:
"""Test each Expert model on its own held-out data."""
EXPERT_FEATURE_MAP = {
"E1": ["obi","spread_pct","vpin","entropy","cvd_norm","absorption",
"tape_speed","large_trade","vol_adjusted_spread","poc_proximity","vol_skew"],
"E2": ["ema_cross_fs","ema_cross_sl","adx","plus_di","minus_di",
"momentum_zscore","trend_strength","vol_surge","macd_hist","macd_line"],
"E3": ["zscore_60","zscore_300","rsi","hurst","bb_pct_b","bb_width",
"vwap_dev","mean_rev_signal"],
"E4": ["market_structure","bos","wyckoff_phase","fib_618_prox",
"fvg_score","ob_score","pin_bar","engulfing","inside_bar",
"pivot_dist_h","pivot_dist_l"],
"E5": ["funding","funding_trend","oi_delta","ls_ratio",
"session_hour","session_asian","session_london","session_ny",
"session_overlap","day_of_week"],
}
def validate_expert(self, model, df: pd.DataFrame,
expert_id: str, strategy: str = "general") -> dict:
feat_cols = self.EXPERT_FEATURE_MAP.get(expert_id, [])
available = [c for c in feat_cols if c in df.columns]
if not available:
return {"status": "no_features", "expert": expert_id}
# Held-out: last 20%
n = len(df)
cutoff = int(n * 0.8)
df_oos = df.iloc[cutoff:].copy()
for c in available:
if c not in df_oos.columns: df_oos[c] = 0.0
X = df_oos[available].fillna(0).values.astype(np.float32)
y = df_oos["label"].values.astype(np.int32)
try:
preds = model.predict(X)
probs = model.predict_proba(X)
conf = np.max(probs, axis=1)
except Exception as e:
return {"status": "predict_error", "error": str(e)}
acc = float(np.mean(preds == y))
sharpe_oos = (float(np.mean(conf)) - 0.5) / (float(np.std(conf)) + 1e-9) * math.sqrt(len(conf))
non_flat = [(t, p) for t, p in zip(y, preds) if p != 0]
win_rate = sum(1 for t, p in non_flat if t == p) / max(len(non_flat), 1)
return {
"expert": expert_id,
"strategy": strategy,
"status": "ok",
"n_oos": len(y),
"accuracy": round(acc, 4),
"sharpe": round(sharpe_oos, 4),
"win_rate": round(win_rate, 4),
"n_signals": len(non_flat),
}
# ─────────────────────────────────────────────────────────────────
# 2. WALK-FORWARD VALIDATOR
# ─────────────────────────────────────────────────────────────────
class WalkForwardValidator:
"""5-fold rolling OOS β€” no lookahead bias."""
N_FOLDS = 5
def validate(self, model, df: pd.DataFrame, feat_cols: list) -> dict:
# Sort by time
df = df.sort_values("timestamp").reset_index(drop=True) if "timestamp" in df.columns else df.reset_index(drop=True)
n = len(df)
if n < 200:
return {"status": "insufficient", "n": n}
window = n // (self.N_FOLDS + 1)
fold_results = []
for i in range(self.N_FOLDS):
train_end = window * (i + 1)
test_start = train_end
test_end = min(test_start + window, n)
if test_end - test_start < 30:
continue
df_train = df.iloc[:train_end]
df_test = df.iloc[test_start:test_end]
for c in feat_cols:
if c not in df_train.columns: df_train = df_train.copy(); df_train[c] = 0.0
if c not in df_test.columns: df_test = df_test.copy(); df_test[c] = 0.0
X_tr = df_train[feat_cols].fillna(0).values.astype(np.float32)
y_tr = df_train["label"].values.astype(np.int32)
X_te = df_test[feat_cols].fillna(0).values.astype(np.float32)
y_te = df_test["label"].values.astype(np.int32)
try:
model.fit(X_tr, y_tr)
preds = model.predict(X_te)
probs = model.predict_proba(X_te)
conf = np.max(probs, axis=1)
acc = float(np.mean(preds == y_te))
sharpe = (float(np.mean(conf)) - 0.5) / (float(np.std(conf)) + 1e-9) * math.sqrt(len(conf))
non_flat = [(t, p) for t, p in zip(y_te, preds) if p != 0]
win_rate = sum(1 for t, p in non_flat if t == p) / max(len(non_flat), 1)
passed = sharpe >= GATE["min_sharpe_oos"] and win_rate >= GATE["min_win_rate"]
fold_results.append({
"fold": i, "n_train": len(X_tr), "n_test": len(X_te),
"acc": round(acc, 4), "sharpe": round(sharpe, 4),
"win_rate": round(win_rate, 4), "passed": passed,
})
except Exception as e:
log.warning(f"WF fold {i}: {e}")
if not fold_results:
return {"status": "no_folds"}
n_pass = sum(1 for f in fold_results if f["passed"])
avg_sh = sum(f["sharpe"] for f in fold_results) / len(fold_results)
avg_wr = sum(f["win_rate"] for f in fold_results) / len(fold_results)
overall_pass = n_pass >= GATE["min_folds_pass"]
return {
"status": "pass" if overall_pass else "fail",
"n_folds": len(fold_results),
"folds_passed": n_pass,
"avg_sharpe": round(avg_sh, 4),
"avg_win_rate": round(avg_wr, 4),
"fold_details": fold_results,
}
# ─────────────────────────────────────────────────────────────────
# 3. REGIME-SPECIFIC VALIDATOR
# ─────────────────────────────────────────────────────────────────
class RegimeValidator:
"""Test model separately in each market regime."""
REGIMES = {
"TRENDING": lambda df: df["adx"] > 25 if "adx" in df.columns else pd.Series([True]*len(df)),
"RANGING": lambda df: df["adx"] < 20 if "adx" in df.columns else pd.Series([False]*len(df)),
"VOLATILE": lambda df: df["vol_surge"] > 2.0 if "vol_surge" in df.columns else pd.Series([False]*len(df)),
"QUIET": lambda df: df["vol_surge"] < 0.7 if "vol_surge" in df.columns else pd.Series([False]*len(df)),
}
def validate(self, model, df: pd.DataFrame, feat_cols: list) -> dict:
results = {}
for regime, mask_fn in self.REGIMES.items():
try:
mask = mask_fn(df)
sub = df[mask].copy()
if len(sub) < 30:
results[regime] = {"status": "insufficient", "n": len(sub)}
continue
for c in feat_cols:
if c not in sub.columns: sub[c] = 0.0
X = sub[feat_cols].fillna(0).values.astype(np.float32)
y = sub["label"].values.astype(np.int32)
preds = model.predict(X)
probs = model.predict_proba(X)
conf = np.max(probs, axis=1)
acc = float(np.mean(preds == y))
sharpe = (float(np.mean(conf)) - 0.5) / (float(np.std(conf)) + 1e-9) * math.sqrt(len(conf))
non_flat = [(t, p) for t, p in zip(y, preds) if p != 0]
win_rate = sum(1 for t, p in non_flat if t == p) / max(len(non_flat), 1)
results[regime] = {
"n": len(y), "accuracy": round(acc, 4),
"sharpe": round(sharpe, 4), "win_rate": round(win_rate, 4),
"n_signals": len(non_flat),
}
except Exception as e:
results[regime] = {"status": "error", "error": str(e)}
return results
# ─────────────────────────────────────────────────────────────────
# 4. BENCHMARK COMPARATOR
# ─────────────────────────────────────────────────────────────────
class BenchmarkComparator:
"""Compare model vs Buy-Hold and vs Random."""
def compare(self, model, df: pd.DataFrame, feat_cols: list) -> dict:
df_oos = df.iloc[int(len(df)*0.8):].copy()
for c in feat_cols:
if c not in df_oos.columns: df_oos[c] = 0.0
X = df_oos[feat_cols].fillna(0).values.astype(np.float32)
y = df_oos["label"].values.astype(np.int32)
try:
preds = model.predict(X)
except Exception:
return {"status": "error"}
model_acc = float(np.mean(preds == y))
random_acc = float(np.mean(np.random.choice([-1,0,1], size=len(y)) == y))
# Buy-hold baseline: always predict +1
buyhold_acc = float(np.mean(np.ones(len(y), dtype=int) == y))
edge_vs_random = model_acc - random_acc
edge_vs_buyhold = model_acc - buyhold_acc
# Signal quality
non_flat = [(t, p) for t, p in zip(y, preds) if p != 0]
signal_rate = len(non_flat) / max(len(preds), 1)
win_rate = sum(1 for t, p in non_flat if t == p) / max(len(non_flat), 1)
passes_edge = edge_vs_random >= GATE["min_edge_vs_random"]
return {
"model_acc": round(model_acc, 4),
"random_acc": round(random_acc, 4),
"buyhold_acc": round(buyhold_acc, 4),
"edge_vs_random": round(edge_vs_random, 4),
"edge_vs_buyhold": round(edge_vs_buyhold, 4),
"signal_rate": round(signal_rate, 4),
"win_rate": round(win_rate, 4),
"passes_edge_gate": passes_edge,
}
# ─────────────────────────────────────────────────────────────────
# 5. PAPER TRADING SIMULATOR
# ─────────────────────────────────────────────────────────────────
class PaperTradingSimulator:
"""
Lightweight paper trading sim using OOS experience data.
Simulates actual trade P&L with slippage and spread costs.
No NautilusTrader dependency β€” runs on historical experience data.
"""
SLIPPAGE_PCT = 0.0002 # 0.02% slippage per side
SPREAD_COST = 0.0001 # 0.01% spread cost
MAX_POSITION = 1.0 # fraction of capital
STOP_LOSS_PCT = 0.005 # 0.5% hard stop
def simulate(self, model, df: pd.DataFrame, feat_cols: list,
initial_capital: float = 10_000.0) -> dict:
"""
Walk through OOS data chronologically.
Open/close trades based on model signals.
Track P&L, drawdown, win rate.
"""
df_oos = df.sort_values("timestamp").iloc[int(len(df)*0.8):].copy() \
if "timestamp" in df.columns else df.iloc[int(len(df)*0.8):].copy()
if len(df_oos) < 50:
return {"status": "insufficient_oos_data"}
for c in feat_cols:
if c not in df_oos.columns: df_oos[c] = 0.0
X = df_oos[feat_cols].fillna(0).values.astype(np.float32)
y = df_oos["label"].values.astype(np.int32)
pnl = df_oos["actual_pnl"].values if "actual_pnl" in df_oos.columns else np.zeros(len(y))
try:
preds = model.predict(X)
probs = model.predict_proba(X)
except Exception as e:
return {"status": "predict_error", "error": str(e)}
capital = initial_capital
equity_curve = [capital]
trades = []
in_trade = False
entry_direction = 0
for i, (pred, prob, actual_pnl_pct) in enumerate(zip(preds, np.max(probs, axis=1), pnl)):
if pred == 0:
# No signal β€” if in trade, check stop
if in_trade:
current_ret = actual_pnl_pct * entry_direction
if current_ret < -self.STOP_LOSS_PCT:
# Hit stop loss
net = current_ret - self.SLIPPAGE_PCT - self.SPREAD_COST
capital *= (1 + net)
trades.append({"type": "stop_loss", "ret": net, "capital": capital})
equity_curve.append(capital)
in_trade = False
continue
if not in_trade:
# Open new trade
cost = self.SLIPPAGE_PCT + self.SPREAD_COST
in_trade = True
entry_direction = pred
else:
# Already in trade β€” if signal flips, close and reopen
if pred != entry_direction:
# Close current
net = actual_pnl_pct * entry_direction - self.SLIPPAGE_PCT - self.SPREAD_COST
capital *= (1 + net)
trades.append({"type": "close_flip", "ret": net, "capital": capital})
equity_curve.append(capital)
# Open new
entry_direction = pred
else:
# Confirm existing trade
net = actual_pnl_pct * entry_direction - self.SLIPPAGE_PCT
capital *= (1 + net)
trades.append({"type": "confirm", "ret": net, "capital": capital})
equity_curve.append(capital)
if not trades:
return {"status": "no_trades"}
returns = [t["ret"] for t in trades]
winning = [r for r in returns if r > 0]
losing = [r for r in returns if r < 0]
total_ret = (capital - initial_capital) / initial_capital
sharpe_sim = _sharpe(returns)
max_dd = _max_drawdown(equity_curve)
win_rate_sim = len(winning) / max(len(returns), 1)
profit_factor = abs(sum(winning)) / max(abs(sum(losing)), 1e-9)
passed = (sharpe_sim >= GATE["min_sharpe_oos"]
and win_rate_sim >= GATE["min_win_rate"]
and max_dd >= GATE["max_drawdown"])
return {
"status": "pass" if passed else "fail",
"initial_capital":initial_capital,
"final_capital": round(capital, 2),
"total_return": round(total_ret, 4),
"sharpe": round(sharpe_sim, 4),
"max_drawdown": round(max_dd, 4),
"win_rate": round(win_rate_sim, 4),
"profit_factor": round(profit_factor, 4),
"n_trades": len(trades),
"n_wins": len(winning),
"n_losses": len(losing),
}
# ─────────────────────────────────────────────────────────────────
# 6. MASTER VALIDATOR β€” ties everything together
# ─────────────────────────────────────────────────────────────────
class MasterValidator:
"""
Run full validation suite on a model.
Publishes results to HF. Updates model_registry.
"""
def __init__(self):
self.per_expert = PerExpertValidator()
self.walk_fwd = WalkForwardValidator()
self.regime = RegimeValidator()
self.benchmark = BenchmarkComparator()
self.paper = PaperTradingSimulator()
def run_full_validation(self, model, df_oos: pd.DataFrame,
model_name: str, feat_cols: list,
expert_id: str = "E1",
strategy: str = "general") -> dict:
log.info(f"πŸ§ͺ Validating {model_name} ({len(df_oos):,} OOS rows)")
ts = datetime.now(timezone.utc).isoformat()
report = {"model": model_name, "validated_at": ts, "strategy": strategy}
report["per_expert"] = self.per_expert.validate_expert(model, df_oos, expert_id, strategy)
report["walk_forward"] = self.walk_fwd.validate(model, df_oos, feat_cols)
report["regime"] = self.regime.validate(model, df_oos, feat_cols)
report["benchmark"] = self.benchmark.compare(model, df_oos, feat_cols)
report["paper_trading"] = self.paper.simulate(model, df_oos, feat_cols)
# Overall promotion decision
wf = report["walk_forward"]
bm = report["benchmark"]
pt = report["paper_trading"]
pe = report["per_expert"]
promotion_checks = {
"wf_pass": wf.get("status") == "pass",
"edge_vs_random": bm.get("passes_edge_gate", False),
"paper_pass": pt.get("status") == "pass",
"expert_sharpe": pe.get("sharpe", 0) >= GATE["min_sharpe_oos"],
}
promote = sum(promotion_checks.values()) >= 3 # need 3/4 checks
report["promotion"] = {"promote": promote, "checks": promotion_checks}
log.info(f" {'βœ… PROMOTE' if promote else '❌ REJECT'} {model_name} | "
f"WF={wf.get('avg_sharpe','?')} PT={pt.get('sharpe','?')}")
self._save_report(model_name, report)
return report
def _save_report(self, model_name: str, report: dict):
path = f"validation_{model_name.replace('/','_')}.json"
with open(path, "w") as f:
json.dump(report, f, indent=2)
try:
from huggingface_hub import HfApi
api = HfApi(token=HF_TOKEN)
ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M")
api.upload_file(
path_or_fileobj=path, token=HF_TOKEN,
path_in_repo=f"validation/{model_name}_{ts}.json",
repo_id=EXPERIENCE_REPO, repo_type="dataset",
commit_message=f"Validation: {model_name}",
)
except Exception as e:
log.warning(f"Cannot push report: {e}")
# ─────────────────────────────────────────────────────────────────
# Run loop (called from forge_controller or training_engine)
# ─────────────────────────────────────────────────────────────────
def validate_after_training(model, df_oos: pd.DataFrame,
model_name: str, feat_cols: list,
expert_id: str = "E1",
strategy: str = "general") -> bool:
"""Returns True if model should be promoted."""
mv = MasterValidator()
report = mv.run_full_validation(model, df_oos, model_name, feat_cols, expert_id, strategy)
return report["promotion"]["promote"]
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Quick self-test with synthetic data
from sklearn.ensemble import GradientBoostingClassifier
import numpy as np
np.random.seed(42)
n = 1000
feat_cols = ["obi","vpin","adx","rsi","zscore_60","funding"]
df_test = pd.DataFrame({c: np.random.randn(n) for c in feat_cols})
df_test["label"] = np.random.choice([-1,0,1], n)
df_test["actual_pnl"] = np.random.uniform(-0.01, 0.01, n)
df_test["timestamp"] = np.arange(n)
model = GradientBoostingClassifier(n_estimators=30, random_state=42)
X = df_test[feat_cols].values; y = df_test["label"].values
model.fit(X, y)
mv = MasterValidator()
report = mv.run_full_validation(model, df_test, "test_model", feat_cols, "E1", "test")
print(json.dumps({
"wf_status": report["walk_forward"]["status"],
"wf_sharpe": report["walk_forward"].get("avg_sharpe"),
"pt_status": report["paper_trading"]["status"],
"pt_sharpe": report["paper_trading"].get("sharpe"),
"promote": report["promotion"]["promote"],
}, indent=2))