2026_MLB_Model / analytics /execution_layer.py
Syntrex's picture
Upgrade props HR calibration and Fangraphs fallback
0010624
raw
history blame
13.1 kB
"""
analytics/execution_layer.py
Tier 5A — Execution Layer (Alpha Release)
Post-model enrichment pass operating exclusively on already-computed outputs
(model probs + book odds). No simulation logic, no probability calculations,
no model changes.
Entry point: enrich_with_execution_layer(df) → df with execution fields added.
"""
from __future__ import annotations
import statistics
from typing import Any
import pandas as pd
from analytics.no_vig_props import american_to_implied_prob
# ---------------------------------------------------------------------------
# Thresholds
# ---------------------------------------------------------------------------
OUTLIER_THRESHOLD = 0.03 # 3pp deviation from median → outlier
STALE_THRESHOLD = 0.025 # 2.5pp worse than median → stale book
AGGRESSIVE_THRESHOLD = 0.02 # 2pp better than median → aggressive/timing flag
_TIMESTAMP_KEYS = ("last_update", "timestamp", "odds_timestamp", "updated_at")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _safe_float(val: Any, default: float | None = None) -> float | None:
if val is None:
return default
try:
return float(val)
except (TypeError, ValueError):
return default
def _safe_implied(odds: Any) -> float | None:
if odds is None:
return None
try:
return american_to_implied_prob(odds)
except Exception:
return None
def _make_player_game_key(row: pd.Series) -> str:
explicit_key = str(row.get("player_event_market_key") or "").strip()
if explicit_key and explicit_key not in ("nan", "None", ""):
return explicit_key
event_id = str(row.get("event_id") or "").strip()
player_name = str(row.get("player_name") or "").strip()
market_family = str(row.get("market_family") or row.get("market") or "").strip()
threshold = str(row.get("threshold") or "").strip()
if event_id and event_id not in ("nan", "None", ""):
return f"{event_id}|{player_name}|{market_family}|{threshold}"
away = str(row.get("away_team") or "").strip()
home = str(row.get("home_team") or "").strip()
return f"{away}|{home}|{player_name}|{market_family}|{threshold}"
def _make_game_key(row: pd.Series) -> str:
event_id = str(row.get("event_id") or "").strip()
if event_id and event_id not in ("nan", "None", ""):
return event_id
away = str(row.get("away_team") or "").strip()
home = str(row.get("home_team") or "").strip()
return f"{away}_{home}"
# ---------------------------------------------------------------------------
# Task 1 — Market Disagreement
# ---------------------------------------------------------------------------
def _compute_market_fields(df: pd.DataFrame) -> pd.DataFrame:
"""Add best_price, median_price, market_width, market_outlier_flag, stale_book_flag."""
df = df.copy()
# Build scoped player-game keys
keys = df.apply(_make_player_game_key, axis=1)
df["_pg_key"] = keys
# Pre-compute implied probs for each row
df["_implied"] = df["odds_american"].apply(_safe_implied)
# Group stats per scoped player-game key
group_stats: dict[str, dict] = {}
for key, grp in df.groupby("_pg_key"):
implied_vals = [v for v in grp["_implied"].tolist() if v is not None]
if not implied_vals:
group_stats[key] = {
"best": None, "worst": None, "median": None, "width": None
}
continue
best = min(implied_vals) # lowest implied = best for bettor
worst = max(implied_vals)
med = statistics.median(implied_vals)
width = abs(worst - best)
group_stats[key] = {"best": best, "worst": worst, "median": med, "width": width}
best_prices: list[float | None] = []
median_prices: list[float | None] = []
market_widths: list[float | None] = []
outlier_flags: list[bool] = []
stale_flags: list[bool] = []
for _, row in df.iterrows():
key = row["_pg_key"]
stats = group_stats.get(key, {})
this_implied = row["_implied"]
best_prices.append(stats.get("best"))
median_prices.append(stats.get("median"))
market_widths.append(stats.get("width"))
med = stats.get("median")
if this_implied is not None and med is not None:
outlier_flags.append(abs(this_implied - med) > OUTLIER_THRESHOLD)
stale_flags.append((this_implied - med) > STALE_THRESHOLD)
else:
outlier_flags.append(False)
stale_flags.append(False)
df["best_price"] = best_prices
df["median_price"] = median_prices
df["market_width"] = market_widths
df["market_outlier_flag"] = outlier_flags
df["stale_book_flag"] = stale_flags
df.drop(columns=["_pg_key", "_implied"], inplace=True)
return df
# ---------------------------------------------------------------------------
# Task 2 — Edge Quality Filters
# ---------------------------------------------------------------------------
def _compute_edge_quality(df: pd.DataFrame) -> pd.DataFrame:
"""Add execution_confidence_score, execution_volatility_score, execution_signal_strength_score,
edge_raw, edge_filtered, edge_filter_flags."""
df = df.copy()
conf_scores: list[float] = []
vol_scores: list[float] = []
sig_scores: list[float] = []
edge_raws: list[float | None] = []
edge_filtered_vals: list[float | None] = []
edge_flag_strs: list[str] = []
for _, row in df.iterrows():
source = str(row.get("model_hr_prob_source") or "unavailable")
context_applied = bool(row.get("pregame_context_applied") or False)
edge_raw = _safe_float(row.get("edge"))
market_width = _safe_float(row.get("market_width"), default=0.0)
# Context adj magnitude
pitcher_adj = _safe_float(row.get("pregame_pitcher_context_adj"), default=0.0)
park_adj = _safe_float(row.get("pregame_park_context_adj"), default=0.0)
context_mag = abs(pitcher_adj or 0.0) + abs(park_adj or 0.0)
# Confidence score
if source == "internal_model_baseline":
conf = 1.0 if context_applied else 0.7
elif source == "shared_pregame_engine":
conf = 0.95 if context_applied else 0.80
else:
conf = 0.3
# Volatility score (weighted blend, range [0, 1])
width_component = min(1.0, (market_width or 0.0) / 0.10)
ctx_component = min(1.0, context_mag / 0.02) if context_mag > 0 else 0.0
vol = 0.7 * width_component + 0.3 * ctx_component
# Signal strength score
if source == "internal_model_baseline":
sig = 0.7 + (0.3 if context_applied else 0.0)
elif source == "shared_pregame_engine":
sig = 0.85 + (0.15 if context_applied else 0.0)
else:
sig = 0.1
sig = min(1.0, sig)
# Edge filtered + flags
if edge_raw is None:
edge_filt = None
flags = "clean"
else:
edge_filt = edge_raw
applied: list[str] = []
# Confidence penalty
if conf < 0.5:
scale = conf / 0.5
edge_filt = edge_filt * scale
applied.append("conf_penalty")
# Volatility penalty
vol_pen = min(0.02, vol * 0.02)
if vol_pen > 0:
edge_filt = edge_filt - vol_pen
applied.append("vol_penalty")
# Weak signal suppression
if sig < 0.3:
edge_filt = edge_filt * 0.5
applied.append("weak_signal")
flags = ",".join(applied) if applied else "clean"
conf_scores.append(conf)
vol_scores.append(vol)
sig_scores.append(sig)
edge_raws.append(edge_raw)
edge_filtered_vals.append(edge_filt)
edge_flag_strs.append(flags)
df["execution_confidence_score"] = conf_scores
df["execution_volatility_score"] = vol_scores
df["execution_signal_strength_score"] = sig_scores
df["edge_raw"] = edge_raws
df["edge_filtered"] = edge_filtered_vals
df["edge_filter_flags"] = edge_flag_strs
return df
# ---------------------------------------------------------------------------
# Task 3 — Timing Heuristics
# ---------------------------------------------------------------------------
def _compute_timing_fields(df: pd.DataFrame) -> pd.DataFrame:
"""Add timing_flag, timing_reason."""
df = df.copy()
timing_flags: list[bool] = []
timing_reasons: list[str] = []
for _, row in df.iterrows():
reasons: list[str] = []
# Aggressive price: this book > 2pp better than median (lower implied)
this_implied = _safe_implied(row.get("odds_american"))
median_price = _safe_float(row.get("median_price"))
if (
this_implied is not None
and median_price is not None
and (median_price - this_implied) > AGGRESSIVE_THRESHOLD
):
reasons.append("aggressive_price")
# Timestamp presence
has_ts = any(
row.get(k) is not None and str(row.get(k)).strip() not in ("", "nan", "None")
for k in _TIMESTAMP_KEYS
)
if has_ts:
reasons.append("has_timestamp")
if not reasons:
reasons.append("none")
timing_flags.append(len(reasons) > 1 or (len(reasons) == 1 and reasons[0] != "none"))
timing_reasons.append(",".join(reasons))
df["timing_flag"] = timing_flags
df["timing_reason"] = timing_reasons
return df
# ---------------------------------------------------------------------------
# Task 4 — Correlation Awareness
# ---------------------------------------------------------------------------
def _compute_correlation_fields(df: pd.DataFrame) -> pd.DataFrame:
"""Add correlation_flag, correlation_direction."""
df = df.copy()
# Count distinct players per game
game_keys = df.apply(_make_game_key, axis=1)
df["_game_key"] = game_keys
player_counts: dict[str, int] = {}
for key, grp in df.groupby("_game_key"):
player_counts[key] = grp["player_name"].nunique()
corr_directions: list[str] = []
for _, row in df.iterrows():
key = row["_game_key"]
count = player_counts.get(key, 1)
corr_directions.append("positive_stacked" if count > 2 else "positive")
df["correlation_flag"] = True # always True for HR props
df["correlation_direction"] = corr_directions
df.drop(columns=["_game_key"], inplace=True)
return df
# ---------------------------------------------------------------------------
# Task 5 — Final Execution Score
# ---------------------------------------------------------------------------
def _compute_execution_score(df: pd.DataFrame) -> pd.DataFrame:
"""Add final_recommendation_score."""
df = df.copy()
scores: list[float | None] = []
for _, row in df.iterrows():
edge_filtered = _safe_float(row.get("edge_filtered"))
if edge_filtered is None:
scores.append(None)
continue
confidence_score = _safe_float(row.get("execution_confidence_score"), default=0.3)
volatility_score = _safe_float(row.get("execution_volatility_score"), default=0.0)
market_width = _safe_float(row.get("market_width"), default=0.0)
timing_flag = bool(row.get("timing_flag") or False)
base = edge_filtered * (0.4 + (confidence_score or 0.0) * 0.6)
vol_penalty = min(0.015, (volatility_score or 0.0) * 0.015)
market_bonus = min(0.01, max(0.0, 0.01 - (market_width or 0.0) * 0.5))
timing_bonus = 0.005 if timing_flag else 0.0
score = base - vol_penalty + market_bonus + timing_bonus
score = max(-0.30, min(0.30, score))
scores.append(score)
df["final_recommendation_score"] = scores
return df
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def enrich_with_execution_layer(df: pd.DataFrame) -> pd.DataFrame:
"""
Run all five execution-layer passes on the mapped props DataFrame.
Passes (in order):
1. Market Disagreement — best_price, median_price, market_width, flags
2. Edge Quality — execution confidence, volatility, signal, edge_filtered
3. Timing Heuristics — timing_flag, timing_reason
4. Correlation — correlation_flag, correlation_direction
5. Execution Score — final_recommendation_score
Returns the enriched DataFrame. Does not modify simulation logic or
model probabilities.
"""
if df.empty:
return df
df = _compute_market_fields(df)
df = _compute_edge_quality(df)
df = _compute_timing_fields(df)
df = _compute_correlation_fields(df)
df = _compute_execution_score(df)
return df