2026_MLB_Model / scripts /build_player_identity_map.py
Syntrex's picture
Card Lab Pass 5: identity map + batter_id-based hitter query
84163a9
raw
history blame
16 kB
"""
Offline build script — builds data/player_identity_map.parquet.
Sources:
- pybaseball pitching_stats() → pitcher names, FanGraphs IDs
- pybaseball batting_stats() → hitter names, FanGraphs IDs
- pybaseball playerid_reverse_lookup() → MLBAM IDs from FanGraphs IDs
- CockroachDB statcast_event_core → statcast_name enrichment (REQUIRED for hitters)
Usage:
python scripts/build_player_identity_map.py
python scripts/build_player_identity_map.py --seasons 2021 2022 2023 2024 2025
Run from project root. Requires pybaseball + DB connection (database.remote_db).
Output: data/player_identity_map.parquet
"""
from __future__ import annotations
import argparse
import sys
from collections import defaultdict
from pathlib import Path
# Allow running from project root
sys.path.insert(0, str(Path(__file__).parent.parent))
import pandas as pd
import pybaseball
from sqlalchemy import text
from database.remote_db import get_connection
from visualization.cards.player_identity import normalize_for_matching, to_canonical_name
DEFAULT_SEASONS = [2021, 2022, 2023, 2024, 2025]
OUTPUT_PATH = Path(__file__).parent.parent / "data" / "player_identity_map.parquet"
# ---------------------------------------------------------------------------
# Step 1 — Fetch pybaseball season summaries
# ---------------------------------------------------------------------------
def _fetch_pitcher_records(seasons: list[int]) -> pd.DataFrame:
"""Return DataFrame with Name, Season, IP, IDfg columns — all pitchers with IP > 0."""
frames = []
for yr in seasons:
print(f" Fetching pitching_stats({yr})...")
try:
df = pybaseball.pitching_stats(yr, yr, league="all", qual=0, ind=1)
if "Season" not in df.columns:
df["Season"] = int(yr)
df["Season"] = int(yr)
keep = [c for c in ["Name", "Season", "IP", "IDfg"] if c in df.columns]
frames.append(df[keep].copy())
except Exception as exc:
print(f" WARNING: pitching_stats({yr}) failed: {exc}")
if not frames:
return pd.DataFrame(columns=["Name", "Season", "IP", "IDfg"])
combined = pd.concat(frames, ignore_index=True)
combined = combined[combined["IP"] > 0].dropna(subset=["Name"])
return combined
def _fetch_hitter_records(seasons: list[int]) -> pd.DataFrame:
"""Return DataFrame with Name, Season, AB, IDfg columns — all hitters with AB > 0."""
frames = []
for yr in seasons:
print(f" Fetching batting_stats({yr})...")
try:
df = pybaseball.batting_stats(yr, yr, league="all", qual=0, ind=1)
if "Season" not in df.columns:
df["Season"] = int(yr)
df["Season"] = int(yr)
keep = [c for c in ["Name", "Season", "AB", "IDfg"] if c in df.columns]
frames.append(df[keep].copy())
except Exception as exc:
print(f" WARNING: batting_stats({yr}) failed: {exc}")
if not frames:
return pd.DataFrame(columns=["Name", "Season", "AB", "IDfg"])
combined = pd.concat(frames, ignore_index=True)
combined = combined[combined["AB"] > 0].dropna(subset=["Name"])
return combined
# ---------------------------------------------------------------------------
# Step 2 — Get MLBAM IDs from FanGraphs IDs
# ---------------------------------------------------------------------------
def _build_fg_to_mlbam(fg_ids: list[int]) -> dict[int, int]:
"""Return {fg_id: mlbam_id} for all resolvable FG IDs."""
if not fg_ids:
return {}
print(f" Looking up MLBAM IDs for {len(fg_ids)} FanGraphs IDs...")
try:
lookup = pybaseball.playerid_reverse_lookup(fg_ids, key_type="fangraphs")
lookup = lookup.dropna(subset=["key_mlbam"])
lookup["key_fangraphs"] = lookup["key_fangraphs"].astype(int)
lookup["key_mlbam"] = lookup["key_mlbam"].astype(int)
return dict(zip(lookup["key_fangraphs"], lookup["key_mlbam"]))
except Exception as exc:
print(f" WARNING: playerid_reverse_lookup failed: {exc}")
return {}
# ---------------------------------------------------------------------------
# Step 3 — Build raw identity records
# ---------------------------------------------------------------------------
def _build_raw_records(
pitcher_df: pd.DataFrame,
hitter_df: pd.DataFrame,
fg_to_mlbam: dict[int, int],
) -> pd.DataFrame:
"""
Combine pitcher + hitter records into raw identity records (pre-merge, pre-collision).
"""
records: dict[tuple, dict] = {} # (player_id or normalized_name) → record
def _upsert(name: str, fg_id_raw, is_hitter: bool, is_pitcher: bool) -> None:
canonical = to_canonical_name(str(name).strip())
norm_key = normalize_for_matching(canonical)
fg_id: int | None = int(fg_id_raw) if pd.notna(fg_id_raw) else None
mlbam: int | None = fg_to_mlbam.get(fg_id) if fg_id is not None else None
# Primary merge key: MLBAM player_id
# Fallback merge key: normalized name (only if no MLBAM ID)
rec_key = ("mlbam", mlbam) if mlbam is not None else ("norm", norm_key)
if rec_key in records:
rec = records[rec_key]
if is_hitter:
rec["role_hitter"] = True
if is_pitcher:
rec["role_pitcher"] = True
# Update IDs if we have new info
if fg_id is not None and rec.get("fangraphs_id") is None:
rec["fangraphs_id"] = fg_id
if mlbam is not None and rec.get("player_id") is None:
rec["player_id"] = mlbam
else:
source = "pybaseball+mlbam" if mlbam is not None else "pybaseball-only"
records[rec_key] = {
"player_id": mlbam,
"fangraphs_id": fg_id,
"canonical_name": canonical,
"canonical_name_normalized": norm_key,
"pybaseball_name": str(name).strip(),
"statcast_name": None,
"role_hitter": is_hitter,
"role_pitcher": is_pitcher,
"display_name": canonical, # set properly in collision step
"source_note": source,
}
print(" Building raw identity records from pitcher data...")
for _, row in pitcher_df.iterrows():
_upsert(row["Name"], row.get("IDfg"), is_hitter=False, is_pitcher=True)
print(" Building raw identity records from hitter data...")
for _, row in hitter_df.iterrows():
_upsert(row["Name"], row.get("IDfg"), is_hitter=True, is_pitcher=False)
df = pd.DataFrame(list(records.values()))
print(f" Raw records: {len(df)}")
return df
# ---------------------------------------------------------------------------
# Step 4 — statcast_name enrichment (REQUIRED)
# ---------------------------------------------------------------------------
def _enrich_statcast_names(identity_df: pd.DataFrame) -> pd.DataFrame:
"""
Populate statcast_name for each identity record.
DB context: statcast_event_core.player_name stores the PITCHER name in "Last, First"
format. Pure hitters never appear there. Strategy:
- role_pitcher records: match against ec.player_name (Last, First → canonical → norm).
statcast_name = the DB's "Last, First" string (used in pitcher selector label only;
pitcher window queries use ec.pitcher = :pitcher_id, not player_name).
- role_hitter-only records: set statcast_name = canonical_name (First Last).
Hitter window queries will use ec.batter = :batter_id via player_id, not player_name.
- Two-way players (both roles): try pitcher match first; fall back to canonical_name.
"""
print(" Connecting to DB for statcast_name enrichment (pitchers only)...")
conn = get_connection()
try:
rows = conn.execute(
text("SELECT DISTINCT player_name FROM statcast_event_core WHERE player_name IS NOT NULL")
).fetchall()
finally:
conn.close()
statcast_names: list[str] = [r[0] for r in rows if r[0]]
print(f" Loaded {len(statcast_names)} distinct statcast player_names (pitcher names)")
# Pre-build O(1) lookup dicts so the per-record loop is fast.
# statcast names are "Last, First" — apply to_canonical_name before normalizing
# so the key matches pybaseball "First Last" canonical_name_normalized.
canonical_lower_to_statcast: dict[str, str] = {} # "first last" → "Last, First"
norm_to_statcast: dict[str, list[str]] = defaultdict(list)
for sc in statcast_names:
canon = to_canonical_name(sc) # "Last, First" → "First Last"
cl = canon.lower()
if cl not in canonical_lower_to_statcast:
canonical_lower_to_statcast[cl] = sc
norm_to_statcast[normalize_for_matching(canon)].append(sc)
resolved_pitcher = 0
resolved_hitter = 0
ambiguous = 0
unmatched_pitcher = 0
for idx, row in identity_df.iterrows():
canonical = row["canonical_name"]
norm_key = row["canonical_name_normalized"]
is_pitcher = bool(row.get("role_pitcher"))
is_hitter = bool(row.get("role_hitter"))
if is_pitcher:
# Layer 1: exact canonical lowercase match (O(1))
sc = canonical_lower_to_statcast.get(canonical.lower())
if sc:
identity_df.at[idx, "statcast_name"] = sc
identity_df.at[idx, "source_note"] = str(row.get("source_note", "")) + "+statcast"
resolved_pitcher += 1
continue
# Layer 2: normalized key match (O(1))
candidates = norm_to_statcast.get(norm_key, [])
if len(candidates) == 1:
identity_df.at[idx, "statcast_name"] = candidates[0]
identity_df.at[idx, "source_note"] = str(row.get("source_note", "")) + "+statcast-norm"
resolved_pitcher += 1
continue
elif len(candidates) > 1:
print(f" AMBIGUOUS pitcher: {canonical!r}{candidates}")
ambiguous += 1
# Fall through: use canonical_name as statcast_name so row is not excluded
else:
print(f" UNMATCHED pitcher: {canonical!r}")
unmatched_pitcher += 1
# Fall through to hitter branch if also a hitter; else canonical fallback
if is_hitter and identity_df.at[idx, "statcast_name"] is None:
# Hitter window queries use ec.batter = :batter_id (player_id), not player_name.
# statcast_name must be non-null to pass build validation.
# Use canonical_name (First Last) as a stable non-null placeholder.
identity_df.at[idx, "statcast_name"] = canonical
identity_df.at[idx, "source_note"] = str(row.get("source_note", "")) + "+hitter-canonical"
resolved_hitter += 1
# Final fallback: any record still missing statcast_name (e.g. unmatched pure pitcher)
if identity_df.at[idx, "statcast_name"] is None:
identity_df.at[idx, "statcast_name"] = canonical
identity_df.at[idx, "source_note"] = str(row.get("source_note", "")) + "+canonical-fallback"
print(
f" Enrichment: pitcher_matched={resolved_pitcher} "
f"hitter_canonical={resolved_hitter} "
f"ambiguous={ambiguous} unmatched_pitcher={unmatched_pitcher}"
)
return identity_df
# ---------------------------------------------------------------------------
# Step 5 — Collision handling
# ---------------------------------------------------------------------------
def _resolve_collisions(identity_df: pd.DataFrame) -> pd.DataFrame:
"""
Assign collision-safe display_name values.
For players sharing canonical_name_normalized but having different player_ids:
- First (lowest player_id): display_name = canonical_name
- Others: display_name = f"{canonical_name} ({player_id})"
"""
identity_df = identity_df.copy()
identity_df["display_name"] = identity_df["canonical_name"]
# Find collisions: same normalized name, multiple rows
norm_groups = identity_df.groupby("canonical_name_normalized")
collision_count = 0
for norm_key, group in norm_groups:
if len(group) == 1:
continue # no collision
# Multiple records share the same normalized name — assign suffixes to all but the first.
# Sort: non-null player_id ascending first, then null (deterministic).
sorted_group = group.sort_values(
"player_id", ascending=True, na_position="last"
)
for rank, (idx, row) in enumerate(sorted_group.iterrows()):
if rank == 0:
# Primary: keep canonical_name as display_name
pass
else:
pid = row.get("player_id")
suffix = str(int(pid)) if pd.notna(pid) else "?"
identity_df.at[idx, "display_name"] = f"{row['canonical_name']} ({suffix})"
identity_df.at[idx, "source_note"] = (
str(row.get("source_note", "")) + "+collision-resolved"
)
collision_count += 1
if collision_count:
print(f" Resolved {collision_count} collision suffix(es)")
return identity_df
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def build_identity_map(seasons: list[int]) -> None:
print(f"\n=== Building player_identity_map.parquet (seasons={seasons}) ===")
print("\n[1/5] Fetching pybaseball pitcher data...")
pitcher_df = _fetch_pitcher_records(seasons)
print(f" Pitcher rows: {len(pitcher_df)}")
print("\n[2/5] Fetching pybaseball hitter data...")
hitter_df = _fetch_hitter_records(seasons)
print(f" Hitter rows: {len(hitter_df)}")
print("\n[3/5] Building MLBAM ID lookup...")
all_fg_ids = set()
for col_df in [pitcher_df, hitter_df]:
if "IDfg" in col_df.columns:
all_fg_ids.update(
int(v) for v in col_df["IDfg"].dropna() if str(v) != "nan"
)
fg_to_mlbam = _build_fg_to_mlbam(list(all_fg_ids))
print(f" Resolved {len(fg_to_mlbam)} / {len(all_fg_ids)} FanGraphs IDs to MLBAM")
print("\n[4/5] Building raw identity records...")
identity_df = _build_raw_records(pitcher_df, hitter_df, fg_to_mlbam)
print("\n[5/5] Enriching statcast_name from DB...")
identity_df = _enrich_statcast_names(identity_df)
print("\n[6/6] Resolving collisions and assigning display_name...")
identity_df = _resolve_collisions(identity_df)
OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True)
identity_df.to_parquet(OUTPUT_PATH, index=False)
print(f"\nWrote {len(identity_df)} rows -> {OUTPUT_PATH}")
# Summary
with_mlbam = identity_df["player_id"].notna().sum()
with_statcast = identity_df["statcast_name"].notna().sum()
missing_statcast = identity_df["statcast_name"].isna().sum()
print(f"\nSummary:")
print(f" Total records: {len(identity_df)}")
print(f" With MLBAM player_id: {with_mlbam}")
print(f" With statcast_name: {with_statcast}")
print(f" Missing statcast_name: {missing_statcast}")
if missing_statcast:
missing = identity_df[identity_df["statcast_name"].isna()]["canonical_name"].head(20).tolist()
print(f" First missing: {missing}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Build player identity map parquet")
parser.add_argument("--seasons", nargs="+", type=int, default=DEFAULT_SEASONS)
args = parser.parse_args()
build_identity_map(args.seasons)