2026_MLB_Model / models /batter_baseline.py
Syntrex's picture
Accuracy overhaul: pitcher resolution logging, baseline recalibration, vig fix, XGBoost blend
21151ce
raw
history blame
12.5 kB
from __future__ import annotations
from typing import Any
import numpy as np
import pandas as pd
from config.settings import BASELINE_HIT_PROB, BASELINE_HR_PROB
def _safe_mean(series: pd.Series) -> float | None:
numeric = pd.to_numeric(series, errors="coerce").dropna()
if numeric.empty:
return None
return float(numeric.mean())
def _safe_rate(series: pd.Series) -> float | None:
numeric = pd.to_numeric(series, errors="coerce").dropna()
if numeric.empty:
return None
return float(numeric.mean())
def _percentile(series: pd.Series, q: float) -> float | None:
numeric = pd.to_numeric(series, errors="coerce").dropna()
if numeric.empty:
return None
return float(numeric.quantile(q))
def _classify_batted_ball_direction_from_spray_angle(
spray_angle: Any,
stand: str,
) -> str:
"""
First-pass direction classifier.
Assumptions:
- negative spray angle tends to be left-field side
- positive spray angle tends to be right-field side
- for RHB, pull is left-field side
- for LHB, pull is right-field side
Returns one of:
- "pull"
- "center"
- "oppo"
- "unknown"
"""
try:
angle = float(spray_angle)
except Exception:
return "unknown"
stand = str(stand or "").strip().upper()
if stand not in {"R", "L"}:
return "unknown"
if -15 <= angle <= 15:
return "center"
if stand == "R":
if angle < -15:
return "pull"
if angle > 15:
return "oppo"
if stand == "L":
if angle > 15:
return "pull"
if angle < -15:
return "oppo"
return "unknown"
def _build_direction_series(df: pd.DataFrame) -> pd.Series:
"""
Priority:
1. use existing hit_direction if present
2. infer from spray_angle + stand if present
3. return unknown
"""
if "hit_direction" in df.columns:
direction_series = df["hit_direction"].astype(str).str.strip().str.lower()
direction_series = direction_series.replace(
{
"pulled": "pull",
"pull-side": "pull",
"opposite": "oppo",
"opposite field": "oppo",
"center field": "center",
"middle": "center",
}
)
return direction_series
if "spray_angle" in df.columns and "stand" in df.columns:
return df.apply(
lambda row: _classify_batted_ball_direction_from_spray_angle(
row.get("spray_angle"),
row.get("stand"),
),
axis=1,
)
return pd.Series(["unknown"] * len(df), index=df.index)
def _build_barrel_mask(
launch_speed: pd.Series,
launch_angle: pd.Series,
) -> pd.Series:
"""
First-pass barrel approximation.
"""
valid = pd.DataFrame(
{
"launch_speed": pd.to_numeric(launch_speed, errors="coerce"),
"launch_angle": pd.to_numeric(launch_angle, errors="coerce"),
}
).dropna()
mask = pd.Series(False, index=launch_speed.index)
if valid.empty:
return mask
barrel_mask_valid = (
((valid["launch_speed"] >= 98) & (valid["launch_angle"].between(26, 30)))
| ((valid["launch_speed"] >= 99) & (valid["launch_angle"].between(25, 31)))
| ((valid["launch_speed"] >= 100) & (valid["launch_angle"].between(23, 33)))
| ((valid["launch_speed"] >= 102) & (valid["launch_angle"].between(20, 35)))
)
mask.loc[valid.index] = barrel_mask_valid
return mask
def build_batter_feature_row(statcast_df: pd.DataFrame, player_name: str) -> dict[str, Any]:
_EMPTY_SHAPE = {
"gb_rate": 0.0,
"ld_rate": 0.0,
"fb_rate": 0.0,
"popup_rate": 0.0,
"la_sweet_spot_rate": 0.0,
"la_optimal_hr_rate": 0.0,
}
if statcast_df.empty or "player_name" not in statcast_df.columns:
return {
"player_name": player_name,
"ev90": None,
"avg_launch_angle": None,
"barrel_rate": None,
"hard_hit_rate": None,
"xwoba": None,
"pull_rate": None,
"air_ball_rate": None,
"pull_air_rate": None,
"pulled_hard_air_rate": None,
"pulled_barrel_rate": None,
"plate_appearances": 0,
"batter_stand": "R",
**_EMPTY_SHAPE,
}
df = statcast_df[statcast_df["player_name"].astype(str) == str(player_name)].copy()
if df.empty:
return {
"player_name": player_name,
"ev90": None,
"avg_launch_angle": None,
"barrel_rate": None,
"hard_hit_rate": None,
"xwoba": None,
"pull_rate": None,
"air_ball_rate": None,
"pull_air_rate": None,
"pulled_hard_air_rate": None,
"pulled_barrel_rate": None,
"plate_appearances": 0,
"batter_stand": "R",
**_EMPTY_SHAPE,
}
launch_speed = pd.to_numeric(df.get("launch_speed"), errors="coerce")
launch_angle = pd.to_numeric(df.get("launch_angle"), errors="coerce")
estimated_woba = pd.to_numeric(
df.get("xwoba") if "xwoba" in df.columns else df.get("estimated_woba_using_speedangle"),
errors="coerce",
)
ev90 = _percentile(launch_speed, 0.90)
avg_launch_angle = _safe_mean(launch_angle)
xwoba = _safe_mean(estimated_woba)
hard_hit_rate = None
if not launch_speed.dropna().empty:
hard_hit_rate = float((launch_speed >= 95).mean())
barrel_rate = None
barrel_mask_series = _build_barrel_mask(launch_speed, launch_angle)
if len(barrel_mask_series) == len(df):
barrel_rate = float(barrel_mask_series.mean())
# ----------------------------
# Direction + HR-shape metrics
# ----------------------------
# Step 1: build spray_angle_series — prefer spray_angle column; fall back to hc_x/hc_y
_sa_raw = pd.to_numeric(df.get("spray_angle", pd.Series(dtype=float)), errors="coerce")
if _sa_raw.notna().any():
spray_angle_series = _sa_raw
elif "hc_x" in df.columns and "hc_y" in df.columns:
_hc_x = pd.to_numeric(df["hc_x"], errors="coerce")
_hc_y = pd.to_numeric(df["hc_y"], errors="coerce")
spray_angle_series = pd.Series(
np.degrees(np.arctan2(125.42 - _hc_y, _hc_x - 125.42)),
index=df.index,
)
else:
spray_angle_series = None
# Step 2: validate direction availability
valid_direction_mask = spray_angle_series.notna() if spray_angle_series is not None else None
# Step 3: air-ball and hard-hit masks (independent of direction)
air_ball_mask = pd.Series(False, index=df.index)
valid_la = launch_angle.dropna()
if not valid_la.empty:
air_ball_mask = launch_angle >= 10
hard_hit_mask = pd.Series(False, index=df.index)
valid_ev = launch_speed.dropna()
if not valid_ev.empty:
hard_hit_mask = launch_speed >= 95
# Step 4: direction-dependent masks — only created when valid direction data exists
if valid_direction_mask is not None and valid_direction_mask.any():
pull_mask = spray_angle_series < -15
pulled_air_mask = pull_mask & air_ball_mask
pulled_hard_air_mask = pull_mask & air_ball_mask & hard_hit_mask
pulled_barrel_mask = pull_mask & barrel_mask_series
else:
pull_mask = None
pulled_air_mask = None
pulled_hard_air_mask = None
pulled_barrel_mask = None
# Step 5: compute rates — None when direction data is absent; never 0.0 for unknown
pull_rate = None
if pull_mask is not None and valid_direction_mask is not None and valid_direction_mask.any():
pull_rate = float(pull_mask[valid_direction_mask].mean())
air_ball_rate = None
if len(air_ball_mask) > 0:
air_ball_rate = float(air_ball_mask.mean())
pull_air_rate = float(pulled_air_mask.mean()) if pulled_air_mask is not None else None
pulled_hard_air_rate = float(pulled_hard_air_mask.mean()) if pulled_hard_air_mask is not None else None
pulled_barrel_rate = float(pulled_barrel_mask.mean()) if pulled_barrel_mask is not None else None
# E1a: Extract batter handedness
batter_stand = "R"
stand_col = "batter_stand" if "batter_stand" in df.columns else "stand"
if stand_col in df.columns:
mode_vals = df[stand_col].dropna().mode()
if not mode_vals.empty:
batter_stand = str(mode_vals.iloc[0]).strip()
# ----------------------------
# 12A: Batted-ball shape taxonomy
# ----------------------------
gb_rate = 0.0
ld_rate = 0.0
fb_rate = 0.0
popup_rate = 0.0
la_sweet_spot_rate = 0.0
la_optimal_hr_rate = 0.0
if "bb_type" in df.columns:
contact_df = df[df["bb_type"].notna() & (df["bb_type"].astype(str).str.strip() != "")]
if len(contact_df) >= 5:
bb = contact_df["bb_type"].astype(str).str.strip().str.lower()
n = float(len(bb))
gb_rate = float((bb == "ground_ball").sum()) / n
ld_rate = float((bb == "line_drive").sum()) / n
fb_rate = float((bb == "fly_ball").sum()) / n
popup_rate = float((bb == "popup").sum()) / n
la_valid = launch_angle.dropna()
if len(la_valid) >= 5:
la_sweet_spot_rate = float(((la_valid >= 8) & (la_valid <= 32)).mean())
la_optimal_hr_rate = float(((la_valid >= 25) & (la_valid <= 35)).mean())
return {
"player_name": player_name,
"ev90": ev90,
"avg_launch_angle": avg_launch_angle,
"barrel_rate": barrel_rate,
"hard_hit_rate": hard_hit_rate,
"xwoba": xwoba,
"pull_rate": pull_rate,
"air_ball_rate": air_ball_rate,
"pull_air_rate": pull_air_rate,
"pulled_hard_air_rate": pulled_hard_air_rate,
"pulled_barrel_rate": pulled_barrel_rate,
"plate_appearances": int(len(df)),
"batter_stand": batter_stand,
"gb_rate": gb_rate,
"ld_rate": ld_rate,
"fb_rate": fb_rate,
"popup_rate": popup_rate,
"la_sweet_spot_rate": la_sweet_spot_rate,
"la_optimal_hr_rate": la_optimal_hr_rate,
}
def compute_batter_baseline(feature_row: dict[str, Any]) -> dict[str, float]:
"""
EV90 is the primary quality signal.
Outputs baseline probabilities before pitcher/context adjustments.
"""
ev90 = feature_row.get("ev90")
avg_launch_angle = feature_row.get("avg_launch_angle")
barrel_rate = feature_row.get("barrel_rate")
hard_hit_rate = feature_row.get("hard_hit_rate")
xwoba = feature_row.get("xwoba")
hit_prob = BASELINE_HIT_PROB # 0.24 — empirical per-PA hit rate
hr_prob = BASELINE_HR_PROB # 0.036 — empirical MLB 2024 per-PA HR rate
tb2p_prob = 0.17
if ev90 is not None:
hit_prob += max(-0.04, min(0.07, (float(ev90) - 100.0) * 0.004))
hr_prob += max(-0.02, min(0.05, (float(ev90) - 101.0) * 0.0035))
tb2p_prob += max(-0.03, min(0.06, (float(ev90) - 100.0) * 0.003))
if hard_hit_rate is not None:
hit_prob += max(-0.02, min(0.04, (float(hard_hit_rate) - 0.40) * 0.15))
hr_prob += max(-0.015, min(0.035, (float(hard_hit_rate) - 0.40) * 0.12))
tb2p_prob += max(-0.02, min(0.04, (float(hard_hit_rate) - 0.40) * 0.13))
if barrel_rate is not None:
hr_prob += max(-0.01, min(0.05, (float(barrel_rate) - 0.07) * 0.60))
tb2p_prob += max(-0.015, min(0.04, (float(barrel_rate) - 0.07) * 0.45))
if xwoba is not None:
hit_prob += max(-0.03, min(0.05, (float(xwoba) - 0.320) * 0.45))
tb2p_prob += max(-0.02, min(0.04, (float(xwoba) - 0.320) * 0.30))
if avg_launch_angle is not None:
la = float(avg_launch_angle)
if 10 <= la <= 22:
hit_prob += 0.01
if 16 <= la <= 28:
hr_prob += 0.008
# E2: Handedness-aware baseline (LHB general advantage)
stand = feature_row.get("batter_stand", "R")
if stand == "L":
hr_prob = min(hr_prob + 0.004, 0.22)
hit_prob = min(hit_prob + 0.003, 0.50)
hit_prob = max(0.05, min(0.50, hit_prob))
hr_prob = max(0.005, min(0.22, hr_prob))
tb2p_prob = max(0.03, min(0.42, tb2p_prob))
return {
"hit_prob_base": hit_prob,
"hr_prob_base": hr_prob,
"tb2p_prob_base": tb2p_prob,
}