model-tester / src /evaluator.py
Arkm20's picture
Create evaluator.py
aee9ef8 verified
"""
evaluator.py — Multi-dimensional model scoring engine.
Scores a model bundle across 6 dimensions and produces a letter grade.
"""
import logging
from dataclasses import dataclass, field
from typing import Callable, Optional
import numpy as np
import pandas as pd
from src.features import build_features, construct_labels, compute_confluence
from src.data_loader import extract_market_series
from src.registry import ArtifactBundle, predict_proba
logger = logging.getLogger("SniperEval")
GRADE_THRESHOLDS = [
(95, "A+"), (90, "A"), (85, "A-"),
(80, "B+"), (75, "B"), (70, "B-"),
(65, "C+"), (60, "C"), (55, "C-"),
(50, "D+"), (45, "D"), (0, "F"),
]
DIMENSION_WEIGHTS = {
"discrimination": 0.20,
"feature_health": 0.20,
"signal_stability": 0.15,
"calibration": 0.15,
"regime_robustness": 0.15,
"asymmetry": 0.15,
}
# ---------------------------------------------------------------------------
# Result containers
# ---------------------------------------------------------------------------
@dataclass
class DimensionResult:
name: str
score: float # 0–100
weight: float
details: dict = field(default_factory=dict)
flags: list = field(default_factory=list) # warning strings
@dataclass
class EvalResult:
overall_score: float
grade: str
dimensions: list # list[DimensionResult]
oof_proba: np.ndarray
oof_labels: np.ndarray
feature_psi: pd.DataFrame
reliability_bins: dict
regime_scores: dict
n_samples: int
n_positives: int
eval_date_range: tuple
warnings: list = field(default_factory=list)
@property
def dimension_dict(self) -> dict:
return {d.name: d for d in self.dimensions}
def score_to_grade(score: float) -> str:
for threshold, grade in GRADE_THRESHOLDS:
if score >= threshold:
return grade
return "F"
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
def run_evaluation(
ticker_data: dict[str, pd.DataFrame],
bundle: ArtifactBundle,
pt_multiplier: float = 3.0,
sl_multiplier: float = 0.5,
atr_period: int = 20,
horizon: int = 15,
dimension_weights: dict = None,
progress_cb: Callable = None,
) -> EvalResult:
def _cb(msg, frac=None):
if progress_cb:
progress_cb(msg, frac)
logger.info(msg)
weights = dimension_weights or DIMENSION_WEIGHTS
vix_data, sp500_data = extract_market_series(ticker_data)
feature_list = bundle.feature_list
process_tickers = [t for t in ticker_data if not t.startswith("^")]
# -----------------------------------------------------------------------
# 1. Build features + labels for all tickers
# -----------------------------------------------------------------------
_cb("Building features and labels for evaluation dataset...", 0.38)
all_feats, all_labels, all_probas, all_dates = [], [], [], []
raw_feat_frames = [] # for PSI computation (unfiltered)
for i, ticker in enumerate(process_tickers):
if i % 50 == 0:
_cb(f"Processing {ticker} ({i+1}/{len(process_tickers)})...",
0.38 + 0.25 * i / max(1, len(process_tickers)))
df = ticker_data[ticker]
try:
feat = build_features(df, vix_data=vix_data, sp500_data=sp500_data)
labels, _ = construct_labels(
df, pt_multiplier=pt_multiplier, sl_multiplier=sl_multiplier,
atr_period=atr_period, horizon=horizon,
)
except Exception as e:
logger.warning(f"Feature/label build failed for {ticker}: {e}")
continue
combined = pd.concat([feat, labels.rename("label")], axis=1)
combined = combined[combined["label"] >= 0].dropna(subset=feat.columns.tolist(), how="any")
if len(combined) < 30:
continue
raw_feat_frames.append(combined[feat.columns])
if feature_list:
missing = [f for f in feature_list if f not in feat.columns]
for m in missing:
feat[m] = 0.0
feat_aligned = combined[feature_list] if all(f in combined.columns for f in feature_list) else combined[feat.columns]
else:
feat_aligned = combined[feat.columns]
feat_clean = feat_aligned.fillna(0).replace([float("inf"), float("-inf")], 0)
try:
probas = predict_proba(
bundle, feat_clean,
use_regime=bundle.has_regime_models,
sp500_above_sma=(sp500_data is not None),
vix_high=False,
)
except Exception as e:
logger.warning(f"Prediction failed for {ticker}: {e}")
continue
valid_rows = combined[combined["label"] >= 0]
all_feats.append(feat_clean.values)
all_labels.append(combined["label"].values)
all_probas.append(probas)
all_dates.extend(feat_clean.index.tolist())
if not all_labels:
raise RuntimeError("No valid data produced for evaluation.")
X_all = np.vstack(all_feats)
y_all = np.concatenate(all_labels)
p_all = np.concatenate(all_probas)
dates_all = np.array(all_dates)
n_samples = len(y_all)
n_positives = int(y_all.sum())
_cb(f"Dataset ready: {n_samples:,} samples, {n_positives} positives ({n_positives/n_samples:.1%} rate)", 0.64)
# -----------------------------------------------------------------------
# 2. Score each dimension
# -----------------------------------------------------------------------
dimension_results = []
# --- Dimension 1: Discrimination ---
_cb("Scoring: Discrimination...", 0.65)
dim_disc = _score_discrimination(p_all, y_all)
dimension_results.append(dim_disc)
# --- Dimension 2: Feature health ---
_cb("Scoring: Feature health (PSI)...", 0.68)
feat_df_all = pd.concat(raw_feat_frames, ignore_index=True) if raw_feat_frames else pd.DataFrame()
feature_cols = feature_list if feature_list else (list(feat_df_all.columns) if not feat_df_all.empty else [])
dim_feat, feat_psi_df = _score_feature_health(feat_df_all, feature_cols)
dimension_results.append(dim_feat)
# --- Dimension 3: Signal stability ---
_cb("Scoring: Signal stability...", 0.72)
dim_stab = _score_signal_stability(p_all, dates_all, y_all)
dimension_results.append(dim_stab)
# --- Dimension 4: Calibration ---
_cb("Scoring: Calibration (ECE)...", 0.76)
dim_cal, rel_bins = _score_calibration(p_all, y_all)
dimension_results.append(dim_cal)
# --- Dimension 5: Regime robustness ---
_cb("Scoring: Regime robustness...", 0.80)
dim_reg, regime_scores = _score_regime_robustness(
p_all, y_all, dates_all, sp500_data, vix_data
)
dimension_results.append(dim_reg)
# --- Dimension 6: Asymmetry capture ---
_cb("Scoring: Asymmetry capture...", 0.85)
dim_asym = _score_asymmetry(p_all, y_all, pt_multiplier, sl_multiplier)
dimension_results.append(dim_asym)
# -----------------------------------------------------------------------
# 3. Weighted overall score
# -----------------------------------------------------------------------
total_weight = sum(weights.get(d.name, d.weight) for d in dimension_results)
overall = sum(
d.score * weights.get(d.name, d.weight) for d in dimension_results
) / max(total_weight, 1e-9)
grade = score_to_grade(overall)
_cb(f"Evaluation complete. Score: {overall:.1f} ({grade})", 0.95)
date_range = (str(min(dates_all))[:10], str(max(dates_all))[:10]) if len(dates_all) > 0 else ("", "")
return EvalResult(
overall_score=round(overall, 2),
grade=grade,
dimensions=dimension_results,
oof_proba=p_all,
oof_labels=y_all,
feature_psi=feat_psi_df,
reliability_bins=rel_bins,
regime_scores=regime_scores,
n_samples=n_samples,
n_positives=n_positives,
eval_date_range=date_range,
)
# ---------------------------------------------------------------------------
# Dimension scorers
# ---------------------------------------------------------------------------
def _score_discrimination(probas: np.ndarray, labels: np.ndarray) -> DimensionResult:
from sklearn.metrics import roc_auc_score, average_precision_score
details = {}
flags = []
try:
auc = roc_auc_score(labels, probas)
except Exception:
auc = 0.5
try:
ap = average_precision_score(labels, probas)
except Exception:
ap = float(labels.mean())
# Precision at top K%
prec_at = {}
for rate in [0.01, 0.03, 0.05, 0.10]:
k = max(1, int(len(probas) * rate))
thresh = np.sort(probas)[-k]
picks = probas >= thresh
prec = float(labels[picks].mean()) if picks.sum() > 0 else 0.0
prec_at[f"prec_at_{int(rate*100)}pct"] = round(prec, 4)
details = {"auc": round(auc, 4), "ap": round(ap, 4), **prec_at}
# Baseline positive rate
base_rate = float(labels.mean())
lift_at3 = prec_at.get("prec_at_3pct", base_rate) / max(base_rate, 1e-6)
# Score: weight AUC and lift
auc_score = max(0, (auc - 0.5) / 0.5) * 100 # 0.5 → 0, 1.0 → 100
lift_score = min(100, max(0, (lift_at3 - 1.0) / 4.0 * 100)) # 1× → 0, 5× → 100
ap_norm = min(100, max(0, (ap - base_rate) / max(1 - base_rate, 0.01) * 100))
score = 0.40 * auc_score + 0.35 * lift_score + 0.25 * ap_norm
if auc < 0.55:
flags.append("AUC near random — model lacks discrimination power")
if lift_at3 < 1.5:
flags.append("Lift at top 3% below 1.5× — precision advantage is weak")
return DimensionResult(
name="discrimination", score=round(score, 2), weight=0.20,
details=details, flags=flags
)
def _score_feature_health(feat_df: pd.DataFrame, feature_cols: list) -> tuple:
"""PSI and NaN/inf rates per feature. Returns (DimensionResult, psi_df)."""
if feat_df.empty or not feature_cols:
empty_psi = pd.DataFrame(columns=["Feature", "NaN Rate", "Inf Rate", "PSI", "Status"])
return DimensionResult(name="feature_health", score=50.0, weight=0.20,
details={"note": "no feature data"}, flags=[]), empty_psi
n = len(feat_df)
rows = []
problem_count = 0
for col in feature_cols:
if col not in feat_df.columns:
rows.append({"Feature": col, "NaN Rate": 1.0, "Inf Rate": 0.0, "PSI": 1.0, "Status": "🔴 Missing"})
problem_count += 1
continue
series = feat_df[col]
nan_rate = float(series.isna().mean())
inf_rate = float(np.isinf(series.replace([None], np.nan).fillna(0)).mean())
# PSI: split first 70% vs last 30% as proxy for train vs eval drift
split = int(n * 0.7)
psi = _compute_psi(series.iloc[:split], series.iloc[split:])
if psi > 0.2 or nan_rate > 0.15:
status = "🔴 Drift"
problem_count += 1
elif psi > 0.1 or nan_rate > 0.05:
status = "🟡 Watch"
else:
status = "🟢 OK"
rows.append({
"Feature": col, "NaN Rate": round(nan_rate, 4),
"Inf Rate": round(inf_rate, 4), "PSI": round(psi, 4),
"Status": status,
})
psi_df = pd.DataFrame(rows).sort_values("PSI", ascending=False).reset_index(drop=True)
red_count = (psi_df["Status"] == "🔴 Drift").sum()
yellow_count = (psi_df["Status"] == "🟡 Watch").sum()
total_feats = len(feature_cols)
score = 100 - (red_count / max(total_feats, 1)) * 70 - (yellow_count / max(total_feats, 1)) * 20
score = max(0.0, min(100.0, score))
flags = []
if red_count > 0:
top_drifters = psi_df[psi_df["Status"] == "🔴 Drift"]["Feature"].head(3).tolist()
flags.append(f"{red_count} feature(s) show significant drift: {', '.join(top_drifters)}")
if yellow_count > 5:
flags.append(f"{yellow_count} features showing moderate drift — monitor closely")
return DimensionResult(
name="feature_health", score=round(score, 2), weight=0.20,
details={"red_features": int(red_count), "yellow_features": int(yellow_count),
"total_features": total_feats},
flags=flags
), psi_df
def _compute_psi(expected: pd.Series, actual: pd.Series, n_bins: int = 10) -> float:
"""Population Stability Index between two distributions."""
try:
combined = pd.concat([expected, actual]).dropna().replace([float("inf"), float("-inf")], np.nan).dropna()
if len(combined) < 20:
return 0.0
bins = np.percentile(combined, np.linspace(0, 100, n_bins + 1))
bins = np.unique(bins)
if len(bins) < 3:
return 0.0
exp_counts = np.histogram(expected.dropna(), bins=bins)[0] + 1e-6
act_counts = np.histogram(actual.dropna(), bins=bins)[0] + 1e-6
exp_pct = exp_counts / exp_counts.sum()
act_pct = act_counts / act_counts.sum()
psi = np.sum((act_pct - exp_pct) * np.log(act_pct / exp_pct))
return float(max(0.0, psi))
except Exception:
return 0.0
def _score_signal_stability(probas: np.ndarray, dates: np.ndarray, labels: np.ndarray) -> DimensionResult:
"""
Measures day-over-day score variance and signal clustering.
High variance = noisy / unstable signals.
"""
details = {}
flags = []
try:
date_series = pd.Series(probas, index=pd.to_datetime(dates))
daily_mean = date_series.groupby(date_series.index.date).mean()
day_over_day_changes = daily_mean.diff().abs().dropna()
dod_variance = float(day_over_day_changes.std())
dod_mean = float(day_over_day_changes.mean())
# Signal clustering: what fraction of days have > 10% of all signals?
daily_counts = date_series.groupby(date_series.index.date).count()
total = daily_counts.sum()
clustering = float((daily_counts / total > 0.10).mean()) if total > 0 else 0.0
details = {
"dod_score_std": round(dod_variance, 4),
"dod_score_mean": round(dod_mean, 4),
"signal_clustering": round(clustering, 4),
"n_active_days": len(daily_mean),
}
# Score: penalize high variance and extreme clustering
variance_score = max(0, 100 - dod_variance * 500)
cluster_score = max(0, 100 - clustering * 200)
score = 0.6 * variance_score + 0.4 * cluster_score
if dod_variance > 0.05:
flags.append(f"High day-over-day score variance ({dod_variance:.3f}) — signals may be unstable")
if clustering > 0.3:
flags.append("Signals cluster on few days — may be picking up macro noise")
except Exception as e:
score = 50.0
details = {"error": str(e)}
return DimensionResult(
name="signal_stability", score=round(score, 2), weight=0.15,
details=details, flags=flags
)
def _score_calibration(probas: np.ndarray, labels: np.ndarray, n_bins: int = 10) -> tuple:
"""
Expected Calibration Error and reliability diagram data.
Returns (DimensionResult, reliability_bins_dict).
"""
flags = []
bin_edges = np.linspace(0, 1, n_bins + 1)
bin_centers = []
actual_freqs = []
bin_counts = []
for i in range(n_bins):
lo, hi = bin_edges[i], bin_edges[i + 1]
mask = (probas >= lo) & (probas < hi)
if mask.sum() == 0:
bin_centers.append((lo + hi) / 2)
actual_freqs.append((lo + hi) / 2)
bin_counts.append(0)
continue
bin_centers.append(float(probas[mask].mean()))
actual_freqs.append(float(labels[mask].mean()))
bin_counts.append(int(mask.sum()))
# ECE
n = len(labels)
ece = sum(
abs(actual_freqs[i] - bin_centers[i]) * bin_counts[i] / n
for i in range(n_bins)
)
reliability_bins = {
"bin_centers": bin_centers,
"actual_freqs": actual_freqs,
"bin_counts": bin_counts,
}
# Score: ECE 0 → 100, ECE 0.1 → 50, ECE 0.2+ → 0
score = max(0, 100 - ece * 500)
details = {
"ece": round(ece, 4),
"mean_predicted": round(float(probas.mean()), 4),
"actual_positive_rate": round(float(labels.mean()), 4),
}
if ece > 0.08:
flags.append(f"High ECE ({ece:.3f}) — probabilities are poorly calibrated")
if abs(probas.mean() - labels.mean()) > 0.05:
flags.append("Mean predicted probability significantly differs from actual positive rate")
return DimensionResult(
name="calibration", score=round(score, 2), weight=0.15,
details=details, flags=flags
), reliability_bins
def _score_regime_robustness(
probas: np.ndarray, labels: np.ndarray, dates: np.ndarray,
sp500_data, vix_data, sma_period: int = 200, vix_threshold: float = 20.0
) -> tuple:
"""
AUC in each of the 4 market regimes (bull/bear × VIX low/high).
Penalizes high variance across regimes.
"""
from sklearn.metrics import roc_auc_score
flags = []
regime_scores = {}
aucs = []
dates_dt = pd.to_datetime(dates)
# Determine regime for each sample
regimes = np.zeros(len(dates_dt), dtype=int) # 0=bear/low, 1=bear/high, 2=bull/low, 3=bull/high
for i, d in enumerate(dates_dt):
mkt, vix_r = 1, 0
if sp500_data is not None:
try:
sma = sp500_data.rolling(sma_period).mean()
idx = sp500_data.index.get_indexer([d], method="ffill")[0]
if idx >= 0:
mkt = 1 if sp500_data.iloc[idx] > sma.iloc[idx] else 0
except Exception:
pass
if vix_data is not None:
try:
idx = vix_data.index.get_indexer([d], method="ffill")[0]
if idx >= 0:
vix_r = 1 if vix_data.iloc[idx] > vix_threshold else 0
except Exception:
pass
regimes[i] = mkt * 2 + vix_r
regime_labels = {
0: "Bear / Low VIX",
1: "Bear / High VIX",
2: "Bull / Low VIX",
3: "Bull / High VIX",
}
for reg_id, reg_name in regime_labels.items():
mask = regimes == reg_id
if mask.sum() < 30:
regime_scores[reg_name] = {"auc": None, "n": int(mask.sum()), "note": "insufficient data"}
continue
if labels[mask].sum() < 5:
regime_scores[reg_name] = {"auc": None, "n": int(mask.sum()), "note": "too few positives"}
continue
try:
auc = float(roc_auc_score(labels[mask], probas[mask]))
regime_scores[reg_name] = {
"auc": round(auc, 4),
"n": int(mask.sum()),
"positive_rate": round(float(labels[mask].mean()), 4),
}
aucs.append(auc)
except Exception:
regime_scores[reg_name] = {"auc": None, "n": int(mask.sum()), "note": "error"}
if len(aucs) >= 2:
spread = max(aucs) - min(aucs)
mean_auc = np.mean(aucs)
# Score: high mean AUC + low spread = good
mean_score = max(0, (mean_auc - 0.5) / 0.5) * 100
spread_penalty = min(50, spread * 200)
score = max(0, mean_score - spread_penalty)
if spread > 0.15:
flags.append(f"High regime variance (spread={spread:.3f}) — model fragile across market conditions")
elif len(aucs) == 1:
score = max(0, (aucs[0] - 0.5) / 0.5) * 100
else:
score = 40.0
flags.append("Insufficient data to evaluate regime robustness")
return DimensionResult(
name="regime_robustness", score=round(score, 2), weight=0.15,
details={"regime_aucs": {k: v.get("auc") for k, v in regime_scores.items()},
"auc_spread": round(max(aucs) - min(aucs), 4) if len(aucs) >= 2 else None},
flags=flags
), regime_scores
def _score_asymmetry(
probas: np.ndarray, labels: np.ndarray,
pt_multiplier: float, sl_multiplier: float,
) -> DimensionResult:
"""
Measures how well top-decile signals capture asymmetric payoffs.
Theoretical max payoff ratio = pt_multiplier / sl_multiplier.
"""
flags = []
theoretical_ratio = pt_multiplier / max(sl_multiplier, 0.01)
top_k = max(10, int(len(probas) * 0.10))
top_thresh = np.sort(probas)[-top_k]
top_mask = probas >= top_thresh
n_top = top_mask.sum()
if n_top == 0:
return DimensionResult(
name="asymmetry", score=30.0, weight=0.15,
details={"note": "no top-decile signals"},
flags=["No signals above top-decile threshold"]
)
top_win_rate = float(labels[top_mask].mean())
top_loss_rate = 1.0 - top_win_rate
# Simulate payoff ratio using PT/SL multipliers
simulated_avg_win = pt_multiplier
simulated_avg_loss = sl_multiplier
payoff_ratio = simulated_avg_win / max(simulated_avg_loss, 0.01)
# Expected value per trade (in ATR units)
ev = top_win_rate * simulated_avg_win - top_loss_rate * simulated_avg_loss
# Normalized EV: at theoretical max, EV = win_rate * PT (if all positives hit PT)
max_ev = pt_multiplier
ev_score = max(0, min(100, ev / max(max_ev, 0.01) * 100))
# Capture score: how close does top-decile win rate get to what's needed for positive EV?
breakeven_wr = sl_multiplier / (pt_multiplier + sl_multiplier)
if top_win_rate > breakeven_wr:
wr_score = min(100, (top_win_rate - breakeven_wr) / (1 - breakeven_wr) * 100)
else:
wr_score = 0.0
score = 0.50 * ev_score + 0.50 * wr_score
details = {
"theoretical_payoff_ratio": round(theoretical_ratio, 2),
"simulated_payoff_ratio": round(payoff_ratio, 2),
"top_decile_win_rate": round(top_win_rate, 4),
"breakeven_win_rate": round(breakeven_wr, 4),
"expected_value_atr": round(ev, 4),
"n_top_decile_signals": int(n_top),
}
if top_win_rate < breakeven_wr:
flags.append(
f"Top-decile win rate ({top_win_rate:.1%}) below breakeven ({breakeven_wr:.1%}) "
f"for {pt_multiplier}×PT / {sl_multiplier}×SL"
)
if ev < 0:
flags.append("Negative expected value in top decile — signals do not capture asymmetry")
return DimensionResult(
name="asymmetry", score=round(score, 2), weight=0.15,
details=details, flags=flags
)