""" Batch 10: Physics-aware trajectory modeling from raw Statcast kinematic fields. Uses: release_pos_x/y/z, vx0/vy0/vz0, ax/ay/az, plate_x/plate_z, pitch_type Provides: - Pitch trajectory reconstruction (early-flight x/z at t=0.167s) - Tunneling metric: pitch types look similar early, diverge at plate - Release consistency: variance of release_pos_x/z across pitch types - Arsenal deception score: weighted combination of tunneling + consistency """ from __future__ import annotations import logging import math from typing import Any import numpy as np import pandas as pd from data.odds_name_map import normalize_pitcher_name as _normalize_name_text from models.pitcher_adjustment import _to_last_first_variants logger = logging.getLogger(__name__) # ~5 ft of travel from release point, consistent with early batter read window _EARLY_FLIGHT_T = 0.167 # seconds _TRAJ_COLS = ["release_pos_x", "release_pos_z", "vx0", "vz0", "ax", "az"] _PLATE_COLS = ["plate_x", "plate_z"] _RELEASE_COLS = ["release_pos_x", "release_pos_y", "release_pos_z"] def _filter_pitcher_df( statcast_df: pd.DataFrame, pitcher_name: str, pitcher_id: int | None = None, ) -> pd.DataFrame: if statcast_df.empty or "player_name" not in statcast_df.columns: return pd.DataFrame() if pitcher_id is not None and "pitcher" in statcast_df.columns: try: ids = pd.to_numeric(statcast_df["pitcher"], errors="coerce") df = statcast_df[ids == int(pitcher_id)].copy() if not df.empty: return df except Exception: pass normalized_series = statcast_df["player_name"].astype(str).map(_normalize_name_text) variants = _to_last_first_variants(pitcher_name) df = statcast_df[normalized_series.isin(variants)].copy() if df.empty: parts = _normalize_name_text(pitcher_name).split() if len(parts) >= 2: first, last = parts[0], parts[-1] loose = normalized_series.apply( lambda n: isinstance(n, str) and first in n and last in n ) df = statcast_df[loose].copy() return df def _compute_release_consistency(df: pd.DataFrame) -> float | None: """ Release consistency score in [0, 1]. Measures 2-D spread (x + z) of release point across all pitches. Score 1.0 = perfect consistency; 0.0 = highly inconsistent (std >= 3.0 in). Threshold calibration: std < 0.5 in = elite mechanical repeatability std > 3.0 in = poor command """ if not all(c in df.columns for c in _RELEASE_COLS): return None valid = df[_RELEASE_COLS].apply(pd.to_numeric, errors="coerce").dropna() if len(valid) < 10: return None std_x = float(valid["release_pos_x"].std()) std_z = float(valid["release_pos_z"].std()) combined_std = math.sqrt(std_x ** 2 + std_z ** 2) return max(0.0, min(1.0, 1.0 - combined_std / 3.0)) def _compute_tunneling(df: pd.DataFrame) -> float | None: """ Tunneling score in [0, 1]. Algorithm: 1. Reconstruct each pitch's (x, z) at t=0.167s (early-flight window) using kinematic equations: x(t) = release_pos_x + vx0*t + 0.5*ax*t^2 z(t) = release_pos_z + vz0*t + 0.5*az*t^2 2. For each pair of pitch types, compute: early_dist = Euclidean dist of mean early-flight (x, z) plate_dist = Euclidean dist of mean plate (plate_x, plate_z) tunnel_ratio = plate_dist / early_dist 3. High ratio = pitches look similar early, diverge at plate = elite tunneling. 4. Map mean ratio to [0, 1]: ratio=1.0 → score=0.0, ratio>=4.0 → score=1.0. """ needed = _TRAJ_COLS + _PLATE_COLS if not all(c in df.columns for c in needed) or "pitch_type" not in df.columns: return None work = df[needed + ["pitch_type"]].copy() for col in needed: work[col] = pd.to_numeric(work[col], errors="coerce") work = work.dropna(subset=needed) if len(work) < 20: return None t = _EARLY_FLIGHT_T work["x_early"] = work["release_pos_x"] + work["vx0"] * t + 0.5 * work["ax"] * t ** 2 work["z_early"] = work["release_pos_z"] + work["vz0"] * t + 0.5 * work["az"] * t ** 2 grouped = work.groupby("pitch_type").agg( x_early_mean=("x_early", "mean"), z_early_mean=("z_early", "mean"), plate_x_mean=("plate_x", "mean"), plate_z_mean=("plate_z", "mean"), count=("plate_x", "count"), ) grouped = grouped[grouped["count"] >= 10] if len(grouped) < 2: return None types = list(grouped.index) ratios: list[float] = [] for i in range(len(types)): for j in range(i + 1, len(types)): a = grouped.loc[types[i]] b = grouped.loc[types[j]] early_dist = math.sqrt( (a["x_early_mean"] - b["x_early_mean"]) ** 2 + (a["z_early_mean"] - b["z_early_mean"]) ** 2 ) plate_dist = math.sqrt( (a["plate_x_mean"] - b["plate_x_mean"]) ** 2 + (a["plate_z_mean"] - b["plate_z_mean"]) ** 2 ) # Avoid div-by-zero: pitches that share the same early path get # full credit for any plate divergence. denom = max(early_dist, 0.01) ratios.append(min(5.0, plate_dist / denom)) if not ratios: return None mean_ratio = float(np.mean(ratios)) # ratio=1.0 → 0.0 (no separation gain), ratio=4.0 → 1.0 (elite) return max(0.0, min(1.0, (mean_ratio - 1.0) / 3.0)) def build_trajectory_features( statcast_df: pd.DataFrame, pitcher_name: str, pitcher_id: int | None = None, ) -> dict[str, Any]: """ Build physics-aware trajectory metrics from raw Statcast kinematic fields. Returns: release_consistency_score : float [0,1] or None tunnel_score : float [0,1] or None deception_score : float [0,1] or None (weighted combo) trajectory_sample_size : int """ _empty: dict[str, Any] = { "pitcher_name": pitcher_name, "release_consistency_score": None, "tunnel_score": None, "deception_score": None, "trajectory_sample_size": 0, } df = _filter_pitcher_df(statcast_df, pitcher_name, pitcher_id) if df.empty: return _empty release_consistency = _compute_release_consistency(df) tunnel_score = _compute_tunneling(df) # Deception: 40% release consistency, 60% tunneling. # Partial credit when only one metric is available. deception_score: float | None = None if release_consistency is not None and tunnel_score is not None: deception_score = 0.40 * release_consistency + 0.60 * tunnel_score elif release_consistency is not None: deception_score = release_consistency * 0.60 elif tunnel_score is not None: deception_score = tunnel_score * 0.70 return { "pitcher_name": pitcher_name, "release_consistency_score": release_consistency, "tunnel_score": tunnel_score, "deception_score": deception_score, "trajectory_sample_size": int(len(df)), } def compute_trajectory_adjustment(trajectory_row: dict[str, Any]) -> dict[str, Any]: """ Convert trajectory/deception metrics into batter-outcome adjustments. Direction: High deception (tunneling + consistent release) → pitcher advantage → negative batter hit/hr/tb2p adjustments Low deception (poor tunneling or wild release) → batter advantage → positive adjustments Scale design (per sub-signal): release_consistency: max ±0.008 on hit, ±0.006 on tb2p tunneling: max ±0.007 on hit, ±0.004 on hr, ±0.005 on tb2p Totals clamped: hit ±0.015, hr ±0.010, tb2p ±0.012. """ hit_adj = 0.0 hr_adj = 0.0 tb2p_adj = 0.0 reason_tags: list[str] = [] release_consistency = trajectory_row.get("release_consistency_score") tunnel_score = trajectory_row.get("tunnel_score") if release_consistency is not None: rc = float(release_consistency) # rc=1.0 → shift=-0.008 (elite consistency suppresses contact) # rc=0.0 → shift=+0.008 (erratic release helps batter) shift = (0.5 - rc) * 0.016 hit_adj += shift tb2p_adj += shift * 0.75 if rc >= 0.75: reason_tags.append("Consistent release point") elif rc <= 0.35: reason_tags.append("Inconsistent release point") if tunnel_score is not None: ts = float(tunnel_score) # ts=1.0 → shift=-0.007 (elite tunneling suppresses reads) # ts=0.0 → shift=+0.007 (poor tunneling, pitches easy to track) shift = (0.5 - ts) * 0.014 hit_adj += shift hr_adj += shift * 0.55 tb2p_adj += shift * 0.70 if ts >= 0.70: reason_tags.append("Strong pitch tunneling") elif ts <= 0.30: reason_tags.append("Poor pitch tunneling") hit_adj = max(-0.015, min(0.015, hit_adj)) hr_adj = max(-0.010, min(0.010, hr_adj)) tb2p_adj = max(-0.012, min(0.012, tb2p_adj)) return { "hit_adj": hit_adj, "hr_adj": hr_adj, "tb2p_adj": tb2p_adj, "release_consistency_score": release_consistency, "tunnel_score": tunnel_score, "deception_score": trajectory_row.get("deception_score"), "reason_tags": reason_tags, }