2026_MLB_Model / analytics /game_completion.py
Syntrex's picture
Fix batter_prop_outcomes explosion: dedup audit view + scoped grading
22080c8
raw
history blame
28.9 kB
from __future__ import annotations
"""
analytics/game_completion.py
Automatic game-completion pipeline.
When a game transitions LIVE → FINAL, `on_game_complete` runs four steps:
1. Grade game_outcomes from scores feed
2. Ingest Baseball Savant pitch-level data into statcast tables
3. Grade batter_prop_outcomes from the recommendation audit view
4. Fill realized outcomes using the freshly ingested statcast rows
"""
from typing import Optional
import pandas as pd
from sqlalchemy import text
from data.statcast import fetch_statcast_range
from models.batter_zone_model import classify_zone_bucket, normalize_pitch_family
from utils.helpers import utc_now_iso
from utils.logger import logger
# ---------------------------------------------------------------------------
# Column lists
# ---------------------------------------------------------------------------
_CORE_COLS = [
"event_key", "player_name", "batter", "pitcher",
"game_date", "game_pk", "source_season",
"pitch_name", "events", "description",
"stand", "p_throws", "home_team", "away_team",
"inning", "inning_topbot", "at_bat_number", "pitch_number",
"plate_x", "plate_z",
]
_BATTED_COLS = [
"event_key", "launch_speed", "launch_angle",
"bb_type", "estimated_woba_using_speedangle",
]
_RELEASE_COLS = [
"event_key", "release_speed", "release_spin_rate",
"pfx_x", "pfx_z", "release_extension",
]
_PITCH_MIX_COLS = [
"event_key", "pa_key",
"game_pk", "game_date", "source_season",
"batter", "pitcher", "player_name",
"stand", "p_throws", "home_team", "away_team",
"inning", "inning_topbot", "at_bat_number", "pitch_number",
"pitch_type", "pitch_name",
"release_speed", "effective_speed", "release_spin_rate", "spin_axis",
"pfx_x", "pfx_z",
"release_pos_x", "release_pos_y", "release_pos_z", "release_extension",
"plate_x", "plate_z", "zone",
"balls", "strikes", "outs_when_up", "bat_score", "fld_score",
"type", "description", "events",
"launch_speed", "launch_angle", "hit_distance_sc",
"hc_x", "hc_y", "spray_angle",
"bb_type", "launch_speed_angle", "barrel",
"estimated_ba_using_speedangle", "estimated_woba_using_speedangle",
"woba_value", "woba_denom",
"delta_home_win_exp", "delta_run_exp",
]
_BATTER_GAME_LOG_COLS = [
"pa_key", "game_pk", "game_date", "source_season",
"batter", "player_name", "stand", "p_throws",
"home_team", "away_team", "inning", "inning_topbot", "at_bat_number",
"pitches_seen", "balls_final", "strikes_final", "outs_when_up",
"events", "description",
"launch_speed", "launch_angle", "hit_distance_sc", "spray_angle",
"bb_type", "launch_speed_angle", "barrel",
"estimated_ba_using_speedangle", "estimated_woba_using_speedangle",
"woba_value", "woba_denom",
"hit_flag", "hr_flag", "tb2p_flag",
"delta_home_win_exp", "delta_run_exp",
]
_INSERT_CHUNK = 500
# ---------------------------------------------------------------------------
# Private helpers
# ---------------------------------------------------------------------------
def _safe_float(val) -> Optional[float]:
try:
if val is None or str(val).strip() in {"", "nan", "None"}:
return None
return float(val)
except Exception:
return None
def _safe_int(val) -> Optional[int]:
try:
if val is None or str(val).strip() in {"", "nan", "None"}:
return None
return int(float(val))
except Exception:
return None
def _upsert_df(conn, table: str, df: pd.DataFrame, pk: str = "event_key") -> int:
"""INSERT … ON CONFLICT (pk) DO NOTHING. Returns rows attempted."""
if df is None or df.empty:
return 0
cols = list(df.columns)
col_list = ", ".join(cols)
placeholders = ", ".join(f":{c}" for c in cols)
sql = text(
f"INSERT INTO {table} ({col_list}) VALUES ({placeholders}) "
f"ON CONFLICT ({pk}) DO NOTHING"
)
records = df.where(df.notna(), other=None).to_dict("records")
total = 0
for i in range(0, len(records), _INSERT_CHUNK):
chunk = records[i : i + _INSERT_CHUNK]
conn.execute(sql, chunk)
total += len(chunk)
return total
_SAVANT_BACKFILL_COLS = [
"launch_speed", "launch_angle", "hit_distance_sc", "hc_x", "hc_y",
"spray_angle", "bb_type", "launch_speed_angle", "barrel",
"estimated_ba_using_speedangle", "estimated_woba_using_speedangle",
"woba_value", "woba_denom", "delta_home_win_exp", "delta_run_exp",
"effective_speed", "bat_score", "fld_score",
]
def _upsert_df_backfill(conn, table: str, df: pd.DataFrame, pk: str = "event_key") -> int:
"""INSERT … ON CONFLICT DO UPDATE SET col = COALESCE(EXCLUDED.col, table.col).
Used post-game to backfill Savant-only columns into live-ingested rows."""
if df is None or df.empty:
return 0
cols = list(df.columns)
backfill = [c for c in _SAVANT_BACKFILL_COLS if c in cols]
if not backfill:
return _upsert_df(conn, table, df, pk)
col_list = ", ".join(cols)
placeholders = ", ".join(f":{c}" for c in cols)
updates = ", ".join(f"{c} = COALESCE(EXCLUDED.{c}, {table}.{c})" for c in backfill)
sql = text(
f"INSERT INTO {table} ({col_list}) VALUES ({placeholders}) "
f"ON CONFLICT ({pk}) DO UPDATE SET {updates}"
)
records = df.where(df.notna(), other=None).to_dict("records")
total = 0
for i in range(0, len(records), _INSERT_CHUNK):
chunk = records[i : i + _INSERT_CHUNK]
conn.execute(sql, chunk)
total += len(chunk)
return total
def _build_pitcher_zone_rows(game_df: pd.DataFrame, game_pk: int, game_date: str) -> pd.DataFrame:
rows = []
for _, r in game_df.iterrows():
pitch_name = str(r.get("pitch_name") or "")
plate_x = r.get("plate_x")
plate_z = r.get("plate_z")
events_val = str(r.get("events") or "")
description = str(r.get("description") or "").lower()
pitch_family = normalize_pitch_family(pitch_name)
zone_bucket = classify_zone_bucket(plate_x, plate_z)
whiff_flag = int("swinging_strike" in description)
hit_flag = int(events_val.lower() in {"single", "double", "triple", "home_run"})
hr_flag = int(events_val.lower() == "home_run")
pitcher_id_raw = r.get("pitcher")
try:
pitcher_id = int(float(pitcher_id_raw)) if pitcher_id_raw is not None and str(pitcher_id_raw).strip() not in {"", "nan"} else None
except Exception:
pitcher_id = None
rows.append({
"event_key": r.get("event_key"),
"pitcher_name": None, # Savant CSV rows don't carry pitcher name directly
"pitcher_id": pitcher_id,
"game_pk": int(game_pk),
"game_date": str(game_date)[:10],
"pitch_name": pitch_name or None,
"pitch_family": pitch_family,
"zone_bucket": zone_bucket,
"plate_x": _safe_float(plate_x),
"plate_z": _safe_float(plate_z),
"pfx_x": _safe_float(r.get("pfx_x")),
"pfx_z": _safe_float(r.get("pfx_z")),
"release_speed": _safe_float(r.get("release_speed")),
"release_spin_rate": _safe_float(r.get("release_spin_rate")),
"release_extension": _safe_float(r.get("release_extension")),
"events": events_val or None,
"whiff_flag": whiff_flag,
"hit_flag": hit_flag,
"hr_flag": hr_flag,
})
return pd.DataFrame(rows)
def _build_pitch_mix_rows(game_df: pd.DataFrame, game_pk: int, game_date: str) -> pd.DataFrame:
rows = []
for _, r in game_df.iterrows():
at_bat_number = r.get("at_bat_number")
pa_key = f"{game_pk}_{int(float(at_bat_number))}" if at_bat_number is not None and str(at_bat_number).strip() not in {"", "nan"} else None
rows.append({
"event_key": r.get("event_key"),
"pa_key": pa_key,
"game_pk": int(game_pk),
"game_date": str(game_date)[:10],
"source_season": int(str(game_date)[:4]),
"batter": _safe_int(r.get("batter")),
"pitcher": _safe_int(r.get("pitcher")),
"player_name": r.get("player_name") or None,
"stand": r.get("stand") or None,
"p_throws": r.get("p_throws") or None,
"home_team": r.get("home_team") or None,
"away_team": r.get("away_team") or None,
"inning": _safe_int(r.get("inning")),
"inning_topbot": r.get("inning_topbot") or None,
"at_bat_number": _safe_int(at_bat_number),
"pitch_number": _safe_int(r.get("pitch_number")),
"pitch_type": r.get("pitch_type") or None,
"pitch_name": r.get("pitch_name") or None,
"release_speed": _safe_float(r.get("release_speed")),
"effective_speed": _safe_float(r.get("effective_speed")),
"release_spin_rate": _safe_float(r.get("release_spin_rate")),
"spin_axis": _safe_float(r.get("spin_axis")),
"pfx_x": _safe_float(r.get("pfx_x")),
"pfx_z": _safe_float(r.get("pfx_z")),
"release_pos_x": _safe_float(r.get("release_pos_x")),
"release_pos_y": _safe_float(r.get("release_pos_y")),
"release_pos_z": _safe_float(r.get("release_pos_z")),
"release_extension": _safe_float(r.get("release_extension")),
"plate_x": _safe_float(r.get("plate_x")),
"plate_z": _safe_float(r.get("plate_z")),
"zone": _safe_int(r.get("zone")),
"balls": _safe_int(r.get("balls")),
"strikes": _safe_int(r.get("strikes")),
"outs_when_up": _safe_int(r.get("outs_when_up")),
"bat_score": _safe_int(r.get("bat_score")),
"fld_score": _safe_int(r.get("fld_score")),
"type": r.get("type") or None,
"description": r.get("description") or None,
"events": r.get("events") or None,
"launch_speed": _safe_float(r.get("launch_speed")),
"launch_angle": _safe_float(r.get("launch_angle")),
"hit_distance_sc": _safe_float(r.get("hit_distance_sc")),
"hc_x": _safe_float(r.get("hc_x")),
"hc_y": _safe_float(r.get("hc_y")),
"spray_angle": _safe_float(r.get("spray_angle")),
"bb_type": r.get("bb_type") or None,
"launch_speed_angle": _safe_int(r.get("launch_speed_angle")),
"barrel": _safe_int(r.get("barrel")),
"estimated_ba_using_speedangle": _safe_float(r.get("estimated_ba_using_speedangle")),
"estimated_woba_using_speedangle": _safe_float(r.get("estimated_woba_using_speedangle")),
"woba_value": _safe_float(r.get("woba_value")),
"woba_denom": _safe_int(r.get("woba_denom")),
"delta_home_win_exp": _safe_float(r.get("delta_home_win_exp")),
"delta_run_exp": _safe_float(r.get("delta_run_exp")),
})
return pd.DataFrame(rows)
def _build_batter_game_log_rows(game_df: pd.DataFrame, game_pk: int, game_date: str) -> pd.DataFrame:
if "at_bat_number" not in game_df.columns:
logger.warning("[game_completion] at_bat_number missing from game_df for game_pk=%s", game_pk)
return pd.DataFrame()
work = game_df.copy()
work["at_bat_number"] = pd.to_numeric(work["at_bat_number"], errors="coerce")
work["pitch_number"] = pd.to_numeric(work["pitch_number"], errors="coerce")
rows = []
for ab_num, group in work.groupby("at_bat_number", sort=True):
if group.empty:
continue
group_sorted = group.sort_values("pitch_number")
first_row = group_sorted.iloc[0]
terminal_row = group_sorted.iloc[-1]
events_val = terminal_row.get("events") or None
events_str = str(events_val or "").strip().lower()
hit_flag = int(events_str in {"single", "double", "triple", "home_run"})
hr_flag = int(events_str == "home_run")
tb2p_flag = int({"single": 1, "double": 2, "triple": 3, "home_run": 4}.get(events_str, 0) >= 2)
def _col_sum(col):
if col not in group.columns:
return None
vals = pd.to_numeric(group[col], errors="coerce").dropna()
return float(vals.sum()) if not vals.empty else None
rows.append({
"pa_key": f"{game_pk}_{int(ab_num)}",
"game_pk": int(game_pk),
"game_date": str(game_date)[:10],
"source_season": int(str(game_date)[:4]),
"batter": _safe_int(terminal_row.get("batter")),
"player_name": terminal_row.get("player_name") or None,
"stand": terminal_row.get("stand") or None,
"p_throws": terminal_row.get("p_throws") or None,
"home_team": terminal_row.get("home_team") or None,
"away_team": terminal_row.get("away_team") or None,
"inning": _safe_int(terminal_row.get("inning")),
"inning_topbot": terminal_row.get("inning_topbot") or None,
"at_bat_number": int(ab_num),
"pitches_seen": len(group),
"balls_final": _safe_int(terminal_row.get("balls")),
"strikes_final": _safe_int(terminal_row.get("strikes")),
"outs_when_up": _safe_int(first_row.get("outs_when_up")),
"events": events_val,
"description": terminal_row.get("description") or None,
"launch_speed": _safe_float(terminal_row.get("launch_speed")),
"launch_angle": _safe_float(terminal_row.get("launch_angle")),
"hit_distance_sc": _safe_float(terminal_row.get("hit_distance_sc")),
"spray_angle": _safe_float(terminal_row.get("spray_angle")),
"bb_type": terminal_row.get("bb_type") or None,
"launch_speed_angle": _safe_int(terminal_row.get("launch_speed_angle")),
"barrel": _safe_int(terminal_row.get("barrel")),
"estimated_ba_using_speedangle": _safe_float(terminal_row.get("estimated_ba_using_speedangle")),
"estimated_woba_using_speedangle": _safe_float(terminal_row.get("estimated_woba_using_speedangle")),
"woba_value": _safe_float(terminal_row.get("woba_value")),
"woba_denom": _safe_int(terminal_row.get("woba_denom")),
"hit_flag": hit_flag,
"hr_flag": hr_flag,
"tb2p_flag": tb2p_flag,
"delta_home_win_exp": _col_sum("delta_home_win_exp"),
"delta_run_exp": _col_sum("delta_run_exp"),
})
return pd.DataFrame(rows)
# ---------------------------------------------------------------------------
# Public: ingest statcast for a single game
# ---------------------------------------------------------------------------
def ingest_statcast_for_game(conn, game_pk: int, game_date: str) -> int:
"""
Fetch Baseball Savant data for game_date, filter to game_pk rows, split into
statcast_event_core / statcast_batted_ball / statcast_pitch_release /
pitcher_zone_events, and upsert with ON CONFLICT DO NOTHING.
Returns the number of core rows inserted (0 = Savant hasn't posted yet — normal
for <60 min post-game).
"""
try:
raw = fetch_statcast_range(game_date, game_date)
except Exception as exc:
logger.warning("[game_completion] statcast fetch failed game_pk=%s: %s", game_pk, exc)
return 0
if raw is None or raw.empty:
logger.info("[game_completion] no statcast rows for date %s (game_pk=%s)", game_date, game_pk)
return 0
if "game_pk" not in raw.columns:
logger.warning("[game_completion] game_pk column missing from statcast fetch")
return 0
game_df = raw[raw["game_pk"].astype(str) == str(game_pk)].copy()
if game_df.empty:
logger.info("[game_completion] game_pk %s not in statcast rows for %s", game_pk, game_date)
return 0
# Build synthetic primary key
game_df["event_key"] = (
game_df["game_pk"].astype(str) + "_"
+ game_df["at_bat_number"].astype(str) + "_"
+ game_df["pitch_number"].astype(str)
)
game_df["game_pk"] = pd.to_numeric(game_df["game_pk"], errors="coerce")
game_df["source_season"] = int(str(game_date)[:4])
# ── statcast_event_core ──────────────────────────────────────────────
core_present = [c for c in _CORE_COLS if c in game_df.columns]
core_df = game_df[core_present].copy()
for col in ["inning", "at_bat_number", "pitch_number", "batter", "pitcher"]:
if col in core_df.columns:
core_df[col] = pd.to_numeric(core_df[col], errors="coerce")
for col in ["plate_x", "plate_z"]:
if col in core_df.columns:
core_df[col] = pd.to_numeric(core_df[col], errors="coerce")
n_core = _upsert_df(conn, "statcast_event_core", core_df)
# ── statcast_batted_ball ─────────────────────────────────────────────
if "launch_speed" in game_df.columns:
bb_df = game_df[game_df["launch_speed"].notna()].copy()
bb_present = [c for c in _BATTED_COLS if c in bb_df.columns]
if bb_present:
bb_out = bb_df[bb_present].copy()
for col in ["launch_speed", "launch_angle", "estimated_woba_using_speedangle"]:
if col in bb_out.columns:
bb_out[col] = pd.to_numeric(bb_out[col], errors="coerce")
_upsert_df(conn, "statcast_batted_ball", bb_out)
# ── statcast_pitch_release ───────────────────────────────────────────
if "release_speed" in game_df.columns:
rel_df = game_df[game_df["release_speed"].notna()].copy()
rel_present = [c for c in _RELEASE_COLS if c in rel_df.columns]
if rel_present:
rel_out = rel_df[rel_present].copy()
for col in ["release_speed", "release_spin_rate", "pfx_x", "pfx_z", "release_extension"]:
if col in rel_out.columns:
rel_out[col] = pd.to_numeric(rel_out[col], errors="coerce")
_upsert_df(conn, "statcast_pitch_release", rel_out)
# ── pitcher_zone_events ──────────────────────────────────────────────
try:
zone_df = _build_pitcher_zone_rows(game_df, game_pk, game_date)
if not zone_df.empty:
_upsert_df(conn, "pitcher_zone_events", zone_df)
except Exception as exc:
logger.warning("[game_completion] pitcher_zone_events ingest failed: %s", exc)
# ── live_pitch_mix_2026 ──────────────────────────────────────────────
try:
pitch_mix_df = _build_pitch_mix_rows(game_df, game_pk, game_date)
if not pitch_mix_df.empty:
n_pm = _upsert_df_backfill(conn, "live_pitch_mix_2026", pitch_mix_df)
logger.info("[game_completion] ingested %d pitch_mix rows for game_pk=%s", n_pm, game_pk)
except Exception as exc:
logger.warning("[game_completion] live_pitch_mix_2026 ingest failed: %s", exc)
# ── live_batter_game_log_2026 ────────────────────────────────────────
try:
batter_log_df = _build_batter_game_log_rows(game_df, game_pk, game_date)
if not batter_log_df.empty:
n_bgl = _upsert_df_backfill(conn, "live_batter_game_log_2026", batter_log_df, pk="pa_key")
logger.info("[game_completion] ingested %d batter_game_log rows for game_pk=%s", n_bgl, game_pk)
except Exception as exc:
logger.warning("[game_completion] live_batter_game_log_2026 ingest failed: %s", exc)
logger.info("[game_completion] ingested %d core rows for game_pk=%s", n_core, game_pk)
return n_core
# ---------------------------------------------------------------------------
# Public: full completion pipeline
# ---------------------------------------------------------------------------
def on_game_complete(conn, game_pk: int, game_date: str, scores_df: pd.DataFrame) -> None:
"""
Full automatic completion pipeline — each step is isolated so partial
failures don't block subsequent steps.
Step 1: grade game_outcomes from scores feed
Step 2: ingest Baseball Savant data (may be 0 rows if Savant hasn't posted)
Step 3: grade batter_prop_outcomes from recommendation audit view
Step 4: fill realized batter outcomes using freshly ingested statcast rows
"""
logger.info("[game_completion] pipeline start game_pk=%s date=%s", game_pk, game_date)
# Step 1: grade game outcomes
try:
from analytics.outcome_grader import build_game_outcome_rows_from_scores
from database.db import insert_game_outcomes
outcome_df = build_game_outcome_rows_from_scores(
scores_df=scores_df,
graded_at=utc_now_iso(),
)
insert_game_outcomes(conn, outcome_df)
logger.info("[game_completion] step 1 done: %d game_outcomes rows", len(outcome_df))
except Exception as exc:
logger.warning("[game_completion] step 1 (game_outcomes) failed: %s", exc)
# Step 2: ingest statcast
statcast_df = pd.DataFrame()
try:
n = ingest_statcast_for_game(conn, game_pk, game_date)
logger.info("[game_completion] step 2 done: %d statcast rows", n)
if n > 0:
statcast_df = pd.read_sql(
text("SELECT * FROM statcast_event_core WHERE game_pk = :pk"),
conn,
params={"pk": int(game_pk)},
)
except Exception as exc:
logger.warning("[game_completion] step 2 (statcast ingest) failed: %s", exc)
# Step 3: grade batter prop outcomes
try:
from analytics.batter_prop_grader import build_batter_prop_outcome_rows_from_audit
from database.db import (
read_recommendation_audit_view,
insert_batter_prop_outcomes,
delete_batter_prop_outcomes_for_game,
)
audit_df = read_recommendation_audit_view(conn)
audit_df = audit_df[audit_df["game_pk"].astype(str).str.strip() == str(game_pk)]
prop_df = build_batter_prop_outcome_rows_from_audit(
audit_df=audit_df,
graded_at=utc_now_iso(),
)
delete_batter_prop_outcomes_for_game(conn, str(game_pk))
insert_batter_prop_outcomes(conn, prop_df)
logger.info("[game_completion] step 3 done: %d batter_prop_outcomes rows", len(prop_df))
except Exception as exc:
logger.warning("[game_completion] step 3 (batter prop grade) failed: %s", exc)
# Step 4: fill realized outcomes (only if statcast was available)
if not statcast_df.empty:
try:
from analytics.batter_realization import build_batter_realization_rows
from database.db import (
read_batter_prop_outcomes_for_game,
delete_batter_prop_outcomes_for_game,
insert_batter_prop_outcomes,
)
batter_prop_df = read_batter_prop_outcomes_for_game(conn, str(game_pk))
if not batter_prop_df.empty:
graded_df = build_batter_realization_rows(
batter_prop_outcomes_df=batter_prop_df,
statcast_df=statcast_df,
graded_at=utc_now_iso(),
)
if not graded_df.empty:
delete_batter_prop_outcomes_for_game(conn, str(game_pk))
insert_batter_prop_outcomes(conn, graded_df)
logger.info("[game_completion] step 4 done: %d realized rows", len(graded_df))
except Exception as exc:
logger.warning("[game_completion] step 4 (fill realized) failed: %s", exc)
logger.info("[game_completion] pipeline complete game_pk=%s", game_pk)
# ---------------------------------------------------------------------------
# Public: live-ingest helpers (called pitch-by-pitch from app.py)
# ---------------------------------------------------------------------------
def upsert_live_pitch_rows(rows: list[dict]) -> int:
"""
Insert partial live pitch rows into live_pitch_mix_2026.
Savant-only columns (launch_angle, barrel, etc.) remain NULL
and are backfilled by the post-game ingest_statcast_for_game().
Uses DO NOTHING so post-game full rows are preserved.
"""
if not rows:
return 0
from database.remote_db import get_connection
conn = get_connection()
try:
df = pd.DataFrame(rows)
n = _upsert_df(conn, "live_pitch_mix_2026", df)
conn.commit()
return n
except Exception as exc:
logger.warning("[live_pitch] upsert failed: %s", exc)
return 0
finally:
conn.close()
def upsert_live_pa_log_rows(rows: list[dict]) -> int:
"""
Insert partial live PA rows into live_batter_game_log_2026.
Savant-only columns remain NULL and are backfilled post-game.
Uses DO NOTHING so post-game full rows are preserved.
"""
if not rows:
return 0
from database.remote_db import get_connection
conn = get_connection()
try:
df = pd.DataFrame(rows)
n = _upsert_df(conn, "live_batter_game_log_2026", df, pk="pa_key")
conn.commit()
return n
except Exception as exc:
logger.warning("[live_pa_log] upsert failed: %s", exc)
return 0
finally:
conn.close()
def upsert_live_pitch_and_pa_rows(pitch_rows: list[dict], pa_rows: list[dict]) -> tuple[int, int]:
"""
Insert live pitch rows + PA rows using a SINGLE connection.
Called from a background thread in app.py to avoid pool contention.
"""
if not pitch_rows and not pa_rows:
return 0, 0
from database.remote_db import get_connection
conn = get_connection()
try:
n_pm = 0
if pitch_rows:
df = pd.DataFrame(pitch_rows)
n_pm = _upsert_df(conn, "live_pitch_mix_2026", df)
n_pa = 0
if pa_rows:
df = pd.DataFrame(pa_rows)
n_pa = _upsert_df(conn, "live_batter_game_log_2026", df, pk="pa_key")
conn.commit()
return n_pm, n_pa
except Exception as exc:
logger.warning("[live_pitch] combined upsert failed: %s", exc)
return 0, 0
finally:
conn.close()