2026_MLB_Model / models /trajectory_model.py
Syntrex's picture
Fix ImportError: replace _normalize_name_text import in trajectory_model.py
2d30550
raw
history blame
9.53 kB
"""
Batch 10: Physics-aware trajectory modeling from raw Statcast kinematic fields.
Uses: release_pos_x/y/z, vx0/vy0/vz0, ax/ay/az, plate_x/plate_z, pitch_type
Provides:
- Pitch trajectory reconstruction (early-flight x/z at t=0.167s)
- Tunneling metric: pitch types look similar early, diverge at plate
- Release consistency: variance of release_pos_x/z across pitch types
- Arsenal deception score: weighted combination of tunneling + consistency
"""
from __future__ import annotations
import logging
import math
from typing import Any
import numpy as np
import pandas as pd
from data.odds_name_map import normalize_pitcher_name as _normalize_name_text
from models.pitcher_adjustment import _to_last_first_variants
logger = logging.getLogger(__name__)
# ~5 ft of travel from release point, consistent with early batter read window
_EARLY_FLIGHT_T = 0.167 # seconds
_TRAJ_COLS = ["release_pos_x", "release_pos_z", "vx0", "vz0", "ax", "az"]
_PLATE_COLS = ["plate_x", "plate_z"]
_RELEASE_COLS = ["release_pos_x", "release_pos_y", "release_pos_z"]
def _filter_pitcher_df(
statcast_df: pd.DataFrame,
pitcher_name: str,
pitcher_id: int | None = None,
) -> pd.DataFrame:
if statcast_df.empty or "player_name" not in statcast_df.columns:
return pd.DataFrame()
if pitcher_id is not None and "pitcher" in statcast_df.columns:
try:
ids = pd.to_numeric(statcast_df["pitcher"], errors="coerce")
df = statcast_df[ids == int(pitcher_id)].copy()
if not df.empty:
return df
except Exception:
pass
normalized_series = statcast_df["player_name"].astype(str).map(_normalize_name_text)
variants = _to_last_first_variants(pitcher_name)
df = statcast_df[normalized_series.isin(variants)].copy()
if df.empty:
parts = _normalize_name_text(pitcher_name).split()
if len(parts) >= 2:
first, last = parts[0], parts[-1]
loose = normalized_series.apply(
lambda n: isinstance(n, str) and first in n and last in n
)
df = statcast_df[loose].copy()
return df
def _compute_release_consistency(df: pd.DataFrame) -> float | None:
"""
Release consistency score in [0, 1].
Measures 2-D spread (x + z) of release point across all pitches.
Score 1.0 = perfect consistency; 0.0 = highly inconsistent (std >= 3.0 in).
Threshold calibration:
std < 0.5 in = elite mechanical repeatability
std > 3.0 in = poor command
"""
if not all(c in df.columns for c in _RELEASE_COLS):
return None
valid = df[_RELEASE_COLS].apply(pd.to_numeric, errors="coerce").dropna()
if len(valid) < 10:
return None
std_x = float(valid["release_pos_x"].std())
std_z = float(valid["release_pos_z"].std())
combined_std = math.sqrt(std_x ** 2 + std_z ** 2)
return max(0.0, min(1.0, 1.0 - combined_std / 3.0))
def _compute_tunneling(df: pd.DataFrame) -> float | None:
"""
Tunneling score in [0, 1].
Algorithm:
1. Reconstruct each pitch's (x, z) at t=0.167s (early-flight window)
using kinematic equations:
x(t) = release_pos_x + vx0*t + 0.5*ax*t^2
z(t) = release_pos_z + vz0*t + 0.5*az*t^2
2. For each pair of pitch types, compute:
early_dist = Euclidean dist of mean early-flight (x, z)
plate_dist = Euclidean dist of mean plate (plate_x, plate_z)
tunnel_ratio = plate_dist / early_dist
3. High ratio = pitches look similar early, diverge at plate = elite tunneling.
4. Map mean ratio to [0, 1]: ratio=1.0 → score=0.0, ratio>=4.0 → score=1.0.
"""
needed = _TRAJ_COLS + _PLATE_COLS
if not all(c in df.columns for c in needed) or "pitch_type" not in df.columns:
return None
work = df[needed + ["pitch_type"]].copy()
for col in needed:
work[col] = pd.to_numeric(work[col], errors="coerce")
work = work.dropna(subset=needed)
if len(work) < 20:
return None
t = _EARLY_FLIGHT_T
work["x_early"] = work["release_pos_x"] + work["vx0"] * t + 0.5 * work["ax"] * t ** 2
work["z_early"] = work["release_pos_z"] + work["vz0"] * t + 0.5 * work["az"] * t ** 2
grouped = work.groupby("pitch_type").agg(
x_early_mean=("x_early", "mean"),
z_early_mean=("z_early", "mean"),
plate_x_mean=("plate_x", "mean"),
plate_z_mean=("plate_z", "mean"),
count=("plate_x", "count"),
)
grouped = grouped[grouped["count"] >= 10]
if len(grouped) < 2:
return None
types = list(grouped.index)
ratios: list[float] = []
for i in range(len(types)):
for j in range(i + 1, len(types)):
a = grouped.loc[types[i]]
b = grouped.loc[types[j]]
early_dist = math.sqrt(
(a["x_early_mean"] - b["x_early_mean"]) ** 2
+ (a["z_early_mean"] - b["z_early_mean"]) ** 2
)
plate_dist = math.sqrt(
(a["plate_x_mean"] - b["plate_x_mean"]) ** 2
+ (a["plate_z_mean"] - b["plate_z_mean"]) ** 2
)
# Avoid div-by-zero: pitches that share the same early path get
# full credit for any plate divergence.
denom = max(early_dist, 0.01)
ratios.append(min(5.0, plate_dist / denom))
if not ratios:
return None
mean_ratio = float(np.mean(ratios))
# ratio=1.0 → 0.0 (no separation gain), ratio=4.0 → 1.0 (elite)
return max(0.0, min(1.0, (mean_ratio - 1.0) / 3.0))
def build_trajectory_features(
statcast_df: pd.DataFrame,
pitcher_name: str,
pitcher_id: int | None = None,
) -> dict[str, Any]:
"""
Build physics-aware trajectory metrics from raw Statcast kinematic fields.
Returns:
release_consistency_score : float [0,1] or None
tunnel_score : float [0,1] or None
deception_score : float [0,1] or None (weighted combo)
trajectory_sample_size : int
"""
_empty: dict[str, Any] = {
"pitcher_name": pitcher_name,
"release_consistency_score": None,
"tunnel_score": None,
"deception_score": None,
"trajectory_sample_size": 0,
}
df = _filter_pitcher_df(statcast_df, pitcher_name, pitcher_id)
if df.empty:
return _empty
release_consistency = _compute_release_consistency(df)
tunnel_score = _compute_tunneling(df)
# Deception: 40% release consistency, 60% tunneling.
# Partial credit when only one metric is available.
deception_score: float | None = None
if release_consistency is not None and tunnel_score is not None:
deception_score = 0.40 * release_consistency + 0.60 * tunnel_score
elif release_consistency is not None:
deception_score = release_consistency * 0.60
elif tunnel_score is not None:
deception_score = tunnel_score * 0.70
return {
"pitcher_name": pitcher_name,
"release_consistency_score": release_consistency,
"tunnel_score": tunnel_score,
"deception_score": deception_score,
"trajectory_sample_size": int(len(df)),
}
def compute_trajectory_adjustment(trajectory_row: dict[str, Any]) -> dict[str, Any]:
"""
Convert trajectory/deception metrics into batter-outcome adjustments.
Direction:
High deception (tunneling + consistent release) → pitcher advantage
→ negative batter hit/hr/tb2p adjustments
Low deception (poor tunneling or wild release) → batter advantage
→ positive adjustments
Scale design (per sub-signal):
release_consistency: max ±0.008 on hit, ±0.006 on tb2p
tunneling: max ±0.007 on hit, ±0.004 on hr, ±0.005 on tb2p
Totals clamped: hit ±0.015, hr ±0.010, tb2p ±0.012.
"""
hit_adj = 0.0
hr_adj = 0.0
tb2p_adj = 0.0
reason_tags: list[str] = []
release_consistency = trajectory_row.get("release_consistency_score")
tunnel_score = trajectory_row.get("tunnel_score")
if release_consistency is not None:
rc = float(release_consistency)
# rc=1.0 → shift=-0.008 (elite consistency suppresses contact)
# rc=0.0 → shift=+0.008 (erratic release helps batter)
shift = (0.5 - rc) * 0.016
hit_adj += shift
tb2p_adj += shift * 0.75
if rc >= 0.75:
reason_tags.append("Consistent release point")
elif rc <= 0.35:
reason_tags.append("Inconsistent release point")
if tunnel_score is not None:
ts = float(tunnel_score)
# ts=1.0 → shift=-0.007 (elite tunneling suppresses reads)
# ts=0.0 → shift=+0.007 (poor tunneling, pitches easy to track)
shift = (0.5 - ts) * 0.014
hit_adj += shift
hr_adj += shift * 0.55
tb2p_adj += shift * 0.70
if ts >= 0.70:
reason_tags.append("Strong pitch tunneling")
elif ts <= 0.30:
reason_tags.append("Poor pitch tunneling")
hit_adj = max(-0.015, min(0.015, hit_adj))
hr_adj = max(-0.010, min(0.010, hr_adj))
tb2p_adj = max(-0.012, min(0.012, tb2p_adj))
return {
"hit_adj": hit_adj,
"hr_adj": hr_adj,
"tb2p_adj": tb2p_adj,
"release_consistency_score": release_consistency,
"tunnel_score": tunnel_score,
"deception_score": trajectory_row.get("deception_score"),
"reason_tags": reason_tags,
}