Spaces:
Running
Running
| import polars as pl | |
| import numpy as np | |
| import requests | |
| from io import StringIO | |
| def calculate_arm_angles(df: pl.DataFrame, pitcher_id: int) -> pl.DataFrame: | |
| def fetch_arm_angle_data(url: str): | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3' | |
| } | |
| response = requests.get(url, headers=headers) | |
| if not response.ok or "<html" in response.text.lower(): | |
| return None | |
| return pl.read_csv(StringIO(response.text), truncate_ragged_lines=True) | |
| date_start = df['game_date'][0] | |
| date_end = df['game_date'][-1] | |
| season = int(date_start[:4]) | |
| daily_check = date_start == date_end | |
| # Try fetching current-season arm angle data | |
| url = ( | |
| f"https://baseballsavant.mlb.com/leaderboard/pitcher-arm-angles" | |
| f"?batSide=&dateStart={date_start}&dateEnd={date_end}&gameType=R&groupBy=&min=1" | |
| f"&minGroupPitches=1&perspective=back&pitchHand=&pitchType=&season={season}" | |
| f"&size=small&sort=ascending&team=&csv=true" | |
| ) | |
| df_arm_angle = fetch_arm_angle_data(url) | |
| old_data = False | |
| if df_arm_angle is None or pitcher_id not in df_arm_angle["pitcher"]: | |
| old_data = True | |
| # Fallback to saved CSVs if 2025 data isn't fetched or pitcher not found | |
| try: | |
| df_arm_angle_2025 = pl.read_csv("stuff_model/pitcher_arm_angles_2025.csv", truncate_ragged_lines=True) | |
| except Exception as e: | |
| raise RuntimeError("Failed to load fallback 2025 arm angle CSV.") from e | |
| try: | |
| df_arm_angle_2024 = pl.read_csv("stuff_model/pitcher_arm_angles_2024.csv", truncate_ragged_lines=True) | |
| df_arm_angle_2024 = df_arm_angle_2024.cast(df_arm_angle_2025.schema) | |
| except Exception as e: | |
| raise RuntimeError("Failed to load or cast 2024 arm angle CSV.") from e | |
| df_arm_angle = pl.concat([df_arm_angle_2025, df_arm_angle_2024]).unique(subset=["pitcher"], keep="first") | |
| # Filter your tracking data | |
| df_filter = df.filter(pl.col("pitcher_id") == pitcher_id).drop_nulls(subset=["release_pos_x", "release_pos_z"]) | |
| if pitcher_id not in df_arm_angle["pitcher"]: | |
| data = requests.get(f'https://statsapi.mlb.com/api/v1/people?personIds={pitcher_id}').json() | |
| height_in = data['people'][0]['height'] | |
| height = int(height_in.split("'")[0]) * 12 + int(height_in.split("'")[1].split('"')[0]) | |
| df_filter = ( | |
| df_filter.with_columns( | |
| (pl.col("release_pos_x") * 12).alias("release_pos_x"), | |
| (pl.col("release_pos_z") * 12).alias("release_pos_z"), | |
| (pl.lit(height * 0.70)).alias("shoulder_pos"), | |
| ) | |
| .with_columns( | |
| (pl.col("release_pos_z") - pl.col("shoulder_pos")).alias("Opp"), | |
| pl.col("release_pos_x").abs().alias("Adj"), | |
| ) | |
| .with_columns( | |
| pl.struct(["Opp", "Adj"]).map_elements(lambda x: np.arctan2(x["Opp"], x["Adj"])).alias("arm_angle_rad") | |
| ) | |
| .with_columns( | |
| pl.col("arm_angle_rad").degrees().alias("arm_angle") | |
| ) | |
| ) | |
| else: | |
| row = df_arm_angle.filter(pl.col("pitcher") == pitcher_id).select([ | |
| "relative_shoulder_x", "shoulder_z", "relative_release_ball_x", "release_ball_z", "ball_angle" | |
| ]).row(0) | |
| shoulder_x, shoulder_z, rel_x, rel_z, ball_angle = row | |
| hyp = np.sqrt((rel_x - shoulder_x)**2 + (rel_z - shoulder_z)**2) | |
| df_filter = ( | |
| df_filter.with_columns( | |
| (pl.col("release_pos_z") - shoulder_z).alias("Opp"), | |
| pl.lit(hyp).alias("Hyp"), | |
| ) | |
| .with_columns( | |
| pl.struct(["Opp", "Hyp"]).map_elements(lambda x: np.arcsin(x["Opp"] / x["Hyp"])).alias("arm_angle_rad") | |
| ) | |
| .with_columns( | |
| pl.col("arm_angle_rad").degrees().alias("arm_angle") | |
| ) | |
| ) | |
| # Adjust based on data source freshness | |
| if old_data: | |
| df_filter = df_filter.with_columns(((pl.col("arm_angle") * 0.5) + (ball_angle * 0.5)).alias("arm_angle")) | |
| elif daily_check: | |
| df_filter = df_filter.with_columns(((pl.col("arm_angle") * 0.25) + (ball_angle * 0.75)).alias("arm_angle")) | |
| else: | |
| df_filter = df_filter.with_columns(((pl.col("arm_angle") * 0.0) + (ball_angle * 1)).alias("arm_angle")) | |
| # Fill missing arm_angle values with mean | |
| valid_mean = df_filter["arm_angle"].fill_nan(None).drop_nulls().mean() | |
| df_filter = df_filter.with_columns( | |
| df_filter["arm_angle"].fill_nan(None).fill_null(valid_mean) | |
| ) | |
| return df_filter | |