""" evaluator.py — Multi-dimensional model scoring engine. Scores a model bundle across 6 dimensions and produces a letter grade. """ import logging from dataclasses import dataclass, field from typing import Callable, Optional import numpy as np import pandas as pd from src.features import build_features, construct_labels, compute_confluence from src.data_loader import extract_market_series from src.registry import ArtifactBundle, predict_proba logger = logging.getLogger("SniperEval") GRADE_THRESHOLDS = [ (95, "A+"), (90, "A"), (85, "A-"), (80, "B+"), (75, "B"), (70, "B-"), (65, "C+"), (60, "C"), (55, "C-"), (50, "D+"), (45, "D"), (0, "F"), ] DIMENSION_WEIGHTS = { "discrimination": 0.20, "feature_health": 0.20, "signal_stability": 0.15, "calibration": 0.15, "regime_robustness": 0.15, "asymmetry": 0.15, } # --------------------------------------------------------------------------- # Result containers # --------------------------------------------------------------------------- @dataclass class DimensionResult: name: str score: float # 0–100 weight: float details: dict = field(default_factory=dict) flags: list = field(default_factory=list) # warning strings @dataclass class EvalResult: overall_score: float grade: str dimensions: list # list[DimensionResult] oof_proba: np.ndarray oof_labels: np.ndarray feature_psi: pd.DataFrame reliability_bins: dict regime_scores: dict n_samples: int n_positives: int eval_date_range: tuple warnings: list = field(default_factory=list) @property def dimension_dict(self) -> dict: return {d.name: d for d in self.dimensions} def score_to_grade(score: float) -> str: for threshold, grade in GRADE_THRESHOLDS: if score >= threshold: return grade return "F" # --------------------------------------------------------------------------- # Main entry point # --------------------------------------------------------------------------- def run_evaluation( ticker_data: dict[str, pd.DataFrame], bundle: ArtifactBundle, pt_multiplier: float = 3.0, sl_multiplier: float = 0.5, atr_period: int = 20, horizon: int = 15, dimension_weights: dict = None, progress_cb: Callable = None, ) -> EvalResult: def _cb(msg, frac=None): if progress_cb: progress_cb(msg, frac) logger.info(msg) weights = dimension_weights or DIMENSION_WEIGHTS vix_data, sp500_data = extract_market_series(ticker_data) feature_list = bundle.feature_list process_tickers = [t for t in ticker_data if not t.startswith("^")] # ----------------------------------------------------------------------- # 1. Build features + labels for all tickers # ----------------------------------------------------------------------- _cb("Building features and labels for evaluation dataset...", 0.38) all_feats, all_labels, all_probas, all_dates = [], [], [], [] raw_feat_frames = [] # for PSI computation (unfiltered) for i, ticker in enumerate(process_tickers): if i % 50 == 0: _cb(f"Processing {ticker} ({i+1}/{len(process_tickers)})...", 0.38 + 0.25 * i / max(1, len(process_tickers))) df = ticker_data[ticker] try: feat = build_features(df, vix_data=vix_data, sp500_data=sp500_data) labels, _ = construct_labels( df, pt_multiplier=pt_multiplier, sl_multiplier=sl_multiplier, atr_period=atr_period, horizon=horizon, ) except Exception as e: logger.warning(f"Feature/label build failed for {ticker}: {e}") continue combined = pd.concat([feat, labels.rename("label")], axis=1) combined = combined[combined["label"] >= 0].dropna(subset=feat.columns.tolist(), how="any") if len(combined) < 30: continue raw_feat_frames.append(combined[feat.columns]) if feature_list: missing = [f for f in feature_list if f not in feat.columns] for m in missing: feat[m] = 0.0 feat_aligned = combined[feature_list] if all(f in combined.columns for f in feature_list) else combined[feat.columns] else: feat_aligned = combined[feat.columns] feat_clean = feat_aligned.fillna(0).replace([float("inf"), float("-inf")], 0) try: probas = predict_proba( bundle, feat_clean, use_regime=bundle.has_regime_models, sp500_above_sma=(sp500_data is not None), vix_high=False, ) except Exception as e: logger.warning(f"Prediction failed for {ticker}: {e}") continue valid_rows = combined[combined["label"] >= 0] all_feats.append(feat_clean.values) all_labels.append(combined["label"].values) all_probas.append(probas) all_dates.extend(feat_clean.index.tolist()) if not all_labels: raise RuntimeError("No valid data produced for evaluation.") X_all = np.vstack(all_feats) y_all = np.concatenate(all_labels) p_all = np.concatenate(all_probas) dates_all = np.array(all_dates) n_samples = len(y_all) n_positives = int(y_all.sum()) _cb(f"Dataset ready: {n_samples:,} samples, {n_positives} positives ({n_positives/n_samples:.1%} rate)", 0.64) # ----------------------------------------------------------------------- # 2. Score each dimension # ----------------------------------------------------------------------- dimension_results = [] # --- Dimension 1: Discrimination --- _cb("Scoring: Discrimination...", 0.65) dim_disc = _score_discrimination(p_all, y_all) dimension_results.append(dim_disc) # --- Dimension 2: Feature health --- _cb("Scoring: Feature health (PSI)...", 0.68) feat_df_all = pd.concat(raw_feat_frames, ignore_index=True) if raw_feat_frames else pd.DataFrame() feature_cols = feature_list if feature_list else (list(feat_df_all.columns) if not feat_df_all.empty else []) dim_feat, feat_psi_df = _score_feature_health(feat_df_all, feature_cols) dimension_results.append(dim_feat) # --- Dimension 3: Signal stability --- _cb("Scoring: Signal stability...", 0.72) dim_stab = _score_signal_stability(p_all, dates_all, y_all) dimension_results.append(dim_stab) # --- Dimension 4: Calibration --- _cb("Scoring: Calibration (ECE)...", 0.76) dim_cal, rel_bins = _score_calibration(p_all, y_all) dimension_results.append(dim_cal) # --- Dimension 5: Regime robustness --- _cb("Scoring: Regime robustness...", 0.80) dim_reg, regime_scores = _score_regime_robustness( p_all, y_all, dates_all, sp500_data, vix_data ) dimension_results.append(dim_reg) # --- Dimension 6: Asymmetry capture --- _cb("Scoring: Asymmetry capture...", 0.85) dim_asym = _score_asymmetry(p_all, y_all, pt_multiplier, sl_multiplier) dimension_results.append(dim_asym) # ----------------------------------------------------------------------- # 3. Weighted overall score # ----------------------------------------------------------------------- total_weight = sum(weights.get(d.name, d.weight) for d in dimension_results) overall = sum( d.score * weights.get(d.name, d.weight) for d in dimension_results ) / max(total_weight, 1e-9) grade = score_to_grade(overall) _cb(f"Evaluation complete. Score: {overall:.1f} ({grade})", 0.95) date_range = (str(min(dates_all))[:10], str(max(dates_all))[:10]) if len(dates_all) > 0 else ("", "") return EvalResult( overall_score=round(overall, 2), grade=grade, dimensions=dimension_results, oof_proba=p_all, oof_labels=y_all, feature_psi=feat_psi_df, reliability_bins=rel_bins, regime_scores=regime_scores, n_samples=n_samples, n_positives=n_positives, eval_date_range=date_range, ) # --------------------------------------------------------------------------- # Dimension scorers # --------------------------------------------------------------------------- def _score_discrimination(probas: np.ndarray, labels: np.ndarray) -> DimensionResult: from sklearn.metrics import roc_auc_score, average_precision_score details = {} flags = [] try: auc = roc_auc_score(labels, probas) except Exception: auc = 0.5 try: ap = average_precision_score(labels, probas) except Exception: ap = float(labels.mean()) # Precision at top K% prec_at = {} for rate in [0.01, 0.03, 0.05, 0.10]: k = max(1, int(len(probas) * rate)) thresh = np.sort(probas)[-k] picks = probas >= thresh prec = float(labels[picks].mean()) if picks.sum() > 0 else 0.0 prec_at[f"prec_at_{int(rate*100)}pct"] = round(prec, 4) details = {"auc": round(auc, 4), "ap": round(ap, 4), **prec_at} # Baseline positive rate base_rate = float(labels.mean()) lift_at3 = prec_at.get("prec_at_3pct", base_rate) / max(base_rate, 1e-6) # Score: weight AUC and lift auc_score = max(0, (auc - 0.5) / 0.5) * 100 # 0.5 → 0, 1.0 → 100 lift_score = min(100, max(0, (lift_at3 - 1.0) / 4.0 * 100)) # 1× → 0, 5× → 100 ap_norm = min(100, max(0, (ap - base_rate) / max(1 - base_rate, 0.01) * 100)) score = 0.40 * auc_score + 0.35 * lift_score + 0.25 * ap_norm if auc < 0.55: flags.append("AUC near random — model lacks discrimination power") if lift_at3 < 1.5: flags.append("Lift at top 3% below 1.5× — precision advantage is weak") return DimensionResult( name="discrimination", score=round(score, 2), weight=0.20, details=details, flags=flags ) def _score_feature_health(feat_df: pd.DataFrame, feature_cols: list) -> tuple: """PSI and NaN/inf rates per feature. Returns (DimensionResult, psi_df).""" if feat_df.empty or not feature_cols: empty_psi = pd.DataFrame(columns=["Feature", "NaN Rate", "Inf Rate", "PSI", "Status"]) return DimensionResult(name="feature_health", score=50.0, weight=0.20, details={"note": "no feature data"}, flags=[]), empty_psi n = len(feat_df) rows = [] problem_count = 0 for col in feature_cols: if col not in feat_df.columns: rows.append({"Feature": col, "NaN Rate": 1.0, "Inf Rate": 0.0, "PSI": 1.0, "Status": "🔴 Missing"}) problem_count += 1 continue series = feat_df[col] nan_rate = float(series.isna().mean()) inf_rate = float(np.isinf(series.replace([None], np.nan).fillna(0)).mean()) # PSI: split first 70% vs last 30% as proxy for train vs eval drift split = int(n * 0.7) psi = _compute_psi(series.iloc[:split], series.iloc[split:]) if psi > 0.2 or nan_rate > 0.15: status = "🔴 Drift" problem_count += 1 elif psi > 0.1 or nan_rate > 0.05: status = "🟡 Watch" else: status = "🟢 OK" rows.append({ "Feature": col, "NaN Rate": round(nan_rate, 4), "Inf Rate": round(inf_rate, 4), "PSI": round(psi, 4), "Status": status, }) psi_df = pd.DataFrame(rows).sort_values("PSI", ascending=False).reset_index(drop=True) red_count = (psi_df["Status"] == "🔴 Drift").sum() yellow_count = (psi_df["Status"] == "🟡 Watch").sum() total_feats = len(feature_cols) score = 100 - (red_count / max(total_feats, 1)) * 70 - (yellow_count / max(total_feats, 1)) * 20 score = max(0.0, min(100.0, score)) flags = [] if red_count > 0: top_drifters = psi_df[psi_df["Status"] == "🔴 Drift"]["Feature"].head(3).tolist() flags.append(f"{red_count} feature(s) show significant drift: {', '.join(top_drifters)}") if yellow_count > 5: flags.append(f"{yellow_count} features showing moderate drift — monitor closely") return DimensionResult( name="feature_health", score=round(score, 2), weight=0.20, details={"red_features": int(red_count), "yellow_features": int(yellow_count), "total_features": total_feats}, flags=flags ), psi_df def _compute_psi(expected: pd.Series, actual: pd.Series, n_bins: int = 10) -> float: """Population Stability Index between two distributions.""" try: combined = pd.concat([expected, actual]).dropna().replace([float("inf"), float("-inf")], np.nan).dropna() if len(combined) < 20: return 0.0 bins = np.percentile(combined, np.linspace(0, 100, n_bins + 1)) bins = np.unique(bins) if len(bins) < 3: return 0.0 exp_counts = np.histogram(expected.dropna(), bins=bins)[0] + 1e-6 act_counts = np.histogram(actual.dropna(), bins=bins)[0] + 1e-6 exp_pct = exp_counts / exp_counts.sum() act_pct = act_counts / act_counts.sum() psi = np.sum((act_pct - exp_pct) * np.log(act_pct / exp_pct)) return float(max(0.0, psi)) except Exception: return 0.0 def _score_signal_stability(probas: np.ndarray, dates: np.ndarray, labels: np.ndarray) -> DimensionResult: """ Measures day-over-day score variance and signal clustering. High variance = noisy / unstable signals. """ details = {} flags = [] try: date_series = pd.Series(probas, index=pd.to_datetime(dates)) daily_mean = date_series.groupby(date_series.index.date).mean() day_over_day_changes = daily_mean.diff().abs().dropna() dod_variance = float(day_over_day_changes.std()) dod_mean = float(day_over_day_changes.mean()) # Signal clustering: what fraction of days have > 10% of all signals? daily_counts = date_series.groupby(date_series.index.date).count() total = daily_counts.sum() clustering = float((daily_counts / total > 0.10).mean()) if total > 0 else 0.0 details = { "dod_score_std": round(dod_variance, 4), "dod_score_mean": round(dod_mean, 4), "signal_clustering": round(clustering, 4), "n_active_days": len(daily_mean), } # Score: penalize high variance and extreme clustering variance_score = max(0, 100 - dod_variance * 500) cluster_score = max(0, 100 - clustering * 200) score = 0.6 * variance_score + 0.4 * cluster_score if dod_variance > 0.05: flags.append(f"High day-over-day score variance ({dod_variance:.3f}) — signals may be unstable") if clustering > 0.3: flags.append("Signals cluster on few days — may be picking up macro noise") except Exception as e: score = 50.0 details = {"error": str(e)} return DimensionResult( name="signal_stability", score=round(score, 2), weight=0.15, details=details, flags=flags ) def _score_calibration(probas: np.ndarray, labels: np.ndarray, n_bins: int = 10) -> tuple: """ Expected Calibration Error and reliability diagram data. Returns (DimensionResult, reliability_bins_dict). """ flags = [] bin_edges = np.linspace(0, 1, n_bins + 1) bin_centers = [] actual_freqs = [] bin_counts = [] for i in range(n_bins): lo, hi = bin_edges[i], bin_edges[i + 1] mask = (probas >= lo) & (probas < hi) if mask.sum() == 0: bin_centers.append((lo + hi) / 2) actual_freqs.append((lo + hi) / 2) bin_counts.append(0) continue bin_centers.append(float(probas[mask].mean())) actual_freqs.append(float(labels[mask].mean())) bin_counts.append(int(mask.sum())) # ECE n = len(labels) ece = sum( abs(actual_freqs[i] - bin_centers[i]) * bin_counts[i] / n for i in range(n_bins) ) reliability_bins = { "bin_centers": bin_centers, "actual_freqs": actual_freqs, "bin_counts": bin_counts, } # Score: ECE 0 → 100, ECE 0.1 → 50, ECE 0.2+ → 0 score = max(0, 100 - ece * 500) details = { "ece": round(ece, 4), "mean_predicted": round(float(probas.mean()), 4), "actual_positive_rate": round(float(labels.mean()), 4), } if ece > 0.08: flags.append(f"High ECE ({ece:.3f}) — probabilities are poorly calibrated") if abs(probas.mean() - labels.mean()) > 0.05: flags.append("Mean predicted probability significantly differs from actual positive rate") return DimensionResult( name="calibration", score=round(score, 2), weight=0.15, details=details, flags=flags ), reliability_bins def _score_regime_robustness( probas: np.ndarray, labels: np.ndarray, dates: np.ndarray, sp500_data, vix_data, sma_period: int = 200, vix_threshold: float = 20.0 ) -> tuple: """ AUC in each of the 4 market regimes (bull/bear × VIX low/high). Penalizes high variance across regimes. """ from sklearn.metrics import roc_auc_score flags = [] regime_scores = {} aucs = [] dates_dt = pd.to_datetime(dates) # Determine regime for each sample regimes = np.zeros(len(dates_dt), dtype=int) # 0=bear/low, 1=bear/high, 2=bull/low, 3=bull/high for i, d in enumerate(dates_dt): mkt, vix_r = 1, 0 if sp500_data is not None: try: sma = sp500_data.rolling(sma_period).mean() idx = sp500_data.index.get_indexer([d], method="ffill")[0] if idx >= 0: mkt = 1 if sp500_data.iloc[idx] > sma.iloc[idx] else 0 except Exception: pass if vix_data is not None: try: idx = vix_data.index.get_indexer([d], method="ffill")[0] if idx >= 0: vix_r = 1 if vix_data.iloc[idx] > vix_threshold else 0 except Exception: pass regimes[i] = mkt * 2 + vix_r regime_labels = { 0: "Bear / Low VIX", 1: "Bear / High VIX", 2: "Bull / Low VIX", 3: "Bull / High VIX", } for reg_id, reg_name in regime_labels.items(): mask = regimes == reg_id if mask.sum() < 30: regime_scores[reg_name] = {"auc": None, "n": int(mask.sum()), "note": "insufficient data"} continue if labels[mask].sum() < 5: regime_scores[reg_name] = {"auc": None, "n": int(mask.sum()), "note": "too few positives"} continue try: auc = float(roc_auc_score(labels[mask], probas[mask])) regime_scores[reg_name] = { "auc": round(auc, 4), "n": int(mask.sum()), "positive_rate": round(float(labels[mask].mean()), 4), } aucs.append(auc) except Exception: regime_scores[reg_name] = {"auc": None, "n": int(mask.sum()), "note": "error"} if len(aucs) >= 2: spread = max(aucs) - min(aucs) mean_auc = np.mean(aucs) # Score: high mean AUC + low spread = good mean_score = max(0, (mean_auc - 0.5) / 0.5) * 100 spread_penalty = min(50, spread * 200) score = max(0, mean_score - spread_penalty) if spread > 0.15: flags.append(f"High regime variance (spread={spread:.3f}) — model fragile across market conditions") elif len(aucs) == 1: score = max(0, (aucs[0] - 0.5) / 0.5) * 100 else: score = 40.0 flags.append("Insufficient data to evaluate regime robustness") return DimensionResult( name="regime_robustness", score=round(score, 2), weight=0.15, details={"regime_aucs": {k: v.get("auc") for k, v in regime_scores.items()}, "auc_spread": round(max(aucs) - min(aucs), 4) if len(aucs) >= 2 else None}, flags=flags ), regime_scores def _score_asymmetry( probas: np.ndarray, labels: np.ndarray, pt_multiplier: float, sl_multiplier: float, ) -> DimensionResult: """ Measures how well top-decile signals capture asymmetric payoffs. Theoretical max payoff ratio = pt_multiplier / sl_multiplier. """ flags = [] theoretical_ratio = pt_multiplier / max(sl_multiplier, 0.01) top_k = max(10, int(len(probas) * 0.10)) top_thresh = np.sort(probas)[-top_k] top_mask = probas >= top_thresh n_top = top_mask.sum() if n_top == 0: return DimensionResult( name="asymmetry", score=30.0, weight=0.15, details={"note": "no top-decile signals"}, flags=["No signals above top-decile threshold"] ) top_win_rate = float(labels[top_mask].mean()) top_loss_rate = 1.0 - top_win_rate # Simulate payoff ratio using PT/SL multipliers simulated_avg_win = pt_multiplier simulated_avg_loss = sl_multiplier payoff_ratio = simulated_avg_win / max(simulated_avg_loss, 0.01) # Expected value per trade (in ATR units) ev = top_win_rate * simulated_avg_win - top_loss_rate * simulated_avg_loss # Normalized EV: at theoretical max, EV = win_rate * PT (if all positives hit PT) max_ev = pt_multiplier ev_score = max(0, min(100, ev / max(max_ev, 0.01) * 100)) # Capture score: how close does top-decile win rate get to what's needed for positive EV? breakeven_wr = sl_multiplier / (pt_multiplier + sl_multiplier) if top_win_rate > breakeven_wr: wr_score = min(100, (top_win_rate - breakeven_wr) / (1 - breakeven_wr) * 100) else: wr_score = 0.0 score = 0.50 * ev_score + 0.50 * wr_score details = { "theoretical_payoff_ratio": round(theoretical_ratio, 2), "simulated_payoff_ratio": round(payoff_ratio, 2), "top_decile_win_rate": round(top_win_rate, 4), "breakeven_win_rate": round(breakeven_wr, 4), "expected_value_atr": round(ev, 4), "n_top_decile_signals": int(n_top), } if top_win_rate < breakeven_wr: flags.append( f"Top-decile win rate ({top_win_rate:.1%}) below breakeven ({breakeven_wr:.1%}) " f"for {pt_multiplier}×PT / {sl_multiplier}×SL" ) if ev < 0: flags.append("Negative expected value in top decile — signals do not capture asymmetry") return DimensionResult( name="asymmetry", score=round(score, 2), weight=0.15, details=details, flags=flags )