File size: 3,924 Bytes
cbe015e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# src/matchups.py
from __future__ import annotations
import numpy as np
import pandas as pd

OUTCOME_DESCS_SWING = {
    "swinging_strike",
    "swinging_strike_blocked",
    "foul",
    "hit_into_play",
}
OUTCOME_DESCS_WHIFF = {"swinging_strike", "swinging_strike_blocked"}

EVENTS_GB = {
    "groundout",
    "field_error",
    "single",
    "double",
    "triple",
}  # crude GB proxy on balls in play


# Name resolution for MLBAM batter IDs → "First Last"
def ensure_batter_names(df_raw: pd.DataFrame) -> pd.DataFrame:
    if "batter_name" in df_raw.columns:
        return df_raw

    df = df_raw.copy()
    if "batter" not in df.columns or df["batter"].dropna().empty:
        df["batter_name"] = None
        return df

    try:
        from pybaseball import playerid_reverse_lookup

        ids = df["batter"].dropna().astype(int).unique().tolist()
        lut = playerid_reverse_lookup(ids, key_type="mlbam")[
            ["key_mlbam", "name_first", "name_last"]
        ]
        lut["batter_name"] = (
            lut["name_first"].str.title() + " " + lut["name_last"].str.title()
        )
        name_map = dict(zip(lut["key_mlbam"].astype(int), lut["batter_name"]))
        df["batter_name"] = df["batter"].map(name_map)
    except Exception:
        # Fallback: readable placeholder if lookup fails / no internet
        df["batter_name"] = df["batter"].apply(
            lambda x: f"ID {int(x)}" if pd.notna(x) else None
        )

    return df


def _safe_rate(num, den):
    num = num.astype(float)
    den = den.astype(float)
    with np.errstate(divide="ignore", invalid="ignore"):
        r = np.where(den > 0, num / den, np.nan)
    return r


def best_matchups_for_pitcher(
    df_raw: pd.DataFrame,
    pitcher_name: str,
    min_pitches: int = 10,
    top_n: int = 10,
    w_whiff: float = 0.6,
    w_gb: float = 0.4,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Summarize batter-vs-selected-pitcher outcomes and rank by a 'pitcher-friendly' score.
      score = w_whiff * whiff_rate + w_gb * gb_rate_on_contact

    Returns (best_df, worst_df).
    """

    # Filter to the one pitcher
    dfp = df_raw[df_raw.get("player_name") == pitcher_name].copy()
    if dfp.empty:
        return pd.DataFrame(), pd.DataFrame()

    # Derive per-pitch outcomes
    dfp["is_swing"] = dfp["description"].isin(OUTCOME_DESCS_SWING).astype(int)
    dfp["is_whiff"] = dfp["description"].isin(OUTCOME_DESCS_WHIFF).astype(int)
    dfp["is_in_play"] = (dfp["description"] == "hit_into_play").astype(int)
    dfp["is_gb_event"] = dfp["events"].isin(EVENTS_GB).astype(int)

    # Name columns vary across pybaseball versions; prefer 'batter_name' if present
    name_col = "batter_name" if "batter_name" in dfp.columns else None

    group_cols = ["batter", "stand"]
    if name_col:
        group_cols = [name_col, "batter", "stand"]

    g = dfp.groupby(group_cols, dropna=False)

    agg = g.agg(
        pitches=("pitch_type", "size"),
        swings=("is_swing", "sum"),
        whiffs=("is_whiff", "sum"),
        inplay=("is_in_play", "sum"),
        gb_events=("is_gb_event", "sum"),
    ).reset_index()

    # Rates
    agg["whiff_rate"] = _safe_rate(agg["whiffs"], agg["swings"])
    agg["gb_rate_on_contact"] = _safe_rate(agg["gb_events"], agg["inplay"])

    # Pitcher-friendly score
    agg["pm_score"] = w_whiff * agg["whiff_rate"] + w_gb * agg["gb_rate_on_contact"]

    # Filtering
    agg = agg[agg["pitches"] >= min_pitches].sort_values("pm_score", ascending=False)

    # Nicely ordered columns
    display_cols = []
    if name_col:
        display_cols.append(name_col)
    display_cols += [
        "batter",
        "stand",
        "pitches",
        "whiff_rate",
        "gb_rate_on_contact",
        "pm_score",
    ]

    best = agg.head(top_n)[display_cols].copy()
    worst = agg.tail(top_n).sort_values("pm_score", ascending=True)[display_cols].copy()
    return best, worst