Spaces:
Running
Running
| from __future__ import annotations | |
| from typing import Any | |
| import pandas as pd | |
| from models.batter_zone_model import classify_zone_bucket, normalize_pitch_family | |
| def build_pitcher_zone_feature_row( | |
| statcast_df: pd.DataFrame, | |
| pitcher_name: str, | |
| ) -> dict[str, Any]: | |
| if statcast_df is None or statcast_df.empty: | |
| return {"pitcher_name": pitcher_name} | |
| if "player_name" not in statcast_df.columns: | |
| return {"pitcher_name": pitcher_name} | |
| df = statcast_df[ | |
| statcast_df["player_name"].astype(str) == str(pitcher_name) | |
| ].copy() | |
| if df.empty: | |
| return {"pitcher_name": pitcher_name} | |
| if "plate_x" not in df.columns or "plate_z" not in df.columns: | |
| return {"pitcher_name": pitcher_name} | |
| if "pitch_name" in df.columns: | |
| pitch_name_series = df["pitch_name"] | |
| elif "pitch_type" in df.columns: | |
| pitch_name_series = df["pitch_type"] | |
| else: | |
| pitch_name_series = pd.Series(["unknown"] * len(df), index=df.index) | |
| df = df.copy() | |
| df["pitch_family"] = pitch_name_series.apply(normalize_pitch_family) | |
| df["zone_bucket"] = df.apply( | |
| lambda row: classify_zone_bucket(row.get("plate_x"), row.get("plate_z")), | |
| axis=1, | |
| ) | |
| row: dict[str, Any] = { | |
| "pitcher_name": pitcher_name, | |
| "zone_sample_size": int(len(df)), | |
| } | |
| pitch_families = ["fastball", "breaking", "offspeed"] | |
| zones = ["heart", "shadow", "chase", "waste"] | |
| total_count = float(len(df)) if len(df) > 0 else 0.0 | |
| for family in pitch_families: | |
| family_df = df[df["pitch_family"] == family].copy() | |
| family_count = int(len(family_df)) | |
| row[f"{family}_usage_rate"] = float(family_count / total_count) if total_count > 0 else None | |
| row[f"sample_size_{family}"] = family_count | |
| for zone in zones: | |
| overall_mask = ( | |
| (df["pitch_family"] == family) | |
| & (df["zone_bucket"] == zone) | |
| ) | |
| overall_sample_size = int(overall_mask.sum()) | |
| overall_rate = float(overall_sample_size / total_count) if total_count > 0 else None | |
| row[f"{family}_{zone}_rate"] = overall_rate | |
| row[f"sample_size_{family}_{zone}"] = overall_sample_size | |
| if family_count > 0: | |
| conditional_mask = family_df["zone_bucket"] == zone | |
| conditional_sample_size = int(conditional_mask.sum()) | |
| conditional_rate = float(conditional_sample_size / family_count) | |
| else: | |
| conditional_sample_size = 0 | |
| conditional_rate = None | |
| row[f"{family}_{zone}_cond_rate"] = conditional_rate | |
| row[f"sample_size_{family}_{zone}_cond"] = conditional_sample_size | |
| return row |