from __future__ import annotations from io import StringIO import threading from datetime import date, timedelta from typing import Any import pandas as pd import re import streamlit as st import streamlit.components.v1 as components from data.live_prop_odds import ( best_book_by_player_market, fetch_live_prop_odds, fetch_all_upcoming_hr_props_bundle, normalize_prop_odds, ) from analytics.no_vig_props import ( american_to_implied_prob, compute_bet_ev, compute_edge, ) from analytics.recommendation_logger import ( build_recommendation_log_rows, build_recommendation_outcome_rows, ) from analytics.batter_audit_metrics import ( build_batter_hr_tier_table, build_batter_hr_confidence_table, build_batter_hr_edge_table, ) from analytics.batter_realization import build_batter_realization_rows from analytics.batter_prop_grader import build_batter_prop_outcome_rows_from_audit from analytics.game_completion import on_game_complete, upsert_live_pitch_and_pa_rows, _safe_float from utils.logger import logger from analytics.outcome_grader import build_game_outcome_rows_from_scores from analytics.bankroll import bankroll_curve, grade_profit, summary_metrics from analytics.edge import ( calculate_edge, kelly_fraction, remove_vig_two_way, ) from analytics.no_vig_props import ( american_to_implied_prob, compute_bet_ev, compute_edge, ) from models.batter_zone_model import classify_zone_bucket, normalize_pitch_family from models.batter_zone_store import ( insert_batter_zone_events, load_batter_zone_store_metrics, ) from models.pitcher_adjustment import build_pitcher_feature_row from config.settings import ( APP_TITLE, DEFAULT_EDGE_THRESHOLD, ODDS_API_KEY, OPENWEATHER_API_KEY, REFRESH_TTL_SECONDS, LIVE_FEED_TTL_SECONDS, SCORES_TTL_SECONDS, SCHEDULE_TTL_SECONDS, STATCAST_TTL_SECONDS, ENABLE_ENTERPRISE_PROVIDER, LIVE_PROP_ODDS_TTL_SECONDS, DEFAULT_PROP_BOOKS, DEFAULT_PROP_MARKETS, ) from models.pitcher_baseline_store import ( upsert_inning_first_seed_event, load_pitcher_cross_game_baseline, ) from analytics.evaluation_metrics import ( build_hr_calibration_table, build_edge_bucket_table, build_confidence_table, build_tier_performance_table, build_ere_table, build_ere_by_edge_bucket_table, build_ere_by_confidence_bucket_table, build_ere_by_tier_table, build_clv_table, build_clv_by_tier_table, ) from models.batter_zone_store import insert_batter_zone_events from analytics.recommendation_engine import build_upcoming_hitter_recommendations from models.live_fair_simulator_v3 import build_upcoming_simulated_rows from visualization.recommendation_panels import render_recommendation_panels from visualization.model_explainer import render_model_explainer from data.live_game_feed import fetch_live_game_feed from engine.live_game_engine import enrich_game_from_live_feed from utils.dates import current_wbc_date_str from data.scores import fetch_scores_for_date from data.odds import fetch_featured_odds from data.schedule import fetch_schedule_for_date from data.shared_baseline import load_or_build_shared_baseline_bundle, load_shared_baseline_bundle_from_snapshots from data.statcast import fetch_statcast_range, fetch_statcast_range_pitcher, normalize_statcast from data.weather import fetch_weather_for_venue from database.db import ( get_connection, insert_bet, next_bet_id, read_table, read_table_retryable, read_cached_odds, read_cached_probable_starters, read_cached_probable_starters_meta, read_cached_schedule_for_date, read_cached_upcoming_props_bundle, read_cached_weather_for_venue, update_bet_result, upsert_dataframe, replace_cached_odds, replace_cached_probable_starters, replace_cached_schedule, replace_cached_upcoming_props_bundle, replace_cached_weather, ensure_recommendation_logs_table, insert_recommendation_logs, ensure_recommendation_outcomes_table, insert_recommendation_outcomes, read_recommendation_audit_view, ensure_game_outcomes_table, insert_game_outcomes, read_game_outcomes, ensure_batter_prop_outcomes_table, insert_batter_prop_outcomes, read_batter_prop_outcomes, replace_batter_prop_outcomes, read_batter_prop_audit_view, ensure_upcoming_hr_props_table, insert_upcoming_hr_props, ) from features.batter_features import batter_summary from features.pitch_features import add_pitch_features from models.matchup_model import calculate_matchup_score from models.pitch_model import pitcher_baseline_from_events from simulation.monte_carlo import simulate_batter_outcomes from utils.helpers import utc_now_iso from visualization.batter import create_exit_velocity_chart, create_launch_angle_chart from visualization.betting import create_bankroll_chart, create_edge_chart from visualization.matchup import create_hit_hr_chart, create_matchup_score_chart from visualization.pitcher import create_pitch_movement_chart from visualization.props_page import render_props from visualization.loading_shell import render_loading_shell from visualization.simulation import create_hr_distribution, create_total_bases_distribution from visualization.game_cards import render_game_card from visualization.debug_page import render_debug from visualization.feedback_page import render_feedback from visualization.card_lab_page import render_card_lab st.set_page_config( page_title=APP_TITLE, layout="wide", page_icon="⚾", ) st.markdown( """ """, unsafe_allow_html=True, ) conn = get_connection() MAX_LIVE_FEEDS = 6 ensure_recommendation_logs_table(conn) ensure_recommendation_outcomes_table(conn) ensure_game_outcomes_table(conn) ensure_batter_prop_outcomes_table(conn) _ASYNC_REFRESH_LOCK = threading.Lock() _ASYNC_REFRESH_KEYS: set[str] = set() def _queue_async_refresh(key: str, fn) -> bool: with _ASYNC_REFRESH_LOCK: if key in _ASYNC_REFRESH_KEYS: return False _ASYNC_REFRESH_KEYS.add(key) def _run() -> None: try: fn() except Exception as exc: logger.warning("[async_refresh] key=%s error=%s", key, exc) finally: with _ASYNC_REFRESH_LOCK: _ASYNC_REFRESH_KEYS.discard(key) threading.Thread(target=_run, daemon=True).start() return True def _run_with_fresh_conn(write_fn) -> None: fresh_conn = None try: fresh_conn = get_connection() write_fn(fresh_conn) finally: if fresh_conn is not None: try: fresh_conn.close() except Exception: pass def _fire_completion(pk_str: str, game_date: str, scores_df: pd.DataFrame) -> None: """Start the game-completion pipeline in a daemon thread with a fresh DB connection.""" scores_snapshot = scores_df.copy() def _run() -> None: fresh_conn = None try: from database.db import get_connection as _get_conn fresh_conn = _get_conn() on_game_complete(fresh_conn, int(pk_str), game_date, scores_snapshot) except Exception as exc: logger.warning("[_fire_completion] thread error game_pk=%s: %s", pk_str, exc) finally: if fresh_conn is not None: try: fresh_conn.close() except Exception: pass t = threading.Thread(target=_run, daemon=True) t.start() def _fire_live_pitch_upsert(pitch_rows: list[dict], pa_rows: list[dict]) -> None: """Fire live pitch + PA upsert in a daemon thread to avoid pool contention.""" pitch_snapshot = list(pitch_rows) pa_snapshot = list(pa_rows) def _run() -> None: try: upsert_live_pitch_and_pa_rows(pitch_snapshot, pa_snapshot) except Exception as exc: logger.warning("[_fire_live_pitch_upsert] error: %s", exc) threading.Thread(target=_run, daemon=True).start() @st.fragment(run_every=3) def render_live_games_fragment( live_games: pd.DataFrame, statcast_df: pd.DataFrame, pitcher_statcast_df: pd.DataFrame | None = None, odds_df: pd.DataFrame | None = None, ) -> None: import datetime as _dt _today = _dt.date.today().strftime("%Y-%m-%d") _scores = load_scores_for_dashboard_date(_today) _curr_live: set[str] = set() if not _scores.empty and "game_pk" in _scores.columns and "status" in _scores.columns: _curr_live = { str(r["game_pk"]) for _, r in _scores.iterrows() if "live" in str(r.get("status", "")).lower() or any(kw in str(r.get("status", "")).lower() for kw in ("top", "bot", "middle", "mid", "inning", "in progress")) } _prev_live: set[str] = st.session_state.get("_live_pks", set()) _done: set[str] = st.session_state.get("_completed_pks", set()) for _pk_str in (_prev_live - _curr_live - _done): if _scores.empty: continue _row = _scores[_scores["game_pk"].astype(str) == _pk_str] if not _row.empty and "final" in str(_row.iloc[0].get("status", "")).lower(): _game_date = str(_row.iloc[0].get("game_date", _today))[:10] _fire_completion(_pk_str, _game_date, _scores) _done = _done | {_pk_str} logger.info("[live_fragment] fired completion pipeline for game_pk=%s", _pk_str) st.session_state["_live_pks"] = _curr_live st.session_state["_completed_pks"] = _done render_live_games_with_edge_strips( live_games=live_games, statcast_df=statcast_df, pitcher_statcast_df=pitcher_statcast_df, odds_df=odds_df, ) @st.fragment(run_every=300) def render_statcast_retry_fragment() -> None: """ Every 5 minutes: find completed games in game_outcomes that still have 0 rows in statcast_event_core and re-attempt ingestion (handles the 30–60 min Savant lag). Runs silently — no UI output. """ try: from sqlalchemy import text as _text pending = pd.read_sql( _text(""" SELECT DISTINCT g.game_pk, g.graded_at FROM game_outcomes g WHERE g.status ILIKE '%final%' AND NOT EXISTS ( SELECT 1 FROM statcast_event_core s WHERE s.game_pk = g.game_pk::BIGINT ) AND NOT EXISTS ( SELECT 1 FROM live_pitch_mix_2026 lpm WHERE lpm.game_pk = g.game_pk::BIGINT ) ORDER BY g.graded_at DESC LIMIT 10 """), conn, ) if pending.empty: return for _, row in pending.iterrows(): _pk_str = str(row.get("game_pk", "")).strip() if not _pk_str: continue _done: set[str] = st.session_state.get("_completed_pks", set()) if _pk_str in _done: continue import datetime as _dt _today = _dt.date.today().strftime("%Y-%m-%d") _scores = load_scores_for_dashboard_date(_today) _fire_completion(_pk_str, _today, _scores) logger.info("[retry_fragment] re-firing completion for game_pk=%s", _pk_str) except Exception as exc: logger.warning("[retry_fragment] error: %s", exc) @st.cache_data(ttl=3, show_spinner=False) def build_prepared_live_games_df_cached(live_games_json: str) -> pd.DataFrame: live_games = pd.read_json(live_games_json) return build_prepared_live_games_df(live_games) @st.cache_data(ttl=SCHEDULE_TTL_SECONDS) def load_wbc_schedule() -> pd.DataFrame: return fetch_schedule_for_date(current_wbc_date_str()) @st.cache_data(ttl=SCHEDULE_TTL_SECONDS) def load_wbc_schedule_for_date(date_str: str) -> pd.DataFrame: return fetch_schedule_for_date(date_str) @st.cache_data(ttl=LIVE_PROP_ODDS_TTL_SECONDS) def load_live_prop_odds_for_game(game_context: dict) -> pd.DataFrame: raw = fetch_live_prop_odds( game_context=game_context, sportsbooks=DEFAULT_PROP_BOOKS, markets=DEFAULT_PROP_MARKETS, ) normalized = normalize_prop_odds(raw) best = best_book_by_player_market(normalized) return best @st.cache_data(ttl=60 * 60 * 6, show_spinner=False) def load_statcast_current_season_full() -> pd.DataFrame: today = pd.Timestamp.utcnow().date() year = today.year start_date = pd.Timestamp(year=year, month=1, day=1).date() raw = fetch_statcast_range(start_date.isoformat(), today.isoformat()) normalized = normalize_statcast(raw) enriched = add_pitch_features(normalized) return enriched @st.cache_data(ttl=60 * 60 * 12, show_spinner=False) def load_statcast_previous_season_full() -> pd.DataFrame: today = pd.Timestamp.utcnow().date() previous_year = today.year - 1 start_date = pd.Timestamp(year=previous_year, month=1, day=1).date() end_date = pd.Timestamp(year=previous_year, month=12, day=31).date() raw = fetch_statcast_range(start_date.isoformat(), end_date.isoformat()) normalized = normalize_statcast(raw) import logging as _logging _logging.getLogger(__name__).warning( "[statcast_load] rows=%d unique_players=%d sample_names=%s", len(normalized), normalized["player_name"].nunique() if not normalized.empty else 0, normalized["player_name"].head(3).tolist() if not normalized.empty else [], ) enriched = add_pitch_features(normalized) return enriched @st.cache_data(ttl=60 * 60 * 12, show_spinner=False) def load_statcast_previous_season_full_pitcher() -> pd.DataFrame: """2025 season pitcher-perspective statcast. player_name = pitcher name.""" today = pd.Timestamp.utcnow().date() previous_year = today.year - 1 start_date = pd.Timestamp(year=previous_year, month=1, day=1).date() end_date = pd.Timestamp(year=previous_year, month=12, day=31).date() raw = fetch_statcast_range_pitcher(start_date.isoformat(), end_date.isoformat()) normalized = normalize_statcast(raw) return add_pitch_features(normalized) @st.cache_data(ttl=60 * 60 * 1, show_spinner=False) def load_probable_starters() -> dict: """Probable starting pitchers for next 7 days from MLB Stats API.""" from data.mlb_starters import fetch_probable_starters_for_props try: cached_meta = read_cached_probable_starters_meta(conn) if not cached_meta.empty: fetched_at = cached_meta.iloc[0]["fetched_at"] fetched_ts = pd.to_datetime(fetched_at, errors="coerce", utc=True) cache_age_seconds = None if pd.notna(fetched_ts): cache_age_seconds = max( 0, int((pd.Timestamp.now(tz="UTC") - fetched_ts).total_seconds()), ) cached = read_cached_probable_starters(conn) if cached: if _is_fetched_at_fresh(fetched_at, 60 * 60): st.session_state["probable_starters_refresh_mode"] = "cache_fresh" st.session_state["probable_starters_cache_age_seconds"] = cache_age_seconds return cached _queue_async_refresh( "probable_starters", lambda: _run_with_fresh_conn( lambda fresh_conn: replace_cached_probable_starters( fresh_conn, fetch_probable_starters_for_props(), ) ), ) st.session_state["probable_starters_refresh_mode"] = "stale_cache_served_async_refresh" st.session_state["probable_starters_cache_age_seconds"] = cache_age_seconds return cached except Exception: pass fresh = fetch_probable_starters_for_props() try: replace_cached_probable_starters(conn, fresh) except Exception as exc: logger.warning("[load_probable_starters] cache persist failure: %s", exc) st.session_state["probable_starters_refresh_mode"] = "fresh_network_load" st.session_state["probable_starters_cache_age_seconds"] = 0 return fresh @st.cache_data(ttl=STATCAST_TTL_SECONDS) def load_statcast_recent() -> pd.DataFrame: end_date_str = current_dashboard_date_str() end_date = pd.to_datetime(end_date_str).date() start_date = end_date - timedelta(days=14) raw = fetch_statcast_range(start_date.isoformat(), end_date.isoformat()) normalized = normalize_statcast(raw) enriched = add_pitch_features(normalized) return enriched def _coerce_name_tuple(values: list[str] | tuple[str, ...] | set[str] | None) -> tuple[str, ...]: if not values: return tuple() normalized = sorted({str(value).strip() for value in values if str(value).strip()}) return tuple(normalized) def _extract_prop_player_names(raw_props: pd.DataFrame | None) -> tuple[str, ...]: if raw_props is None or raw_props.empty or "player_name" not in raw_props.columns: return tuple() return _coerce_name_tuple(raw_props["player_name"].dropna().astype(str).tolist()) def _extract_prop_pitcher_names(raw_props: pd.DataFrame | None) -> tuple[str, ...]: if raw_props is None or raw_props.empty or "player_name" not in raw_props.columns: return tuple() if "market" not in raw_props.columns: return tuple() k_rows = raw_props[raw_props["market"].astype(str).str.lower() == "k"].copy() if k_rows.empty: return tuple() return _coerce_name_tuple(k_rows["player_name"].dropna().astype(str).tolist()) def _extract_probable_starter_names(probable_starters: dict | None) -> tuple[str, ...]: if not probable_starters: return tuple() names: list[str] = [] for payload in probable_starters.values(): if not isinstance(payload, dict): continue for key in ("away_pitcher", "home_pitcher", "pitcher_name"): value = str(payload.get(key) or "").strip() if value: names.append(value) return _coerce_name_tuple(names) def _extract_live_dashboard_participants(live_games: pd.DataFrame) -> tuple[tuple[str, ...], tuple[str, ...]]: if live_games is None or live_games.empty: return tuple(), tuple() batter_names: list[str] = [] pitcher_names: list[str] = [] for col in ["on_deck_name", "in_hole_name", "three_away_name", "batter_name"]: if col in live_games.columns: batter_names.extend( [ str(value).strip() for value in live_games[col].dropna().astype(str).tolist() if str(value).strip() ] ) for col in ["pitcher_name"]: if col in live_games.columns: pitcher_names.extend( [ str(value).strip() for value in live_games[col].dropna().astype(str).tolist() if str(value).strip() ] ) return _coerce_name_tuple(batter_names), _coerce_name_tuple(pitcher_names) def _is_fetched_at_fresh(value: object, max_age_seconds: int) -> bool: try: ts = pd.to_datetime(value, errors="coerce", utc=True) if pd.isna(ts): return False age_seconds = max(0.0, float((pd.Timestamp.now(tz="UTC") - ts).total_seconds())) return age_seconds <= float(max_age_seconds) except Exception: return False def _latest_fetched_at_from_df(df: pd.DataFrame) -> object: if df is None or df.empty or "fetched_at" not in df.columns: return None try: return pd.to_datetime(df["fetched_at"], errors="coerce", utc=True).max() except Exception: return None def _hr_bundle_is_complete(bundle: dict | None) -> bool: completeness = dict((bundle or {}).get("hr_snapshot_completeness") or {}) if not completeness: return True return bool(completeness.get("is_complete", True)) def _hr_bundle_is_usable(bundle: dict | None) -> bool: state = str((bundle or {}).get("hr_snapshot_state") or "").strip().lower() if state in {"usable_complete", "usable_partial", "stale_degraded"}: return True current_rows = int((bundle or {}).get("current_hr_row_count") or 0) if current_rows > 0: return True completeness = dict((bundle or {}).get("hr_snapshot_completeness") or {}) return int(completeness.get("row_count") or 0) > 0 def _parse_iso_utc(value: object) -> pd.Timestamp | None: try: ts = pd.to_datetime(value, errors="coerce", utc=True) return None if pd.isna(ts) else ts except Exception: return None def _hr_bundle_needs_draftkings_refresh(bundle: dict | None) -> bool: completeness = dict((bundle or {}).get("hr_snapshot_completeness") or {}) missing_books = { str(book).strip().lower() for book in completeness.get("missing_books", []) or [] if str(book).strip() } if "draftkings" not in missing_books: return False retry_after_map = dict((bundle or {}).get("adapter_retry_after_by_book") or {}) retry_after_ts = _parse_iso_utc(retry_after_map.get("draftkings")) if retry_after_ts is not None and retry_after_ts > pd.Timestamp.now(tz="UTC"): return False return True @st.cache_data(ttl=STATCAST_TTL_SECONDS, show_spinner=False) def load_shared_baseline_bundle_cached( batter_names: tuple[str, ...] = (), pitcher_names: tuple[str, ...] = (), ) -> dict: return load_or_build_shared_baseline_bundle( batter_names=batter_names, pitcher_names=pitcher_names, max_age_seconds=max(STATCAST_TTL_SECONDS, 60 * 60), persist_runtime_refresh=True, ) @st.cache_data(ttl=STATCAST_TTL_SECONDS, show_spinner=False) def load_shared_baseline_page_slice_cached( batter_names: tuple[str, ...] = (), pitcher_names: tuple[str, ...] = (), ) -> dict: bundle = load_shared_baseline_bundle_cached( batter_names=batter_names, pitcher_names=pitcher_names, ) return { "blended_batter_df": bundle.get("blended_batter_df", pd.DataFrame()), "blended_pitcher_df": bundle.get("blended_pitcher_df", pd.DataFrame()), "batter_baseline_meta": bundle.get("batter_baseline_meta", pd.DataFrame()), "pitcher_baseline_meta": bundle.get("pitcher_baseline_meta", pd.DataFrame()), "hitter_rolling_snapshot": bundle.get("hitter_rolling_snapshot", pd.DataFrame()), "pitcher_rolling_snapshot": bundle.get("pitcher_rolling_snapshot", pd.DataFrame()), "snapshot_status": bundle.get("snapshot_status", pd.DataFrame()), "snapshot_source_status": bundle.get("snapshot_source_status"), "runtime_fallback_used": bundle.get("runtime_fallback_used"), "requested_hitter_count": bundle.get("requested_hitter_count"), "requested_pitcher_count": bundle.get("requested_pitcher_count"), "resolved_hitter_count": bundle.get("resolved_hitter_count"), "resolved_pitcher_count": bundle.get("resolved_pitcher_count"), "missing_hitter_names": bundle.get("missing_hitter_names", []), "missing_pitcher_names": bundle.get("missing_pitcher_names", []), "snapshot_coverage_mode": bundle.get("snapshot_coverage_mode"), "background_refresh_queued": bundle.get("background_refresh_queued"), } @st.cache_data(ttl=SCORES_TTL_SECONDS, show_spinner=False) def _build_dashboard_ready_payload( dashboard_date_str: str, schedule_date_str: str, scores_json: str, ) -> dict[str, Any]: try: scores_df = pd.read_json(StringIO(scores_json), orient="split") except Exception: scores_df = pd.DataFrame() schedule_df = load_dashboard_schedule_for_date(schedule_date_str) live_games, final_games, scheduled_games = split_games_for_scoreboard( schedule_df=schedule_df, scores_df=scores_df, ) if live_games.empty and final_games.empty and not schedule_df.empty: try: fallback_scores_df = build_scores_from_schedule_via_live_feeds(schedule_df) if fallback_scores_df is not None and not fallback_scores_df.empty: live_games, final_games, scheduled_games = split_games_for_scoreboard( schedule_df=schedule_df, scores_df=fallback_scores_df, ) else: fallback_scores_df = pd.DataFrame() except Exception: fallback_scores_df = pd.DataFrame() else: fallback_scores_df = pd.DataFrame() if live_games.empty and final_games.empty and not scores_df.empty and "status" in scores_df.columns: recovery_scores = normalize_game_cards_df(scores_df.copy()) recovery_scores = attach_game_pk_from_schedule(recovery_scores, schedule_df) recovery_status = recovery_scores["status"].fillna("").astype(str).str.strip().str.lower() recovery_live_mask = recovery_status.str.contains( r"live|top|bot|bottom|mid|middle|inning|in progress|delayed|suspended", regex=True, na=False, ) recovery_final_mask = recovery_status.str.contains( r"final|game over|completed|ended", regex=True, na=False, ) recovered_live_games = recovery_scores[recovery_live_mask].copy() recovered_final_games = recovery_scores[recovery_final_mask].copy() if not recovered_live_games.empty or not recovered_final_games.empty: live_games = recovered_live_games final_games = recovered_final_games dashboard_batter_names, dashboard_pitcher_names = _extract_live_dashboard_participants(live_games) baseline_slice = ( load_shared_baseline_page_slice_cached( batter_names=dashboard_batter_names, pitcher_names=dashboard_pitcher_names, ) if not live_games.empty else {} ) statcast_df = baseline_slice.get("blended_batter_df", pd.DataFrame()) if isinstance(baseline_slice, dict) else pd.DataFrame() pitcher_statcast_df = baseline_slice.get("blended_pitcher_df", pd.DataFrame()) if isinstance(baseline_slice, dict) else pd.DataFrame() if statcast_df.empty and not live_games.empty: statcast_df = load_statcast_recent() if pitcher_statcast_df.empty and not live_games.empty: pitcher_statcast_df = statcast_df live_games = sort_scoreboard_games(normalize_game_cards_df(live_games)) final_games = sort_scoreboard_games(normalize_game_cards_df(final_games)) scheduled_games = sort_scoreboard_games(normalize_game_cards_df(scheduled_games)) return { "dashboard_date_str": dashboard_date_str, "schedule_date_str": schedule_date_str, "scores_df": scores_df, "schedule_df": schedule_df, "live_games": live_games, "final_games": final_games, "scheduled_games": scheduled_games, "baseline_slice": baseline_slice, "statcast_df": statcast_df, "pitcher_statcast_df": pitcher_statcast_df, "odds_df": load_odds(), } @st.cache_data(ttl=max(REFRESH_TTL_SECONDS, STATCAST_TTL_SECONDS), show_spinner=False) def _build_betting_ready_payload() -> dict[str, Any]: baseline_slice = load_shared_baseline_page_slice_cached() statcast_df = baseline_slice.get("blended_batter_df", pd.DataFrame()) if isinstance(baseline_slice, dict) else pd.DataFrame() if statcast_df.empty: statcast_df = load_statcast_recent() odds_df = load_odds() edges_df = compute_market_edges(odds_df) top_edges = edges_df.sort_values("no_vig_prob", ascending=False).head(30) if not edges_df.empty else pd.DataFrame() return { "schedule_df": load_wbc_schedule(), "baseline_slice": baseline_slice, "statcast_df": statcast_df, "odds_df": odds_df, "edges_df": edges_df, "top_edges": top_edges, } @st.cache_data(ttl=60, show_spinner=False) def load_hr_prop_odds_for_game(away_team: str, home_team: str) -> pd.DataFrame: """Fetch live HR prop odds for a specific game. Returns empty df on failure.""" try: from data.live_prop_odds import fetch_live_prop_odds game_context = {"away_team": away_team, "home_team": home_team} df = fetch_live_prop_odds( game_context=game_context, markets=["batter_home_runs"], ) return df if df is not None else pd.DataFrame() except Exception as exc: logger.warning("[load_hr_prop_odds_for_game] failure: %s", exc) return pd.DataFrame() @st.cache_data(ttl=300, show_spinner=False) def load_upcoming_hr_props() -> pd.DataFrame: """Fetch HR props for all upcoming games. Cached 5 min to limit API credit burn.""" try: return load_upcoming_hr_props_bundle()["merged_props_feed"] except Exception as exc: logger.warning("[load_upcoming_hr_props] failure: %s", exc) return pd.DataFrame() @st.cache_data(ttl=300, show_spinner=False) def load_upcoming_hr_props_bundle() -> dict: try: _cache_result: list[dict | None] = [None] def _read_db_cache() -> None: try: _cache_result[0] = read_cached_upcoming_props_bundle(conn, cache_key="default") except Exception: pass _dbt = threading.Thread(target=_read_db_cache, daemon=True) _dbt.start() _dbt.join(timeout=10) if _cache_result[0] is None: raise RuntimeError("DB cache read timed out — falling through to live fetch") cached_bundle = _cache_result[0] cache_meta = cached_bundle.get("cache_meta", pd.DataFrame()) merged = cached_bundle.get("merged_props_feed", pd.DataFrame()) coverage = cached_bundle.get("coverage_summary", pd.DataFrame()) coverage_api = cached_bundle.get("coverage_summary_api", pd.DataFrame()) coverage_scraper_added = cached_bundle.get("coverage_summary_scraper_added", pd.DataFrame()) coverage_final = cached_bundle.get("coverage_summary_final", pd.DataFrame()) coverage_hr_api = cached_bundle.get("coverage_summary_hr_api", pd.DataFrame()) coverage_hr_supplemental = cached_bundle.get("coverage_summary_hr_supplemental", pd.DataFrame()) coverage_hr_final = cached_bundle.get("coverage_summary_hr_final", pd.DataFrame()) missing_books_by_market = cached_bundle.get("missing_books_by_market", pd.DataFrame()) missing_event_books_by_market = cached_bundle.get("missing_event_books_by_market", pd.DataFrame()) missing_hr_books_global = cached_bundle.get("missing_hr_books_global", pd.DataFrame()) missing_hr_books_by_event = cached_bundle.get("missing_hr_books_by_event", pd.DataFrame()) hr_snapshot_completeness = cached_bundle.get("hr_snapshot_completeness", {}) hr_snapshot_state = str(cached_bundle.get("hr_snapshot_state") or "") current_hr_row_count = int(cached_bundle.get("current_hr_row_count") or 0) current_hr_event_count = int(cached_bundle.get("current_hr_event_count") or 0) last_known_good_hr_row_count = int(cached_bundle.get("last_known_good_hr_row_count") or 0) last_known_good_hr_built_at = str(cached_bundle.get("last_known_good_hr_built_at") or "") hr_refresh_overwrite_prevented = bool(cached_bundle.get("hr_refresh_overwrite_prevented")) adapter_status_by_book = cached_bundle.get("adapter_status_by_book", {}) adapter_error_by_book = cached_bundle.get("adapter_error_by_book", {}) adapter_rows_by_book = cached_bundle.get("adapter_rows_by_book", {}) adapter_last_attempted_at_by_book = cached_bundle.get("adapter_last_attempted_at_by_book", {}) adapter_retry_after_by_book = cached_bundle.get("adapter_retry_after_by_book", {}) if not cache_meta.empty and isinstance(merged, pd.DataFrame) and not merged.empty: bundle_payload = { "odds_api_raw": pd.DataFrame(), "scraper_raw": pd.DataFrame(), "merged_props_feed": merged if isinstance(merged, pd.DataFrame) else pd.DataFrame(), "coverage_summary": coverage if isinstance(coverage, pd.DataFrame) else pd.DataFrame(), "coverage_summary_api": coverage_api if isinstance(coverage_api, pd.DataFrame) else pd.DataFrame(), "coverage_summary_scraper_added": coverage_scraper_added if isinstance(coverage_scraper_added, pd.DataFrame) else pd.DataFrame(), "coverage_summary_final": coverage_final if isinstance(coverage_final, pd.DataFrame) else pd.DataFrame(), "coverage_summary_hr_api": coverage_hr_api if isinstance(coverage_hr_api, pd.DataFrame) else pd.DataFrame(), "coverage_summary_hr_supplemental": coverage_hr_supplemental if isinstance(coverage_hr_supplemental, pd.DataFrame) else pd.DataFrame(), "coverage_summary_hr_final": coverage_hr_final if isinstance(coverage_hr_final, pd.DataFrame) else pd.DataFrame(), "missing_books_by_market": missing_books_by_market if isinstance(missing_books_by_market, pd.DataFrame) else pd.DataFrame(), "missing_event_books_by_market": missing_event_books_by_market if isinstance(missing_event_books_by_market, pd.DataFrame) else pd.DataFrame(), "missing_hr_books_global": missing_hr_books_global if isinstance(missing_hr_books_global, pd.DataFrame) else pd.DataFrame(), "missing_hr_books_by_event": missing_hr_books_by_event if isinstance(missing_hr_books_by_event, pd.DataFrame) else pd.DataFrame(), "hr_snapshot_completeness": dict(hr_snapshot_completeness or {}), "hr_snapshot_state": hr_snapshot_state, "current_hr_row_count": current_hr_row_count, "current_hr_event_count": current_hr_event_count, "last_known_good_hr_row_count": last_known_good_hr_row_count, "last_known_good_hr_built_at": last_known_good_hr_built_at, "hr_refresh_overwrite_prevented": hr_refresh_overwrite_prevented, "adapter_status_by_book": dict(adapter_status_by_book or {}), "adapter_error_by_book": dict(adapter_error_by_book or {}), "adapter_rows_by_book": dict(adapter_rows_by_book or {}), "adapter_last_attempted_at_by_book": dict(adapter_last_attempted_at_by_book or {}), "adapter_retry_after_by_book": dict(adapter_retry_after_by_book or {}), "scraper_candidate_count": int(cached_bundle.get("scraper_candidate_count") or 0), "scraper_added_count": int(cached_bundle.get("scraper_added_count") or 0), "scraper_duplicate_reject_count": int(cached_bundle.get("scraper_duplicate_reject_count") or 0), "cache_meta": cache_meta, "cache_source": "db_snapshot", } cache_is_fresh = _is_fetched_at_fresh(cache_meta.iloc[0]["fetched_at"], 300) hr_complete = _hr_bundle_is_complete(bundle_payload) hr_usable = _hr_bundle_is_usable(bundle_payload) dk_refresh_needed = _hr_bundle_needs_draftkings_refresh(bundle_payload) if cache_is_fresh and (hr_usable or not dk_refresh_needed): if dk_refresh_needed: _queue_async_refresh( "upcoming_props_bundle", lambda: _run_with_fresh_conn( lambda fresh_conn: replace_cached_upcoming_props_bundle( fresh_conn, fetch_all_upcoming_hr_props_bundle(sportsbooks=DEFAULT_PROP_BOOKS), cache_key="default", ) ), ) return bundle_payload if (not cache_is_fresh) or (dk_refresh_needed and not hr_usable): _queue_async_refresh( "upcoming_props_bundle", lambda: _run_with_fresh_conn( lambda fresh_conn: replace_cached_upcoming_props_bundle( fresh_conn, fetch_all_upcoming_hr_props_bundle(sportsbooks=DEFAULT_PROP_BOOKS), cache_key="default", ) ), ) bundle_payload["cache_source"] = "stale_snapshot" if not cache_is_fresh else "incomplete_snapshot" return bundle_payload except Exception: pass try: bundle = fetch_all_upcoming_hr_props_bundle(sportsbooks=DEFAULT_PROP_BOOKS) try: _persist_result: list[Exception | None] = [None] def _do_persist() -> None: try: replace_cached_upcoming_props_bundle(conn, bundle, cache_key="default") except Exception as _exc: _persist_result[0] = _exc _pt = threading.Thread(target=_do_persist, daemon=True) _pt.start() _pt.join(timeout=15) if _pt.is_alive(): logger.warning("[load_upcoming_hr_props_bundle] cache persist timed out after 15s — skipping") elif _persist_result[0] is not None: logger.warning("[load_upcoming_hr_props_bundle] cache persist failure: %s", _persist_result[0]) except Exception as exc: logger.warning("[load_upcoming_hr_props_bundle] cache persist failure: %s", exc) bundle["cache_source"] = "live_fetch" return bundle except Exception as exc: logger.warning("[load_upcoming_hr_props_bundle] failure: %s", exc) return { "odds_api_raw": pd.DataFrame(), "scraper_raw": pd.DataFrame(), "merged_props_feed": pd.DataFrame(), "coverage_summary": pd.DataFrame(), "coverage_summary_api": pd.DataFrame(), "coverage_summary_scraper_added": pd.DataFrame(), "coverage_summary_final": pd.DataFrame(), "coverage_summary_hr_api": pd.DataFrame(), "coverage_summary_hr_supplemental": pd.DataFrame(), "coverage_summary_hr_final": pd.DataFrame(), "missing_books_by_market": pd.DataFrame(), "missing_event_books_by_market": pd.DataFrame(), "missing_hr_books_global": pd.DataFrame(), "missing_hr_books_by_event": pd.DataFrame(), "hr_snapshot_completeness": {}, "hr_snapshot_state": "empty", "current_hr_row_count": 0, "current_hr_event_count": 0, "last_known_good_hr_row_count": 0, "last_known_good_hr_built_at": "", "hr_refresh_overwrite_prevented": False, "adapter_status_by_book": {}, "adapter_error_by_book": {}, "adapter_rows_by_book": {}, "adapter_last_attempted_at_by_book": {}, "adapter_retry_after_by_book": {}, "scraper_candidate_count": 0, "scraper_added_count": 0, "scraper_duplicate_reject_count": 0, "cache_meta": pd.DataFrame(), "cache_source": "unavailable", } @st.cache_data(ttl=REFRESH_TTL_SECONDS) def load_odds() -> pd.DataFrame: try: cached = read_cached_odds(conn) if not cached.empty: if _is_fetched_at_fresh(cached.iloc[0]["fetched_at"], REFRESH_TTL_SECONDS): return cached _queue_async_refresh( "featured_odds", lambda: _run_with_fresh_conn( lambda fresh_conn: replace_cached_odds(fresh_conn, fetch_featured_odds()) ), ) return cached except Exception: pass fresh = fetch_featured_odds() try: replace_cached_odds(conn, fresh) except Exception as exc: logger.warning("[load_odds] cache persist failure: %s", exc) return fresh @st.cache_data(ttl=SCHEDULE_TTL_SECONDS) def load_dashboard_schedule_for_date(date_str: str) -> pd.DataFrame: try: cached = read_cached_schedule_for_date(conn, date_str) latest_cached_at = _latest_fetched_at_from_df(cached) if not cached.empty: if _is_fetched_at_fresh(latest_cached_at, SCHEDULE_TTL_SECONDS): return cached _queue_async_refresh( f"schedule:{date_str}", lambda: _run_with_fresh_conn( lambda fresh_conn: replace_cached_schedule( fresh_conn, fetch_schedule_for_date(date_str), ) ), ) return cached except Exception: pass fresh = fetch_schedule_for_date(date_str) try: replace_cached_schedule(conn, fresh) except Exception as exc: logger.warning("[load_dashboard_schedule_for_date] cache persist failure: %s", exc) return fresh @st.cache_data(ttl=SCORES_TTL_SECONDS) def load_scores_for_dashboard_date(date_str: str) -> pd.DataFrame: try: df = fetch_scores_for_date(date_str) if df is None or df.empty: return pd.DataFrame() out = df.copy() out["scores_source_date"] = date_str return out except Exception: return pd.DataFrame() def get_stable_scores_for_dashboard_date(date_str: str) -> pd.DataFrame: fresh_scores = load_scores_for_dashboard_date(date_str) if _scores_df_has_live_or_final_content(fresh_scores): st.session_state["last_good_scores_df"] = fresh_scores.copy() return fresh_scores if "last_good_scores_df" in st.session_state: return st.session_state["last_good_scores_df"].copy() return fresh_scores def load_weather(venue_name: str) -> pd.DataFrame: try: cached = read_cached_weather_for_venue(conn, venue_name) latest_cached_at = _latest_fetched_at_from_df(cached) if not cached.empty: if _is_fetched_at_fresh(latest_cached_at, REFRESH_TTL_SECONDS): return cached.head(1).reset_index(drop=True) _queue_async_refresh( f"weather:{venue_name}", lambda: _run_with_fresh_conn( lambda fresh_conn: replace_cached_weather( fresh_conn, fetch_weather_for_venue(venue_name), ) ), ) return cached.head(1).reset_index(drop=True) except Exception: pass fresh = fetch_weather_for_venue(venue_name) try: if fresh is not None and not fresh.empty: replace_cached_weather(conn, fresh) except Exception as exc: logger.warning("[load_weather] cache persist failure: %s", exc) return fresh def render_header() -> None: st.title("\n\n\n⚾ Kasper") st.caption( "All-in-One Baseball Assistant. Excellent for finding Home Run True +EV. " "Full pitch telemetry with XGBoost Machine Learning model trained on a 3.8M pitch-event " "data set + live data with custom anchors." ) secret_status = [] secret_status.append("ODDS_API_KEY ✓" if ODDS_API_KEY else "ODDS_API_KEY missing") st.caption("💰".join(secret_status)) def render_source_diagnostics( schedule_df: pd.DataFrame, statcast_df: pd.DataFrame, odds_df: pd.DataFrame | None = None, scores_df: pd.DataFrame | None = None, ) -> None: c1, c2, c3, c4 = st.columns(4) c1.metric("Schedule rows", int(len(schedule_df))) c2.metric("Scores rows", int(len(scores_df)) if scores_df is not None else 0) c3.metric("Statcast rows", int(len(statcast_df))) c4.metric("Odds rows", int(len(odds_df)) if odds_df is not None else 0) def _scores_df_has_live_or_final_content(df: pd.DataFrame) -> bool: """ This helper should answer only one question: does the scores dataframe contain usable LIVE/FINAL STATUSES? Do NOT count raw score values here, because partially parsed scheduled-style rows with score numbers can incorrectly block the schedule->live-feed fallback. """ if df is None or df.empty: return False temp = df.copy() if "status" not in temp.columns: return False status_series = temp["status"].fillna("").astype(str).str.strip().str.lower() has_live_or_final_status = status_series.str.contains( r"live|top|bot|bottom|mid|middle|inning|in progress|delayed|suspended|final|game over|completed|ended", regex=True, na=False, ).any() return bool(has_live_or_final_status) @st.cache_data(ttl=SCORES_TTL_SECONDS) def load_scores_for_today() -> pd.DataFrame: base_date = pd.to_datetime(current_wbc_date_str()).date() candidates: list[pd.DataFrame] = [] for offset in range(0, 4): candidate_date = (base_date - timedelta(days=offset)).isoformat() try: df = fetch_scores_for_date(candidate_date) if df is not None and not df.empty: out = df.copy() out["scores_source_date"] = candidate_date candidates.append(out) except Exception as e: logger.warning(f"[scores_source_date_enrich] failure: {e}", exc_info=True) for df in candidates: if _scores_df_has_live_or_final_content(df): return df if candidates: return candidates[0] return pd.DataFrame() def current_dashboard_date_str() -> str: return pd.Timestamp.now(tz="America/New_York").date().isoformat() def get_stable_scores_for_dashboard() -> pd.DataFrame: fresh_scores = load_scores_for_today() if _scores_df_has_live_or_final_content(fresh_scores): st.session_state["last_good_scores_df"] = fresh_scores.copy() return fresh_scores if "last_good_scores_df" in st.session_state: return st.session_state["last_good_scores_df"].copy() return fresh_scores WBC_COUNTRY_NAMES = { "australia", "canada", "china", "chinese taipei", "colombia", "cuba", "czech republic", "dominican republic", "great britain", "israel", "italy", "japan", "korea", "mexico", "netherlands", "nicaragua", "panama", "puerto rico", "united states", "usa", "venezuela", } MLB_TEAM_NAMES = { "angels", "astros", "athletics", "blue jays", "braves", "brewers", "cardinals", "cubs", "diamondbacks", "dodgers", "giants", "guardians", "mariners", "marlins", "mets", "nationals", "orioles", "padres", "phillies", "pirates", "rangers", "rays", "red sox", "reds", "rockies", "royals", "tigers", "twins", "white sox", "yankees", } def _normalize_team_bucket_name(name: str) -> str: text = str(name or "").strip().lower() text = " ".join(text.split()) return text def _infer_competition_bucket( away_team: str, home_team: str, raw_game: dict | None = None, ) -> str: raw_game = raw_game or {} # Preferred: use feed metadata when available possible_text = " ".join( [ str(raw_game.get("series_description", "") or ""), str(raw_game.get("seriesDescription", "") or ""), str(raw_game.get("game_type", "") or ""), str(raw_game.get("gameType", "") or ""), str(raw_game.get("competition_name", "") or ""), str(raw_game.get("tournament_name", "") or ""), ] ).lower() if "world baseball classic" in possible_text or "wbc" in possible_text: return "WBC" if "mlb" in possible_text or "major league" in possible_text: return "MLB" away_norm = _normalize_team_bucket_name(away_team) home_norm = _normalize_team_bucket_name(home_team) if away_norm in WBC_COUNTRY_NAMES and home_norm in WBC_COUNTRY_NAMES: return "WBC" away_is_mlb = any(away_norm == team or away_norm.endswith(f" {team}") for team in MLB_TEAM_NAMES) home_is_mlb = any(home_norm == team or home_norm.endswith(f" {team}") for team in MLB_TEAM_NAMES) if away_is_mlb and home_is_mlb: return "MLB" return "OTHER" def merge_schedule_and_scores(schedule_df: pd.DataFrame, scores_df: pd.DataFrame) -> pd.DataFrame: if schedule_df.empty and scores_df.empty: return pd.DataFrame() if schedule_df.empty: return scores_df.copy() if scores_df.empty: return schedule_df.copy() merged = schedule_df.merge( scores_df, on=["game_date", "away_team", "home_team"], how="left", suffixes=("", "_score"), ) if "away_score_score" in merged.columns: merged["away_score"] = merged["away_score_score"].combine_first(merged.get("away_score")) merged = merged.drop(columns=["away_score_score"]) if "home_score_score" in merged.columns: merged["home_score"] = merged["home_score_score"].combine_first(merged.get("home_score")) merged = merged.drop(columns=["home_score_score"]) if "status_score" in merged.columns: merged["status"] = merged["status_score"].combine_first(merged.get("status")) merged = merged.drop(columns=["status_score"]) if "start_time_et_score" in merged.columns: if "start_time_et" in merged.columns: merged["start_time_et"] = merged["start_time_et"].combine_first(merged["start_time_et_score"]) else: merged["start_time_et"] = merged["start_time_et_score"] merged = merged.drop(columns=["start_time_et_score"]) if "tv_score" in merged.columns: if "tv" in merged.columns: merged["tv"] = merged["tv"].combine_first(merged["tv_score"]) else: merged["tv"] = merged["tv_score"] merged = merged.drop(columns=["tv_score"]) return merged def _normalize_pitch_type_key(pitch_type: str) -> str: text = str(pitch_type or "").strip().lower() if text in {"", "nan", "none"}: return "unknown" if "4-seam" in text or "four-seam" in text or "four seam" in text: return "four_seam" if "sinker" in text: return "sinker" if "cutter" in text: return "cutter" if "slider" in text: return "slider" if "sweeper" in text: return "sweeper" if "curve" in text: return "curveball" if "change" in text: return "changeup" if "split" in text or "fork" in text: return "splitter" if "knuckle" in text: return "knuckleball" return re.sub(r"[^a-z0-9]+", "_", text).strip("_") or "unknown" def normalize_game_cards_df(df: pd.DataFrame) -> pd.DataFrame: if df is None or df.empty: return pd.DataFrame() out = df.copy() for col in ["away_team", "home_team", "status", "tv", "start_time_et", "away_record", "home_record", "game_pk"]: if col not in out.columns: out[col] = "" for col in ["away_score", "home_score", "away_hits", "home_hits", "away_errors", "home_errors"]: if col not in out.columns: out[col] = None for col in ["runner_on_1b", "runner_on_2b", "runner_on_3b"]: if col not in out.columns: out[col] = False for col in ["away_win_prob", "home_win_prob", "outs", "balls", "strikes"]: if col not in out.columns: out[col] = None for col in ["batter_name", "pitcher_name", "last_play", "last_pitch", "pitch_type"]: if col not in out.columns: out[col] = "" for col in ["pitch_velocity", "pitch_spin_rate", "pitch_extension", "pitch_pfx_x", "pitch_pfx_z"]: if col not in out.columns: out[col] = None return out def filter_games_for_display(df: pd.DataFrame, selected_filter: str) -> pd.DataFrame: if df.empty or selected_filter == "All": return df temp = df.copy() status_series = temp["status"].fillna("").astype(str).str.lower() if selected_filter == "Live": return temp[ status_series.str.contains("live") | status_series.str.contains("top") | status_series.str.contains("bot") | status_series.str.contains("bottom") | status_series.str.contains("mid") | status_series.str.contains("inning") ] if selected_filter == "Final": return temp[status_series.str.contains("final")] if selected_filter == "Scheduled": return temp[ status_series.str.contains("scheduled") | status_series.str.contains("preview") | status_series.eq("") | temp["start_time_et"].fillna("").astype(str).str.len().gt(0) ] return temp def filter_games_for_competition(df: pd.DataFrame, competition_filter: str) -> pd.DataFrame: if df.empty or competition_filter == "All": return df temp = df.copy() if "competition_bucket" not in temp.columns: temp["competition_bucket"] = temp.apply( lambda row: _infer_competition_bucket( away_team=row.get("away_team", ""), home_team=row.get("home_team", ""), raw_game=row.to_dict(), ), axis=1, ) return temp[ temp["competition_bucket"].fillna("").astype(str).str.upper() == competition_filter.upper() ] def render_scoreboard_section(title: str, games_df: pd.DataFrame) -> None: if games_df.empty: return st.markdown(f'
{title}
', unsafe_allow_html=True) games = games_df.to_dict("records") cols = st.columns(2) for i, game in enumerate(games): with cols[i % 2]: render_game_card(game) def normalize_game_pk(value: object) -> str: try: if value is None: return "" text = str(value).strip() if text.lower() in {"", "nan", "none"}: return "" return str(int(float(text))) except Exception: text = str(value).strip() return text if text.isdigit() else "" def split_games_for_scoreboard( schedule_df: pd.DataFrame, scores_df: pd.DataFrame, ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: def ensure_cols(df: pd.DataFrame) -> pd.DataFrame: out = df.copy() for col in [ "away_team", "home_team", "away_score", "home_score", "away_hits", "home_hits", "away_errors", "home_errors", ]: if col not in out.columns: out[col] = None for col in [ "tv", "start_time_et", "away_record", "home_record", "status", "game_pk", ]: if col not in out.columns: out[col] = "" return out def canon(name: str) -> str: if not name: return "" n = str(name).strip().lower() n = n.replace(".", "").replace(",", "") n = " ".join(n.split()) mapping = { "usa": "united states", "usa ": "united states", "u s a": "united states", "u s a ": "united states", "u.s.a": "united states", "u.s.a.": "united states", "united states": "united states", "puerto rico": "puerto rico", "dominican republic": "dominican republic", "dominicana": "dominican republic", "dominic republic": "dominican republic", "dom rep": "dominican republic", "great britain": "great britain", "chinese taipei": "chinese taipei", "taiwan": "chinese taipei", "czech republic": "czechia", "czechia": "czechia", "korea": "korea", "south korea": "korea", "republic of korea": "korea", "rep of korea": "korea", "japan": "japan", "brazil": "brazil", "brasil": "brazil", "italy": "italy", "nicaragua": "nicaragua", "mexico": "mexico", "venezuela": "venezuela", "colombia": "colombia", "columbia": "colombia", "panama": "panama", "netherlands": "netherlands", "cuba": "cuba", "israel": "israel", "canada": "canada", "australia": "australia", "china": "china", } return mapping.get(n, n) live_games = pd.DataFrame() final_games = pd.DataFrame() scheduled_games = pd.DataFrame() scores = ensure_cols(scores_df) if not scores_df.empty else pd.DataFrame() schedule = ensure_cols(schedule_df) if not schedule_df.empty else pd.DataFrame() # Fallback: if scores feed is empty OR contains no live/final content, # derive rows directly from schedule game_pk + live feed. if (scores.empty or not _scores_df_has_live_or_final_content(scores)) and not schedule.empty: try: fallback_scores = build_scores_from_schedule_via_live_feeds(schedule) if fallback_scores is not None and not fallback_scores.empty: scores = ensure_cols(fallback_scores) elif scores.empty: scores = pd.DataFrame() except Exception: if scores.empty: scores = pd.DataFrame() if not scores.empty: scores["away_key"] = scores["away_team"].fillna("").apply(canon) scores["home_key"] = scores["home_team"].fillna("").apply(canon) if not schedule.empty: schedule["away_key"] = schedule["away_team"].fillna("").apply(canon) schedule["home_key"] = schedule["home_team"].fillna("").apply(canon) # Attach game_pk from schedule to scores using normalized team keys # First try direct away/home match, then fallback to reversed team-pair match. if not scores.empty and not schedule.empty and "game_pk" in schedule.columns: schedule_keys = ( schedule[["away_key", "home_key", "game_pk"]] .dropna(subset=["away_key", "home_key"]) .drop_duplicates(subset=["away_key", "home_key"]) .copy() ) # direct match scores = scores.merge( schedule_keys, on=["away_key", "home_key"], how="left", suffixes=("", "_sched"), ) if "game_pk_sched" in scores.columns: scores["game_pk"] = ( scores["game_pk"] .astype(str) .replace({"": pd.NA, "nan": pd.NA, "None": pd.NA}) .combine_first(scores["game_pk_sched"]) ) scores = scores.drop(columns=["game_pk_sched"]) if "game_pk" in scores.columns: scores["game_pk"] = scores["game_pk"].apply(normalize_game_pk) # reverse-order fallback for feeds that swap away/home labels missing_game_pk_mask = ( scores["game_pk"].astype(str).str.strip().replace({"nan": "", "None": ""}).eq("") ) if missing_game_pk_mask.any(): reverse_schedule_keys = schedule_keys.rename( columns={ "away_key": "home_key", "home_key": "away_key", "game_pk": "game_pk_rev", } ) reverse_matches = scores.loc[missing_game_pk_mask, ["away_key", "home_key"]].merge( reverse_schedule_keys, on=["away_key", "home_key"], how="left", ) scores.loc[missing_game_pk_mask, "game_pk"] = ( reverse_matches["game_pk_rev"] .astype(str) .replace({"": pd.NA, "nan": pd.NA, "None": pd.NA}) .values ) if "game_pk_sched" in scores.columns: scores["game_pk"] = ( scores["game_pk"] .astype(str) .replace({"": pd.NA, "nan": pd.NA, "None": pd.NA}) .combine_first(scores["game_pk_sched"]) ) scores = scores.drop(columns=["game_pk_sched"]) if not scores.empty: try: scores = enrich_live_games_from_feeds(scores) except Exception as e: logger.warning(f"[live_feed_enrich] failure: {e}", exc_info=True) if scores is None or scores.empty: scores = pd.DataFrame() if not scores.empty: scores = ensure_cols(scores) if "status" not in scores.columns: scores["status"] = "" status_series = scores["status"].fillna("").astype(str).str.strip().str.lower() def has_score_value(series: pd.Series) -> pd.Series: text = series.fillna("").astype(str).str.strip().str.lower() return ~text.isin(["", "nan", "none"]) away_score_present = ( has_score_value(scores["away_score"]) if "away_score" in scores.columns else pd.Series(False, index=scores.index) ) home_score_present = ( has_score_value(scores["home_score"]) if "home_score" in scores.columns else pd.Series(False, index=scores.index) ) score_present_mask = away_score_present | home_score_present final_mask = status_series.str.contains( r"final|game over|completed|ended", regex=True, na=False, ) detailed_live_mask = status_series.str.contains( r"top|bot|bottom|mid|middle|end|inning|in progress|delayed|suspended", regex=True, na=False, ) # Treat plain "Live" as truly live only if the row has score context. plain_live_mask = status_series.eq("live") & score_present_mask live_status_mask = detailed_live_mask | plain_live_mask live_games = scores[live_status_mask].copy() final_games = scores[ final_mask | (score_present_mask & ~live_status_mask) ].copy() # fallback only if both are still empty if live_games.empty and final_games.empty and not scores.empty: score_rows = scores[score_present_mask].copy() if not score_rows.empty: score_status = score_rows["status"].fillna("").astype(str).str.lower() live_games = score_rows[ score_status.str.contains( r"live|top|bot|bottom|mid|inning|in progress|delayed|suspended", regex=True, na=False, ) ].copy() final_games = score_rows[ score_status.str.contains( r"final|game over|completed|ended", regex=True, na=False, ) | ~score_status.str.contains( r"live|top|bot|bottom|mid|inning|in progress|delayed|suspended", regex=True, na=False, ) ].copy() if not schedule.empty: status_series = schedule["status"].fillna("").astype(str).str.strip().str.lower() scheduled_games = schedule[ status_series.str.contains(r"\bscheduled\b|\bpreview\b", regex=True, na=False) | schedule["start_time_et"].fillna("").astype(str).str.len().gt(0) | status_series.eq("") ].copy() # --------------------------------------- # Tag games with competition bucket # --------------------------------------- if not live_games.empty: live_games["competition_bucket"] = live_games.apply( lambda r: _infer_competition_bucket( away_team=r.get("away_team", ""), home_team=r.get("home_team", ""), raw_game=r.to_dict(), ), axis=1, ) if not final_games.empty: final_games["competition_bucket"] = final_games.apply( lambda r: _infer_competition_bucket( away_team=r.get("away_team", ""), home_team=r.get("home_team", ""), raw_game=r.to_dict(), ), axis=1, ) if not scheduled_games.empty: scheduled_games["competition_bucket"] = scheduled_games.apply( lambda r: _infer_competition_bucket( away_team=r.get("away_team", ""), home_team=r.get("home_team", ""), raw_game=r.to_dict(), ), axis=1, ) return live_games, final_games, scheduled_games @st.cache_data(ttl=LIVE_FEED_TTL_SECONDS, show_spinner=False) def load_live_game_feed_cached(game_pk: str) -> dict: game_pk = normalize_game_pk(game_pk) if not game_pk: return {} try: return fetch_live_game_feed(game_pk) except Exception: return {} def merge_live_game_row(base_game: dict, enriched_game: dict | None) -> dict: merged = dict(base_game) if not isinstance(enriched_game, dict): return merged for key, value in enriched_game.items(): if _is_missing_like(value): continue merged[key] = value return merged def build_prepared_live_games_df(live_games: pd.DataFrame) -> pd.DataFrame: if live_games is None or live_games.empty: return pd.DataFrame() rows = [] for _, row in live_games.iterrows(): rows.append(prepare_live_game_for_ui(row.to_dict())) return pd.DataFrame(rows) def _is_missing_like(value: object) -> bool: if value is None: return True text = str(value).strip().lower() return text in {"", "nan", "none"} def merge_live_game_row(base_game: dict, enriched_game: dict | None) -> dict: merged = dict(base_game) if not isinstance(enriched_game, dict): return merged for key, value in enriched_game.items(): if _is_missing_like(value): continue merged[key] = value return merged def _extract_pitch_velocity_value(pitch_data: dict) -> float | None: if not isinstance(pitch_data, dict): return None candidates = [ pitch_data.get("startSpeed"), pitch_data.get("releaseSpeed"), pitch_data.get("speed"), ] for value in candidates: try: if value is None: continue text = str(value).strip().lower() if text in {"", "nan", "none"}: continue return float(value) except Exception: continue return None def _extract_pitch_spin_rate(pitch_data: dict) -> float | None: if not isinstance(pitch_data, dict): return None pitch_breaks = pitch_data.get("breaks", {}) or {} coordinates = pitch_data.get("coordinates", {}) or {} candidates = [ pitch_data.get("release_spin_rate"), pitch_data.get("spinRate"), pitch_data.get("spin_rate"), pitch_breaks.get("spinRate"), pitch_breaks.get("spin_rate"), coordinates.get("spinRate"), ] for value in candidates: try: if value is None: continue text = str(value).strip().lower() if text in {"", "nan", "none"}: continue return float(value) except Exception: continue return None def _extract_pitch_extension_value(pitch_data: dict) -> float | None: if not isinstance(pitch_data, dict): return None candidates = [ pitch_data.get("release_extension"), pitch_data.get("extension"), pitch_data.get("releaseExtension"), ] for value in candidates: try: if value is None: continue text = str(value).strip().lower() if text in {"", "nan", "none"}: continue return float(value) except Exception: continue return None def _extract_pitch_movement_x_value(pitch_data: dict) -> float | None: if not isinstance(pitch_data, dict): return None coords = pitch_data.get("coordinates", {}) or {} candidates = [ coords.get("pfxX"), pitch_data.get("pfxX"), pitch_data.get("pfx_x"), ] for value in candidates: try: if value is None: continue text = str(value).strip().lower() if text in {"", "nan", "none"}: continue return float(value) except Exception: continue return None def _extract_pitch_movement_z_value(pitch_data: dict) -> float | None: if not isinstance(pitch_data, dict): return None coords = pitch_data.get("coordinates", {}) or {} candidates = [ coords.get("pfxZ"), pitch_data.get("pfxZ"), pitch_data.get("pfx_z"), ] for value in candidates: try: if value is None: continue text = str(value).strip().lower() if text in {"", "nan", "none"}: continue return float(value) except Exception: continue return None def _extract_person_name(obj: object) -> str: if not isinstance(obj, dict): return "" return str(obj.get("fullName", "") or obj.get("name", "") or "").strip() def _safe_mean_numeric(values: list[object]) -> float | None: cleaned: list[float] = [] for value in values: try: if value is None: continue text = str(value).strip().lower() if text in {"", "nan", "none"}: continue cleaned.append(float(value)) except Exception: continue if not cleaned: return None return float(sum(cleaned) / len(cleaned)) def prepare_live_game_for_ui(game: dict) -> dict: prepared = dict(game) try: game_pk = normalize_game_pk(prepared.get("game_pk")) if not game_pk: return prepared feed = load_live_game_feed_cached(game_pk) if not isinstance(feed, dict) or not feed: return prepared # First: use existing enrichment if it works try: enriched = enrich_game_from_live_feed(prepared, feed) prepared = merge_live_game_row(prepared, enriched) except Exception as e: logger.warning(f"[live_feed_merge] failure: {e}", exc_info=True) # Second: direct fallback extraction from feed so UI fields are guaranteed live_data = feed.get("liveData", {}) or {} linescore = live_data.get("linescore", {}) or {} plays = live_data.get("plays", {}) or {} current_play = plays.get("currentPlay", {}) or {} matchup = current_play.get("matchup", {}) or {} count = current_play.get("count", {}) or {} offense = linescore.get("offense", {}) or {} defense = linescore.get("defense", {}) or {} batter_name = _extract_person_name(matchup.get("batter", {})) if not batter_name: batter_name = _extract_person_name(offense.get("batter", {})) pitcher_name = _extract_person_name(matchup.get("pitcher", {})) if not pitcher_name: pitcher_name = _extract_person_name(defense.get("pitcher", {})) three_away_name = None try: lineup = offense.get("battingOrder", []) or [] if isinstance(lineup, list) and len(lineup) >= 3: three_away_name = _extract_person_name(lineup[2]) except Exception as e: logger.warning(f"[lineup_slot_extract] failure: {e}", exc_info=True) prepared = merge_live_game_row( prepared, { "batter_name": batter_name, "pitcher_name": pitcher_name, "pitcher_id": ( (matchup.get("pitcher", {}) or {}).get("id") or (defense.get("pitcher", {}) or {}).get("id") ), "balls": count.get("balls"), "strikes": count.get("strikes"), "outs": count.get("outs", linescore.get("outs")), "runner_on_1b": offense.get("first") is not None, "runner_on_2b": offense.get("second") is not None, "runner_on_3b": offense.get("third") is not None, "last_play": str((current_play.get("result", {}) or {}).get("description", "") or "").strip(), "on_deck_name": _extract_person_name(offense.get("onDeck", {})), "in_hole_name": _extract_person_name(offense.get("inHole", {})), "three_away_name": three_away_name, }, ) # Task 3: Extract batting order lineup slots (fully fallback-safe) try: batting_order = offense.get("battingOrder") or [] def _find_slot(player_id: object, bo_list: list) -> int | None: if not player_id or not bo_list: return None for i, p in enumerate(bo_list): pid = p.get("id") if isinstance(p, dict) else p if str(pid) == str(player_id): return i + 1 # 1-based slot return None on_deck_id = offense.get("onDeck", {}).get("id") in_hole_id = offense.get("inHole", {}).get("id") prepared["on_deck_lineup_slot"] = _find_slot(on_deck_id, batting_order) prepared["in_hole_lineup_slot"] = _find_slot(in_hole_id, batting_order) prepared["three_away_lineup_slot"] = None except Exception: prepared["on_deck_lineup_slot"] = None prepared["in_hole_lineup_slot"] = None prepared["three_away_lineup_slot"] = None # Prefer the most recent pitch event that actually has RPM/EXT. # If none exists, fall back to the most recent event with any pitchData. play_events = current_play.get("playEvents", []) or [] pitch_event = None fallback_pitch_event = None for event in reversed(play_events): pitch_data = event.get("pitchData") or {} if not pitch_data: continue if fallback_pitch_event is None: fallback_pitch_event = event pitch_breaks = pitch_data.get("breaks", {}) or {} has_spin_or_ext = any( value is not None for value in [ pitch_data.get("release_spin_rate"), pitch_breaks.get("spinRate"), pitch_data.get("spinRate"), pitch_data.get("release_extension"), pitch_data.get("extension"), ] ) if has_spin_or_ext: pitch_event = event break if pitch_event is None: pitch_event = fallback_pitch_event if pitch_event: pitch_data = pitch_event.get("pitchData", {}) or {} pitch_breaks = pitch_data.get("breaks", {}) or {} coords = pitch_data.get("coordinates", {}) or {} details = pitch_event.get("details", {}) or {} current_pitch_type = str(((details.get("type", {}) or {}).get("description", "")) or "").strip() prepared = merge_live_game_row( prepared, { "last_pitch": str(details.get("description", "") or "").strip(), "pitch_type": current_pitch_type, "pitch_type_key": _normalize_pitch_type_key(current_pitch_type), "pitch_velocity": _extract_pitch_velocity_value(pitch_data), "pitch_spin_rate": _extract_pitch_spin_rate(pitch_data), "pitch_extension": _extract_pitch_extension_value(pitch_data), "pitch_pfx_x": _extract_pitch_movement_x_value(pitch_data), "pitch_pfx_z": _extract_pitch_movement_z_value(pitch_data), }, ) # Phase 6.5: rolling telemetry from recent pitch events recent_pitch_events: list[dict] = [] for event in reversed(play_events): pitch_data = event.get("pitchData") or {} event_details = event.get("details", {}) or {} event_pitch_type = str(((event_details.get("type", {}) or {}).get("description", "")) or "").strip() event_pitch_type_key = _normalize_pitch_type_key(event_pitch_type) if not pitch_data: continue recent_pitch_events.append(event) if len(recent_pitch_events) >= 8: break if recent_pitch_events: velocity_values: list[object] = [] spin_values: list[object] = [] extension_values: list[object] = [] pfx_x_values: list[object] = [] pfx_z_values: list[object] = [] current_pitch_type_key = str(prepared.get("pitch_type_key", "") or "").strip() pitch_type_velocity_values: list[object] = [] pitch_type_spin_values: list[object] = [] pitch_type_extension_values: list[object] = [] pitch_type_pfx_x_values: list[object] = [] pitch_type_pfx_z_values: list[object] = [] for event in recent_pitch_events: pitch_data = event.get("pitchData", {}) or {} velocity_values.append(_extract_pitch_velocity_value(pitch_data)) spin_values.append(_extract_pitch_spin_rate(pitch_data)) extension_values.append(_extract_pitch_extension_value(pitch_data)) pfx_x_values.append(_extract_pitch_movement_x_value(pitch_data)) pfx_z_values.append(_extract_pitch_movement_z_value(pitch_data)) if current_pitch_type_key and current_pitch_type_key != "unknown": if event_pitch_type_key == current_pitch_type_key: pitch_type_velocity_values.append(_extract_pitch_velocity_value(pitch_data)) pitch_type_spin_values.append(_extract_pitch_spin_rate(pitch_data)) pitch_type_extension_values.append(_extract_pitch_extension_value(pitch_data)) pitch_type_pfx_x_values.append(_extract_pitch_movement_x_value(pitch_data)) pitch_type_pfx_z_values.append(_extract_pitch_movement_z_value(pitch_data)) valid_velocity_values = [v for v in velocity_values if v is not None] valid_spin_values = [v for v in spin_values if v is not None] valid_extension_values = [v for v in extension_values if v is not None] valid_pfx_x_values = [v for v in pfx_x_values if v is not None] valid_pfx_z_values = [v for v in pfx_z_values if v is not None] valid_pitch_type_velocity_values = [v for v in pitch_type_velocity_values if v is not None] valid_pitch_type_spin_values = [v for v in pitch_type_spin_values if v is not None] valid_pitch_type_extension_values = [v for v in pitch_type_extension_values if v is not None] valid_pitch_type_pfx_x_values = [v for v in pitch_type_pfx_x_values if v is not None] valid_pitch_type_pfx_z_values = [v for v in pitch_type_pfx_z_values if v is not None] prepared = merge_live_game_row( prepared, { "rolling_pitch_velocity": _safe_mean_numeric(valid_velocity_values), "rolling_pitch_spin_rate": _safe_mean_numeric(valid_spin_values), "rolling_pitch_extension": _safe_mean_numeric(valid_extension_values), "rolling_pitch_pfx_x": _safe_mean_numeric(valid_pfx_x_values), "rolling_pitch_pfx_z": _safe_mean_numeric(valid_pfx_z_values), "rolling_pitch_sample_size": len(recent_pitch_events), "rolling_pitch_velocity_sample_size": len(valid_velocity_values), "rolling_pitch_spin_sample_size": len(valid_spin_values), "rolling_pitch_extension_sample_size": len(valid_extension_values), "rolling_pitch_pfx_x_sample_size": len(valid_pfx_x_values), "rolling_pitch_pfx_z_sample_size": len(valid_pfx_z_values), "rolling_pitch_type_key": current_pitch_type_key, "rolling_pitch_type_velocity": _safe_mean_numeric(valid_pitch_type_velocity_values), "rolling_pitch_type_spin_rate": _safe_mean_numeric(valid_pitch_type_spin_values), "rolling_pitch_type_extension": _safe_mean_numeric(valid_pitch_type_extension_values), "rolling_pitch_type_pfx_x": _safe_mean_numeric(valid_pitch_type_pfx_x_values), "rolling_pitch_type_pfx_z": _safe_mean_numeric(valid_pitch_type_pfx_z_values), "rolling_pitch_type_velocity_sample_size": len(valid_pitch_type_velocity_values), "rolling_pitch_type_spin_sample_size": len(valid_pitch_type_spin_values), "rolling_pitch_type_extension_sample_size": len(valid_pitch_type_extension_values), "rolling_pitch_type_pfx_x_sample_size": len(valid_pitch_type_pfx_x_values), "rolling_pitch_type_pfx_z_sample_size": len(valid_pitch_type_pfx_z_values), }, ) # Seed a per-game live baseline using the FIRST observed pitch of each inning, # and persist it across games. try: game_pk_seed = normalize_game_pk(prepared.get("game_pk")) pitcher_id_seed = prepared.get("pitcher_id") pitcher_name_seed = str(prepared.get("pitcher_name", "") or "").strip() game_date_seed = str(prepared.get("game_date", "") or prepared.get("official_date", "") or "").strip() inning_seed = current_play.get("about", {}) or {} current_inning_seed = ( inning_seed.get("inning") or linescore.get("currentInning") or prepared.get("inning") ) current_pitch_type_key = str(prepared.get("pitch_type_key", "") or "").strip() if "live_pitcher_baseline_seeds" not in st.session_state: st.session_state["live_pitcher_baseline_seeds"] = {} baseline_seed_map = st.session_state["live_pitcher_baseline_seeds"] baseline_seed_key = None if game_pk_seed and pitcher_id_seed: baseline_seed_key = f"{game_pk_seed}:{pitcher_id_seed}" elif game_pk_seed and pitcher_name_seed: baseline_seed_key = f"{game_pk_seed}:{pitcher_name_seed.lower()}" elif pitcher_name_seed: baseline_seed_key = f"name_only:{pitcher_name_seed.lower()}" if baseline_seed_key: baseline_seed = baseline_seed_map.get( baseline_seed_key, { "innings": {}, "pitch_type_innings": {}, }, ) innings_map = baseline_seed.get("innings", {}) pitch_type_innings_map = baseline_seed.get("pitch_type_innings", {}) persisted_new_inning_seed = False if current_inning_seed is not None: inning_key = str(current_inning_seed) if inning_key not in innings_map: innings_map[inning_key] = { "velocity": prepared.get("pitch_velocity"), "spin_rate": prepared.get("pitch_spin_rate"), "extension": prepared.get("pitch_extension"), "pfx_x": prepared.get("pitch_pfx_x"), "pfx_z": prepared.get("pitch_pfx_z"), } persisted_new_inning_seed = True if current_pitch_type_key and current_pitch_type_key != "unknown": if current_pitch_type_key not in pitch_type_innings_map: pitch_type_innings_map[current_pitch_type_key] = {} if inning_key not in pitch_type_innings_map[current_pitch_type_key]: pitch_type_innings_map[current_pitch_type_key][inning_key] = { "velocity": prepared.get("pitch_velocity"), "spin_rate": prepared.get("pitch_spin_rate"), "extension": prepared.get("pitch_extension"), "pfx_x": prepared.get("pitch_pfx_x"), "pfx_z": prepared.get("pitch_pfx_z"), } baseline_seed["innings"] = innings_map baseline_seed["pitch_type_innings"] = pitch_type_innings_map baseline_seed_map[baseline_seed_key] = baseline_seed if persisted_new_inning_seed and current_inning_seed is not None: upsert_inning_first_seed_event( { "pitcher_id": pitcher_id_seed, "pitcher_name": pitcher_name_seed, "game_pk": game_pk_seed, "game_date": game_date_seed, "inning": current_inning_seed, "pitch_type_key": current_pitch_type_key or "unknown", "velocity": prepared.get("pitch_velocity"), "spin_rate": prepared.get("pitch_spin_rate"), "extension": prepared.get("pitch_extension"), "pfx_x": prepared.get("pitch_pfx_x"), "pfx_z": prepared.get("pitch_pfx_z"), } ) def _mean_from_metric_rows(rows_map: dict, metric_name: str) -> float | None: values: list[object] = [] for row in rows_map.values(): if not isinstance(row, dict): continue values.append(row.get(metric_name)) return _safe_mean_numeric(values) velocity_seed = _mean_from_metric_rows(innings_map, "velocity") spin_seed = _mean_from_metric_rows(innings_map, "spin_rate") extension_seed = _mean_from_metric_rows(innings_map, "extension") pfx_x_seed = _mean_from_metric_rows(innings_map, "pfx_x") pfx_z_seed = _mean_from_metric_rows(innings_map, "pfx_z") velocity_seed_count = sum( 1 for row in innings_map.values() if isinstance(row, dict) and row.get("velocity") is not None ) spin_seed_count = sum( 1 for row in innings_map.values() if isinstance(row, dict) and row.get("spin_rate") is not None ) extension_seed_count = sum( 1 for row in innings_map.values() if isinstance(row, dict) and row.get("extension") is not None ) active_pitch_type_rows = {} if current_pitch_type_key and current_pitch_type_key != "unknown": active_pitch_type_rows = pitch_type_innings_map.get(current_pitch_type_key, {}) or {} pitch_type_velocity_seed = _mean_from_metric_rows(active_pitch_type_rows, "velocity") pitch_type_spin_seed = _mean_from_metric_rows(active_pitch_type_rows, "spin_rate") pitch_type_extension_seed = _mean_from_metric_rows(active_pitch_type_rows, "extension") pitch_type_pfx_x_seed = _mean_from_metric_rows(active_pitch_type_rows, "pfx_x") pitch_type_pfx_z_seed = _mean_from_metric_rows(active_pitch_type_rows, "pfx_z") pitch_type_velocity_seed_count = sum( 1 for row in active_pitch_type_rows.values() if isinstance(row, dict) and row.get("velocity") is not None ) pitch_type_spin_seed_count = sum( 1 for row in active_pitch_type_rows.values() if isinstance(row, dict) and row.get("spin_rate") is not None ) pitch_type_extension_seed_count = sum( 1 for row in active_pitch_type_rows.values() if isinstance(row, dict) and row.get("extension") is not None ) stored_baseline = load_pitcher_cross_game_baseline( pitcher_id=pitcher_id_seed, pitcher_name=pitcher_name_seed, pitch_type_key=current_pitch_type_key, ) prepared = merge_live_game_row( prepared, { "seed_baseline_velocity": velocity_seed, "seed_baseline_spin_rate": spin_seed, "seed_baseline_extension": extension_seed, "seed_baseline_pfx_x": pfx_x_seed, "seed_baseline_pfx_z": pfx_z_seed, "seed_baseline_velocity_sample_size": velocity_seed_count, "seed_baseline_spin_sample_size": spin_seed_count, "seed_baseline_extension_sample_size": extension_seed_count, "seed_pitch_type_key": current_pitch_type_key, "seed_pitch_type_baseline_velocity": pitch_type_velocity_seed, "seed_pitch_type_baseline_spin_rate": pitch_type_spin_seed, "seed_pitch_type_baseline_extension": pitch_type_extension_seed, "seed_pitch_type_baseline_pfx_x": pitch_type_pfx_x_seed, "seed_pitch_type_baseline_pfx_z": pitch_type_pfx_z_seed, "seed_pitch_type_baseline_velocity_sample_size": pitch_type_velocity_seed_count, "seed_pitch_type_baseline_spin_sample_size": pitch_type_spin_seed_count, "seed_pitch_type_baseline_extension_sample_size": pitch_type_extension_seed_count, **stored_baseline, }, ) if "pitcher_store_error" in st.session_state: del st.session_state["pitcher_store_error"] except Exception as e: st.session_state["pitcher_store_error"] = str(e) # Persist batter zone events from live playEvents try: batter_name_store = str(prepared.get("batter_name", "") or "").strip() game_pk_store = normalize_game_pk(prepared.get("game_pk")) game_date_store = str(prepared.get("game_date", "") or prepared.get("official_date", "") or "").strip() if batter_name_store and game_pk_store: if "batter_zone_event_keys" not in st.session_state: st.session_state["batter_zone_event_keys"] = set() seen_keys = st.session_state["batter_zone_event_keys"] new_rows: list[dict[str, Any]] = [] for event in play_events: pitch_data = event.get("pitchData") or {} if not pitch_data: continue details = event.get("details", {}) or {} result = event.get("result", {}) or {} pitch_name = str(((details.get("type", {}) or {}).get("description", "")) or "").strip() pitch_family = normalize_pitch_family(pitch_name) coords = pitch_data.get("coordinates", {}) or {} plate_x = coords.get("pX") plate_z = coords.get("pZ") if plate_x is None: plate_x = pitch_data.get("plate_x") if plate_z is None: plate_z = pitch_data.get("plate_z") zone_bucket = classify_zone_bucket(plate_x, plate_z) if zone_bucket == "unknown": continue description = str(details.get("description", "") or "").strip().lower() event_type = str(result.get("event", "") or "").strip().lower() event_desc = str(result.get("eventType", "") or "").strip().lower() hit_flag = int( event_type in {"single", "double", "triple", "home_run"} or event_desc in {"single", "double", "triple", "home_run"} ) hr_flag = int(event_type == "home_run" or event_desc == "home_run") tb2p_flag = int( event_type in {"double", "triple", "home_run"} or event_desc in {"double", "triple", "home_run"} ) whiff_flag = int( description in {"swinging strike", "swinging strike blocked"} or event_desc in {"swinging_strike", "swinging_strike_blocked"} ) launch_speed = result.get("launchSpeed") damage_flag = 0 try: if launch_speed is not None and float(launch_speed) >= 95: damage_flag = 1 except Exception: pass if hr_flag: damage_flag = 1 event_key = ( str(game_pk_store), batter_name_store, str(event.get("playId", "") or event.get("index", "") or ""), pitch_family, zone_bucket, ) if event_key in seen_keys: continue seen_keys.add(event_key) play_id_value = str(event.get("playId", "") or event.get("index", "") or "").strip() if not play_id_value: continue event_key = ":".join( [ str(game_pk_store), play_id_value, batter_name_store, pitch_family, zone_bucket, ] ) pitch_breaks = pitch_data.get("breaks", {}) or {} coords = pitch_data.get("coordinates", {}) or {} pfx_x_value = _extract_pitch_movement_x_value(pitch_data) pfx_z_value = _extract_pitch_movement_z_value(pitch_data) ax_value = coords.get("aX") ay_value = coords.get("aY") az_value = coords.get("aZ") if ax_value is None: ax_value = pitch_data.get("ax") if ay_value is None: ay_value = pitch_data.get("ay") if az_value is None: az_value = pitch_data.get("az") new_rows.append( { "event_key": event_key, "batter_name": batter_name_store, "game_pk": game_pk_store, "game_date": game_date_store, "pitch_family": pitch_family, "zone_bucket": zone_bucket, "plate_x": plate_x, "plate_z": plate_z, "pfx_x": pfx_x_value, "pfx_z": pfx_z_value, "ax": ax_value, "ay": ay_value, "az": az_value, "hit_flag": hit_flag, "hr_flag": hr_flag, "tb2p_flag": tb2p_flag, "whiff_flag": whiff_flag, "damage_flag": damage_flag, } ) if new_rows: insert_batter_zone_events(new_rows) if "batter_zone_store_error" in st.session_state: del st.session_state["batter_zone_store_error"] except Exception as e: st.session_state["batter_zone_store_error"] = str(e) # Live-populate live_pitch_mix_2026 and live_batter_game_log_2026 from allPlays try: if "live_pitch_mix_seen_keys" not in st.session_state: st.session_state["live_pitch_mix_seen_keys"] = {} seen_pitch_keys: set = st.session_state["live_pitch_mix_seen_keys"].setdefault( str(game_pk), set() ) home_team = (feed.get("gameData", {}) or {}).get("teams", {}).get("home", {}).get("abbreviation") or prepared.get("home_team") away_team = (feed.get("gameData", {}) or {}).get("teams", {}).get("away", {}).get("abbreviation") or prepared.get("away_team") game_date_live = str(prepared.get("game_date") or prepared.get("official_date") or "")[:10] all_plays = (plays.get("allPlays") or []) + ([current_play] if current_play else []) new_pitch_rows: list[dict] = [] new_pa_rows: list[dict] = [] for play in all_plays: matchup = play.get("matchup", {}) or {} about = play.get("about", {}) or {} result = play.get("result", {}) or {} pitcher_id = (matchup.get("pitcher", {}) or {}).get("id") batter_id = (matchup.get("batter", {}) or {}).get("id") pitcher_name = _extract_person_name(matchup.get("pitcher", {})) stand = ((matchup.get("batSide", {}) or {}).get("code")) or None p_throws = ((matchup.get("pitchHand", {}) or {}).get("code")) or None ab_num = (about.get("atBatIndex") or 0) + 1 # convert 0-based to 1-based inning = about.get("inning") inning_topbot = str(about.get("halfInning") or "").capitalize() or None play_events_all = play.get("playEvents", []) or [] pa_complete = bool(result.get("eventType")) for event in play_events_all: if not event.get("isPitch"): continue pitch_num = event.get("pitchNumber") if pitch_num is None: continue ek = f"{game_pk}_{ab_num}_{pitch_num}" if ek in seen_pitch_keys: continue pitch_data = event.get("pitchData", {}) or {} pitch_breaks = pitch_data.get("breaks", {}) or {} coords = pitch_data.get("coordinates", {}) or {} details = event.get("details", {}) or {} count = event.get("count", {}) or {} plate_x = coords.get("pX") plate_z = coords.get("pZ") new_pitch_rows.append({ "event_key": ek, "pa_key": f"{game_pk}_{ab_num}", "game_pk": int(game_pk), "game_date": game_date_live, "source_season": int(game_date_live[:4]) if game_date_live else None, "batter": int(batter_id) if batter_id else None, "pitcher": int(pitcher_id) if pitcher_id else None, "player_name": pitcher_name or None, "stand": stand, "p_throws": p_throws, "home_team": home_team, "away_team": away_team, "inning": inning, "inning_topbot": inning_topbot, "at_bat_number": ab_num, "pitch_number": pitch_num, "pitch_type": ((details.get("type", {}) or {}).get("code")) or None, "pitch_name": ((details.get("type", {}) or {}).get("description")) or None, "release_speed": _safe_float(pitch_data.get("startSpeed")), "effective_speed": _safe_float(pitch_data.get("endSpeed")), "release_spin_rate": _safe_float(pitch_breaks.get("spinRate")), "spin_axis": _safe_float(pitch_breaks.get("spinDirection")), "pfx_x": _safe_float(pitch_breaks.get("pfxX") or coords.get("pfxX")), "pfx_z": _safe_float(pitch_breaks.get("pfxZ") or coords.get("pfxZ")), "release_pos_x": _safe_float(coords.get("x0")), "release_pos_y": _safe_float(coords.get("y0")), "release_pos_z": _safe_float(coords.get("z0")), "release_extension": _safe_float(pitch_data.get("extension")), "plate_x": _safe_float(plate_x), "plate_z": _safe_float(plate_z), "zone": None, # TODO: integer zone classifier not yet implemented; backfilled by post-game Savant ingest "balls": count.get("balls"), "strikes": count.get("strikes"), "outs_when_up": count.get("outs"), "bat_score": None, "fld_score": None, "type": (details.get("code")) or None, "description": (details.get("description")) or None, "events": (result.get("event")) if pa_complete and event == play_events_all[-1] else None, }) seen_pitch_keys.add(ek) # PA-level row when the play is complete if pa_complete: pa_key = f"{game_pk}_{ab_num}" if pa_key not in seen_pitch_keys: events_val = result.get("event") or None events_str = str(events_val or "").lower().replace(" ", "_") hit_flag = int(events_str in {"single", "double", "triple", "home_run"}) hr_flag = int(events_str == "home_run") tb2p_flag = int(events_str in {"double", "triple", "home_run"}) pitch_events_only = [e for e in play_events_all if e.get("isPitch")] terminal = pitch_events_only[-1] if pitch_events_only else {} tc = terminal.get("count", {}) or {} new_pa_rows.append({ "pa_key": pa_key, "game_pk": int(game_pk), "game_date": game_date_live, "source_season": int(game_date_live[:4]) if game_date_live else None, "batter": int(batter_id) if batter_id else None, "player_name": _extract_person_name(matchup.get("batter", {})), "stand": stand, "p_throws": p_throws, "home_team": home_team, "away_team": away_team, "inning": inning, "inning_topbot": inning_topbot, "at_bat_number": ab_num, "pitches_seen": len(pitch_events_only), "balls_final": tc.get("balls"), "strikes_final": tc.get("strikes"), "outs_when_up": tc.get("outs"), "events": events_val, "description": result.get("description"), "hit_flag": hit_flag, "hr_flag": hr_flag, "tb2p_flag": tb2p_flag, }) seen_pitch_keys.add(pa_key) if new_pitch_rows or new_pa_rows: _fire_live_pitch_upsert(new_pitch_rows, new_pa_rows) except Exception as e: logger.warning("[live_pitch_mix_ingest] failure game_pk=%s: %s", game_pk, e) except Exception as e: logger.warning(f"[batter_zone_store_init] failure: {e}", exc_info=True) return prepared def build_prepared_live_games_df(live_games: pd.DataFrame) -> pd.DataFrame: if live_games is None or live_games.empty: return pd.DataFrame() rows = [] for _, row in live_games.iterrows(): rows.append(prepare_live_game_for_ui(row.to_dict())) return pd.DataFrame(rows) def render_live_games_with_edge_strips( live_games: pd.DataFrame, statcast_df: pd.DataFrame, pitcher_statcast_df: pd.DataFrame | None = None, odds_df: pd.DataFrame | None = None, ) -> None: if live_games.empty: return st.markdown('
LIVE GAMES
', unsafe_allow_html=True) live_games_json = live_games.to_json(orient="records") prepared_live_games = build_prepared_live_games_df_cached(live_games_json) games = prepared_live_games.to_dict("records") cols = st.columns(2) for i, game in enumerate(games): with cols[i % 2]: render_game_card(game) prop_odds_df = load_hr_prop_odds_for_game( away_team=str(game.get("away_team", "")), home_team=str(game.get("home_team", "")), ) recommendations = build_upcoming_hitter_recommendations( game_row=game, statcast_df=statcast_df, pitcher_statcast_df=pitcher_statcast_df, odds_df=odds_df, prop_odds_df=prop_odds_df, weather_row=None, ) has_recommendations = False if recommendations is not None: if isinstance(recommendations, pd.DataFrame): has_recommendations = not recommendations.empty elif isinstance(recommendations, list): has_recommendations = len(recommendations) > 0 else: try: has_recommendations = len(recommendations) > 0 except Exception: has_recommendations = False try: timestamp = utc_now_iso() log_df = build_recommendation_log_rows( recommendations=recommendations, game_row=game, created_at=timestamp, ) insert_recommendation_logs(conn, log_df) outcome_df = build_recommendation_outcome_rows( game_row=game, graded_at=timestamp, ) insert_recommendation_outcomes(conn, outcome_df) except Exception as e: logger.warning(f"[recommendation_outcome_insert] failure: {e}", exc_info=True) def normalize_game_pk(value: object) -> str: try: if value is None: return "" text = str(value).strip() if text.lower() in {"", "nan", "none"}: return "" return str(int(float(text))) except Exception: text = str(value).strip() return text if text.isdigit() else "" def build_live_pitch_metrics_debug_df(live_games: pd.DataFrame) -> pd.DataFrame: rows: list[dict] = [] if live_games is None or live_games.empty: return pd.DataFrame() for _, row in live_games.iterrows(): game = row.to_dict() game_pk = normalize_game_pk(game.get("game_pk")) if not game_pk: rows.append( { "away_team": game.get("away_team"), "home_team": game.get("home_team"), "game_pk": "", "status": game.get("status"), "pitch_velocity": None, "pitch_extension": None, "pitch_spin_rate": None, "pitch_type": None, "last_pitch": None, "pfx_x": None, "pfx_z": None, "ax": None, "ay": None, "az": None, "release_x": None, "release_y": None, "release_z": None, "found_pitch_event": False, } ) continue try: feed = load_live_game_feed_cached(game_pk) except Exception: feed = {} live_data = (feed.get("liveData", {}) or {}) if isinstance(feed, dict) else {} plays = live_data.get("plays", {}) or {} current_play = plays.get("currentPlay", {}) or {} play_events = current_play.get("playEvents", []) or [] pitch_event = None best_pitch_event = None for event in reversed(play_events): pitch_data = event.get("pitchData") or {} if not pitch_data: continue best_pitch_event = event # Prefer an event that actually has usable numeric pitch metrics start_speed = pitch_data.get("startSpeed") extension = pitch_data.get("extension") spin_rate = (pitch_data.get("breaks", {}) or {}).get("spinRate") coords = pitch_data.get("coordinates", {}) or {} has_useful_metric = any( value is not None for value in [ start_speed, extension, spin_rate, coords.get("pfxX"), coords.get("pfxZ"), coords.get("x0"), coords.get("y0"), coords.get("z0"), ] ) if has_useful_metric: pitch_event = event break if pitch_event is None: pitch_event = best_pitch_event if pitch_event: pitch_data = pitch_event.get("pitchData", {}) or {} pitch_breaks = pitch_data.get("breaks", {}) or {} coords = pitch_data.get("coordinates", {}) or {} details = pitch_event.get("details", {}) or {} rows.append( { "away_team": game.get("away_team"), "home_team": game.get("home_team"), "game_pk": game_pk, "status": game.get("status"), "pitch_velocity": pitch_data.get("startSpeed"), "pitch_extension": ( pitch_data.get("release_extension") or pitch_data.get("extension") ), "pitch_spin_rate": ( pitch_data.get("release_spin_rate") or pitch_breaks.get("spinRate") or pitch_data.get("spinRate") ), "pitch_type": (details.get("type", {}) or {}).get("description"), "last_pitch": details.get("description"), "pfx_x": coords.get("pfxX"), "pfx_z": coords.get("pfxZ"), "ax": coords.get("aX"), "ay": coords.get("aY"), "az": coords.get("aZ"), "release_x": coords.get("x0"), "release_y": coords.get("y0"), "release_z": coords.get("z0"), "found_pitch_event": True, "raw_pitch_data": str(pitch_data)[:1500], "raw_pitch_breaks": str(pitch_breaks)[:1000], } ) else: rows.append( { "away_team": game.get("away_team"), "home_team": game.get("home_team"), "game_pk": game_pk, "status": game.get("status"), "pitch_velocity": None, "pitch_extension": None, "pitch_spin_rate": None, "pitch_type": None, "last_pitch": None, "pfx_x": None, "pfx_z": None, "ax": None, "ay": None, "az": None, "release_x": None, "release_y": None, "release_z": None, "found_pitch_event": False, } ) return pd.DataFrame(rows) def build_scores_from_schedule_via_live_feeds(schedule_df: pd.DataFrame) -> pd.DataFrame: """ Fallback path when the scores parser/feed is empty or unreliable. Uses schedule rows + game_pk + live feed to derive live/final score rows. """ if schedule_df is None or schedule_df.empty: return pd.DataFrame() if "game_pk" not in schedule_df.columns: return pd.DataFrame() rows: list[dict] = [] for _, row in schedule_df.iterrows(): game = row.to_dict() game_pk = normalize_game_pk(game.get("game_pk")) if not game_pk: rows.append(game) continue try: feed = load_live_game_feed_cached(game_pk) if isinstance(feed, dict) and feed: game["game_pk"] = game_pk game = enrich_game_from_live_feed(game, feed) except Exception as e: logger.warning(f"[feed_cache_load] failure: {e}", exc_info=True) rows.append(game) return pd.DataFrame(rows) def enrich_live_games_from_feeds(scores_df: pd.DataFrame) -> pd.DataFrame: if scores_df.empty: return scores_df rows = [] live_feed_calls = 0 for _, row in scores_df.iterrows(): game = row.to_dict() original_status = str(game.get("status", "")).strip() status = original_status.lower() game_pk = normalize_game_pk(game.get("game_pk")) is_live_candidate = any( token in status for token in ["live", "top", "bot", "bottom", "mid", "inning"] ) is_final_candidate = any( token in status for token in ["final", "game over", "completed", "ended"] ) # For finals, enrich aggressively if we have a usable game_pk. # For live games, still respect the live feed cap. should_enrich_live = ( is_live_candidate and game_pk.isdigit() and live_feed_calls < MAX_LIVE_FEEDS ) should_enrich_final = ( is_final_candidate and game_pk.isdigit() ) if should_enrich_live or should_enrich_final: try: feed = load_live_game_feed_cached(game_pk) if isinstance(feed, dict) and feed: game = enrich_game_from_live_feed(game, feed) if should_enrich_live: live_feed_calls += 1 # Preserve original completed-game status text if is_final_candidate: game["status"] = original_status if original_status else "Final" except Exception as e: logger.warning(f"[game_status_preserve] failure: {e}", exc_info=True) rows.append(game) return pd.DataFrame(rows) def _extract_status_order(status: str) -> tuple[int, int]: s = str(status or "").strip().lower() if any(token in s for token in ["top", "bot", "bottom", "mid", "live"]): inning_num = 0 for part in reversed(s.split()): try: inning_num = int(part) break except Exception: continue return (0, -inning_num) if "final" in s: return (1, 0) return (2, 0) MLB_TEAM_LOGOS = { "angels": "https://a.espncdn.com/i/teamlogos/mlb/500/ana.png", "astros": "https://a.espncdn.com/i/teamlogos/mlb/500/hou.png", "athletics": "https://a.espncdn.com/i/teamlogos/mlb/500/oak.png", "blue jays": "https://a.espncdn.com/i/teamlogos/mlb/500/tor.png", "braves": "https://a.espncdn.com/i/teamlogos/mlb/500/atl.png", "brewers": "https://a.espncdn.com/i/teamlogos/mlb/500/mil.png", "cardinals": "https://a.espncdn.com/i/teamlogos/mlb/500/stl.png", "cubs": "https://a.espncdn.com/i/teamlogos/mlb/500/chc.png", "diamondbacks": "https://a.espncdn.com/i/teamlogos/mlb/500/ari.png", "dodgers": "https://a.espncdn.com/i/teamlogos/mlb/500/lad.png", "giants": "https://a.espncdn.com/i/teamlogos/mlb/500/sf.png", "guardians": "https://a.espncdn.com/i/teamlogos/mlb/500/cle.png", "mariners": "https://a.espncdn.com/i/teamlogos/mlb/500/sea.png", "marlins": "https://a.espncdn.com/i/teamlogos/mlb/500/mia.png", "mets": "https://a.espncdn.com/i/teamlogos/mlb/500/nym.png", "nationals": "https://a.espncdn.com/i/teamlogos/mlb/500/wsh.png", "orioles": "https://a.espncdn.com/i/teamlogos/mlb/500/bal.png", "padres": "https://a.espncdn.com/i/teamlogos/mlb/500/sd.png", "phillies": "https://a.espncdn.com/i/teamlogos/mlb/500/phi.png", "pirates": "https://a.espncdn.com/i/teamlogos/mlb/500/pit.png", "rangers": "https://a.espncdn.com/i/teamlogos/mlb/500/tex.png", "rays": "https://a.espncdn.com/i/teamlogos/mlb/500/tb.png", "red sox": "https://a.espncdn.com/i/teamlogos/mlb/500/bos.png", "reds": "https://a.espncdn.com/i/teamlogos/mlb/500/cin.png", "rockies": "https://a.espncdn.com/i/teamlogos/mlb/500/col.png", "royals": "https://a.espncdn.com/i/teamlogos/mlb/500/kc.png", "tigers": "https://a.espncdn.com/i/teamlogos/mlb/500/det.png", "twins": "https://a.espncdn.com/i/teamlogos/mlb/500/min.png", "white sox": "https://a.espncdn.com/i/teamlogos/mlb/500/cws.png", "yankees": "https://a.espncdn.com/i/teamlogos/mlb/500/nyy.png", } def get_team_logo_url(team_name: str, competition_bucket: str = "") -> str | None: if str(competition_bucket or "").upper() != "MLB": return None return MLB_TEAM_LOGOS.get(str(team_name or "").strip().lower()) def sort_scoreboard_games(df: pd.DataFrame) -> pd.DataFrame: if df is None or df.empty: return df out = df.copy() if "status" not in out.columns: out["status"] = "" if "start_time_et" not in out.columns: out["start_time_et"] = "" if "away_team" not in out.columns: out["away_team"] = "" if "home_team" not in out.columns: out["home_team"] = "" status_keys = out["status"].fillna("").astype(str).apply(_extract_status_order) out["status_group"] = status_keys.apply(lambda x: x[0]) out["status_rank"] = status_keys.apply(lambda x: x[1]) out = out.sort_values( by=["status_group", "status_rank", "start_time_et", "away_team", "home_team"], ascending=[True, True, True, True, True], ) return out.drop(columns=["status_group", "status_rank"], errors="ignore") def render_live_prop_odds_debug_panel(live_games: pd.DataFrame) -> None: if live_games.empty: return with st.expander("Debug: live prop odds for first live game"): first_game = live_games.iloc[0].to_dict() st.write("Game context") st.write( { "away_team": first_game.get("away_team"), "home_team": first_game.get("home_team"), "status": first_game.get("status"), } ) odds_df = load_live_prop_odds_for_game(first_game) if odds_df.empty: st.info("No live prop odds returned for this game.") return display_cols = [ col for col in [ "sportsbook", "market", "player_name", "odds_american", "line", ] if col in odds_df.columns ] st.dataframe(odds_df[display_cols], use_container_width=True, hide_index=True) def grade_final_game_outcomes_from_scores(scores_df: pd.DataFrame) -> None: try: outcome_df = build_game_outcome_rows_from_scores( scores_df=scores_df, graded_at=utc_now_iso(), ) insert_game_outcomes(conn, outcome_df) except Exception: pass def grade_batter_prop_outcomes_from_audit() -> None: try: audit_df = read_recommendation_audit_view(conn) outcome_df = build_batter_prop_outcome_rows_from_audit( audit_df=audit_df, graded_at=utc_now_iso(), ) replace_batter_prop_outcomes(conn, outcome_df) except Exception: pass def fill_batter_prop_realized_outcomes(statcast_df: pd.DataFrame) -> None: try: batter_prop_outcomes_df = read_batter_prop_outcomes(conn) if batter_prop_outcomes_df.empty: return graded_df = build_batter_realization_rows( batter_prop_outcomes_df=batter_prop_outcomes_df, statcast_df=statcast_df, graded_at=utc_now_iso(), ) if not graded_df.empty: replace_batter_prop_outcomes(conn, graded_df) except Exception: pass def attach_game_pk_from_schedule( scores_like_df: pd.DataFrame, schedule_df: pd.DataFrame, ) -> pd.DataFrame: if scores_like_df is None or scores_like_df.empty: return pd.DataFrame() if scores_like_df is None else scores_like_df if schedule_df is None or schedule_df.empty or "game_pk" not in schedule_df.columns: return scores_like_df.copy() def canon(name: str) -> str: if not name: return "" n = str(name).strip().lower() mapping = { "usa": "united states", "u.s.a.": "united states", "united states": "united states", "puerto rico": "puerto rico", "dominican republic": "dominican republic", "great britain": "great britain", "chinese taipei": "chinese taipei", "taiwan": "chinese taipei", "czech republic": "czechia", "czechia": "czechia", "korea": "korea", "south korea": "korea", "japan": "japan", "brazil": "brazil", "italy": "italy", "nicaragua": "nicaragua", "mexico": "mexico", "venezuela": "venezuela", "colombia": "colombia", "panama": "panama", "netherlands": "netherlands", "cuba": "cuba", "israel": "israel", "canada": "canada", "australia": "australia", "china": "china", } return mapping.get(n, n) scores = scores_like_df.copy() schedule = schedule_df.copy() if "away_team" not in scores.columns or "home_team" not in scores.columns: return scores scores["away_key"] = scores["away_team"].apply(canon) scores["home_key"] = scores["home_team"].apply(canon) schedule["away_key"] = schedule["away_team"].apply(canon) schedule["home_key"] = schedule["home_team"].apply(canon) schedule_keys = ( schedule[["away_key", "home_key", "game_pk"]] .dropna(subset=["away_key", "home_key"]) .drop_duplicates(subset=["away_key", "home_key"]) .copy() ) scores = scores.merge( schedule_keys, on=["away_key", "home_key"], how="left", suffixes=("", "_sched"), ) if "game_pk_sched" in scores.columns: if "game_pk" in scores.columns: scores["game_pk"] = ( scores["game_pk"] .astype(str) .replace({"": pd.NA, "nan": pd.NA, "None": pd.NA}) .combine_first(scores["game_pk_sched"]) ) else: scores["game_pk"] = scores["game_pk_sched"] scores = scores.drop(columns=["game_pk_sched"]) return scores def inject_live_auto_refresh(interval_ms: int = 5000) -> None: components.html( f""" """, height=0, ) def render_dashboard() -> None: phase6_debug_rows = [] st.subheader("Live Dashboard") st.caption("Professional scoreboard view") st.caption(f"Dashboard date: {current_dashboard_date_str()} (America/New_York)") st.caption( f"Live dashboard refresh cadence: {LIVE_FEED_TTL_SECONDS}s cache. " f"Use browser refresh or swap tabs for immediate update." ) dashboard_date_str = current_dashboard_date_str() scores_df = get_stable_scores_for_dashboard_date(dashboard_date_str) schedule_date_str = dashboard_date_str if not scores_df.empty and "scores_source_date" in scores_df.columns: try: source_date = str(scores_df["scores_source_date"].iloc[0]).strip() if source_date: schedule_date_str = source_date except Exception: pass dashboard_payload = _build_dashboard_ready_payload( dashboard_date_str=dashboard_date_str, schedule_date_str=schedule_date_str, scores_json=scores_df.to_json(orient="split", date_format="iso"), ) schedule_df = dashboard_payload["schedule_df"] live_games = dashboard_payload["live_games"] final_games = dashboard_payload["final_games"] scheduled_games = dashboard_payload["scheduled_games"] baseline_bundle = dashboard_payload["baseline_slice"] if isinstance(dashboard_payload.get("baseline_slice"), dict) else {} statcast_df = dashboard_payload["statcast_df"] pitcher_statcast_df = dashboard_payload["pitcher_statcast_df"] odds_df = dashboard_payload["odds_df"] filter_option = st.radio( "Game Status", ["All", "Live", "Final", "Scheduled"], horizontal=True, key="dashboard_filter", ) live_games = sort_scoreboard_games(normalize_game_cards_df(live_games)) final_games = sort_scoreboard_games(normalize_game_cards_df(final_games)) scheduled_games = sort_scoreboard_games(normalize_game_cards_df(scheduled_games)) auto_refresh_live = st.sidebar.checkbox( "Full Page Auto Refresh Toggle", value=False, key="auto_refresh_live_dashboard", ) if auto_refresh_live and not live_games.empty: inject_live_auto_refresh(interval_ms=5000) if filter_option == "All": if not live_games.empty: render_live_games_fragment( live_games=live_games, statcast_df=statcast_df, pitcher_statcast_df=pitcher_statcast_df, odds_df=odds_df, ) render_scoreboard_section("FINAL", final_games) render_scoreboard_section("UPCOMING", scheduled_games) elif filter_option == "Live": if not live_games.empty: render_live_games_fragment( live_games=live_games, statcast_df=statcast_df, pitcher_statcast_df=pitcher_statcast_df, odds_df=odds_df, ) if live_games.empty and final_games.empty and scheduled_games.empty: st.warning("No games available from either schedule or scores feed.") render_live_prop_odds_debug_panel(live_games) render_statcast_retry_fragment() if statcast_df.empty: st.info("No WBC Statcast rows returned for the selected window.") def render_players() -> None: st.subheader("WBC Player Analytics") statcast_df = read_table(conn, "shared_hitter_baseline_event_rows").drop( columns=["snapshot_built_at", "snapshot_version", "source_status"], errors="ignore", ) if statcast_df.empty: statcast_df = load_statcast_recent() if statcast_df.empty: st.info("No recent WBC Statcast data available.") return st.caption(f"Loaded {len(statcast_df)} WBC Statcast rows") col1, col2 = st.columns(2) with col1: st.plotly_chart(create_exit_velocity_chart(statcast_df), use_container_width=True) with col2: st.plotly_chart(create_launch_angle_chart(statcast_df), use_container_width=True) def compute_market_edges(odds_df: pd.DataFrame) -> pd.DataFrame: if odds_df.empty: return odds_df out = odds_df.copy() out["implied_prob"] = out["price"].apply(american_to_implied_prob) grouped_rows: list[dict] = [] for (event_id, sportsbook, market_key), group in out.groupby(["event_id", "sportsbook", "market_key"]): temp = group.copy().reset_index(drop=True) if len(temp) == 2: p1, p2 = temp.loc[0, "implied_prob"], temp.loc[1, "implied_prob"] nv1, nv2 = remove_vig_two_way(p1, p2) temp.loc[0, "no_vig_prob"] = nv1 temp.loc[1, "no_vig_prob"] = nv2 else: total = temp["implied_prob"].sum() temp["no_vig_prob"] = temp["implied_prob"] / total if total else temp["implied_prob"] market_key_val = str(market_key).lower() has_model = market_key_val in ("player_props_hr", "batter_home_runs", "hr") for _, row in temp.iterrows(): grouped_rows.append( { **row.to_dict(), "model_prob": None, "edge": None, "kelly": None, "has_model": has_model, } ) return pd.DataFrame(grouped_rows) def render_betting() -> None: from visualization.betting_page import render_betting_tab bundle = load_upcoming_hr_props_bundle() render_betting_tab(bundle, load_fn=load_upcoming_hr_props_bundle) def render_matchups() -> None: st.subheader("WBC Matchup Analyzer") hitter_meta = read_table(conn, "shared_hitter_baseline_meta") pitcher_meta = read_table(conn, "shared_pitcher_baseline_meta") available_players = sorted( hitter_meta.get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist() ) if not hitter_meta.empty else [] available_pitchers = sorted( pitcher_meta.get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist() ) if not pitcher_meta.empty else [] if not available_players or not available_pitchers: baseline_bundle = load_shared_baseline_bundle_cached() hitter_meta = baseline_bundle.get("batter_baseline_meta", pd.DataFrame()) pitcher_meta = baseline_bundle.get("pitcher_baseline_meta", pd.DataFrame()) available_players = sorted( hitter_meta.get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist() ) if not hitter_meta.empty else [] available_pitchers = sorted( pitcher_meta.get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist() ) if not pitcher_meta.empty else [] if not available_players or not available_pitchers: st.info("No WBC Statcast available yet.") return selected_pitcher_player = st.selectbox( "Pitcher reference sample", options=available_pitchers, ) selected_venue = st.text_input("Venue", value="loanDepot park") selected_batters = st.multiselect("Batters", options=available_players, default=available_players[:8]) baseline_bundle = load_shared_baseline_bundle_cached( batter_names=_coerce_name_tuple(selected_batters), pitcher_names=_coerce_name_tuple([selected_pitcher_player]), ) statcast_df = baseline_bundle.get("blended_batter_df", pd.DataFrame()) pitcher_statcast_df = baseline_bundle.get("blended_pitcher_df", pd.DataFrame()) if statcast_df.empty: statcast_df = load_statcast_recent() if pitcher_statcast_df.empty: pitcher_statcast_df = load_statcast_recent() if statcast_df.empty or pitcher_statcast_df.empty: st.info("No WBC Statcast available yet.") return batter_df = batter_summary(statcast_df) if batter_df.empty: st.info("No batter summary available.") return pitcher_events = pitcher_statcast_df[pitcher_statcast_df["player_name"] == selected_pitcher_player].copy() pitcher_profile = pitcher_baseline_from_events(pitcher_events) weather_df = load_weather(selected_venue) temperature_f = None wind_speed_mph = None if not weather_df.empty: temperature_f = float(weather_df.iloc[0]["temperature_f"]) if pd.notna(weather_df.iloc[0]["temperature_f"]) else None wind_speed_mph = float(weather_df.iloc[0]["wind_speed_mph"]) if pd.notna(weather_df.iloc[0]["wind_speed_mph"]) else None rows = [] filtered_batters = batter_df[batter_df["player_name"].isin(selected_batters)].copy() for _, batter_row in filtered_batters.iterrows(): result = calculate_matchup_score( batter_row=batter_row, pitcher_profile=pitcher_profile, venue_name=selected_venue, temperature_f=temperature_f, wind_speed_mph=wind_speed_mph, ) rows.append( { "player_name": batter_row["player_name"], **result, } ) matchup_df = pd.DataFrame(rows).sort_values("matchup_score", ascending=False) st.dataframe(matchup_df, use_container_width=True, hide_index=True) col1, col2 = st.columns(2) with col1: st.plotly_chart(create_matchup_score_chart(matchup_df), use_container_width=True) with col2: st.plotly_chart(create_hit_hr_chart(matchup_df), use_container_width=True) if not matchup_df.empty: selected_player = st.selectbox("Simulate batter", options=matchup_df["player_name"].tolist()) sim_row = matchup_df[matchup_df["player_name"] == selected_player].iloc[0] sim_df = simulate_batter_outcomes( hit_prob=float(sim_row["hit_prob"]), hr_prob=float(sim_row["hr_prob"]), n_sims=10000, ) c1, c2, c3 = st.columns(3) c1.metric("Sim Hit Rate", f"{sim_df['hit'].mean():.2%}") c2.metric("Sim HR Rate", f"{sim_df['hr'].mean():.2%}") c3.metric("Avg Total Bases", f"{sim_df['total_bases'].mean():.2f}") col3, col4 = st.columns(2) with col3: st.plotly_chart(create_total_bases_distribution(sim_df, selected_player), use_container_width=True) with col4: st.plotly_chart(create_hr_distribution(sim_df, selected_player), use_container_width=True) def render_bet_tracker() -> None: st.subheader("Bet Tracker") with st.form("bet_form", clear_on_submit=True): c1, c2, c3 = st.columns(3) sportsbook = c1.text_input("Sportsbook", value="DraftKings") market = c2.text_input("Market", value="h2h") selection = c3.text_input("Selection", value="Example Team") c4, c5, c6 = st.columns(3) odds = c4.number_input("Odds", min_value=-1000, max_value=1000, value=120, step=1) stake = c5.number_input("Stake", min_value=0.0, value=10.0, step=1.0) game_id = c6.text_input("Game ID", value="") notes = st.text_input("Notes", value="") submitted = st.form_submit_button("Log Bet") if submitted: bet_id = next_bet_id(conn) insert_bet( conn=conn, bet_id=bet_id, created_at=utc_now_iso(), sportsbook=sportsbook, market=market, selection=selection, odds=int(odds), stake=float(stake), result="open", profit=0.0, game_id=game_id, notes=notes, ) st.success(f"Logged bet #{bet_id}") bets_df = read_table(conn, "bets") if bets_df.empty: st.info("No bets logged yet.") return st.dataframe(bets_df, use_container_width=True, hide_index=True) with st.expander("Grade a bet"): bet_id_to_grade = st.number_input("Bet ID", min_value=1, step=1, value=1) result = st.selectbox("Result", options=["win", "loss"]) if st.button("Apply Grade"): row = bets_df[bets_df["bet_id"] == bet_id_to_grade] if row.empty: st.error("Bet ID not found.") else: stake = float(row.iloc[0]["stake"]) odds = int(row.iloc[0]["odds"]) profit = grade_profit(stake, odds, result) update_bet_result(conn, int(bet_id_to_grade), result, profit) st.success(f"Updated bet #{bet_id_to_grade} to {result}") bets_df = read_table(conn, "bets") metrics = summary_metrics(bets_df) c1, c2, c3, c4 = st.columns(4) c1.metric("Graded Bets", metrics["bets"]) c2.metric("Profit", f"${metrics['profit']:.2f}") c3.metric("ROI", f"{metrics['roi']:.2%}") c4.metric("Win Rate", f"{metrics['win_rate']:.2%}") curve_df = bankroll_curve(bets_df) st.plotly_chart(create_bankroll_chart(curve_df), use_container_width=True) def render_alpha_release() -> None: st.subheader("Alpha Release") st.info( "**Kasper is in alpha.** Model probabilities are statistical estimates, not guarantees. " "Edge values reflect model output vs. market implied probability — they do not predict outcomes. " "All outputs are for informational and research purposes only." ) st.markdown( """ **Kasper** is a pre-game and live-game baseball analytics engine built for the 2026 MLB season. It ingests Statcast data, live game feeds, and sportsbook odds to compute batter HR probabilities, compare them against the market, and surface edges in real time. This is an **alpha release** — the model stack is functional and actively processing live data, but outputs are under ongoing validation. Calibration data is accumulating each game day. """ ) with st.expander("System Overview", expanded=False): st.markdown( """ **What Kasper currently supports:** - Live game recommendations (Dashboard) — HR, Hit, Total Bases props for batters On Deck / In Hole / 3 Away - Pre-game HR prop analysis (Props tab) — edge vs. retail books (DraftKings, FanDuel, BetMGM, Caesars) - Execution layer (Alpha) — cross-book market comparison, edge quality filtering, final recommendation score - Full debug visibility — adjustment ladders, signal attribution, execution layer diagnostics **Data sources:** - Statcast (Baseball Savant) — batter and pitcher features, 14-day rolling window - MLB Schedule API — live game state, lineup, score - Sportsbook odds API — HR prop odds from retail books """ ) with st.expander("How It Works", expanded=False): st.markdown( """ **Signal flow:** ``` Statcast features → Batter baseline (EV90, barrel rate, hard-hit rate, xwOBA, launch angle) → Pitcher adjustment (velo, EV allowed, barrel rate allowed) → Context adjustments (park, weather, bullpen state) → Zone / arsenal / family-zone matchup overlays → Trend & rolling form (5/10-game windows) → Opportunity adjustment (expected PA given game state) → Fair probability → American odds → Compare vs. sportsbook implied probability → Edge = model prob − book implied prob → Execution layer (market disagreement, confidence, timing, final score) → Recommendation: BET / WATCH / PASS ``` """ ) with st.expander("Core Math", expanded=False): st.markdown( r""" **Baseline probability** (per batter, pre-game): - EV90, barrel rate, hard-hit rate, xwOBA, launch angle → weighted sum → bounded probability - Bounds: HR [0.5%, 22%], Hit [5%, 50%], TB2P [3%, 42%] **Edge:** ``` edge = model_prob − implied_prob(book_odds) ``` Positive edge = model believes event is more likely than the market does. **Adjusted edge** (live Dashboard): ``` adjusted_edge = hr_edge + slot_boost slot_boost: On Deck +1.2pp, In Hole +0.6pp, 3 Away +0.0pp ``` **Execution score** (Execution Layer): ``` base = edge_filtered × (0.4 + confidence × 0.6) score = base − vol_penalty + market_bonus + timing_bonus score clamped to [−0.30, +0.30] ``` **Recommendation tiers:** - BET: adjusted_edge ≥ 6% AND confidence ≥ 78 - WATCH: adjusted_edge ≥ 2.5% AND confidence ≥ 62 - PASS: all others """ ) with st.expander("Signal Library", expanded=False): st.markdown( """ | Signal | Source | Type | |--------|--------|------| | EV90 | Statcast (90th pct exit velo) | Batter power | | Barrel rate | Statcast | Batter quality contact | | Hard-hit rate | Statcast | Batter contact strength | | xwOBA | Statcast | Batter overall quality | | Launch angle | Statcast | HR trajectory profile | | Pitcher velo | Statcast | Pitcher difficulty | | EV allowed | Statcast | Pitcher weakness | | Zone matchup | Statcast pitch zones | Pitch-to-zone alignment | | Arsenal matchup | Statcast pitch types | Batter vs. pitch family | | Rolling form | 5/10-game window | Recent batter/pitcher trend | | Bullpen state | Live game feed | Leverage / transition risk | | Park factor | Venue lookup | HR environment | | Platoon | Batter/pitcher handedness | Splits adjustment | | Opportunity | Game state (outs, slot) | Expected PA probability | """ ) with st.expander("Execution Layer (Alpha)", expanded=False): st.markdown( """ The Execution Layer is a post-model pass that does **not** modify probabilities. It operates on already-computed outputs (model probs + book odds) to improve edge selection. **Five passes:** 1. **Market Disagreement** — best/median/worst implied prob across books; flags outlier and stale books 2. **Edge Quality** — confidence score (source quality), volatility score (market width), signal strength; filters edge_raw → edge_filtered 3. **Timing Heuristics** — detects aggressive prices (>2pp better than median) and timestamp presence 4. **Correlation** — flags all HR props as positively correlated; detects stacked games (>2 players per game) 5. **Final Score** — blends edge_filtered, confidence, volatility, market width, and timing into a [−0.30, +0.30] score Visible in: Props tab → "Execution Layer" expander | Debug tab → "Execution Layer (Props)" expander """ ) with st.expander("System Health", expanded=False): st.markdown( """ | Feed | Refresh | Notes | |------|---------|-------| | Live game feed | 5s TTL | Live only when games in progress | | Scores | 8s TTL | | | Schedule | 300s TTL | | | Statcast | 600s TTL | 14-day rolling window | | Odds (moneyline) | 30s TTL | Used for Betting tab | | HR props (live, per game) | 60s TTL | Wired into Dashboard recommendations | | HR props (pre-game) | On demand | Via Props tab | Data is stored in CockroachDB. Tables: `recommendation_logs`, `upcoming_hr_props`, `batter_prop_outcomes`, `game_outcomes`, `feedback_submissions`. """ ) with st.expander("Alpha Scope", expanded=False): st.markdown( """ **Primary focus:** HR props (home run probability) HR is the primary market because: - It has the clearest Statcast signal (EV90, barrel rate, launch angle) - It's a binary outcome — clean to evaluate - Books offer consistent retail HR prop lines (DK, FD, BetMGM, Caesars) Hit and Total Bases props are computed and displayed but receive less model focus in alpha. """ ) with st.expander("Known Limitations", expanded=False): st.markdown( """ - **Pre-game baseline only** (Props tab): No live lineup, park, or weather context. Model uses season Statcast features. - **Live book odds**: When live HR prop odds are unavailable for a game, the Dashboard uses market-neutral reference odds (~+425). These are labeled with `~` in the BOOK column. - **Calibration**: Model has not yet accumulated a full-season outcome dataset. Probability estimates are structurally reasonable but not empirically calibrated to 2026 data. - **Name mapping**: Sportsbook player names sometimes differ from Statcast names. Some players may show "unavailable" source until mapping is added. - **No closing line value (CLV)**: CLV tracking requires final closing odds — not yet wired. - **No account for lineup scratches**: If a player is scratched post-lineup release, the model doesn't know. """ ) with st.expander("Feedback & Roadmap", expanded=False): st.markdown( """ Use the **Feedback** tab to submit observations, bugs, or suggestions. **Near-term roadmap:** - Post-game outcome grading and calibration reports - Closing line value (CLV) tracking - Hit and Total Bases model calibration - XGBoost model integration (currently shadow mode only) """ ) def main() -> None: render_header() page = st.sidebar.radio( "Navigation", options=[ "Dashboard", "Props", "Card Lab", "Odds", "Bet Tracker", "Alpha Release", "Feedback", "Debug", ], ) st.sidebar.caption(f"Live: {LIVE_FEED_TTL_SECONDS}s | Scores: {SCORES_TTL_SECONDS}s | " f"Schedule: {SCHEDULE_TTL_SECONDS}s | Statcast: {STATCAST_TTL_SECONDS}s") loader = render_loading_shell() if page == "Dashboard": loader["update"]("Loading dashboard data and live game context...", 0.35) render_dashboard() loader["clear"]() elif page == "Props": loader["update"]("Loading pregame props, statcast context, and projected lineups...", 0.25) loader["update"]("Loading sportsbook props and probable starters...", 0.45) _upcoming_props_bundle = load_upcoming_hr_props_bundle() st.session_state["upcoming_props_bundle_debug"] = _upcoming_props_bundle _probable_starters = load_probable_starters() loader["update"]("Preparing market-scoped baseline context...", 0.70) loader["update"]("Building Props Command Center...", 0.85) render_props( None, conn=conn, raw_props=_upcoming_props_bundle["merged_props_feed"], pitcher_statcast_df=None, probable_starters=_probable_starters, ) loader["clear"]() elif page == "Card Lab": loader["update"]("Loading Card Lab assets...", 0.45) render_card_lab(conn=conn) loader["clear"]() elif page == "Odds": loader["update"]("Loading live odds...", 0.45) render_betting() loader["clear"]() elif page == "Bet Tracker": loader["update"]("Loading bet tracker...", 0.45) render_bet_tracker() loader["clear"]() elif page == "Alpha Release": loader["update"]("Loading release notes and explainer content...", 0.45) render_alpha_release() loader["clear"]() elif page == "Feedback": loader["update"]("Loading feedback tools...", 0.45) render_feedback(conn) loader["clear"]() elif page == "Debug": loader["update"]("Loading debug diagnostics and provider visibility...", 0.30) _debug_scores = get_stable_scores_for_dashboard_date(current_wbc_date_str()) _debug_baseline_bundle = load_shared_baseline_bundle_from_snapshots( max_age_seconds=max(STATCAST_TTL_SECONDS, 60 * 60) ) _debug_read_status = { "hitter_event_rows": { "table_name": "shared_hitter_baseline_event_rows", "read_source": "baseline_bundle", "read_attempts": 1, "retry_used": False, "read_error": "", "snapshot_built_at": None, "source_status": str(_debug_baseline_bundle.get("snapshot_source_status") or ""), }, "pitcher_event_rows": { "table_name": "shared_pitcher_baseline_event_rows", "read_source": "baseline_bundle", "read_attempts": 1, "retry_used": False, "read_error": "", "snapshot_built_at": None, "source_status": str(_debug_baseline_bundle.get("snapshot_source_status") or ""), }, } _baseline_status_df = _debug_baseline_bundle.get("snapshot_status", pd.DataFrame()) if isinstance(_baseline_status_df, pd.DataFrame) and not _baseline_status_df.empty: for _table_key, _status_key in [ ("shared_hitter_baseline_event_rows", "hitter_event_rows"), ("shared_pitcher_baseline_event_rows", "pitcher_event_rows"), ]: _row = _baseline_status_df[_baseline_status_df["table_name"] == _table_key] if not _row.empty: _debug_read_status[_status_key]["snapshot_built_at"] = _row.iloc[0].get("snapshot_built_at") _debug_read_status[_status_key]["source_status"] = _row.iloc[0].get("source_status") _debug_hitter_df = _debug_baseline_bundle.get("blended_batter_df", pd.DataFrame()).copy() _debug_pitcher_df = _debug_baseline_bundle.get("blended_pitcher_df", pd.DataFrame()).copy() if not isinstance(_debug_hitter_df, pd.DataFrame): _debug_hitter_df = pd.DataFrame() if not isinstance(_debug_pitcher_df, pd.DataFrame): _debug_pitcher_df = pd.DataFrame() _debug_hitter_df = _debug_hitter_df.drop( columns=["snapshot_built_at", "snapshot_version", "source_status"], errors="ignore", ) _debug_pitcher_df = _debug_pitcher_df.drop( columns=["snapshot_built_at", "snapshot_version", "source_status"], errors="ignore", ) if _debug_hitter_df.empty: _debug_hitter_df, _debug_read_status["hitter_event_rows"] = read_table_retryable( conn, "shared_hitter_baseline_event_rows", ) _debug_hitter_df = _debug_hitter_df.drop( columns=["snapshot_built_at", "snapshot_version", "source_status"], errors="ignore", ) if _debug_pitcher_df.empty: _debug_pitcher_df, _debug_read_status["pitcher_event_rows"] = read_table_retryable( conn, "shared_pitcher_baseline_event_rows", ) _debug_pitcher_df = _debug_pitcher_df.drop( columns=["snapshot_built_at", "snapshot_version", "source_status"], errors="ignore", ) if _debug_hitter_df.empty: _debug_read_status["hitter_event_rows"]["read_source"] = "load_statcast_recent_fallback" _debug_hitter_df = load_statcast_recent() if _debug_pitcher_df.empty: _debug_read_status["pitcher_event_rows"]["read_source"] = "debug_hitter_fallback" _debug_pitcher_df = _debug_hitter_df loader["update"]("Rendering debug tables...", 0.75) render_debug( statcast_df=_debug_hitter_df, pitcher_statcast_df=_debug_pitcher_df, baseline_bundle=_debug_baseline_bundle, odds_df=load_odds(), conn=conn, live_games=pd.DataFrame(), scores_df=_debug_scores, upcoming_props_debug=load_upcoming_hr_props_bundle(), grade_outcomes_fn=grade_final_game_outcomes_from_scores, grade_props_fn=grade_batter_prop_outcomes_from_audit, fill_realized_fn=fill_batter_prop_realized_outcomes, debug_event_row_status=_debug_read_status, ) loader["clear"]() if __name__ == "__main__": main()