from __future__ import annotations from typing import Any import numpy as np import pandas as pd from config.settings import BASELINE_HIT_PROB, BASELINE_HR_PROB def _safe_mean(series: pd.Series) -> float | None: numeric = pd.to_numeric(series, errors="coerce").dropna() if numeric.empty: return None return float(numeric.mean()) def _safe_rate(series: pd.Series) -> float | None: numeric = pd.to_numeric(series, errors="coerce").dropna() if numeric.empty: return None return float(numeric.mean()) def _percentile(series: pd.Series, q: float) -> float | None: numeric = pd.to_numeric(series, errors="coerce").dropna() if numeric.empty: return None return float(numeric.quantile(q)) def _classify_batted_ball_direction_from_spray_angle( spray_angle: Any, stand: str, ) -> str: """ First-pass direction classifier. Assumptions: - negative spray angle tends to be left-field side - positive spray angle tends to be right-field side - for RHB, pull is left-field side - for LHB, pull is right-field side Returns one of: - "pull" - "center" - "oppo" - "unknown" """ try: angle = float(spray_angle) except Exception: return "unknown" stand = str(stand or "").strip().upper() if stand not in {"R", "L"}: return "unknown" if -15 <= angle <= 15: return "center" if stand == "R": if angle < -15: return "pull" if angle > 15: return "oppo" if stand == "L": if angle > 15: return "pull" if angle < -15: return "oppo" return "unknown" def _build_direction_series(df: pd.DataFrame) -> pd.Series: """ Priority: 1. use existing hit_direction if present 2. infer from spray_angle + stand if present 3. return unknown """ if "hit_direction" in df.columns: direction_series = df["hit_direction"].astype(str).str.strip().str.lower() direction_series = direction_series.replace( { "pulled": "pull", "pull-side": "pull", "opposite": "oppo", "opposite field": "oppo", "center field": "center", "middle": "center", } ) return direction_series if "spray_angle" in df.columns and "stand" in df.columns: return df.apply( lambda row: _classify_batted_ball_direction_from_spray_angle( row.get("spray_angle"), row.get("stand"), ), axis=1, ) return pd.Series(["unknown"] * len(df), index=df.index) def _build_barrel_mask( launch_speed: pd.Series, launch_angle: pd.Series, ) -> pd.Series: """ First-pass barrel approximation. """ valid = pd.DataFrame( { "launch_speed": pd.to_numeric(launch_speed, errors="coerce"), "launch_angle": pd.to_numeric(launch_angle, errors="coerce"), } ).dropna() mask = pd.Series(False, index=launch_speed.index) if valid.empty: return mask barrel_mask_valid = ( ((valid["launch_speed"] >= 98) & (valid["launch_angle"].between(26, 30))) | ((valid["launch_speed"] >= 99) & (valid["launch_angle"].between(25, 31))) | ((valid["launch_speed"] >= 100) & (valid["launch_angle"].between(23, 33))) | ((valid["launch_speed"] >= 102) & (valid["launch_angle"].between(20, 35))) ) mask.loc[valid.index] = barrel_mask_valid return mask def build_batter_feature_row(statcast_df: pd.DataFrame, player_name: str) -> dict[str, Any]: _EMPTY_SHAPE = { "gb_rate": 0.0, "ld_rate": 0.0, "fb_rate": 0.0, "popup_rate": 0.0, "la_sweet_spot_rate": 0.0, "la_optimal_hr_rate": 0.0, } if statcast_df.empty or "player_name" not in statcast_df.columns: return { "player_name": player_name, "ev90": None, "avg_launch_angle": None, "barrel_rate": None, "hard_hit_rate": None, "xwoba": None, "pull_rate": None, "air_ball_rate": None, "pull_air_rate": None, "pulled_hard_air_rate": None, "pulled_barrel_rate": None, "plate_appearances": 0, "batter_stand": "R", **_EMPTY_SHAPE, } df = statcast_df[statcast_df["player_name"].astype(str) == str(player_name)].copy() if df.empty: return { "player_name": player_name, "ev90": None, "avg_launch_angle": None, "barrel_rate": None, "hard_hit_rate": None, "xwoba": None, "pull_rate": None, "air_ball_rate": None, "pull_air_rate": None, "pulled_hard_air_rate": None, "pulled_barrel_rate": None, "plate_appearances": 0, "batter_stand": "R", **_EMPTY_SHAPE, } launch_speed = pd.to_numeric(df.get("launch_speed"), errors="coerce") launch_angle = pd.to_numeric(df.get("launch_angle"), errors="coerce") estimated_woba = pd.to_numeric( df.get("xwoba") if "xwoba" in df.columns else df.get("estimated_woba_using_speedangle"), errors="coerce", ) ev90 = _percentile(launch_speed, 0.90) avg_launch_angle = _safe_mean(launch_angle) xwoba = _safe_mean(estimated_woba) hard_hit_rate = None if not launch_speed.dropna().empty: hard_hit_rate = float((launch_speed >= 95).mean()) barrel_rate = None barrel_mask_series = _build_barrel_mask(launch_speed, launch_angle) if len(barrel_mask_series) == len(df): barrel_rate = float(barrel_mask_series.mean()) # ---------------------------- # Direction + HR-shape metrics # ---------------------------- # Step 1: build spray_angle_series — prefer spray_angle column; fall back to hc_x/hc_y _sa_raw = pd.to_numeric(df.get("spray_angle", pd.Series(dtype=float)), errors="coerce") if _sa_raw.notna().any(): spray_angle_series = _sa_raw elif "hc_x" in df.columns and "hc_y" in df.columns: _hc_x = pd.to_numeric(df["hc_x"], errors="coerce") _hc_y = pd.to_numeric(df["hc_y"], errors="coerce") spray_angle_series = pd.Series( np.degrees(np.arctan2(125.42 - _hc_y, _hc_x - 125.42)), index=df.index, ) else: spray_angle_series = None # Step 2: validate direction availability valid_direction_mask = spray_angle_series.notna() if spray_angle_series is not None else None # Step 3: air-ball and hard-hit masks (independent of direction) air_ball_mask = pd.Series(False, index=df.index) valid_la = launch_angle.dropna() if not valid_la.empty: air_ball_mask = launch_angle >= 10 hard_hit_mask = pd.Series(False, index=df.index) valid_ev = launch_speed.dropna() if not valid_ev.empty: hard_hit_mask = launch_speed >= 95 # Step 4: direction-dependent masks — only created when valid direction data exists if valid_direction_mask is not None and valid_direction_mask.any(): pull_mask = spray_angle_series < -15 pulled_air_mask = pull_mask & air_ball_mask pulled_hard_air_mask = pull_mask & air_ball_mask & hard_hit_mask pulled_barrel_mask = pull_mask & barrel_mask_series else: pull_mask = None pulled_air_mask = None pulled_hard_air_mask = None pulled_barrel_mask = None # Step 5: compute rates — None when direction data is absent; never 0.0 for unknown pull_rate = None if pull_mask is not None and valid_direction_mask is not None and valid_direction_mask.any(): pull_rate = float(pull_mask[valid_direction_mask].mean()) air_ball_rate = None if len(air_ball_mask) > 0: air_ball_rate = float(air_ball_mask.mean()) pull_air_rate = float(pulled_air_mask.mean()) if pulled_air_mask is not None else None pulled_hard_air_rate = float(pulled_hard_air_mask.mean()) if pulled_hard_air_mask is not None else None pulled_barrel_rate = float(pulled_barrel_mask.mean()) if pulled_barrel_mask is not None else None # E1a: Extract batter handedness batter_stand = "R" stand_col = "batter_stand" if "batter_stand" in df.columns else "stand" if stand_col in df.columns: mode_vals = df[stand_col].dropna().mode() if not mode_vals.empty: batter_stand = str(mode_vals.iloc[0]).strip() # ---------------------------- # 12A: Batted-ball shape taxonomy # ---------------------------- gb_rate = 0.0 ld_rate = 0.0 fb_rate = 0.0 popup_rate = 0.0 la_sweet_spot_rate = 0.0 la_optimal_hr_rate = 0.0 if "bb_type" in df.columns: contact_df = df[df["bb_type"].notna() & (df["bb_type"].astype(str).str.strip() != "")] if len(contact_df) >= 5: bb = contact_df["bb_type"].astype(str).str.strip().str.lower() n = float(len(bb)) gb_rate = float((bb == "ground_ball").sum()) / n ld_rate = float((bb == "line_drive").sum()) / n fb_rate = float((bb == "fly_ball").sum()) / n popup_rate = float((bb == "popup").sum()) / n la_valid = launch_angle.dropna() if len(la_valid) >= 5: la_sweet_spot_rate = float(((la_valid >= 8) & (la_valid <= 32)).mean()) la_optimal_hr_rate = float(((la_valid >= 25) & (la_valid <= 35)).mean()) return { "player_name": player_name, "ev90": ev90, "avg_launch_angle": avg_launch_angle, "barrel_rate": barrel_rate, "hard_hit_rate": hard_hit_rate, "xwoba": xwoba, "pull_rate": pull_rate, "air_ball_rate": air_ball_rate, "pull_air_rate": pull_air_rate, "pulled_hard_air_rate": pulled_hard_air_rate, "pulled_barrel_rate": pulled_barrel_rate, "plate_appearances": int(len(df)), "batter_stand": batter_stand, "gb_rate": gb_rate, "ld_rate": ld_rate, "fb_rate": fb_rate, "popup_rate": popup_rate, "la_sweet_spot_rate": la_sweet_spot_rate, "la_optimal_hr_rate": la_optimal_hr_rate, } def compute_batter_baseline(feature_row: dict[str, Any]) -> dict[str, float]: """ EV90 is the primary quality signal. Outputs baseline probabilities before pitcher/context adjustments. """ ev90 = feature_row.get("ev90") avg_launch_angle = feature_row.get("avg_launch_angle") barrel_rate = feature_row.get("barrel_rate") hard_hit_rate = feature_row.get("hard_hit_rate") xwoba = feature_row.get("xwoba") hit_prob = BASELINE_HIT_PROB # 0.24 — empirical per-PA hit rate hr_prob = BASELINE_HR_PROB # 0.036 — empirical MLB 2024 per-PA HR rate tb2p_prob = 0.17 if ev90 is not None: hit_prob += max(-0.04, min(0.07, (float(ev90) - 100.0) * 0.004)) hr_prob += max(-0.02, min(0.05, (float(ev90) - 101.0) * 0.0035)) tb2p_prob += max(-0.03, min(0.06, (float(ev90) - 100.0) * 0.003)) if hard_hit_rate is not None: hit_prob += max(-0.02, min(0.04, (float(hard_hit_rate) - 0.40) * 0.15)) hr_prob += max(-0.015, min(0.035, (float(hard_hit_rate) - 0.40) * 0.12)) tb2p_prob += max(-0.02, min(0.04, (float(hard_hit_rate) - 0.40) * 0.13)) if barrel_rate is not None: hr_prob += max(-0.01, min(0.05, (float(barrel_rate) - 0.07) * 0.60)) tb2p_prob += max(-0.015, min(0.04, (float(barrel_rate) - 0.07) * 0.45)) if xwoba is not None: hit_prob += max(-0.03, min(0.05, (float(xwoba) - 0.320) * 0.45)) tb2p_prob += max(-0.02, min(0.04, (float(xwoba) - 0.320) * 0.30)) if avg_launch_angle is not None: la = float(avg_launch_angle) if 10 <= la <= 22: hit_prob += 0.01 if 16 <= la <= 28: hr_prob += 0.008 # E2: Handedness-aware baseline (LHB general advantage) stand = feature_row.get("batter_stand", "R") if stand == "L": hr_prob = min(hr_prob + 0.004, 0.22) hit_prob = min(hit_prob + 0.003, 0.50) hit_prob = max(0.05, min(0.50, hit_prob)) hr_prob = max(0.005, min(0.22, hr_prob)) tb2p_prob = max(0.03, min(0.42, tb2p_prob)) return { "hit_prob_base": hit_prob, "hr_prob_base": hr_prob, "tb2p_prob_base": tb2p_prob, }