Spaces:
Sleeping
Sleeping
| """ | |
| evaluator.py — Multi-dimensional model scoring engine. | |
| Scores a model bundle across 6 dimensions and produces a letter grade. | |
| """ | |
| import logging | |
| from dataclasses import dataclass, field | |
| from typing import Callable, Optional | |
| import numpy as np | |
| import pandas as pd | |
| from src.features import build_features, construct_labels, compute_confluence | |
| from src.data_loader import extract_market_series | |
| from src.registry import ArtifactBundle, predict_proba | |
| logger = logging.getLogger("SniperEval") | |
| GRADE_THRESHOLDS = [ | |
| (95, "A+"), (90, "A"), (85, "A-"), | |
| (80, "B+"), (75, "B"), (70, "B-"), | |
| (65, "C+"), (60, "C"), (55, "C-"), | |
| (50, "D+"), (45, "D"), (0, "F"), | |
| ] | |
| DIMENSION_WEIGHTS = { | |
| "discrimination": 0.20, | |
| "feature_health": 0.20, | |
| "signal_stability": 0.15, | |
| "calibration": 0.15, | |
| "regime_robustness": 0.15, | |
| "asymmetry": 0.15, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Result containers | |
| # --------------------------------------------------------------------------- | |
| class DimensionResult: | |
| name: str | |
| score: float # 0–100 | |
| weight: float | |
| details: dict = field(default_factory=dict) | |
| flags: list = field(default_factory=list) # warning strings | |
| class EvalResult: | |
| overall_score: float | |
| grade: str | |
| dimensions: list # list[DimensionResult] | |
| oof_proba: np.ndarray | |
| oof_labels: np.ndarray | |
| feature_psi: pd.DataFrame | |
| reliability_bins: dict | |
| regime_scores: dict | |
| n_samples: int | |
| n_positives: int | |
| eval_date_range: tuple | |
| warnings: list = field(default_factory=list) | |
| def dimension_dict(self) -> dict: | |
| return {d.name: d for d in self.dimensions} | |
| def score_to_grade(score: float) -> str: | |
| for threshold, grade in GRADE_THRESHOLDS: | |
| if score >= threshold: | |
| return grade | |
| return "F" | |
| # --------------------------------------------------------------------------- | |
| # Main entry point | |
| # --------------------------------------------------------------------------- | |
| def run_evaluation( | |
| ticker_data: dict[str, pd.DataFrame], | |
| bundle: ArtifactBundle, | |
| pt_multiplier: float = 3.0, | |
| sl_multiplier: float = 0.5, | |
| atr_period: int = 20, | |
| horizon: int = 15, | |
| dimension_weights: dict = None, | |
| progress_cb: Callable = None, | |
| ) -> EvalResult: | |
| def _cb(msg, frac=None): | |
| if progress_cb: | |
| progress_cb(msg, frac) | |
| logger.info(msg) | |
| weights = dimension_weights or DIMENSION_WEIGHTS | |
| vix_data, sp500_data = extract_market_series(ticker_data) | |
| feature_list = bundle.feature_list | |
| process_tickers = [t for t in ticker_data if not t.startswith("^")] | |
| # ----------------------------------------------------------------------- | |
| # 1. Build features + labels for all tickers | |
| # ----------------------------------------------------------------------- | |
| _cb("Building features and labels for evaluation dataset...", 0.38) | |
| all_feats, all_labels, all_probas, all_dates = [], [], [], [] | |
| raw_feat_frames = [] # for PSI computation (unfiltered) | |
| for i, ticker in enumerate(process_tickers): | |
| if i % 50 == 0: | |
| _cb(f"Processing {ticker} ({i+1}/{len(process_tickers)})...", | |
| 0.38 + 0.25 * i / max(1, len(process_tickers))) | |
| df = ticker_data[ticker] | |
| try: | |
| feat = build_features(df, vix_data=vix_data, sp500_data=sp500_data) | |
| labels, _ = construct_labels( | |
| df, pt_multiplier=pt_multiplier, sl_multiplier=sl_multiplier, | |
| atr_period=atr_period, horizon=horizon, | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Feature/label build failed for {ticker}: {e}") | |
| continue | |
| combined = pd.concat([feat, labels.rename("label")], axis=1) | |
| combined = combined[combined["label"] >= 0].dropna(subset=feat.columns.tolist(), how="any") | |
| if len(combined) < 30: | |
| continue | |
| raw_feat_frames.append(combined[feat.columns]) | |
| if feature_list: | |
| missing = [f for f in feature_list if f not in feat.columns] | |
| for m in missing: | |
| feat[m] = 0.0 | |
| feat_aligned = combined[feature_list] if all(f in combined.columns for f in feature_list) else combined[feat.columns] | |
| else: | |
| feat_aligned = combined[feat.columns] | |
| feat_clean = feat_aligned.fillna(0).replace([float("inf"), float("-inf")], 0) | |
| try: | |
| probas = predict_proba( | |
| bundle, feat_clean, | |
| use_regime=bundle.has_regime_models, | |
| sp500_above_sma=(sp500_data is not None), | |
| vix_high=False, | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Prediction failed for {ticker}: {e}") | |
| continue | |
| valid_rows = combined[combined["label"] >= 0] | |
| all_feats.append(feat_clean.values) | |
| all_labels.append(combined["label"].values) | |
| all_probas.append(probas) | |
| all_dates.extend(feat_clean.index.tolist()) | |
| if not all_labels: | |
| raise RuntimeError("No valid data produced for evaluation.") | |
| X_all = np.vstack(all_feats) | |
| y_all = np.concatenate(all_labels) | |
| p_all = np.concatenate(all_probas) | |
| dates_all = np.array(all_dates) | |
| n_samples = len(y_all) | |
| n_positives = int(y_all.sum()) | |
| _cb(f"Dataset ready: {n_samples:,} samples, {n_positives} positives ({n_positives/n_samples:.1%} rate)", 0.64) | |
| # ----------------------------------------------------------------------- | |
| # 2. Score each dimension | |
| # ----------------------------------------------------------------------- | |
| dimension_results = [] | |
| # --- Dimension 1: Discrimination --- | |
| _cb("Scoring: Discrimination...", 0.65) | |
| dim_disc = _score_discrimination(p_all, y_all) | |
| dimension_results.append(dim_disc) | |
| # --- Dimension 2: Feature health --- | |
| _cb("Scoring: Feature health (PSI)...", 0.68) | |
| feat_df_all = pd.concat(raw_feat_frames, ignore_index=True) if raw_feat_frames else pd.DataFrame() | |
| feature_cols = feature_list if feature_list else (list(feat_df_all.columns) if not feat_df_all.empty else []) | |
| dim_feat, feat_psi_df = _score_feature_health(feat_df_all, feature_cols) | |
| dimension_results.append(dim_feat) | |
| # --- Dimension 3: Signal stability --- | |
| _cb("Scoring: Signal stability...", 0.72) | |
| dim_stab = _score_signal_stability(p_all, dates_all, y_all) | |
| dimension_results.append(dim_stab) | |
| # --- Dimension 4: Calibration --- | |
| _cb("Scoring: Calibration (ECE)...", 0.76) | |
| dim_cal, rel_bins = _score_calibration(p_all, y_all) | |
| dimension_results.append(dim_cal) | |
| # --- Dimension 5: Regime robustness --- | |
| _cb("Scoring: Regime robustness...", 0.80) | |
| dim_reg, regime_scores = _score_regime_robustness( | |
| p_all, y_all, dates_all, sp500_data, vix_data | |
| ) | |
| dimension_results.append(dim_reg) | |
| # --- Dimension 6: Asymmetry capture --- | |
| _cb("Scoring: Asymmetry capture...", 0.85) | |
| dim_asym = _score_asymmetry(p_all, y_all, pt_multiplier, sl_multiplier) | |
| dimension_results.append(dim_asym) | |
| # ----------------------------------------------------------------------- | |
| # 3. Weighted overall score | |
| # ----------------------------------------------------------------------- | |
| total_weight = sum(weights.get(d.name, d.weight) for d in dimension_results) | |
| overall = sum( | |
| d.score * weights.get(d.name, d.weight) for d in dimension_results | |
| ) / max(total_weight, 1e-9) | |
| grade = score_to_grade(overall) | |
| _cb(f"Evaluation complete. Score: {overall:.1f} ({grade})", 0.95) | |
| date_range = (str(min(dates_all))[:10], str(max(dates_all))[:10]) if len(dates_all) > 0 else ("", "") | |
| return EvalResult( | |
| overall_score=round(overall, 2), | |
| grade=grade, | |
| dimensions=dimension_results, | |
| oof_proba=p_all, | |
| oof_labels=y_all, | |
| feature_psi=feat_psi_df, | |
| reliability_bins=rel_bins, | |
| regime_scores=regime_scores, | |
| n_samples=n_samples, | |
| n_positives=n_positives, | |
| eval_date_range=date_range, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Dimension scorers | |
| # --------------------------------------------------------------------------- | |
| def _score_discrimination(probas: np.ndarray, labels: np.ndarray) -> DimensionResult: | |
| from sklearn.metrics import roc_auc_score, average_precision_score | |
| details = {} | |
| flags = [] | |
| try: | |
| auc = roc_auc_score(labels, probas) | |
| except Exception: | |
| auc = 0.5 | |
| try: | |
| ap = average_precision_score(labels, probas) | |
| except Exception: | |
| ap = float(labels.mean()) | |
| # Precision at top K% | |
| prec_at = {} | |
| for rate in [0.01, 0.03, 0.05, 0.10]: | |
| k = max(1, int(len(probas) * rate)) | |
| thresh = np.sort(probas)[-k] | |
| picks = probas >= thresh | |
| prec = float(labels[picks].mean()) if picks.sum() > 0 else 0.0 | |
| prec_at[f"prec_at_{int(rate*100)}pct"] = round(prec, 4) | |
| details = {"auc": round(auc, 4), "ap": round(ap, 4), **prec_at} | |
| # Baseline positive rate | |
| base_rate = float(labels.mean()) | |
| lift_at3 = prec_at.get("prec_at_3pct", base_rate) / max(base_rate, 1e-6) | |
| # Score: weight AUC and lift | |
| auc_score = max(0, (auc - 0.5) / 0.5) * 100 # 0.5 → 0, 1.0 → 100 | |
| lift_score = min(100, max(0, (lift_at3 - 1.0) / 4.0 * 100)) # 1× → 0, 5× → 100 | |
| ap_norm = min(100, max(0, (ap - base_rate) / max(1 - base_rate, 0.01) * 100)) | |
| score = 0.40 * auc_score + 0.35 * lift_score + 0.25 * ap_norm | |
| if auc < 0.55: | |
| flags.append("AUC near random — model lacks discrimination power") | |
| if lift_at3 < 1.5: | |
| flags.append("Lift at top 3% below 1.5× — precision advantage is weak") | |
| return DimensionResult( | |
| name="discrimination", score=round(score, 2), weight=0.20, | |
| details=details, flags=flags | |
| ) | |
| def _score_feature_health(feat_df: pd.DataFrame, feature_cols: list) -> tuple: | |
| """PSI and NaN/inf rates per feature. Returns (DimensionResult, psi_df).""" | |
| if feat_df.empty or not feature_cols: | |
| empty_psi = pd.DataFrame(columns=["Feature", "NaN Rate", "Inf Rate", "PSI", "Status"]) | |
| return DimensionResult(name="feature_health", score=50.0, weight=0.20, | |
| details={"note": "no feature data"}, flags=[]), empty_psi | |
| n = len(feat_df) | |
| rows = [] | |
| problem_count = 0 | |
| for col in feature_cols: | |
| if col not in feat_df.columns: | |
| rows.append({"Feature": col, "NaN Rate": 1.0, "Inf Rate": 0.0, "PSI": 1.0, "Status": "🔴 Missing"}) | |
| problem_count += 1 | |
| continue | |
| series = feat_df[col] | |
| nan_rate = float(series.isna().mean()) | |
| inf_rate = float(np.isinf(series.replace([None], np.nan).fillna(0)).mean()) | |
| # PSI: split first 70% vs last 30% as proxy for train vs eval drift | |
| split = int(n * 0.7) | |
| psi = _compute_psi(series.iloc[:split], series.iloc[split:]) | |
| if psi > 0.2 or nan_rate > 0.15: | |
| status = "🔴 Drift" | |
| problem_count += 1 | |
| elif psi > 0.1 or nan_rate > 0.05: | |
| status = "🟡 Watch" | |
| else: | |
| status = "🟢 OK" | |
| rows.append({ | |
| "Feature": col, "NaN Rate": round(nan_rate, 4), | |
| "Inf Rate": round(inf_rate, 4), "PSI": round(psi, 4), | |
| "Status": status, | |
| }) | |
| psi_df = pd.DataFrame(rows).sort_values("PSI", ascending=False).reset_index(drop=True) | |
| red_count = (psi_df["Status"] == "🔴 Drift").sum() | |
| yellow_count = (psi_df["Status"] == "🟡 Watch").sum() | |
| total_feats = len(feature_cols) | |
| score = 100 - (red_count / max(total_feats, 1)) * 70 - (yellow_count / max(total_feats, 1)) * 20 | |
| score = max(0.0, min(100.0, score)) | |
| flags = [] | |
| if red_count > 0: | |
| top_drifters = psi_df[psi_df["Status"] == "🔴 Drift"]["Feature"].head(3).tolist() | |
| flags.append(f"{red_count} feature(s) show significant drift: {', '.join(top_drifters)}") | |
| if yellow_count > 5: | |
| flags.append(f"{yellow_count} features showing moderate drift — monitor closely") | |
| return DimensionResult( | |
| name="feature_health", score=round(score, 2), weight=0.20, | |
| details={"red_features": int(red_count), "yellow_features": int(yellow_count), | |
| "total_features": total_feats}, | |
| flags=flags | |
| ), psi_df | |
| def _compute_psi(expected: pd.Series, actual: pd.Series, n_bins: int = 10) -> float: | |
| """Population Stability Index between two distributions.""" | |
| try: | |
| combined = pd.concat([expected, actual]).dropna().replace([float("inf"), float("-inf")], np.nan).dropna() | |
| if len(combined) < 20: | |
| return 0.0 | |
| bins = np.percentile(combined, np.linspace(0, 100, n_bins + 1)) | |
| bins = np.unique(bins) | |
| if len(bins) < 3: | |
| return 0.0 | |
| exp_counts = np.histogram(expected.dropna(), bins=bins)[0] + 1e-6 | |
| act_counts = np.histogram(actual.dropna(), bins=bins)[0] + 1e-6 | |
| exp_pct = exp_counts / exp_counts.sum() | |
| act_pct = act_counts / act_counts.sum() | |
| psi = np.sum((act_pct - exp_pct) * np.log(act_pct / exp_pct)) | |
| return float(max(0.0, psi)) | |
| except Exception: | |
| return 0.0 | |
| def _score_signal_stability(probas: np.ndarray, dates: np.ndarray, labels: np.ndarray) -> DimensionResult: | |
| """ | |
| Measures day-over-day score variance and signal clustering. | |
| High variance = noisy / unstable signals. | |
| """ | |
| details = {} | |
| flags = [] | |
| try: | |
| date_series = pd.Series(probas, index=pd.to_datetime(dates)) | |
| daily_mean = date_series.groupby(date_series.index.date).mean() | |
| day_over_day_changes = daily_mean.diff().abs().dropna() | |
| dod_variance = float(day_over_day_changes.std()) | |
| dod_mean = float(day_over_day_changes.mean()) | |
| # Signal clustering: what fraction of days have > 10% of all signals? | |
| daily_counts = date_series.groupby(date_series.index.date).count() | |
| total = daily_counts.sum() | |
| clustering = float((daily_counts / total > 0.10).mean()) if total > 0 else 0.0 | |
| details = { | |
| "dod_score_std": round(dod_variance, 4), | |
| "dod_score_mean": round(dod_mean, 4), | |
| "signal_clustering": round(clustering, 4), | |
| "n_active_days": len(daily_mean), | |
| } | |
| # Score: penalize high variance and extreme clustering | |
| variance_score = max(0, 100 - dod_variance * 500) | |
| cluster_score = max(0, 100 - clustering * 200) | |
| score = 0.6 * variance_score + 0.4 * cluster_score | |
| if dod_variance > 0.05: | |
| flags.append(f"High day-over-day score variance ({dod_variance:.3f}) — signals may be unstable") | |
| if clustering > 0.3: | |
| flags.append("Signals cluster on few days — may be picking up macro noise") | |
| except Exception as e: | |
| score = 50.0 | |
| details = {"error": str(e)} | |
| return DimensionResult( | |
| name="signal_stability", score=round(score, 2), weight=0.15, | |
| details=details, flags=flags | |
| ) | |
| def _score_calibration(probas: np.ndarray, labels: np.ndarray, n_bins: int = 10) -> tuple: | |
| """ | |
| Expected Calibration Error and reliability diagram data. | |
| Returns (DimensionResult, reliability_bins_dict). | |
| """ | |
| flags = [] | |
| bin_edges = np.linspace(0, 1, n_bins + 1) | |
| bin_centers = [] | |
| actual_freqs = [] | |
| bin_counts = [] | |
| for i in range(n_bins): | |
| lo, hi = bin_edges[i], bin_edges[i + 1] | |
| mask = (probas >= lo) & (probas < hi) | |
| if mask.sum() == 0: | |
| bin_centers.append((lo + hi) / 2) | |
| actual_freqs.append((lo + hi) / 2) | |
| bin_counts.append(0) | |
| continue | |
| bin_centers.append(float(probas[mask].mean())) | |
| actual_freqs.append(float(labels[mask].mean())) | |
| bin_counts.append(int(mask.sum())) | |
| # ECE | |
| n = len(labels) | |
| ece = sum( | |
| abs(actual_freqs[i] - bin_centers[i]) * bin_counts[i] / n | |
| for i in range(n_bins) | |
| ) | |
| reliability_bins = { | |
| "bin_centers": bin_centers, | |
| "actual_freqs": actual_freqs, | |
| "bin_counts": bin_counts, | |
| } | |
| # Score: ECE 0 → 100, ECE 0.1 → 50, ECE 0.2+ → 0 | |
| score = max(0, 100 - ece * 500) | |
| details = { | |
| "ece": round(ece, 4), | |
| "mean_predicted": round(float(probas.mean()), 4), | |
| "actual_positive_rate": round(float(labels.mean()), 4), | |
| } | |
| if ece > 0.08: | |
| flags.append(f"High ECE ({ece:.3f}) — probabilities are poorly calibrated") | |
| if abs(probas.mean() - labels.mean()) > 0.05: | |
| flags.append("Mean predicted probability significantly differs from actual positive rate") | |
| return DimensionResult( | |
| name="calibration", score=round(score, 2), weight=0.15, | |
| details=details, flags=flags | |
| ), reliability_bins | |
| def _score_regime_robustness( | |
| probas: np.ndarray, labels: np.ndarray, dates: np.ndarray, | |
| sp500_data, vix_data, sma_period: int = 200, vix_threshold: float = 20.0 | |
| ) -> tuple: | |
| """ | |
| AUC in each of the 4 market regimes (bull/bear × VIX low/high). | |
| Penalizes high variance across regimes. | |
| """ | |
| from sklearn.metrics import roc_auc_score | |
| flags = [] | |
| regime_scores = {} | |
| aucs = [] | |
| dates_dt = pd.to_datetime(dates) | |
| # Determine regime for each sample | |
| regimes = np.zeros(len(dates_dt), dtype=int) # 0=bear/low, 1=bear/high, 2=bull/low, 3=bull/high | |
| for i, d in enumerate(dates_dt): | |
| mkt, vix_r = 1, 0 | |
| if sp500_data is not None: | |
| try: | |
| sma = sp500_data.rolling(sma_period).mean() | |
| idx = sp500_data.index.get_indexer([d], method="ffill")[0] | |
| if idx >= 0: | |
| mkt = 1 if sp500_data.iloc[idx] > sma.iloc[idx] else 0 | |
| except Exception: | |
| pass | |
| if vix_data is not None: | |
| try: | |
| idx = vix_data.index.get_indexer([d], method="ffill")[0] | |
| if idx >= 0: | |
| vix_r = 1 if vix_data.iloc[idx] > vix_threshold else 0 | |
| except Exception: | |
| pass | |
| regimes[i] = mkt * 2 + vix_r | |
| regime_labels = { | |
| 0: "Bear / Low VIX", | |
| 1: "Bear / High VIX", | |
| 2: "Bull / Low VIX", | |
| 3: "Bull / High VIX", | |
| } | |
| for reg_id, reg_name in regime_labels.items(): | |
| mask = regimes == reg_id | |
| if mask.sum() < 30: | |
| regime_scores[reg_name] = {"auc": None, "n": int(mask.sum()), "note": "insufficient data"} | |
| continue | |
| if labels[mask].sum() < 5: | |
| regime_scores[reg_name] = {"auc": None, "n": int(mask.sum()), "note": "too few positives"} | |
| continue | |
| try: | |
| auc = float(roc_auc_score(labels[mask], probas[mask])) | |
| regime_scores[reg_name] = { | |
| "auc": round(auc, 4), | |
| "n": int(mask.sum()), | |
| "positive_rate": round(float(labels[mask].mean()), 4), | |
| } | |
| aucs.append(auc) | |
| except Exception: | |
| regime_scores[reg_name] = {"auc": None, "n": int(mask.sum()), "note": "error"} | |
| if len(aucs) >= 2: | |
| spread = max(aucs) - min(aucs) | |
| mean_auc = np.mean(aucs) | |
| # Score: high mean AUC + low spread = good | |
| mean_score = max(0, (mean_auc - 0.5) / 0.5) * 100 | |
| spread_penalty = min(50, spread * 200) | |
| score = max(0, mean_score - spread_penalty) | |
| if spread > 0.15: | |
| flags.append(f"High regime variance (spread={spread:.3f}) — model fragile across market conditions") | |
| elif len(aucs) == 1: | |
| score = max(0, (aucs[0] - 0.5) / 0.5) * 100 | |
| else: | |
| score = 40.0 | |
| flags.append("Insufficient data to evaluate regime robustness") | |
| return DimensionResult( | |
| name="regime_robustness", score=round(score, 2), weight=0.15, | |
| details={"regime_aucs": {k: v.get("auc") for k, v in regime_scores.items()}, | |
| "auc_spread": round(max(aucs) - min(aucs), 4) if len(aucs) >= 2 else None}, | |
| flags=flags | |
| ), regime_scores | |
| def _score_asymmetry( | |
| probas: np.ndarray, labels: np.ndarray, | |
| pt_multiplier: float, sl_multiplier: float, | |
| ) -> DimensionResult: | |
| """ | |
| Measures how well top-decile signals capture asymmetric payoffs. | |
| Theoretical max payoff ratio = pt_multiplier / sl_multiplier. | |
| """ | |
| flags = [] | |
| theoretical_ratio = pt_multiplier / max(sl_multiplier, 0.01) | |
| top_k = max(10, int(len(probas) * 0.10)) | |
| top_thresh = np.sort(probas)[-top_k] | |
| top_mask = probas >= top_thresh | |
| n_top = top_mask.sum() | |
| if n_top == 0: | |
| return DimensionResult( | |
| name="asymmetry", score=30.0, weight=0.15, | |
| details={"note": "no top-decile signals"}, | |
| flags=["No signals above top-decile threshold"] | |
| ) | |
| top_win_rate = float(labels[top_mask].mean()) | |
| top_loss_rate = 1.0 - top_win_rate | |
| # Simulate payoff ratio using PT/SL multipliers | |
| simulated_avg_win = pt_multiplier | |
| simulated_avg_loss = sl_multiplier | |
| payoff_ratio = simulated_avg_win / max(simulated_avg_loss, 0.01) | |
| # Expected value per trade (in ATR units) | |
| ev = top_win_rate * simulated_avg_win - top_loss_rate * simulated_avg_loss | |
| # Normalized EV: at theoretical max, EV = win_rate * PT (if all positives hit PT) | |
| max_ev = pt_multiplier | |
| ev_score = max(0, min(100, ev / max(max_ev, 0.01) * 100)) | |
| # Capture score: how close does top-decile win rate get to what's needed for positive EV? | |
| breakeven_wr = sl_multiplier / (pt_multiplier + sl_multiplier) | |
| if top_win_rate > breakeven_wr: | |
| wr_score = min(100, (top_win_rate - breakeven_wr) / (1 - breakeven_wr) * 100) | |
| else: | |
| wr_score = 0.0 | |
| score = 0.50 * ev_score + 0.50 * wr_score | |
| details = { | |
| "theoretical_payoff_ratio": round(theoretical_ratio, 2), | |
| "simulated_payoff_ratio": round(payoff_ratio, 2), | |
| "top_decile_win_rate": round(top_win_rate, 4), | |
| "breakeven_win_rate": round(breakeven_wr, 4), | |
| "expected_value_atr": round(ev, 4), | |
| "n_top_decile_signals": int(n_top), | |
| } | |
| if top_win_rate < breakeven_wr: | |
| flags.append( | |
| f"Top-decile win rate ({top_win_rate:.1%}) below breakeven ({breakeven_wr:.1%}) " | |
| f"for {pt_multiplier}×PT / {sl_multiplier}×SL" | |
| ) | |
| if ev < 0: | |
| flags.append("Negative expected value in top decile — signals do not capture asymmetry") | |
| return DimensionResult( | |
| name="asymmetry", score=round(score, 2), weight=0.15, | |
| details=details, flags=flags | |
| ) |