""" Offline build script — builds data/player_identity_map.parquet. Sources: - pybaseball pitching_stats() → pitcher names, FanGraphs IDs - pybaseball batting_stats() → hitter names, FanGraphs IDs - pybaseball playerid_reverse_lookup() → MLBAM IDs from FanGraphs IDs - CockroachDB statcast_event_core → statcast_name enrichment (REQUIRED for hitters) Usage: python scripts/build_player_identity_map.py python scripts/build_player_identity_map.py --seasons 2021 2022 2023 2024 2025 Run from project root. Requires pybaseball + DB connection (database.remote_db). Output: data/player_identity_map.parquet """ from __future__ import annotations import argparse import sys from collections import defaultdict from pathlib import Path # Allow running from project root sys.path.insert(0, str(Path(__file__).parent.parent)) import pandas as pd import pybaseball from sqlalchemy import text from database.remote_db import get_connection from visualization.cards.player_identity import normalize_for_matching, to_canonical_name DEFAULT_SEASONS = [2021, 2022, 2023, 2024, 2025] OUTPUT_PATH = Path(__file__).parent.parent / "data" / "player_identity_map.parquet" # --------------------------------------------------------------------------- # Step 1 — Fetch pybaseball season summaries # --------------------------------------------------------------------------- def _fetch_pitcher_records(seasons: list[int]) -> pd.DataFrame: """Return DataFrame with Name, Season, IP, IDfg columns — all pitchers with IP > 0.""" frames = [] for yr in seasons: print(f" Fetching pitching_stats({yr})...") try: df = pybaseball.pitching_stats(yr, yr, league="all", qual=0, ind=1) if "Season" not in df.columns: df["Season"] = int(yr) df["Season"] = int(yr) keep = [c for c in ["Name", "Season", "IP", "IDfg"] if c in df.columns] frames.append(df[keep].copy()) except Exception as exc: print(f" WARNING: pitching_stats({yr}) failed: {exc}") if not frames: return pd.DataFrame(columns=["Name", "Season", "IP", "IDfg"]) combined = pd.concat(frames, ignore_index=True) combined = combined[combined["IP"] > 0].dropna(subset=["Name"]) return combined def _fetch_hitter_records(seasons: list[int]) -> pd.DataFrame: """Return DataFrame with Name, Season, AB, IDfg columns — all hitters with AB > 0.""" frames = [] for yr in seasons: print(f" Fetching batting_stats({yr})...") try: df = pybaseball.batting_stats(yr, yr, league="all", qual=0, ind=1) if "Season" not in df.columns: df["Season"] = int(yr) df["Season"] = int(yr) keep = [c for c in ["Name", "Season", "AB", "IDfg"] if c in df.columns] frames.append(df[keep].copy()) except Exception as exc: print(f" WARNING: batting_stats({yr}) failed: {exc}") if not frames: return pd.DataFrame(columns=["Name", "Season", "AB", "IDfg"]) combined = pd.concat(frames, ignore_index=True) combined = combined[combined["AB"] > 0].dropna(subset=["Name"]) return combined # --------------------------------------------------------------------------- # Step 2 — Get MLBAM IDs from FanGraphs IDs # --------------------------------------------------------------------------- def _build_fg_to_mlbam(fg_ids: list[int]) -> dict[int, int]: """Return {fg_id: mlbam_id} for all resolvable FG IDs.""" if not fg_ids: return {} print(f" Looking up MLBAM IDs for {len(fg_ids)} FanGraphs IDs...") try: lookup = pybaseball.playerid_reverse_lookup(fg_ids, key_type="fangraphs") lookup = lookup.dropna(subset=["key_mlbam"]) lookup["key_fangraphs"] = lookup["key_fangraphs"].astype(int) lookup["key_mlbam"] = lookup["key_mlbam"].astype(int) return dict(zip(lookup["key_fangraphs"], lookup["key_mlbam"])) except Exception as exc: print(f" WARNING: playerid_reverse_lookup failed: {exc}") return {} # --------------------------------------------------------------------------- # Step 3 — Build raw identity records # --------------------------------------------------------------------------- def _build_raw_records( pitcher_df: pd.DataFrame, hitter_df: pd.DataFrame, fg_to_mlbam: dict[int, int], ) -> pd.DataFrame: """ Combine pitcher + hitter records into raw identity records (pre-merge, pre-collision). """ records: dict[tuple, dict] = {} # (player_id or normalized_name) → record def _upsert(name: str, fg_id_raw, is_hitter: bool, is_pitcher: bool) -> None: canonical = to_canonical_name(str(name).strip()) norm_key = normalize_for_matching(canonical) fg_id: int | None = int(fg_id_raw) if pd.notna(fg_id_raw) else None mlbam: int | None = fg_to_mlbam.get(fg_id) if fg_id is not None else None # Primary merge key: MLBAM player_id # Fallback merge key: normalized name (only if no MLBAM ID) rec_key = ("mlbam", mlbam) if mlbam is not None else ("norm", norm_key) if rec_key in records: rec = records[rec_key] if is_hitter: rec["role_hitter"] = True if is_pitcher: rec["role_pitcher"] = True # Update IDs if we have new info if fg_id is not None and rec.get("fangraphs_id") is None: rec["fangraphs_id"] = fg_id if mlbam is not None and rec.get("player_id") is None: rec["player_id"] = mlbam else: source = "pybaseball+mlbam" if mlbam is not None else "pybaseball-only" records[rec_key] = { "player_id": mlbam, "fangraphs_id": fg_id, "canonical_name": canonical, "canonical_name_normalized": norm_key, "pybaseball_name": str(name).strip(), "statcast_name": None, "role_hitter": is_hitter, "role_pitcher": is_pitcher, "display_name": canonical, # set properly in collision step "source_note": source, } print(" Building raw identity records from pitcher data...") for _, row in pitcher_df.iterrows(): _upsert(row["Name"], row.get("IDfg"), is_hitter=False, is_pitcher=True) print(" Building raw identity records from hitter data...") for _, row in hitter_df.iterrows(): _upsert(row["Name"], row.get("IDfg"), is_hitter=True, is_pitcher=False) df = pd.DataFrame(list(records.values())) print(f" Raw records: {len(df)}") return df # --------------------------------------------------------------------------- # Step 4 — statcast_name enrichment (REQUIRED) # --------------------------------------------------------------------------- def _enrich_statcast_names(identity_df: pd.DataFrame) -> pd.DataFrame: """ Populate statcast_name for each identity record. DB context: statcast_event_core.player_name stores the PITCHER name in "Last, First" format. Pure hitters never appear there. Strategy: - role_pitcher records: match against ec.player_name (Last, First → canonical → norm). statcast_name = the DB's "Last, First" string (used in pitcher selector label only; pitcher window queries use ec.pitcher = :pitcher_id, not player_name). - role_hitter-only records: set statcast_name = canonical_name (First Last). Hitter window queries will use ec.batter = :batter_id via player_id, not player_name. - Two-way players (both roles): try pitcher match first; fall back to canonical_name. """ print(" Connecting to DB for statcast_name enrichment (pitchers only)...") conn = get_connection() try: rows = conn.execute( text("SELECT DISTINCT player_name FROM statcast_event_core WHERE player_name IS NOT NULL") ).fetchall() finally: conn.close() statcast_names: list[str] = [r[0] for r in rows if r[0]] print(f" Loaded {len(statcast_names)} distinct statcast player_names (pitcher names)") # Pre-build O(1) lookup dicts so the per-record loop is fast. # statcast names are "Last, First" — apply to_canonical_name before normalizing # so the key matches pybaseball "First Last" canonical_name_normalized. canonical_lower_to_statcast: dict[str, str] = {} # "first last" → "Last, First" norm_to_statcast: dict[str, list[str]] = defaultdict(list) for sc in statcast_names: canon = to_canonical_name(sc) # "Last, First" → "First Last" cl = canon.lower() if cl not in canonical_lower_to_statcast: canonical_lower_to_statcast[cl] = sc norm_to_statcast[normalize_for_matching(canon)].append(sc) resolved_pitcher = 0 resolved_hitter = 0 ambiguous = 0 unmatched_pitcher = 0 for idx, row in identity_df.iterrows(): canonical = row["canonical_name"] norm_key = row["canonical_name_normalized"] is_pitcher = bool(row.get("role_pitcher")) is_hitter = bool(row.get("role_hitter")) if is_pitcher: # Layer 1: exact canonical lowercase match (O(1)) sc = canonical_lower_to_statcast.get(canonical.lower()) if sc: identity_df.at[idx, "statcast_name"] = sc identity_df.at[idx, "source_note"] = str(row.get("source_note", "")) + "+statcast" resolved_pitcher += 1 continue # Layer 2: normalized key match (O(1)) candidates = norm_to_statcast.get(norm_key, []) if len(candidates) == 1: identity_df.at[idx, "statcast_name"] = candidates[0] identity_df.at[idx, "source_note"] = str(row.get("source_note", "")) + "+statcast-norm" resolved_pitcher += 1 continue elif len(candidates) > 1: print(f" AMBIGUOUS pitcher: {canonical!r} → {candidates}") ambiguous += 1 # Fall through: use canonical_name as statcast_name so row is not excluded else: print(f" UNMATCHED pitcher: {canonical!r}") unmatched_pitcher += 1 # Fall through to hitter branch if also a hitter; else canonical fallback if is_hitter and identity_df.at[idx, "statcast_name"] is None: # Hitter window queries use ec.batter = :batter_id (player_id), not player_name. # statcast_name must be non-null to pass build validation. # Use canonical_name (First Last) as a stable non-null placeholder. identity_df.at[idx, "statcast_name"] = canonical identity_df.at[idx, "source_note"] = str(row.get("source_note", "")) + "+hitter-canonical" resolved_hitter += 1 # Final fallback: any record still missing statcast_name (e.g. unmatched pure pitcher) if identity_df.at[idx, "statcast_name"] is None: identity_df.at[idx, "statcast_name"] = canonical identity_df.at[idx, "source_note"] = str(row.get("source_note", "")) + "+canonical-fallback" print( f" Enrichment: pitcher_matched={resolved_pitcher} " f"hitter_canonical={resolved_hitter} " f"ambiguous={ambiguous} unmatched_pitcher={unmatched_pitcher}" ) return identity_df # --------------------------------------------------------------------------- # Step 5 — Collision handling # --------------------------------------------------------------------------- def _resolve_collisions(identity_df: pd.DataFrame) -> pd.DataFrame: """ Assign collision-safe display_name values. For players sharing canonical_name_normalized but having different player_ids: - First (lowest player_id): display_name = canonical_name - Others: display_name = f"{canonical_name} ({player_id})" """ identity_df = identity_df.copy() identity_df["display_name"] = identity_df["canonical_name"] # Find collisions: same normalized name, multiple rows norm_groups = identity_df.groupby("canonical_name_normalized") collision_count = 0 for norm_key, group in norm_groups: if len(group) == 1: continue # no collision # Multiple records share the same normalized name — assign suffixes to all but the first. # Sort: non-null player_id ascending first, then null (deterministic). sorted_group = group.sort_values( "player_id", ascending=True, na_position="last" ) for rank, (idx, row) in enumerate(sorted_group.iterrows()): if rank == 0: # Primary: keep canonical_name as display_name pass else: pid = row.get("player_id") suffix = str(int(pid)) if pd.notna(pid) else "?" identity_df.at[idx, "display_name"] = f"{row['canonical_name']} ({suffix})" identity_df.at[idx, "source_note"] = ( str(row.get("source_note", "")) + "+collision-resolved" ) collision_count += 1 if collision_count: print(f" Resolved {collision_count} collision suffix(es)") return identity_df # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def build_identity_map(seasons: list[int]) -> None: print(f"\n=== Building player_identity_map.parquet (seasons={seasons}) ===") print("\n[1/5] Fetching pybaseball pitcher data...") pitcher_df = _fetch_pitcher_records(seasons) print(f" Pitcher rows: {len(pitcher_df)}") print("\n[2/5] Fetching pybaseball hitter data...") hitter_df = _fetch_hitter_records(seasons) print(f" Hitter rows: {len(hitter_df)}") print("\n[3/5] Building MLBAM ID lookup...") all_fg_ids = set() for col_df in [pitcher_df, hitter_df]: if "IDfg" in col_df.columns: all_fg_ids.update( int(v) for v in col_df["IDfg"].dropna() if str(v) != "nan" ) fg_to_mlbam = _build_fg_to_mlbam(list(all_fg_ids)) print(f" Resolved {len(fg_to_mlbam)} / {len(all_fg_ids)} FanGraphs IDs to MLBAM") print("\n[4/5] Building raw identity records...") identity_df = _build_raw_records(pitcher_df, hitter_df, fg_to_mlbam) print("\n[5/5] Enriching statcast_name from DB...") identity_df = _enrich_statcast_names(identity_df) print("\n[6/6] Resolving collisions and assigning display_name...") identity_df = _resolve_collisions(identity_df) OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) identity_df.to_parquet(OUTPUT_PATH, index=False) print(f"\nWrote {len(identity_df)} rows -> {OUTPUT_PATH}") # Summary with_mlbam = identity_df["player_id"].notna().sum() with_statcast = identity_df["statcast_name"].notna().sum() missing_statcast = identity_df["statcast_name"].isna().sum() print(f"\nSummary:") print(f" Total records: {len(identity_df)}") print(f" With MLBAM player_id: {with_mlbam}") print(f" With statcast_name: {with_statcast}") print(f" Missing statcast_name: {missing_statcast}") if missing_statcast: missing = identity_df[identity_df["statcast_name"].isna()]["canonical_name"].head(20).tolist() print(f" First missing: {missing}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Build player identity map parquet") parser.add_argument("--seasons", nargs="+", type=int, default=DEFAULT_SEASONS) args = parser.parse_args() build_identity_map(args.seasons)