2026_MLB_Model / models /pitcher_zone_model.py
Syntrex's picture
Update models/pitcher_zone_model.py
67a37ca verified
raw
history blame
2.77 kB
from __future__ import annotations
from typing import Any
import pandas as pd
from models.batter_zone_model import classify_zone_bucket, normalize_pitch_family
def build_pitcher_zone_feature_row(
statcast_df: pd.DataFrame,
pitcher_name: str,
) -> dict[str, Any]:
if statcast_df is None or statcast_df.empty:
return {"pitcher_name": pitcher_name}
if "player_name" not in statcast_df.columns:
return {"pitcher_name": pitcher_name}
df = statcast_df[
statcast_df["player_name"].astype(str) == str(pitcher_name)
].copy()
if df.empty:
return {"pitcher_name": pitcher_name}
if "plate_x" not in df.columns or "plate_z" not in df.columns:
return {"pitcher_name": pitcher_name}
if "pitch_name" in df.columns:
pitch_name_series = df["pitch_name"]
elif "pitch_type" in df.columns:
pitch_name_series = df["pitch_type"]
else:
pitch_name_series = pd.Series(["unknown"] * len(df), index=df.index)
df = df.copy()
df["pitch_family"] = pitch_name_series.apply(normalize_pitch_family)
df["zone_bucket"] = df.apply(
lambda row: classify_zone_bucket(row.get("plate_x"), row.get("plate_z")),
axis=1,
)
row: dict[str, Any] = {
"pitcher_name": pitcher_name,
"zone_sample_size": int(len(df)),
}
pitch_families = ["fastball", "breaking", "offspeed"]
zones = ["heart", "shadow", "chase", "waste"]
total_count = float(len(df)) if len(df) > 0 else 0.0
for family in pitch_families:
family_df = df[df["pitch_family"] == family].copy()
family_count = int(len(family_df))
row[f"{family}_usage_rate"] = float(family_count / total_count) if total_count > 0 else None
row[f"sample_size_{family}"] = family_count
for zone in zones:
overall_mask = (
(df["pitch_family"] == family)
& (df["zone_bucket"] == zone)
)
overall_sample_size = int(overall_mask.sum())
overall_rate = float(overall_sample_size / total_count) if total_count > 0 else None
row[f"{family}_{zone}_rate"] = overall_rate
row[f"sample_size_{family}_{zone}"] = overall_sample_size
if family_count > 0:
conditional_mask = family_df["zone_bucket"] == zone
conditional_sample_size = int(conditional_mask.sum())
conditional_rate = float(conditional_sample_size / family_count)
else:
conditional_sample_size = 0
conditional_rate = None
row[f"{family}_{zone}_cond_rate"] = conditional_rate
row[f"sample_size_{family}_{zone}_cond"] = conditional_sample_size
return row