""" analytics/execution_layer.py Tier 5A — Execution Layer (Alpha Release) Post-model enrichment pass operating exclusively on already-computed outputs (model probs + book odds). No simulation logic, no probability calculations, no model changes. Entry point: enrich_with_execution_layer(df) → df with execution fields added. """ from __future__ import annotations import statistics from typing import Any import pandas as pd from analytics.no_vig_props import american_to_implied_prob # --------------------------------------------------------------------------- # Thresholds # --------------------------------------------------------------------------- OUTLIER_THRESHOLD = 0.03 # 3pp deviation from median → outlier STALE_THRESHOLD = 0.025 # 2.5pp worse than median → stale book AGGRESSIVE_THRESHOLD = 0.02 # 2pp better than median → aggressive/timing flag _TIMESTAMP_KEYS = ("last_update", "timestamp", "odds_timestamp", "updated_at") # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _safe_float(val: Any, default: float | None = None) -> float | None: if val is None: return default try: return float(val) except (TypeError, ValueError): return default def _safe_implied(odds: Any) -> float | None: if odds is None: return None try: return american_to_implied_prob(odds) except Exception: return None def _make_player_game_key(row: pd.Series) -> str: explicit_key = str(row.get("player_event_market_key") or "").strip() if explicit_key and explicit_key not in ("nan", "None", ""): return explicit_key event_id = str(row.get("event_id") or "").strip() player_name = str(row.get("player_name") or "").strip() market_family = str(row.get("market_family") or row.get("market") or "").strip() threshold = str(row.get("threshold") or "").strip() if event_id and event_id not in ("nan", "None", ""): return f"{event_id}|{player_name}|{market_family}|{threshold}" away = str(row.get("away_team") or "").strip() home = str(row.get("home_team") or "").strip() return f"{away}|{home}|{player_name}|{market_family}|{threshold}" def _make_game_key(row: pd.Series) -> str: event_id = str(row.get("event_id") or "").strip() if event_id and event_id not in ("nan", "None", ""): return event_id away = str(row.get("away_team") or "").strip() home = str(row.get("home_team") or "").strip() return f"{away}_{home}" # --------------------------------------------------------------------------- # Task 1 — Market Disagreement # --------------------------------------------------------------------------- def _compute_market_fields(df: pd.DataFrame) -> pd.DataFrame: """Add best_price, median_price, market_width, market_outlier_flag, stale_book_flag.""" df = df.copy() # Build scoped player-game keys keys = df.apply(_make_player_game_key, axis=1) df["_pg_key"] = keys # Pre-compute implied probs for each row df["_implied"] = df["odds_american"].apply(_safe_implied) # Group stats per scoped player-game key group_stats: dict[str, dict] = {} for key, grp in df.groupby("_pg_key"): implied_vals = [v for v in grp["_implied"].tolist() if v is not None] if not implied_vals: group_stats[key] = { "best": None, "worst": None, "median": None, "width": None } continue best = min(implied_vals) # lowest implied = best for bettor worst = max(implied_vals) med = statistics.median(implied_vals) width = abs(worst - best) group_stats[key] = {"best": best, "worst": worst, "median": med, "width": width} best_prices: list[float | None] = [] median_prices: list[float | None] = [] market_widths: list[float | None] = [] outlier_flags: list[bool] = [] stale_flags: list[bool] = [] for _, row in df.iterrows(): key = row["_pg_key"] stats = group_stats.get(key, {}) this_implied = row["_implied"] best_prices.append(stats.get("best")) median_prices.append(stats.get("median")) market_widths.append(stats.get("width")) med = stats.get("median") if this_implied is not None and med is not None: outlier_flags.append(abs(this_implied - med) > OUTLIER_THRESHOLD) stale_flags.append((this_implied - med) > STALE_THRESHOLD) else: outlier_flags.append(False) stale_flags.append(False) df["best_price"] = best_prices df["median_price"] = median_prices df["market_width"] = market_widths df["market_outlier_flag"] = outlier_flags df["stale_book_flag"] = stale_flags df.drop(columns=["_pg_key", "_implied"], inplace=True) return df # --------------------------------------------------------------------------- # Task 2 — Edge Quality Filters # --------------------------------------------------------------------------- def _compute_edge_quality(df: pd.DataFrame) -> pd.DataFrame: """Add execution_confidence_score, execution_volatility_score, execution_signal_strength_score, edge_raw, edge_filtered, edge_filter_flags.""" df = df.copy() conf_scores: list[float] = [] vol_scores: list[float] = [] sig_scores: list[float] = [] edge_raws: list[float | None] = [] edge_filtered_vals: list[float | None] = [] edge_flag_strs: list[str] = [] for _, row in df.iterrows(): source = str(row.get("model_hr_prob_source") or "unavailable") context_applied = bool(row.get("pregame_context_applied") or False) edge_raw = _safe_float(row.get("edge")) market_width = _safe_float(row.get("market_width"), default=0.0) # Context adj magnitude pitcher_adj = _safe_float(row.get("pregame_pitcher_context_adj"), default=0.0) park_adj = _safe_float(row.get("pregame_park_context_adj"), default=0.0) context_mag = abs(pitcher_adj or 0.0) + abs(park_adj or 0.0) # Confidence score if source == "internal_model_baseline": conf = 1.0 if context_applied else 0.7 elif source == "shared_pregame_engine": conf = 0.95 if context_applied else 0.80 else: conf = 0.3 # Volatility score (weighted blend, range [0, 1]) width_component = min(1.0, (market_width or 0.0) / 0.10) ctx_component = min(1.0, context_mag / 0.02) if context_mag > 0 else 0.0 vol = 0.7 * width_component + 0.3 * ctx_component # Signal strength score if source == "internal_model_baseline": sig = 0.7 + (0.3 if context_applied else 0.0) elif source == "shared_pregame_engine": sig = 0.85 + (0.15 if context_applied else 0.0) else: sig = 0.1 sig = min(1.0, sig) # Edge filtered + flags if edge_raw is None: edge_filt = None flags = "clean" else: edge_filt = edge_raw applied: list[str] = [] # Confidence penalty if conf < 0.5: scale = conf / 0.5 edge_filt = edge_filt * scale applied.append("conf_penalty") # Volatility penalty vol_pen = min(0.02, vol * 0.02) if vol_pen > 0: edge_filt = edge_filt - vol_pen applied.append("vol_penalty") # Weak signal suppression if sig < 0.3: edge_filt = edge_filt * 0.5 applied.append("weak_signal") flags = ",".join(applied) if applied else "clean" conf_scores.append(conf) vol_scores.append(vol) sig_scores.append(sig) edge_raws.append(edge_raw) edge_filtered_vals.append(edge_filt) edge_flag_strs.append(flags) df["execution_confidence_score"] = conf_scores df["execution_volatility_score"] = vol_scores df["execution_signal_strength_score"] = sig_scores df["edge_raw"] = edge_raws df["edge_filtered"] = edge_filtered_vals df["edge_filter_flags"] = edge_flag_strs return df # --------------------------------------------------------------------------- # Task 3 — Timing Heuristics # --------------------------------------------------------------------------- def _compute_timing_fields(df: pd.DataFrame) -> pd.DataFrame: """Add timing_flag, timing_reason.""" df = df.copy() timing_flags: list[bool] = [] timing_reasons: list[str] = [] for _, row in df.iterrows(): reasons: list[str] = [] # Aggressive price: this book > 2pp better than median (lower implied) this_implied = _safe_implied(row.get("odds_american")) median_price = _safe_float(row.get("median_price")) if ( this_implied is not None and median_price is not None and (median_price - this_implied) > AGGRESSIVE_THRESHOLD ): reasons.append("aggressive_price") # Timestamp presence has_ts = any( row.get(k) is not None and str(row.get(k)).strip() not in ("", "nan", "None") for k in _TIMESTAMP_KEYS ) if has_ts: reasons.append("has_timestamp") if not reasons: reasons.append("none") timing_flags.append(len(reasons) > 1 or (len(reasons) == 1 and reasons[0] != "none")) timing_reasons.append(",".join(reasons)) df["timing_flag"] = timing_flags df["timing_reason"] = timing_reasons return df # --------------------------------------------------------------------------- # Task 4 — Correlation Awareness # --------------------------------------------------------------------------- def _compute_correlation_fields(df: pd.DataFrame) -> pd.DataFrame: """Add correlation_flag, correlation_direction.""" df = df.copy() # Count distinct players per game game_keys = df.apply(_make_game_key, axis=1) df["_game_key"] = game_keys player_counts: dict[str, int] = {} for key, grp in df.groupby("_game_key"): player_counts[key] = grp["player_name"].nunique() corr_directions: list[str] = [] for _, row in df.iterrows(): key = row["_game_key"] count = player_counts.get(key, 1) corr_directions.append("positive_stacked" if count > 2 else "positive") df["correlation_flag"] = True # always True for HR props df["correlation_direction"] = corr_directions df.drop(columns=["_game_key"], inplace=True) return df # --------------------------------------------------------------------------- # Task 5 — Final Execution Score # --------------------------------------------------------------------------- def _compute_execution_score(df: pd.DataFrame) -> pd.DataFrame: """Add final_recommendation_score.""" df = df.copy() scores: list[float | None] = [] for _, row in df.iterrows(): edge_filtered = _safe_float(row.get("edge_filtered")) if edge_filtered is None: scores.append(None) continue confidence_score = _safe_float(row.get("execution_confidence_score"), default=0.3) volatility_score = _safe_float(row.get("execution_volatility_score"), default=0.0) market_width = _safe_float(row.get("market_width"), default=0.0) timing_flag = bool(row.get("timing_flag") or False) base = edge_filtered * (0.4 + (confidence_score or 0.0) * 0.6) vol_penalty = min(0.015, (volatility_score or 0.0) * 0.015) market_bonus = min(0.01, max(0.0, 0.01 - (market_width or 0.0) * 0.5)) timing_bonus = 0.005 if timing_flag else 0.0 score = base - vol_penalty + market_bonus + timing_bonus score = max(-0.30, min(0.30, score)) scores.append(score) df["final_recommendation_score"] = scores return df # --------------------------------------------------------------------------- # Public entry point # --------------------------------------------------------------------------- def enrich_with_execution_layer(df: pd.DataFrame) -> pd.DataFrame: """ Run all five execution-layer passes on the mapped props DataFrame. Passes (in order): 1. Market Disagreement — best_price, median_price, market_width, flags 2. Edge Quality — execution confidence, volatility, signal, edge_filtered 3. Timing Heuristics — timing_flag, timing_reason 4. Correlation — correlation_flag, correlation_direction 5. Execution Score — final_recommendation_score Returns the enriched DataFrame. Does not modify simulation logic or model probabilities. """ if df.empty: return df df = _compute_market_fields(df) df = _compute_edge_quality(df) df = _compute_timing_fields(df) df = _compute_correlation_fields(df) df = _compute_execution_score(df) return df