Spaces:
Running
Running
| """ | |
| Offline build script — builds data/player_identity_map.parquet. | |
| Sources: | |
| - pybaseball pitching_stats() → pitcher names, FanGraphs IDs | |
| - pybaseball batting_stats() → hitter names, FanGraphs IDs | |
| - pybaseball playerid_reverse_lookup() → MLBAM IDs from FanGraphs IDs | |
| - CockroachDB statcast_event_core → statcast_name enrichment (REQUIRED for hitters) | |
| Usage: | |
| python scripts/build_player_identity_map.py | |
| python scripts/build_player_identity_map.py --seasons 2021 2022 2023 2024 2025 | |
| Run from project root. Requires pybaseball + DB connection (database.remote_db). | |
| Output: data/player_identity_map.parquet | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import sys | |
| from collections import defaultdict | |
| from pathlib import Path | |
| # Allow running from project root | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| import pandas as pd | |
| import pybaseball | |
| from sqlalchemy import text | |
| from database.remote_db import get_connection | |
| from visualization.cards.player_identity import normalize_for_matching, to_canonical_name | |
| DEFAULT_SEASONS = [2021, 2022, 2023, 2024, 2025] | |
| OUTPUT_PATH = Path(__file__).parent.parent / "data" / "player_identity_map.parquet" | |
| # --------------------------------------------------------------------------- | |
| # Step 1 — Fetch pybaseball season summaries | |
| # --------------------------------------------------------------------------- | |
| def _fetch_pitcher_records(seasons: list[int]) -> pd.DataFrame: | |
| """Return DataFrame with Name, Season, IP, IDfg columns — all pitchers with IP > 0.""" | |
| frames = [] | |
| for yr in seasons: | |
| print(f" Fetching pitching_stats({yr})...") | |
| try: | |
| df = pybaseball.pitching_stats(yr, yr, league="all", qual=0, ind=1) | |
| if "Season" not in df.columns: | |
| df["Season"] = int(yr) | |
| df["Season"] = int(yr) | |
| keep = [c for c in ["Name", "Season", "IP", "IDfg"] if c in df.columns] | |
| frames.append(df[keep].copy()) | |
| except Exception as exc: | |
| print(f" WARNING: pitching_stats({yr}) failed: {exc}") | |
| if not frames: | |
| return pd.DataFrame(columns=["Name", "Season", "IP", "IDfg"]) | |
| combined = pd.concat(frames, ignore_index=True) | |
| combined = combined[combined["IP"] > 0].dropna(subset=["Name"]) | |
| return combined | |
| def _fetch_hitter_records(seasons: list[int]) -> pd.DataFrame: | |
| """Return DataFrame with Name, Season, AB, IDfg columns — all hitters with AB > 0.""" | |
| frames = [] | |
| for yr in seasons: | |
| print(f" Fetching batting_stats({yr})...") | |
| try: | |
| df = pybaseball.batting_stats(yr, yr, league="all", qual=0, ind=1) | |
| if "Season" not in df.columns: | |
| df["Season"] = int(yr) | |
| df["Season"] = int(yr) | |
| keep = [c for c in ["Name", "Season", "AB", "IDfg"] if c in df.columns] | |
| frames.append(df[keep].copy()) | |
| except Exception as exc: | |
| print(f" WARNING: batting_stats({yr}) failed: {exc}") | |
| if not frames: | |
| return pd.DataFrame(columns=["Name", "Season", "AB", "IDfg"]) | |
| combined = pd.concat(frames, ignore_index=True) | |
| combined = combined[combined["AB"] > 0].dropna(subset=["Name"]) | |
| return combined | |
| # --------------------------------------------------------------------------- | |
| # Step 2 — Get MLBAM IDs from FanGraphs IDs | |
| # --------------------------------------------------------------------------- | |
| def _build_fg_to_mlbam(fg_ids: list[int]) -> dict[int, int]: | |
| """Return {fg_id: mlbam_id} for all resolvable FG IDs.""" | |
| if not fg_ids: | |
| return {} | |
| print(f" Looking up MLBAM IDs for {len(fg_ids)} FanGraphs IDs...") | |
| try: | |
| lookup = pybaseball.playerid_reverse_lookup(fg_ids, key_type="fangraphs") | |
| lookup = lookup.dropna(subset=["key_mlbam"]) | |
| lookup["key_fangraphs"] = lookup["key_fangraphs"].astype(int) | |
| lookup["key_mlbam"] = lookup["key_mlbam"].astype(int) | |
| return dict(zip(lookup["key_fangraphs"], lookup["key_mlbam"])) | |
| except Exception as exc: | |
| print(f" WARNING: playerid_reverse_lookup failed: {exc}") | |
| return {} | |
| # --------------------------------------------------------------------------- | |
| # Step 3 — Build raw identity records | |
| # --------------------------------------------------------------------------- | |
| def _build_raw_records( | |
| pitcher_df: pd.DataFrame, | |
| hitter_df: pd.DataFrame, | |
| fg_to_mlbam: dict[int, int], | |
| ) -> pd.DataFrame: | |
| """ | |
| Combine pitcher + hitter records into raw identity records (pre-merge, pre-collision). | |
| """ | |
| records: dict[tuple, dict] = {} # (player_id or normalized_name) → record | |
| def _upsert(name: str, fg_id_raw, is_hitter: bool, is_pitcher: bool) -> None: | |
| canonical = to_canonical_name(str(name).strip()) | |
| norm_key = normalize_for_matching(canonical) | |
| fg_id: int | None = int(fg_id_raw) if pd.notna(fg_id_raw) else None | |
| mlbam: int | None = fg_to_mlbam.get(fg_id) if fg_id is not None else None | |
| # Primary merge key: MLBAM player_id | |
| # Fallback merge key: normalized name (only if no MLBAM ID) | |
| rec_key = ("mlbam", mlbam) if mlbam is not None else ("norm", norm_key) | |
| if rec_key in records: | |
| rec = records[rec_key] | |
| if is_hitter: | |
| rec["role_hitter"] = True | |
| if is_pitcher: | |
| rec["role_pitcher"] = True | |
| # Update IDs if we have new info | |
| if fg_id is not None and rec.get("fangraphs_id") is None: | |
| rec["fangraphs_id"] = fg_id | |
| if mlbam is not None and rec.get("player_id") is None: | |
| rec["player_id"] = mlbam | |
| else: | |
| source = "pybaseball+mlbam" if mlbam is not None else "pybaseball-only" | |
| records[rec_key] = { | |
| "player_id": mlbam, | |
| "fangraphs_id": fg_id, | |
| "canonical_name": canonical, | |
| "canonical_name_normalized": norm_key, | |
| "pybaseball_name": str(name).strip(), | |
| "statcast_name": None, | |
| "role_hitter": is_hitter, | |
| "role_pitcher": is_pitcher, | |
| "display_name": canonical, # set properly in collision step | |
| "source_note": source, | |
| } | |
| print(" Building raw identity records from pitcher data...") | |
| for _, row in pitcher_df.iterrows(): | |
| _upsert(row["Name"], row.get("IDfg"), is_hitter=False, is_pitcher=True) | |
| print(" Building raw identity records from hitter data...") | |
| for _, row in hitter_df.iterrows(): | |
| _upsert(row["Name"], row.get("IDfg"), is_hitter=True, is_pitcher=False) | |
| df = pd.DataFrame(list(records.values())) | |
| print(f" Raw records: {len(df)}") | |
| return df | |
| # --------------------------------------------------------------------------- | |
| # Step 4 — statcast_name enrichment (REQUIRED) | |
| # --------------------------------------------------------------------------- | |
| def _enrich_statcast_names(identity_df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Populate statcast_name for each identity record. | |
| DB context: statcast_event_core.player_name stores the PITCHER name in "Last, First" | |
| format. Pure hitters never appear there. Strategy: | |
| - role_pitcher records: match against ec.player_name (Last, First → canonical → norm). | |
| statcast_name = the DB's "Last, First" string (used in pitcher selector label only; | |
| pitcher window queries use ec.pitcher = :pitcher_id, not player_name). | |
| - role_hitter-only records: set statcast_name = canonical_name (First Last). | |
| Hitter window queries will use ec.batter = :batter_id via player_id, not player_name. | |
| - Two-way players (both roles): try pitcher match first; fall back to canonical_name. | |
| """ | |
| print(" Connecting to DB for statcast_name enrichment (pitchers only)...") | |
| conn = get_connection() | |
| try: | |
| rows = conn.execute( | |
| text("SELECT DISTINCT player_name FROM statcast_event_core WHERE player_name IS NOT NULL") | |
| ).fetchall() | |
| finally: | |
| conn.close() | |
| statcast_names: list[str] = [r[0] for r in rows if r[0]] | |
| print(f" Loaded {len(statcast_names)} distinct statcast player_names (pitcher names)") | |
| # Pre-build O(1) lookup dicts so the per-record loop is fast. | |
| # statcast names are "Last, First" — apply to_canonical_name before normalizing | |
| # so the key matches pybaseball "First Last" canonical_name_normalized. | |
| canonical_lower_to_statcast: dict[str, str] = {} # "first last" → "Last, First" | |
| norm_to_statcast: dict[str, list[str]] = defaultdict(list) | |
| for sc in statcast_names: | |
| canon = to_canonical_name(sc) # "Last, First" → "First Last" | |
| cl = canon.lower() | |
| if cl not in canonical_lower_to_statcast: | |
| canonical_lower_to_statcast[cl] = sc | |
| norm_to_statcast[normalize_for_matching(canon)].append(sc) | |
| resolved_pitcher = 0 | |
| resolved_hitter = 0 | |
| ambiguous = 0 | |
| unmatched_pitcher = 0 | |
| for idx, row in identity_df.iterrows(): | |
| canonical = row["canonical_name"] | |
| norm_key = row["canonical_name_normalized"] | |
| is_pitcher = bool(row.get("role_pitcher")) | |
| is_hitter = bool(row.get("role_hitter")) | |
| if is_pitcher: | |
| # Layer 1: exact canonical lowercase match (O(1)) | |
| sc = canonical_lower_to_statcast.get(canonical.lower()) | |
| if sc: | |
| identity_df.at[idx, "statcast_name"] = sc | |
| identity_df.at[idx, "source_note"] = str(row.get("source_note", "")) + "+statcast" | |
| resolved_pitcher += 1 | |
| continue | |
| # Layer 2: normalized key match (O(1)) | |
| candidates = norm_to_statcast.get(norm_key, []) | |
| if len(candidates) == 1: | |
| identity_df.at[idx, "statcast_name"] = candidates[0] | |
| identity_df.at[idx, "source_note"] = str(row.get("source_note", "")) + "+statcast-norm" | |
| resolved_pitcher += 1 | |
| continue | |
| elif len(candidates) > 1: | |
| print(f" AMBIGUOUS pitcher: {canonical!r} → {candidates}") | |
| ambiguous += 1 | |
| # Fall through: use canonical_name as statcast_name so row is not excluded | |
| else: | |
| print(f" UNMATCHED pitcher: {canonical!r}") | |
| unmatched_pitcher += 1 | |
| # Fall through to hitter branch if also a hitter; else canonical fallback | |
| if is_hitter and identity_df.at[idx, "statcast_name"] is None: | |
| # Hitter window queries use ec.batter = :batter_id (player_id), not player_name. | |
| # statcast_name must be non-null to pass build validation. | |
| # Use canonical_name (First Last) as a stable non-null placeholder. | |
| identity_df.at[idx, "statcast_name"] = canonical | |
| identity_df.at[idx, "source_note"] = str(row.get("source_note", "")) + "+hitter-canonical" | |
| resolved_hitter += 1 | |
| # Final fallback: any record still missing statcast_name (e.g. unmatched pure pitcher) | |
| if identity_df.at[idx, "statcast_name"] is None: | |
| identity_df.at[idx, "statcast_name"] = canonical | |
| identity_df.at[idx, "source_note"] = str(row.get("source_note", "")) + "+canonical-fallback" | |
| print( | |
| f" Enrichment: pitcher_matched={resolved_pitcher} " | |
| f"hitter_canonical={resolved_hitter} " | |
| f"ambiguous={ambiguous} unmatched_pitcher={unmatched_pitcher}" | |
| ) | |
| return identity_df | |
| # --------------------------------------------------------------------------- | |
| # Step 5 — Collision handling | |
| # --------------------------------------------------------------------------- | |
| def _resolve_collisions(identity_df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Assign collision-safe display_name values. | |
| For players sharing canonical_name_normalized but having different player_ids: | |
| - First (lowest player_id): display_name = canonical_name | |
| - Others: display_name = f"{canonical_name} ({player_id})" | |
| """ | |
| identity_df = identity_df.copy() | |
| identity_df["display_name"] = identity_df["canonical_name"] | |
| # Find collisions: same normalized name, multiple rows | |
| norm_groups = identity_df.groupby("canonical_name_normalized") | |
| collision_count = 0 | |
| for norm_key, group in norm_groups: | |
| if len(group) == 1: | |
| continue # no collision | |
| # Multiple records share the same normalized name — assign suffixes to all but the first. | |
| # Sort: non-null player_id ascending first, then null (deterministic). | |
| sorted_group = group.sort_values( | |
| "player_id", ascending=True, na_position="last" | |
| ) | |
| for rank, (idx, row) in enumerate(sorted_group.iterrows()): | |
| if rank == 0: | |
| # Primary: keep canonical_name as display_name | |
| pass | |
| else: | |
| pid = row.get("player_id") | |
| suffix = str(int(pid)) if pd.notna(pid) else "?" | |
| identity_df.at[idx, "display_name"] = f"{row['canonical_name']} ({suffix})" | |
| identity_df.at[idx, "source_note"] = ( | |
| str(row.get("source_note", "")) + "+collision-resolved" | |
| ) | |
| collision_count += 1 | |
| if collision_count: | |
| print(f" Resolved {collision_count} collision suffix(es)") | |
| return identity_df | |
| # --------------------------------------------------------------------------- | |
| # Main | |
| # --------------------------------------------------------------------------- | |
| def build_identity_map(seasons: list[int]) -> None: | |
| print(f"\n=== Building player_identity_map.parquet (seasons={seasons}) ===") | |
| print("\n[1/5] Fetching pybaseball pitcher data...") | |
| pitcher_df = _fetch_pitcher_records(seasons) | |
| print(f" Pitcher rows: {len(pitcher_df)}") | |
| print("\n[2/5] Fetching pybaseball hitter data...") | |
| hitter_df = _fetch_hitter_records(seasons) | |
| print(f" Hitter rows: {len(hitter_df)}") | |
| print("\n[3/5] Building MLBAM ID lookup...") | |
| all_fg_ids = set() | |
| for col_df in [pitcher_df, hitter_df]: | |
| if "IDfg" in col_df.columns: | |
| all_fg_ids.update( | |
| int(v) for v in col_df["IDfg"].dropna() if str(v) != "nan" | |
| ) | |
| fg_to_mlbam = _build_fg_to_mlbam(list(all_fg_ids)) | |
| print(f" Resolved {len(fg_to_mlbam)} / {len(all_fg_ids)} FanGraphs IDs to MLBAM") | |
| print("\n[4/5] Building raw identity records...") | |
| identity_df = _build_raw_records(pitcher_df, hitter_df, fg_to_mlbam) | |
| print("\n[5/5] Enriching statcast_name from DB...") | |
| identity_df = _enrich_statcast_names(identity_df) | |
| print("\n[6/6] Resolving collisions and assigning display_name...") | |
| identity_df = _resolve_collisions(identity_df) | |
| OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| identity_df.to_parquet(OUTPUT_PATH, index=False) | |
| print(f"\nWrote {len(identity_df)} rows -> {OUTPUT_PATH}") | |
| # Summary | |
| with_mlbam = identity_df["player_id"].notna().sum() | |
| with_statcast = identity_df["statcast_name"].notna().sum() | |
| missing_statcast = identity_df["statcast_name"].isna().sum() | |
| print(f"\nSummary:") | |
| print(f" Total records: {len(identity_df)}") | |
| print(f" With MLBAM player_id: {with_mlbam}") | |
| print(f" With statcast_name: {with_statcast}") | |
| print(f" Missing statcast_name: {missing_statcast}") | |
| if missing_statcast: | |
| missing = identity_df[identity_df["statcast_name"].isna()]["canonical_name"].head(20).tolist() | |
| print(f" First missing: {missing}") | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Build player identity map parquet") | |
| parser.add_argument("--seasons", nargs="+", type=int, default=DEFAULT_SEASONS) | |
| args = parser.parse_args() | |
| build_identity_map(args.seasons) | |