File size: 4,786 Bytes
bfe3996
 
 
 
 
f003aff
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bfe3996
 
f003aff
bfe3996
 
f003aff
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bfe3996
 
 
 
 
 
f003aff
 
 
bfe3996
 
 
 
 
 
 
 
 
 
f003aff
 
bfe3996
f003aff
 
bfe3996
f003aff
 
 
 
 
bfe3996
 
f003aff
 
bfe3996
f003aff
bfe3996
 
f003aff
bfe3996
 
 
 
f003aff
 
 
bfe3996
f003aff
bfe3996
f003aff
bfe3996
f003aff
bfe3996
f003aff
bfe3996
 
f003aff
bfe3996
 
f003aff
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import polars as pl
import numpy as np
import requests
from io import StringIO

def calculate_arm_angles(df: pl.DataFrame, pitcher_id: int) -> pl.DataFrame:
    def fetch_arm_angle_data(url: str):
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
        }
        response = requests.get(url, headers=headers)
        if not response.ok or "<html" in response.text.lower():
            return None
        return pl.read_csv(StringIO(response.text), truncate_ragged_lines=True)

    date_start = df['game_date'][0]
    date_end = df['game_date'][-1]
    season = int(date_start[:4])
    daily_check = date_start == date_end

    # Try fetching current-season arm angle data
    url = (
        f"https://baseballsavant.mlb.com/leaderboard/pitcher-arm-angles"
        f"?batSide=&dateStart={date_start}&dateEnd={date_end}&gameType=R&groupBy=&min=1"
        f"&minGroupPitches=1&perspective=back&pitchHand=&pitchType=&season={season}"
        f"&size=small&sort=ascending&team=&csv=true"
    )
    df_arm_angle = fetch_arm_angle_data(url)

    old_data = False
    if df_arm_angle is None or pitcher_id not in df_arm_angle["pitcher"]:
        old_data = True

        # Fallback to saved CSVs if 2025 data isn't fetched or pitcher not found
        try:
            df_arm_angle_2025 = pl.read_csv("stuff_model/pitcher_arm_angles_2025.csv", truncate_ragged_lines=True)
        except Exception as e:
            raise RuntimeError("Failed to load fallback 2025 arm angle CSV.") from e

        try:
            df_arm_angle_2024 = pl.read_csv("stuff_model/pitcher_arm_angles_2024.csv", truncate_ragged_lines=True)
            df_arm_angle_2024 = df_arm_angle_2024.cast(df_arm_angle_2025.schema)
        except Exception as e:
            raise RuntimeError("Failed to load or cast 2024 arm angle CSV.") from e

        df_arm_angle = pl.concat([df_arm_angle_2025, df_arm_angle_2024]).unique(subset=["pitcher"], keep="first")

    # Filter your tracking data
    df_filter = df.filter(pl.col("pitcher_id") == pitcher_id).drop_nulls(subset=["release_pos_x", "release_pos_z"])

    if pitcher_id not in df_arm_angle["pitcher"]:
        data = requests.get(f'https://statsapi.mlb.com/api/v1/people?personIds={pitcher_id}').json()
        height_in = data['people'][0]['height']
        height = int(height_in.split("'")[0]) * 12 + int(height_in.split("'")[1].split('"')[0])
        
        df_filter = (
            df_filter.with_columns(
                (pl.col("release_pos_x") * 12).alias("release_pos_x"),
                (pl.col("release_pos_z") * 12).alias("release_pos_z"),
                (pl.lit(height * 0.70)).alias("shoulder_pos"),
            )
            .with_columns(
                (pl.col("release_pos_z") - pl.col("shoulder_pos")).alias("Opp"),
                pl.col("release_pos_x").abs().alias("Adj"),
            )
            .with_columns(
                pl.struct(["Opp", "Adj"]).map_elements(lambda x: np.arctan2(x["Opp"], x["Adj"])).alias("arm_angle_rad")
            )
            .with_columns(
                pl.col("arm_angle_rad").degrees().alias("arm_angle")
            )
        )

    else:
        row = df_arm_angle.filter(pl.col("pitcher") == pitcher_id).select([
            "relative_shoulder_x", "shoulder_z", "relative_release_ball_x", "release_ball_z", "ball_angle"
        ]).row(0)
        shoulder_x, shoulder_z, rel_x, rel_z, ball_angle = row
        hyp = np.sqrt((rel_x - shoulder_x)**2 + (rel_z - shoulder_z)**2)

        df_filter = (
            df_filter.with_columns(
                (pl.col("release_pos_z") - shoulder_z).alias("Opp"),
                pl.lit(hyp).alias("Hyp"),
            )
            .with_columns(
                pl.struct(["Opp", "Hyp"]).map_elements(lambda x: np.arcsin(x["Opp"] / x["Hyp"])).alias("arm_angle_rad")
            )
            .with_columns(
                pl.col("arm_angle_rad").degrees().alias("arm_angle")
            )
        )

        # Adjust based on data source freshness
        if old_data:
            df_filter = df_filter.with_columns(((pl.col("arm_angle") * 0.5) + (ball_angle * 0.5)).alias("arm_angle"))
        elif daily_check:
            df_filter = df_filter.with_columns(((pl.col("arm_angle") * 0.25) + (ball_angle * 0.75)).alias("arm_angle"))
        else:
            df_filter = df_filter.with_columns(((pl.col("arm_angle") * 0.0) + (ball_angle * 1)).alias("arm_angle"))

        # Fill missing arm_angle values with mean
        valid_mean = df_filter["arm_angle"].fill_nan(None).drop_nulls().mean()
        df_filter = df_filter.with_columns(
            df_filter["arm_angle"].fill_nan(None).fill_null(valid_mean)
        )

    return df_filter