Spaces:
Running
Running
| from __future__ import annotations | |
| """ | |
| analytics/game_completion.py | |
| Automatic game-completion pipeline. | |
| When a game transitions LIVE → FINAL, `on_game_complete` runs four steps: | |
| 1. Grade game_outcomes from scores feed | |
| 2. Ingest Baseball Savant pitch-level data into statcast tables | |
| 3. Grade batter_prop_outcomes from the recommendation audit view | |
| 4. Fill realized outcomes using the freshly ingested statcast rows | |
| """ | |
| from typing import Optional | |
| import pandas as pd | |
| from sqlalchemy import text | |
| from data.statcast import fetch_statcast_range | |
| from models.batter_zone_model import classify_zone_bucket, normalize_pitch_family | |
| from utils.helpers import utc_now_iso | |
| from utils.logger import logger | |
| # --------------------------------------------------------------------------- | |
| # Column lists | |
| # --------------------------------------------------------------------------- | |
| _CORE_COLS = [ | |
| "event_key", "player_name", "batter", "pitcher", | |
| "game_date", "game_pk", "source_season", | |
| "pitch_name", "events", "description", | |
| "stand", "p_throws", "home_team", "away_team", | |
| "inning", "inning_topbot", "at_bat_number", "pitch_number", | |
| "plate_x", "plate_z", | |
| ] | |
| _BATTED_COLS = [ | |
| "event_key", "launch_speed", "launch_angle", | |
| "bb_type", "estimated_woba_using_speedangle", | |
| ] | |
| _RELEASE_COLS = [ | |
| "event_key", "release_speed", "release_spin_rate", | |
| "pfx_x", "pfx_z", "release_extension", | |
| ] | |
| _PITCH_MIX_COLS = [ | |
| "event_key", "pa_key", | |
| "game_pk", "game_date", "source_season", | |
| "batter", "pitcher", "player_name", | |
| "stand", "p_throws", "home_team", "away_team", | |
| "inning", "inning_topbot", "at_bat_number", "pitch_number", | |
| "pitch_type", "pitch_name", | |
| "release_speed", "effective_speed", "release_spin_rate", "spin_axis", | |
| "pfx_x", "pfx_z", | |
| "release_pos_x", "release_pos_y", "release_pos_z", "release_extension", | |
| "plate_x", "plate_z", "zone", | |
| "balls", "strikes", "outs_when_up", "bat_score", "fld_score", | |
| "type", "description", "events", | |
| "launch_speed", "launch_angle", "hit_distance_sc", | |
| "hc_x", "hc_y", "spray_angle", | |
| "bb_type", "launch_speed_angle", "barrel", | |
| "estimated_ba_using_speedangle", "estimated_woba_using_speedangle", | |
| "woba_value", "woba_denom", | |
| "delta_home_win_exp", "delta_run_exp", | |
| ] | |
| _BATTER_GAME_LOG_COLS = [ | |
| "pa_key", "game_pk", "game_date", "source_season", | |
| "batter", "player_name", "stand", "p_throws", | |
| "home_team", "away_team", "inning", "inning_topbot", "at_bat_number", | |
| "pitches_seen", "balls_final", "strikes_final", "outs_when_up", | |
| "events", "description", | |
| "launch_speed", "launch_angle", "hit_distance_sc", "spray_angle", | |
| "bb_type", "launch_speed_angle", "barrel", | |
| "estimated_ba_using_speedangle", "estimated_woba_using_speedangle", | |
| "woba_value", "woba_denom", | |
| "hit_flag", "hr_flag", "tb2p_flag", | |
| "delta_home_win_exp", "delta_run_exp", | |
| ] | |
| _INSERT_CHUNK = 500 | |
| # --------------------------------------------------------------------------- | |
| # Private helpers | |
| # --------------------------------------------------------------------------- | |
| def _safe_float(val) -> Optional[float]: | |
| try: | |
| if val is None or str(val).strip() in {"", "nan", "None"}: | |
| return None | |
| return float(val) | |
| except Exception: | |
| return None | |
| def _safe_int(val) -> Optional[int]: | |
| try: | |
| if val is None or str(val).strip() in {"", "nan", "None"}: | |
| return None | |
| return int(float(val)) | |
| except Exception: | |
| return None | |
| def _upsert_df(conn, table: str, df: pd.DataFrame, pk: str = "event_key") -> int: | |
| """INSERT … ON CONFLICT (pk) DO NOTHING. Returns rows attempted.""" | |
| if df is None or df.empty: | |
| return 0 | |
| cols = list(df.columns) | |
| col_list = ", ".join(cols) | |
| placeholders = ", ".join(f":{c}" for c in cols) | |
| sql = text( | |
| f"INSERT INTO {table} ({col_list}) VALUES ({placeholders}) " | |
| f"ON CONFLICT ({pk}) DO NOTHING" | |
| ) | |
| records = df.where(df.notna(), other=None).to_dict("records") | |
| total = 0 | |
| for i in range(0, len(records), _INSERT_CHUNK): | |
| chunk = records[i : i + _INSERT_CHUNK] | |
| conn.execute(sql, chunk) | |
| total += len(chunk) | |
| return total | |
| _SAVANT_BACKFILL_COLS = [ | |
| "launch_speed", "launch_angle", "hit_distance_sc", "hc_x", "hc_y", | |
| "spray_angle", "bb_type", "launch_speed_angle", "barrel", | |
| "estimated_ba_using_speedangle", "estimated_woba_using_speedangle", | |
| "woba_value", "woba_denom", "delta_home_win_exp", "delta_run_exp", | |
| "effective_speed", "bat_score", "fld_score", | |
| ] | |
| def _upsert_df_backfill(conn, table: str, df: pd.DataFrame, pk: str = "event_key") -> int: | |
| """INSERT … ON CONFLICT DO UPDATE SET col = COALESCE(EXCLUDED.col, table.col). | |
| Used post-game to backfill Savant-only columns into live-ingested rows.""" | |
| if df is None or df.empty: | |
| return 0 | |
| cols = list(df.columns) | |
| backfill = [c for c in _SAVANT_BACKFILL_COLS if c in cols] | |
| if not backfill: | |
| return _upsert_df(conn, table, df, pk) | |
| col_list = ", ".join(cols) | |
| placeholders = ", ".join(f":{c}" for c in cols) | |
| updates = ", ".join(f"{c} = COALESCE(EXCLUDED.{c}, {table}.{c})" for c in backfill) | |
| sql = text( | |
| f"INSERT INTO {table} ({col_list}) VALUES ({placeholders}) " | |
| f"ON CONFLICT ({pk}) DO UPDATE SET {updates}" | |
| ) | |
| records = df.where(df.notna(), other=None).to_dict("records") | |
| total = 0 | |
| for i in range(0, len(records), _INSERT_CHUNK): | |
| chunk = records[i : i + _INSERT_CHUNK] | |
| conn.execute(sql, chunk) | |
| total += len(chunk) | |
| return total | |
| def _build_pitcher_zone_rows(game_df: pd.DataFrame, game_pk: int, game_date: str) -> pd.DataFrame: | |
| rows = [] | |
| for _, r in game_df.iterrows(): | |
| pitch_name = str(r.get("pitch_name") or "") | |
| plate_x = r.get("plate_x") | |
| plate_z = r.get("plate_z") | |
| events_val = str(r.get("events") or "") | |
| description = str(r.get("description") or "").lower() | |
| pitch_family = normalize_pitch_family(pitch_name) | |
| zone_bucket = classify_zone_bucket(plate_x, plate_z) | |
| whiff_flag = int("swinging_strike" in description) | |
| hit_flag = int(events_val.lower() in {"single", "double", "triple", "home_run"}) | |
| hr_flag = int(events_val.lower() == "home_run") | |
| pitcher_id_raw = r.get("pitcher") | |
| try: | |
| pitcher_id = int(float(pitcher_id_raw)) if pitcher_id_raw is not None and str(pitcher_id_raw).strip() not in {"", "nan"} else None | |
| except Exception: | |
| pitcher_id = None | |
| rows.append({ | |
| "event_key": r.get("event_key"), | |
| "pitcher_name": None, # Savant CSV rows don't carry pitcher name directly | |
| "pitcher_id": pitcher_id, | |
| "game_pk": int(game_pk), | |
| "game_date": str(game_date)[:10], | |
| "pitch_name": pitch_name or None, | |
| "pitch_family": pitch_family, | |
| "zone_bucket": zone_bucket, | |
| "plate_x": _safe_float(plate_x), | |
| "plate_z": _safe_float(plate_z), | |
| "pfx_x": _safe_float(r.get("pfx_x")), | |
| "pfx_z": _safe_float(r.get("pfx_z")), | |
| "release_speed": _safe_float(r.get("release_speed")), | |
| "release_spin_rate": _safe_float(r.get("release_spin_rate")), | |
| "release_extension": _safe_float(r.get("release_extension")), | |
| "events": events_val or None, | |
| "whiff_flag": whiff_flag, | |
| "hit_flag": hit_flag, | |
| "hr_flag": hr_flag, | |
| }) | |
| return pd.DataFrame(rows) | |
| def _build_pitch_mix_rows(game_df: pd.DataFrame, game_pk: int, game_date: str) -> pd.DataFrame: | |
| rows = [] | |
| for _, r in game_df.iterrows(): | |
| at_bat_number = r.get("at_bat_number") | |
| pa_key = f"{game_pk}_{int(float(at_bat_number))}" if at_bat_number is not None and str(at_bat_number).strip() not in {"", "nan"} else None | |
| rows.append({ | |
| "event_key": r.get("event_key"), | |
| "pa_key": pa_key, | |
| "game_pk": int(game_pk), | |
| "game_date": str(game_date)[:10], | |
| "source_season": int(str(game_date)[:4]), | |
| "batter": _safe_int(r.get("batter")), | |
| "pitcher": _safe_int(r.get("pitcher")), | |
| "player_name": r.get("player_name") or None, | |
| "stand": r.get("stand") or None, | |
| "p_throws": r.get("p_throws") or None, | |
| "home_team": r.get("home_team") or None, | |
| "away_team": r.get("away_team") or None, | |
| "inning": _safe_int(r.get("inning")), | |
| "inning_topbot": r.get("inning_topbot") or None, | |
| "at_bat_number": _safe_int(at_bat_number), | |
| "pitch_number": _safe_int(r.get("pitch_number")), | |
| "pitch_type": r.get("pitch_type") or None, | |
| "pitch_name": r.get("pitch_name") or None, | |
| "release_speed": _safe_float(r.get("release_speed")), | |
| "effective_speed": _safe_float(r.get("effective_speed")), | |
| "release_spin_rate": _safe_float(r.get("release_spin_rate")), | |
| "spin_axis": _safe_float(r.get("spin_axis")), | |
| "pfx_x": _safe_float(r.get("pfx_x")), | |
| "pfx_z": _safe_float(r.get("pfx_z")), | |
| "release_pos_x": _safe_float(r.get("release_pos_x")), | |
| "release_pos_y": _safe_float(r.get("release_pos_y")), | |
| "release_pos_z": _safe_float(r.get("release_pos_z")), | |
| "release_extension": _safe_float(r.get("release_extension")), | |
| "plate_x": _safe_float(r.get("plate_x")), | |
| "plate_z": _safe_float(r.get("plate_z")), | |
| "zone": _safe_int(r.get("zone")), | |
| "balls": _safe_int(r.get("balls")), | |
| "strikes": _safe_int(r.get("strikes")), | |
| "outs_when_up": _safe_int(r.get("outs_when_up")), | |
| "bat_score": _safe_int(r.get("bat_score")), | |
| "fld_score": _safe_int(r.get("fld_score")), | |
| "type": r.get("type") or None, | |
| "description": r.get("description") or None, | |
| "events": r.get("events") or None, | |
| "launch_speed": _safe_float(r.get("launch_speed")), | |
| "launch_angle": _safe_float(r.get("launch_angle")), | |
| "hit_distance_sc": _safe_float(r.get("hit_distance_sc")), | |
| "hc_x": _safe_float(r.get("hc_x")), | |
| "hc_y": _safe_float(r.get("hc_y")), | |
| "spray_angle": _safe_float(r.get("spray_angle")), | |
| "bb_type": r.get("bb_type") or None, | |
| "launch_speed_angle": _safe_int(r.get("launch_speed_angle")), | |
| "barrel": _safe_int(r.get("barrel")), | |
| "estimated_ba_using_speedangle": _safe_float(r.get("estimated_ba_using_speedangle")), | |
| "estimated_woba_using_speedangle": _safe_float(r.get("estimated_woba_using_speedangle")), | |
| "woba_value": _safe_float(r.get("woba_value")), | |
| "woba_denom": _safe_int(r.get("woba_denom")), | |
| "delta_home_win_exp": _safe_float(r.get("delta_home_win_exp")), | |
| "delta_run_exp": _safe_float(r.get("delta_run_exp")), | |
| }) | |
| return pd.DataFrame(rows) | |
| def _build_batter_game_log_rows(game_df: pd.DataFrame, game_pk: int, game_date: str) -> pd.DataFrame: | |
| if "at_bat_number" not in game_df.columns: | |
| logger.warning("[game_completion] at_bat_number missing from game_df for game_pk=%s", game_pk) | |
| return pd.DataFrame() | |
| work = game_df.copy() | |
| work["at_bat_number"] = pd.to_numeric(work["at_bat_number"], errors="coerce") | |
| work["pitch_number"] = pd.to_numeric(work["pitch_number"], errors="coerce") | |
| rows = [] | |
| for ab_num, group in work.groupby("at_bat_number", sort=True): | |
| if group.empty: | |
| continue | |
| group_sorted = group.sort_values("pitch_number") | |
| first_row = group_sorted.iloc[0] | |
| terminal_row = group_sorted.iloc[-1] | |
| events_val = terminal_row.get("events") or None | |
| events_str = str(events_val or "").strip().lower() | |
| hit_flag = int(events_str in {"single", "double", "triple", "home_run"}) | |
| hr_flag = int(events_str == "home_run") | |
| tb2p_flag = int({"single": 1, "double": 2, "triple": 3, "home_run": 4}.get(events_str, 0) >= 2) | |
| def _col_sum(col): | |
| if col not in group.columns: | |
| return None | |
| vals = pd.to_numeric(group[col], errors="coerce").dropna() | |
| return float(vals.sum()) if not vals.empty else None | |
| rows.append({ | |
| "pa_key": f"{game_pk}_{int(ab_num)}", | |
| "game_pk": int(game_pk), | |
| "game_date": str(game_date)[:10], | |
| "source_season": int(str(game_date)[:4]), | |
| "batter": _safe_int(terminal_row.get("batter")), | |
| "player_name": terminal_row.get("player_name") or None, | |
| "stand": terminal_row.get("stand") or None, | |
| "p_throws": terminal_row.get("p_throws") or None, | |
| "home_team": terminal_row.get("home_team") or None, | |
| "away_team": terminal_row.get("away_team") or None, | |
| "inning": _safe_int(terminal_row.get("inning")), | |
| "inning_topbot": terminal_row.get("inning_topbot") or None, | |
| "at_bat_number": int(ab_num), | |
| "pitches_seen": len(group), | |
| "balls_final": _safe_int(terminal_row.get("balls")), | |
| "strikes_final": _safe_int(terminal_row.get("strikes")), | |
| "outs_when_up": _safe_int(first_row.get("outs_when_up")), | |
| "events": events_val, | |
| "description": terminal_row.get("description") or None, | |
| "launch_speed": _safe_float(terminal_row.get("launch_speed")), | |
| "launch_angle": _safe_float(terminal_row.get("launch_angle")), | |
| "hit_distance_sc": _safe_float(terminal_row.get("hit_distance_sc")), | |
| "spray_angle": _safe_float(terminal_row.get("spray_angle")), | |
| "bb_type": terminal_row.get("bb_type") or None, | |
| "launch_speed_angle": _safe_int(terminal_row.get("launch_speed_angle")), | |
| "barrel": _safe_int(terminal_row.get("barrel")), | |
| "estimated_ba_using_speedangle": _safe_float(terminal_row.get("estimated_ba_using_speedangle")), | |
| "estimated_woba_using_speedangle": _safe_float(terminal_row.get("estimated_woba_using_speedangle")), | |
| "woba_value": _safe_float(terminal_row.get("woba_value")), | |
| "woba_denom": _safe_int(terminal_row.get("woba_denom")), | |
| "hit_flag": hit_flag, | |
| "hr_flag": hr_flag, | |
| "tb2p_flag": tb2p_flag, | |
| "delta_home_win_exp": _col_sum("delta_home_win_exp"), | |
| "delta_run_exp": _col_sum("delta_run_exp"), | |
| }) | |
| return pd.DataFrame(rows) | |
| # --------------------------------------------------------------------------- | |
| # Public: ingest statcast for a single game | |
| # --------------------------------------------------------------------------- | |
| def ingest_statcast_for_game(conn, game_pk: int, game_date: str) -> int: | |
| """ | |
| Fetch Baseball Savant data for game_date, filter to game_pk rows, split into | |
| statcast_event_core / statcast_batted_ball / statcast_pitch_release / | |
| pitcher_zone_events, and upsert with ON CONFLICT DO NOTHING. | |
| Returns the number of core rows inserted (0 = Savant hasn't posted yet — normal | |
| for <60 min post-game). | |
| """ | |
| try: | |
| raw = fetch_statcast_range(game_date, game_date) | |
| except Exception as exc: | |
| logger.warning("[game_completion] statcast fetch failed game_pk=%s: %s", game_pk, exc) | |
| return 0 | |
| if raw is None or raw.empty: | |
| logger.info("[game_completion] no statcast rows for date %s (game_pk=%s)", game_date, game_pk) | |
| return 0 | |
| if "game_pk" not in raw.columns: | |
| logger.warning("[game_completion] game_pk column missing from statcast fetch") | |
| return 0 | |
| game_df = raw[raw["game_pk"].astype(str) == str(game_pk)].copy() | |
| if game_df.empty: | |
| logger.info("[game_completion] game_pk %s not in statcast rows for %s", game_pk, game_date) | |
| return 0 | |
| # Build synthetic primary key | |
| game_df["event_key"] = ( | |
| game_df["game_pk"].astype(str) + "_" | |
| + game_df["at_bat_number"].astype(str) + "_" | |
| + game_df["pitch_number"].astype(str) | |
| ) | |
| game_df["game_pk"] = pd.to_numeric(game_df["game_pk"], errors="coerce") | |
| game_df["source_season"] = int(str(game_date)[:4]) | |
| # ── statcast_event_core ────────────────────────────────────────────── | |
| core_present = [c for c in _CORE_COLS if c in game_df.columns] | |
| core_df = game_df[core_present].copy() | |
| for col in ["inning", "at_bat_number", "pitch_number", "batter", "pitcher"]: | |
| if col in core_df.columns: | |
| core_df[col] = pd.to_numeric(core_df[col], errors="coerce") | |
| for col in ["plate_x", "plate_z"]: | |
| if col in core_df.columns: | |
| core_df[col] = pd.to_numeric(core_df[col], errors="coerce") | |
| n_core = _upsert_df(conn, "statcast_event_core", core_df) | |
| # ── statcast_batted_ball ───────────────────────────────────────────── | |
| if "launch_speed" in game_df.columns: | |
| bb_df = game_df[game_df["launch_speed"].notna()].copy() | |
| bb_present = [c for c in _BATTED_COLS if c in bb_df.columns] | |
| if bb_present: | |
| bb_out = bb_df[bb_present].copy() | |
| for col in ["launch_speed", "launch_angle", "estimated_woba_using_speedangle"]: | |
| if col in bb_out.columns: | |
| bb_out[col] = pd.to_numeric(bb_out[col], errors="coerce") | |
| _upsert_df(conn, "statcast_batted_ball", bb_out) | |
| # ── statcast_pitch_release ─────────────────────────────────────────── | |
| if "release_speed" in game_df.columns: | |
| rel_df = game_df[game_df["release_speed"].notna()].copy() | |
| rel_present = [c for c in _RELEASE_COLS if c in rel_df.columns] | |
| if rel_present: | |
| rel_out = rel_df[rel_present].copy() | |
| for col in ["release_speed", "release_spin_rate", "pfx_x", "pfx_z", "release_extension"]: | |
| if col in rel_out.columns: | |
| rel_out[col] = pd.to_numeric(rel_out[col], errors="coerce") | |
| _upsert_df(conn, "statcast_pitch_release", rel_out) | |
| # ── pitcher_zone_events ────────────────────────────────────────────── | |
| try: | |
| zone_df = _build_pitcher_zone_rows(game_df, game_pk, game_date) | |
| if not zone_df.empty: | |
| _upsert_df(conn, "pitcher_zone_events", zone_df) | |
| except Exception as exc: | |
| logger.warning("[game_completion] pitcher_zone_events ingest failed: %s", exc) | |
| # ── live_pitch_mix_2026 ────────────────────────────────────────────── | |
| try: | |
| pitch_mix_df = _build_pitch_mix_rows(game_df, game_pk, game_date) | |
| if not pitch_mix_df.empty: | |
| n_pm = _upsert_df_backfill(conn, "live_pitch_mix_2026", pitch_mix_df) | |
| logger.info("[game_completion] ingested %d pitch_mix rows for game_pk=%s", n_pm, game_pk) | |
| except Exception as exc: | |
| logger.warning("[game_completion] live_pitch_mix_2026 ingest failed: %s", exc) | |
| # ── live_batter_game_log_2026 ──────────────────────────────────────── | |
| try: | |
| batter_log_df = _build_batter_game_log_rows(game_df, game_pk, game_date) | |
| if not batter_log_df.empty: | |
| n_bgl = _upsert_df_backfill(conn, "live_batter_game_log_2026", batter_log_df, pk="pa_key") | |
| logger.info("[game_completion] ingested %d batter_game_log rows for game_pk=%s", n_bgl, game_pk) | |
| except Exception as exc: | |
| logger.warning("[game_completion] live_batter_game_log_2026 ingest failed: %s", exc) | |
| logger.info("[game_completion] ingested %d core rows for game_pk=%s", n_core, game_pk) | |
| return n_core | |
| # --------------------------------------------------------------------------- | |
| # Public: full completion pipeline | |
| # --------------------------------------------------------------------------- | |
| def on_game_complete(conn, game_pk: int, game_date: str, scores_df: pd.DataFrame) -> None: | |
| """ | |
| Full automatic completion pipeline — each step is isolated so partial | |
| failures don't block subsequent steps. | |
| Step 1: grade game_outcomes from scores feed | |
| Step 2: ingest Baseball Savant data (may be 0 rows if Savant hasn't posted) | |
| Step 3: grade batter_prop_outcomes from recommendation audit view | |
| Step 4: fill realized batter outcomes using freshly ingested statcast rows | |
| """ | |
| logger.info("[game_completion] pipeline start game_pk=%s date=%s", game_pk, game_date) | |
| # Step 1: grade game outcomes | |
| try: | |
| from analytics.outcome_grader import build_game_outcome_rows_from_scores | |
| from database.db import insert_game_outcomes | |
| outcome_df = build_game_outcome_rows_from_scores( | |
| scores_df=scores_df, | |
| graded_at=utc_now_iso(), | |
| ) | |
| insert_game_outcomes(conn, outcome_df) | |
| logger.info("[game_completion] step 1 done: %d game_outcomes rows", len(outcome_df)) | |
| except Exception as exc: | |
| logger.warning("[game_completion] step 1 (game_outcomes) failed: %s", exc) | |
| # Step 2: ingest statcast | |
| statcast_df = pd.DataFrame() | |
| try: | |
| n = ingest_statcast_for_game(conn, game_pk, game_date) | |
| logger.info("[game_completion] step 2 done: %d statcast rows", n) | |
| if n > 0: | |
| statcast_df = pd.read_sql( | |
| text("SELECT * FROM statcast_event_core WHERE game_pk = :pk"), | |
| conn, | |
| params={"pk": int(game_pk)}, | |
| ) | |
| except Exception as exc: | |
| logger.warning("[game_completion] step 2 (statcast ingest) failed: %s", exc) | |
| # Step 3: grade batter prop outcomes | |
| try: | |
| from analytics.batter_prop_grader import build_batter_prop_outcome_rows_from_audit | |
| from database.db import ( | |
| read_recommendation_audit_view, | |
| insert_batter_prop_outcomes, | |
| delete_batter_prop_outcomes_for_game, | |
| ) | |
| audit_df = read_recommendation_audit_view(conn) | |
| audit_df = audit_df[audit_df["game_pk"].astype(str).str.strip() == str(game_pk)] | |
| prop_df = build_batter_prop_outcome_rows_from_audit( | |
| audit_df=audit_df, | |
| graded_at=utc_now_iso(), | |
| ) | |
| delete_batter_prop_outcomes_for_game(conn, str(game_pk)) | |
| insert_batter_prop_outcomes(conn, prop_df) | |
| logger.info("[game_completion] step 3 done: %d batter_prop_outcomes rows", len(prop_df)) | |
| except Exception as exc: | |
| logger.warning("[game_completion] step 3 (batter prop grade) failed: %s", exc) | |
| # Step 4: fill realized outcomes (only if statcast was available) | |
| if not statcast_df.empty: | |
| try: | |
| from analytics.batter_realization import build_batter_realization_rows | |
| from database.db import ( | |
| read_batter_prop_outcomes_for_game, | |
| delete_batter_prop_outcomes_for_game, | |
| insert_batter_prop_outcomes, | |
| ) | |
| batter_prop_df = read_batter_prop_outcomes_for_game(conn, str(game_pk)) | |
| if not batter_prop_df.empty: | |
| graded_df = build_batter_realization_rows( | |
| batter_prop_outcomes_df=batter_prop_df, | |
| statcast_df=statcast_df, | |
| graded_at=utc_now_iso(), | |
| ) | |
| if not graded_df.empty: | |
| delete_batter_prop_outcomes_for_game(conn, str(game_pk)) | |
| insert_batter_prop_outcomes(conn, graded_df) | |
| logger.info("[game_completion] step 4 done: %d realized rows", len(graded_df)) | |
| except Exception as exc: | |
| logger.warning("[game_completion] step 4 (fill realized) failed: %s", exc) | |
| logger.info("[game_completion] pipeline complete game_pk=%s", game_pk) | |
| # --------------------------------------------------------------------------- | |
| # Public: live-ingest helpers (called pitch-by-pitch from app.py) | |
| # --------------------------------------------------------------------------- | |
| def upsert_live_pitch_rows(rows: list[dict]) -> int: | |
| """ | |
| Insert partial live pitch rows into live_pitch_mix_2026. | |
| Savant-only columns (launch_angle, barrel, etc.) remain NULL | |
| and are backfilled by the post-game ingest_statcast_for_game(). | |
| Uses DO NOTHING so post-game full rows are preserved. | |
| """ | |
| if not rows: | |
| return 0 | |
| from database.remote_db import get_connection | |
| conn = get_connection() | |
| try: | |
| df = pd.DataFrame(rows) | |
| n = _upsert_df(conn, "live_pitch_mix_2026", df) | |
| conn.commit() | |
| return n | |
| except Exception as exc: | |
| logger.warning("[live_pitch] upsert failed: %s", exc) | |
| return 0 | |
| finally: | |
| conn.close() | |
| def upsert_live_pa_log_rows(rows: list[dict]) -> int: | |
| """ | |
| Insert partial live PA rows into live_batter_game_log_2026. | |
| Savant-only columns remain NULL and are backfilled post-game. | |
| Uses DO NOTHING so post-game full rows are preserved. | |
| """ | |
| if not rows: | |
| return 0 | |
| from database.remote_db import get_connection | |
| conn = get_connection() | |
| try: | |
| df = pd.DataFrame(rows) | |
| n = _upsert_df(conn, "live_batter_game_log_2026", df, pk="pa_key") | |
| conn.commit() | |
| return n | |
| except Exception as exc: | |
| logger.warning("[live_pa_log] upsert failed: %s", exc) | |
| return 0 | |
| finally: | |
| conn.close() | |
| def upsert_live_pitch_and_pa_rows(pitch_rows: list[dict], pa_rows: list[dict]) -> tuple[int, int]: | |
| """ | |
| Insert live pitch rows + PA rows using a SINGLE connection. | |
| Called from a background thread in app.py to avoid pool contention. | |
| """ | |
| if not pitch_rows and not pa_rows: | |
| return 0, 0 | |
| from database.remote_db import get_connection | |
| conn = get_connection() | |
| try: | |
| n_pm = 0 | |
| if pitch_rows: | |
| df = pd.DataFrame(pitch_rows) | |
| n_pm = _upsert_df(conn, "live_pitch_mix_2026", df) | |
| n_pa = 0 | |
| if pa_rows: | |
| df = pd.DataFrame(pa_rows) | |
| n_pa = _upsert_df(conn, "live_batter_game_log_2026", df, pk="pa_key") | |
| conn.commit() | |
| return n_pm, n_pa | |
| except Exception as exc: | |
| logger.warning("[live_pitch] combined upsert failed: %s", exc) | |
| return 0, 0 | |
| finally: | |
| conn.close() | |