2026_MLB_Model / app.py
Syntrex's picture
Rework Betting tab → Odds: live market-by-market sportsbook comparison
77c7ee3
raw
history blame
160 kB
from __future__ import annotations
from io import StringIO
import threading
from datetime import date, timedelta
from typing import Any
import pandas as pd
import re
import streamlit as st
import streamlit.components.v1 as components
from data.live_prop_odds import (
best_book_by_player_market,
fetch_live_prop_odds,
fetch_all_upcoming_hr_props_bundle,
normalize_prop_odds,
)
from analytics.no_vig_props import (
american_to_implied_prob,
compute_bet_ev,
compute_edge,
)
from analytics.recommendation_logger import (
build_recommendation_log_rows,
build_recommendation_outcome_rows,
)
from analytics.batter_audit_metrics import (
build_batter_hr_tier_table,
build_batter_hr_confidence_table,
build_batter_hr_edge_table,
)
from analytics.batter_realization import build_batter_realization_rows
from analytics.batter_prop_grader import build_batter_prop_outcome_rows_from_audit
from analytics.game_completion import on_game_complete, upsert_live_pitch_and_pa_rows, _safe_float
from utils.logger import logger
from analytics.outcome_grader import build_game_outcome_rows_from_scores
from analytics.bankroll import bankroll_curve, grade_profit, summary_metrics
from analytics.edge import (
calculate_edge,
kelly_fraction,
remove_vig_two_way,
)
from analytics.no_vig_props import (
american_to_implied_prob,
compute_bet_ev,
compute_edge,
)
from models.batter_zone_model import classify_zone_bucket, normalize_pitch_family
from models.batter_zone_store import (
insert_batter_zone_events,
load_batter_zone_store_metrics,
)
from models.pitcher_adjustment import build_pitcher_feature_row
from config.settings import (
APP_TITLE,
DEFAULT_EDGE_THRESHOLD,
ODDS_API_KEY,
OPENWEATHER_API_KEY,
REFRESH_TTL_SECONDS,
LIVE_FEED_TTL_SECONDS,
SCORES_TTL_SECONDS,
SCHEDULE_TTL_SECONDS,
STATCAST_TTL_SECONDS,
ENABLE_ENTERPRISE_PROVIDER,
LIVE_PROP_ODDS_TTL_SECONDS,
DEFAULT_PROP_BOOKS,
DEFAULT_PROP_MARKETS,
)
from models.pitcher_baseline_store import (
upsert_inning_first_seed_event,
load_pitcher_cross_game_baseline,
)
from analytics.evaluation_metrics import (
build_hr_calibration_table,
build_edge_bucket_table,
build_confidence_table,
build_tier_performance_table,
build_ere_table,
build_ere_by_edge_bucket_table,
build_ere_by_confidence_bucket_table,
build_ere_by_tier_table,
build_clv_table,
build_clv_by_tier_table,
)
from models.batter_zone_store import insert_batter_zone_events
from analytics.recommendation_engine import build_upcoming_hitter_recommendations
from models.live_fair_simulator_v3 import build_upcoming_simulated_rows
from visualization.recommendation_panels import render_recommendation_panels
from visualization.model_explainer import render_model_explainer
from data.live_game_feed import fetch_live_game_feed
from engine.live_game_engine import enrich_game_from_live_feed
from utils.dates import current_wbc_date_str
from data.scores import fetch_scores_for_date
from data.odds import fetch_featured_odds
from data.schedule import fetch_schedule_for_date
from data.shared_baseline import load_or_build_shared_baseline_bundle, load_shared_baseline_bundle_from_snapshots
from data.statcast import fetch_statcast_range, fetch_statcast_range_pitcher, normalize_statcast
from data.weather import fetch_weather_for_venue
from database.db import (
get_connection,
insert_bet,
next_bet_id,
read_table,
read_table_retryable,
read_cached_odds,
read_cached_probable_starters,
read_cached_probable_starters_meta,
read_cached_schedule_for_date,
read_cached_upcoming_props_bundle,
read_cached_weather_for_venue,
update_bet_result,
upsert_dataframe,
replace_cached_odds,
replace_cached_probable_starters,
replace_cached_schedule,
replace_cached_upcoming_props_bundle,
replace_cached_weather,
ensure_recommendation_logs_table,
insert_recommendation_logs,
ensure_recommendation_outcomes_table,
insert_recommendation_outcomes,
read_recommendation_audit_view,
ensure_game_outcomes_table,
insert_game_outcomes,
read_game_outcomes,
ensure_batter_prop_outcomes_table,
insert_batter_prop_outcomes,
read_batter_prop_outcomes,
replace_batter_prop_outcomes,
read_batter_prop_audit_view,
ensure_upcoming_hr_props_table,
insert_upcoming_hr_props,
)
from features.batter_features import batter_summary
from features.pitch_features import add_pitch_features
from models.matchup_model import calculate_matchup_score
from models.pitch_model import pitcher_baseline_from_events
from simulation.monte_carlo import simulate_batter_outcomes
from utils.helpers import utc_now_iso
from visualization.batter import create_exit_velocity_chart, create_launch_angle_chart
from visualization.betting import create_bankroll_chart, create_edge_chart
from visualization.matchup import create_hit_hr_chart, create_matchup_score_chart
from visualization.pitcher import create_pitch_movement_chart
from visualization.props_page import render_props
from visualization.loading_shell import render_loading_shell
from visualization.simulation import create_hr_distribution, create_total_bases_distribution
from visualization.game_cards import render_game_card
from visualization.debug_page import render_debug
from visualization.feedback_page import render_feedback
from visualization.card_lab_page import render_card_lab
st.set_page_config(
page_title=APP_TITLE,
layout="wide",
page_icon="⚾",
)
st.markdown(
"""
<style>
.stApp {
background: linear-gradient(180deg, #08111f 0%, #0b1728 100%);
}
.block-container {
padding-top: 1rem;
padding-bottom: 2rem;
max-width: 1500px;
}
div[data-testid="stMetric"] {
background: rgba(255,255,255,0.04);
border: 1px solid rgba(255,255,255,0.08);
border-radius: 16px;
padding: 12px;
}
.score-filter-wrap {
margin: 0.5rem 0 1rem 0;
}
.section-title {
color: #cbd5e1;
letter-spacing: 0.15em;
font-size: 0.95rem;
font-weight: 700;
margin: 1rem 0 0.75rem 0;
}
.score-card {
background: linear-gradient(180deg, rgba(30,41,59,0.96) 0%, rgba(15,23,42,0.96) 100%);
border: 1px solid rgba(148,163,184,0.18);
border-radius: 22px;
padding: 16px 16px 14px 16px;
margin-bottom: 16px;
box-shadow: 0 10px 30px rgba(0,0,0,0.18);
}
.score-card-top {
display: flex;
justify-content: space-between;
align-items: flex-start;
margin-bottom: 10px;
}
.status-badge {
display: inline-flex;
align-items: center;
gap: 7px;
font-size: 0.95rem;
font-weight: 700;
padding: 4px 10px;
border-radius: 999px;
background: rgba(255,255,255,0.04);
}
.status-live {
color: #22c55e;
}
.status-final {
color: #fbbf24;
}
.status-scheduled {
color: #94a3b8;
}
.status-dot {
width: 8px;
height: 8px;
border-radius: 999px;
display: inline-block;
}
.dot-live {
background: #22c55e;
}
.dot-final {
background: #fbbf24;
}
.dot-scheduled {
background: #94a3b8;
}
.team-line {
display: flex;
justify-content: space-between;
align-items: center;
padding: 6px 0;
}
.team-meta {
display: flex;
align-items: baseline;
gap: 10px;
min-width: 0;
}
.team-name {
color: #e5e7eb;
font-size: 1.15rem;
font-weight: 800;
line-height: 1.2;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
max-width: 230px;
}
.team-record {
color: #94a3b8;
font-size: 0.9rem;
font-weight: 600;
}
.team-score {
color: #f8fafc;
font-size: 2rem;
font-weight: 900;
line-height: 1;
min-width: 24px;
text-align: right;
}
.rhe-header,
.rhe-row {
display: grid;
grid-template-columns: 1.8fr 0.5fr 0.5fr 0.5fr;
gap: 6px;
align-items: center;
}
.rhe-header {
color: #94a3b8;
font-size: 0.78rem;
font-weight: 800;
letter-spacing: 0.08em;
margin-top: 10px;
margin-bottom: 4px;
}
.rhe-row {
color: #cbd5e1;
font-size: 0.95rem;
font-weight: 700;
padding: 2px 0;
}
.score-card-bottom {
margin-top: 10px;
min-height: 20px;
}
.broadcast-line {
color: #94a3b8;
font-size: 0.9rem;
font-weight: 600;
}
.wp-wrap {
margin-top: 8px;
margin-bottom: 6px;
}
.wp-label-row {
display: flex;
justify-content: space-between;
color: #94a3b8;
font-size: 0.78rem;
font-weight: 700;
margin-bottom: 4px;
}
.wp-bar {
position: relative;
width: 100%;
height: 8px;
background: rgba(255,255,255,0.08);
border-radius: 999px;
overflow: hidden;
}
.wp-away {
position: absolute;
left: 0;
top: 0;
bottom: 0;
background: linear-gradient(90deg, #38bdf8 0%, #60a5fa 100%);
border-radius: 999px;
}
.diamond-wrap {
position: relative;
width: 42px;
height: 34px;
opacity: 0.95;
margin-right: 2px;
}
.diamond-base {
position: absolute;
width: 10px;
height: 10px;
transform: rotate(45deg);
border-radius: 2px;
border: 1px solid rgba(255,255,255,0.18);
}
.diamond-top {
top: 0;
left: 16px;
}
.diamond-left {
top: 12px;
left: 4px;
}
.diamond-right {
top: 12px;
left: 28px;
}
.diamond-home {
top: 24px;
left: 16px;
}
.base-on {
background: #fbbf24;
box-shadow: 0 0 10px rgba(251,191,36,0.25);
}
.base-off {
background: rgba(255,255,255,0.08);
}
.base-home {
background: rgba(255,255,255,0.14);
}
</style>
""",
unsafe_allow_html=True,
)
conn = get_connection()
MAX_LIVE_FEEDS = 6
ensure_recommendation_logs_table(conn)
ensure_recommendation_outcomes_table(conn)
ensure_game_outcomes_table(conn)
ensure_batter_prop_outcomes_table(conn)
_ASYNC_REFRESH_LOCK = threading.Lock()
_ASYNC_REFRESH_KEYS: set[str] = set()
def _queue_async_refresh(key: str, fn) -> bool:
with _ASYNC_REFRESH_LOCK:
if key in _ASYNC_REFRESH_KEYS:
return False
_ASYNC_REFRESH_KEYS.add(key)
def _run() -> None:
try:
fn()
except Exception as exc:
logger.warning("[async_refresh] key=%s error=%s", key, exc)
finally:
with _ASYNC_REFRESH_LOCK:
_ASYNC_REFRESH_KEYS.discard(key)
threading.Thread(target=_run, daemon=True).start()
return True
def _run_with_fresh_conn(write_fn) -> None:
fresh_conn = None
try:
fresh_conn = get_connection()
write_fn(fresh_conn)
finally:
if fresh_conn is not None:
try:
fresh_conn.close()
except Exception:
pass
def _fire_completion(pk_str: str, game_date: str, scores_df: pd.DataFrame) -> None:
"""Start the game-completion pipeline in a daemon thread with a fresh DB connection."""
scores_snapshot = scores_df.copy()
def _run() -> None:
fresh_conn = None
try:
from database.db import get_connection as _get_conn
fresh_conn = _get_conn()
on_game_complete(fresh_conn, int(pk_str), game_date, scores_snapshot)
except Exception as exc:
logger.warning("[_fire_completion] thread error game_pk=%s: %s", pk_str, exc)
finally:
if fresh_conn is not None:
try:
fresh_conn.close()
except Exception:
pass
t = threading.Thread(target=_run, daemon=True)
t.start()
def _fire_live_pitch_upsert(pitch_rows: list[dict], pa_rows: list[dict]) -> None:
"""Fire live pitch + PA upsert in a daemon thread to avoid pool contention."""
pitch_snapshot = list(pitch_rows)
pa_snapshot = list(pa_rows)
def _run() -> None:
try:
upsert_live_pitch_and_pa_rows(pitch_snapshot, pa_snapshot)
except Exception as exc:
logger.warning("[_fire_live_pitch_upsert] error: %s", exc)
threading.Thread(target=_run, daemon=True).start()
@st.fragment(run_every=3)
def render_live_games_fragment(
live_games: pd.DataFrame,
statcast_df: pd.DataFrame,
pitcher_statcast_df: pd.DataFrame | None = None,
odds_df: pd.DataFrame | None = None,
) -> None:
import datetime as _dt
_today = _dt.date.today().strftime("%Y-%m-%d")
_scores = load_scores_for_dashboard_date(_today)
_curr_live: set[str] = set()
if not _scores.empty and "game_pk" in _scores.columns and "status" in _scores.columns:
_curr_live = {
str(r["game_pk"])
for _, r in _scores.iterrows()
if "live" in str(r.get("status", "")).lower()
or any(kw in str(r.get("status", "")).lower()
for kw in ("top", "bot", "middle", "mid", "inning", "in progress"))
}
_prev_live: set[str] = st.session_state.get("_live_pks", set())
_done: set[str] = st.session_state.get("_completed_pks", set())
for _pk_str in (_prev_live - _curr_live - _done):
if _scores.empty:
continue
_row = _scores[_scores["game_pk"].astype(str) == _pk_str]
if not _row.empty and "final" in str(_row.iloc[0].get("status", "")).lower():
_game_date = str(_row.iloc[0].get("game_date", _today))[:10]
_fire_completion(_pk_str, _game_date, _scores)
_done = _done | {_pk_str}
logger.info("[live_fragment] fired completion pipeline for game_pk=%s", _pk_str)
st.session_state["_live_pks"] = _curr_live
st.session_state["_completed_pks"] = _done
render_live_games_with_edge_strips(
live_games=live_games,
statcast_df=statcast_df,
pitcher_statcast_df=pitcher_statcast_df,
odds_df=odds_df,
)
@st.fragment(run_every=300)
def render_statcast_retry_fragment() -> None:
"""
Every 5 minutes: find completed games in game_outcomes that still have 0 rows
in statcast_event_core and re-attempt ingestion (handles the 30–60 min Savant lag).
Runs silently — no UI output.
"""
try:
from sqlalchemy import text as _text
pending = pd.read_sql(
_text("""
SELECT DISTINCT g.game_pk, g.graded_at
FROM game_outcomes g
WHERE g.status ILIKE '%final%'
AND NOT EXISTS (
SELECT 1 FROM statcast_event_core s
WHERE s.game_pk = g.game_pk::BIGINT
)
AND NOT EXISTS (
SELECT 1 FROM live_pitch_mix_2026 lpm
WHERE lpm.game_pk = g.game_pk::BIGINT
)
ORDER BY g.graded_at DESC
LIMIT 10
"""),
conn,
)
if pending.empty:
return
for _, row in pending.iterrows():
_pk_str = str(row.get("game_pk", "")).strip()
if not _pk_str:
continue
_done: set[str] = st.session_state.get("_completed_pks", set())
if _pk_str in _done:
continue
import datetime as _dt
_today = _dt.date.today().strftime("%Y-%m-%d")
_scores = load_scores_for_dashboard_date(_today)
_fire_completion(_pk_str, _today, _scores)
logger.info("[retry_fragment] re-firing completion for game_pk=%s", _pk_str)
except Exception as exc:
logger.warning("[retry_fragment] error: %s", exc)
@st.cache_data(ttl=3, show_spinner=False)
def build_prepared_live_games_df_cached(live_games_json: str) -> pd.DataFrame:
live_games = pd.read_json(live_games_json)
return build_prepared_live_games_df(live_games)
@st.cache_data(ttl=SCHEDULE_TTL_SECONDS)
def load_wbc_schedule() -> pd.DataFrame:
return fetch_schedule_for_date(current_wbc_date_str())
@st.cache_data(ttl=SCHEDULE_TTL_SECONDS)
def load_wbc_schedule_for_date(date_str: str) -> pd.DataFrame:
return fetch_schedule_for_date(date_str)
@st.cache_data(ttl=LIVE_PROP_ODDS_TTL_SECONDS)
def load_live_prop_odds_for_game(game_context: dict) -> pd.DataFrame:
raw = fetch_live_prop_odds(
game_context=game_context,
sportsbooks=DEFAULT_PROP_BOOKS,
markets=DEFAULT_PROP_MARKETS,
)
normalized = normalize_prop_odds(raw)
best = best_book_by_player_market(normalized)
return best
@st.cache_data(ttl=60 * 60 * 6, show_spinner=False)
def load_statcast_current_season_full() -> pd.DataFrame:
today = pd.Timestamp.utcnow().date()
year = today.year
start_date = pd.Timestamp(year=year, month=1, day=1).date()
raw = fetch_statcast_range(start_date.isoformat(), today.isoformat())
normalized = normalize_statcast(raw)
enriched = add_pitch_features(normalized)
return enriched
@st.cache_data(ttl=60 * 60 * 12, show_spinner=False)
def load_statcast_previous_season_full() -> pd.DataFrame:
today = pd.Timestamp.utcnow().date()
previous_year = today.year - 1
start_date = pd.Timestamp(year=previous_year, month=1, day=1).date()
end_date = pd.Timestamp(year=previous_year, month=12, day=31).date()
raw = fetch_statcast_range(start_date.isoformat(), end_date.isoformat())
normalized = normalize_statcast(raw)
import logging as _logging
_logging.getLogger(__name__).warning(
"[statcast_load] rows=%d unique_players=%d sample_names=%s",
len(normalized),
normalized["player_name"].nunique() if not normalized.empty else 0,
normalized["player_name"].head(3).tolist() if not normalized.empty else [],
)
enriched = add_pitch_features(normalized)
return enriched
@st.cache_data(ttl=60 * 60 * 12, show_spinner=False)
def load_statcast_previous_season_full_pitcher() -> pd.DataFrame:
"""2025 season pitcher-perspective statcast. player_name = pitcher name."""
today = pd.Timestamp.utcnow().date()
previous_year = today.year - 1
start_date = pd.Timestamp(year=previous_year, month=1, day=1).date()
end_date = pd.Timestamp(year=previous_year, month=12, day=31).date()
raw = fetch_statcast_range_pitcher(start_date.isoformat(), end_date.isoformat())
normalized = normalize_statcast(raw)
return add_pitch_features(normalized)
@st.cache_data(ttl=60 * 60 * 1, show_spinner=False)
def load_probable_starters() -> dict:
"""Probable starting pitchers for next 7 days from MLB Stats API."""
from data.mlb_starters import fetch_probable_starters_for_props
try:
cached_meta = read_cached_probable_starters_meta(conn)
if not cached_meta.empty:
fetched_at = cached_meta.iloc[0]["fetched_at"]
fetched_ts = pd.to_datetime(fetched_at, errors="coerce", utc=True)
cache_age_seconds = None
if pd.notna(fetched_ts):
cache_age_seconds = max(
0,
int((pd.Timestamp.now(tz="UTC") - fetched_ts).total_seconds()),
)
cached = read_cached_probable_starters(conn)
if cached:
if _is_fetched_at_fresh(fetched_at, 60 * 60):
st.session_state["probable_starters_refresh_mode"] = "cache_fresh"
st.session_state["probable_starters_cache_age_seconds"] = cache_age_seconds
return cached
_queue_async_refresh(
"probable_starters",
lambda: _run_with_fresh_conn(
lambda fresh_conn: replace_cached_probable_starters(
fresh_conn,
fetch_probable_starters_for_props(),
)
),
)
st.session_state["probable_starters_refresh_mode"] = "stale_cache_served_async_refresh"
st.session_state["probable_starters_cache_age_seconds"] = cache_age_seconds
return cached
except Exception:
pass
fresh = fetch_probable_starters_for_props()
try:
replace_cached_probable_starters(conn, fresh)
except Exception as exc:
logger.warning("[load_probable_starters] cache persist failure: %s", exc)
st.session_state["probable_starters_refresh_mode"] = "fresh_network_load"
st.session_state["probable_starters_cache_age_seconds"] = 0
return fresh
@st.cache_data(ttl=STATCAST_TTL_SECONDS)
def load_statcast_recent() -> pd.DataFrame:
end_date_str = current_dashboard_date_str()
end_date = pd.to_datetime(end_date_str).date()
start_date = end_date - timedelta(days=14)
raw = fetch_statcast_range(start_date.isoformat(), end_date.isoformat())
normalized = normalize_statcast(raw)
enriched = add_pitch_features(normalized)
return enriched
def _coerce_name_tuple(values: list[str] | tuple[str, ...] | set[str] | None) -> tuple[str, ...]:
if not values:
return tuple()
normalized = sorted({str(value).strip() for value in values if str(value).strip()})
return tuple(normalized)
def _extract_prop_player_names(raw_props: pd.DataFrame | None) -> tuple[str, ...]:
if raw_props is None or raw_props.empty or "player_name" not in raw_props.columns:
return tuple()
return _coerce_name_tuple(raw_props["player_name"].dropna().astype(str).tolist())
def _extract_prop_pitcher_names(raw_props: pd.DataFrame | None) -> tuple[str, ...]:
if raw_props is None or raw_props.empty or "player_name" not in raw_props.columns:
return tuple()
if "market" not in raw_props.columns:
return tuple()
k_rows = raw_props[raw_props["market"].astype(str).str.lower() == "k"].copy()
if k_rows.empty:
return tuple()
return _coerce_name_tuple(k_rows["player_name"].dropna().astype(str).tolist())
def _extract_probable_starter_names(probable_starters: dict | None) -> tuple[str, ...]:
if not probable_starters:
return tuple()
names: list[str] = []
for payload in probable_starters.values():
if not isinstance(payload, dict):
continue
for key in ("away_pitcher", "home_pitcher", "pitcher_name"):
value = str(payload.get(key) or "").strip()
if value:
names.append(value)
return _coerce_name_tuple(names)
def _extract_live_dashboard_participants(live_games: pd.DataFrame) -> tuple[tuple[str, ...], tuple[str, ...]]:
if live_games is None or live_games.empty:
return tuple(), tuple()
batter_names: list[str] = []
pitcher_names: list[str] = []
for col in ["on_deck_name", "in_hole_name", "three_away_name", "batter_name"]:
if col in live_games.columns:
batter_names.extend(
[
str(value).strip()
for value in live_games[col].dropna().astype(str).tolist()
if str(value).strip()
]
)
for col in ["pitcher_name"]:
if col in live_games.columns:
pitcher_names.extend(
[
str(value).strip()
for value in live_games[col].dropna().astype(str).tolist()
if str(value).strip()
]
)
return _coerce_name_tuple(batter_names), _coerce_name_tuple(pitcher_names)
def _is_fetched_at_fresh(value: object, max_age_seconds: int) -> bool:
try:
ts = pd.to_datetime(value, errors="coerce", utc=True)
if pd.isna(ts):
return False
age_seconds = max(0.0, float((pd.Timestamp.now(tz="UTC") - ts).total_seconds()))
return age_seconds <= float(max_age_seconds)
except Exception:
return False
def _latest_fetched_at_from_df(df: pd.DataFrame) -> object:
if df is None or df.empty or "fetched_at" not in df.columns:
return None
try:
return pd.to_datetime(df["fetched_at"], errors="coerce", utc=True).max()
except Exception:
return None
def _hr_bundle_is_complete(bundle: dict | None) -> bool:
completeness = dict((bundle or {}).get("hr_snapshot_completeness") or {})
if not completeness:
return True
return bool(completeness.get("is_complete", True))
def _hr_bundle_is_usable(bundle: dict | None) -> bool:
state = str((bundle or {}).get("hr_snapshot_state") or "").strip().lower()
if state in {"usable_complete", "usable_partial", "stale_degraded"}:
return True
current_rows = int((bundle or {}).get("current_hr_row_count") or 0)
if current_rows > 0:
return True
completeness = dict((bundle or {}).get("hr_snapshot_completeness") or {})
return int(completeness.get("row_count") or 0) > 0
def _parse_iso_utc(value: object) -> pd.Timestamp | None:
try:
ts = pd.to_datetime(value, errors="coerce", utc=True)
return None if pd.isna(ts) else ts
except Exception:
return None
def _hr_bundle_needs_draftkings_refresh(bundle: dict | None) -> bool:
completeness = dict((bundle or {}).get("hr_snapshot_completeness") or {})
missing_books = {
str(book).strip().lower()
for book in completeness.get("missing_books", []) or []
if str(book).strip()
}
if "draftkings" not in missing_books:
return False
retry_after_map = dict((bundle or {}).get("adapter_retry_after_by_book") or {})
retry_after_ts = _parse_iso_utc(retry_after_map.get("draftkings"))
if retry_after_ts is not None and retry_after_ts > pd.Timestamp.now(tz="UTC"):
return False
return True
@st.cache_data(ttl=STATCAST_TTL_SECONDS, show_spinner=False)
def load_shared_baseline_bundle_cached(
batter_names: tuple[str, ...] = (),
pitcher_names: tuple[str, ...] = (),
) -> dict:
return load_or_build_shared_baseline_bundle(
batter_names=batter_names,
pitcher_names=pitcher_names,
max_age_seconds=max(STATCAST_TTL_SECONDS, 60 * 60),
persist_runtime_refresh=True,
)
@st.cache_data(ttl=STATCAST_TTL_SECONDS, show_spinner=False)
def load_shared_baseline_page_slice_cached(
batter_names: tuple[str, ...] = (),
pitcher_names: tuple[str, ...] = (),
) -> dict:
bundle = load_shared_baseline_bundle_cached(
batter_names=batter_names,
pitcher_names=pitcher_names,
)
return {
"blended_batter_df": bundle.get("blended_batter_df", pd.DataFrame()),
"blended_pitcher_df": bundle.get("blended_pitcher_df", pd.DataFrame()),
"batter_baseline_meta": bundle.get("batter_baseline_meta", pd.DataFrame()),
"pitcher_baseline_meta": bundle.get("pitcher_baseline_meta", pd.DataFrame()),
"hitter_rolling_snapshot": bundle.get("hitter_rolling_snapshot", pd.DataFrame()),
"pitcher_rolling_snapshot": bundle.get("pitcher_rolling_snapshot", pd.DataFrame()),
"snapshot_status": bundle.get("snapshot_status", pd.DataFrame()),
"snapshot_source_status": bundle.get("snapshot_source_status"),
"runtime_fallback_used": bundle.get("runtime_fallback_used"),
"requested_hitter_count": bundle.get("requested_hitter_count"),
"requested_pitcher_count": bundle.get("requested_pitcher_count"),
"resolved_hitter_count": bundle.get("resolved_hitter_count"),
"resolved_pitcher_count": bundle.get("resolved_pitcher_count"),
"missing_hitter_names": bundle.get("missing_hitter_names", []),
"missing_pitcher_names": bundle.get("missing_pitcher_names", []),
"snapshot_coverage_mode": bundle.get("snapshot_coverage_mode"),
"background_refresh_queued": bundle.get("background_refresh_queued"),
}
@st.cache_data(ttl=SCORES_TTL_SECONDS, show_spinner=False)
def _build_dashboard_ready_payload(
dashboard_date_str: str,
schedule_date_str: str,
scores_json: str,
) -> dict[str, Any]:
try:
scores_df = pd.read_json(StringIO(scores_json), orient="split")
except Exception:
scores_df = pd.DataFrame()
schedule_df = load_dashboard_schedule_for_date(schedule_date_str)
live_games, final_games, scheduled_games = split_games_for_scoreboard(
schedule_df=schedule_df,
scores_df=scores_df,
)
if live_games.empty and final_games.empty and not schedule_df.empty:
try:
fallback_scores_df = build_scores_from_schedule_via_live_feeds(schedule_df)
if fallback_scores_df is not None and not fallback_scores_df.empty:
live_games, final_games, scheduled_games = split_games_for_scoreboard(
schedule_df=schedule_df,
scores_df=fallback_scores_df,
)
else:
fallback_scores_df = pd.DataFrame()
except Exception:
fallback_scores_df = pd.DataFrame()
else:
fallback_scores_df = pd.DataFrame()
if live_games.empty and final_games.empty and not scores_df.empty and "status" in scores_df.columns:
recovery_scores = normalize_game_cards_df(scores_df.copy())
recovery_scores = attach_game_pk_from_schedule(recovery_scores, schedule_df)
recovery_status = recovery_scores["status"].fillna("").astype(str).str.strip().str.lower()
recovery_live_mask = recovery_status.str.contains(
r"live|top|bot|bottom|mid|middle|inning|in progress|delayed|suspended",
regex=True,
na=False,
)
recovery_final_mask = recovery_status.str.contains(
r"final|game over|completed|ended",
regex=True,
na=False,
)
recovered_live_games = recovery_scores[recovery_live_mask].copy()
recovered_final_games = recovery_scores[recovery_final_mask].copy()
if not recovered_live_games.empty or not recovered_final_games.empty:
live_games = recovered_live_games
final_games = recovered_final_games
dashboard_batter_names, dashboard_pitcher_names = _extract_live_dashboard_participants(live_games)
baseline_slice = (
load_shared_baseline_page_slice_cached(
batter_names=dashboard_batter_names,
pitcher_names=dashboard_pitcher_names,
)
if not live_games.empty
else {}
)
statcast_df = baseline_slice.get("blended_batter_df", pd.DataFrame()) if isinstance(baseline_slice, dict) else pd.DataFrame()
pitcher_statcast_df = baseline_slice.get("blended_pitcher_df", pd.DataFrame()) if isinstance(baseline_slice, dict) else pd.DataFrame()
if statcast_df.empty and not live_games.empty:
statcast_df = load_statcast_recent()
if pitcher_statcast_df.empty and not live_games.empty:
pitcher_statcast_df = statcast_df
live_games = sort_scoreboard_games(normalize_game_cards_df(live_games))
final_games = sort_scoreboard_games(normalize_game_cards_df(final_games))
scheduled_games = sort_scoreboard_games(normalize_game_cards_df(scheduled_games))
return {
"dashboard_date_str": dashboard_date_str,
"schedule_date_str": schedule_date_str,
"scores_df": scores_df,
"schedule_df": schedule_df,
"live_games": live_games,
"final_games": final_games,
"scheduled_games": scheduled_games,
"baseline_slice": baseline_slice,
"statcast_df": statcast_df,
"pitcher_statcast_df": pitcher_statcast_df,
"odds_df": load_odds(),
}
@st.cache_data(ttl=max(REFRESH_TTL_SECONDS, STATCAST_TTL_SECONDS), show_spinner=False)
def _build_betting_ready_payload() -> dict[str, Any]:
baseline_slice = load_shared_baseline_page_slice_cached()
statcast_df = baseline_slice.get("blended_batter_df", pd.DataFrame()) if isinstance(baseline_slice, dict) else pd.DataFrame()
if statcast_df.empty:
statcast_df = load_statcast_recent()
odds_df = load_odds()
edges_df = compute_market_edges(odds_df)
top_edges = edges_df.sort_values("no_vig_prob", ascending=False).head(30) if not edges_df.empty else pd.DataFrame()
return {
"schedule_df": load_wbc_schedule(),
"baseline_slice": baseline_slice,
"statcast_df": statcast_df,
"odds_df": odds_df,
"edges_df": edges_df,
"top_edges": top_edges,
}
@st.cache_data(ttl=60, show_spinner=False)
def load_hr_prop_odds_for_game(away_team: str, home_team: str) -> pd.DataFrame:
"""Fetch live HR prop odds for a specific game. Returns empty df on failure."""
try:
from data.live_prop_odds import fetch_live_prop_odds
game_context = {"away_team": away_team, "home_team": home_team}
df = fetch_live_prop_odds(
game_context=game_context,
markets=["batter_home_runs"],
)
return df if df is not None else pd.DataFrame()
except Exception as exc:
logger.warning("[load_hr_prop_odds_for_game] failure: %s", exc)
return pd.DataFrame()
@st.cache_data(ttl=300, show_spinner=False)
def load_upcoming_hr_props() -> pd.DataFrame:
"""Fetch HR props for all upcoming games. Cached 5 min to limit API credit burn."""
try:
return load_upcoming_hr_props_bundle()["merged_props_feed"]
except Exception as exc:
logger.warning("[load_upcoming_hr_props] failure: %s", exc)
return pd.DataFrame()
@st.cache_data(ttl=300, show_spinner=False)
def load_upcoming_hr_props_bundle() -> dict:
try:
_cache_result: list[dict | None] = [None]
def _read_db_cache() -> None:
try:
_cache_result[0] = read_cached_upcoming_props_bundle(conn, cache_key="default")
except Exception:
pass
_dbt = threading.Thread(target=_read_db_cache, daemon=True)
_dbt.start()
_dbt.join(timeout=10)
if _cache_result[0] is None:
raise RuntimeError("DB cache read timed out — falling through to live fetch")
cached_bundle = _cache_result[0]
cache_meta = cached_bundle.get("cache_meta", pd.DataFrame())
merged = cached_bundle.get("merged_props_feed", pd.DataFrame())
coverage = cached_bundle.get("coverage_summary", pd.DataFrame())
coverage_api = cached_bundle.get("coverage_summary_api", pd.DataFrame())
coverage_scraper_added = cached_bundle.get("coverage_summary_scraper_added", pd.DataFrame())
coverage_final = cached_bundle.get("coverage_summary_final", pd.DataFrame())
coverage_hr_api = cached_bundle.get("coverage_summary_hr_api", pd.DataFrame())
coverage_hr_supplemental = cached_bundle.get("coverage_summary_hr_supplemental", pd.DataFrame())
coverage_hr_final = cached_bundle.get("coverage_summary_hr_final", pd.DataFrame())
missing_books_by_market = cached_bundle.get("missing_books_by_market", pd.DataFrame())
missing_event_books_by_market = cached_bundle.get("missing_event_books_by_market", pd.DataFrame())
missing_hr_books_global = cached_bundle.get("missing_hr_books_global", pd.DataFrame())
missing_hr_books_by_event = cached_bundle.get("missing_hr_books_by_event", pd.DataFrame())
hr_snapshot_completeness = cached_bundle.get("hr_snapshot_completeness", {})
hr_snapshot_state = str(cached_bundle.get("hr_snapshot_state") or "")
current_hr_row_count = int(cached_bundle.get("current_hr_row_count") or 0)
current_hr_event_count = int(cached_bundle.get("current_hr_event_count") or 0)
last_known_good_hr_row_count = int(cached_bundle.get("last_known_good_hr_row_count") or 0)
last_known_good_hr_built_at = str(cached_bundle.get("last_known_good_hr_built_at") or "")
hr_refresh_overwrite_prevented = bool(cached_bundle.get("hr_refresh_overwrite_prevented"))
adapter_status_by_book = cached_bundle.get("adapter_status_by_book", {})
adapter_error_by_book = cached_bundle.get("adapter_error_by_book", {})
adapter_rows_by_book = cached_bundle.get("adapter_rows_by_book", {})
adapter_last_attempted_at_by_book = cached_bundle.get("adapter_last_attempted_at_by_book", {})
adapter_retry_after_by_book = cached_bundle.get("adapter_retry_after_by_book", {})
if not cache_meta.empty and isinstance(merged, pd.DataFrame) and not merged.empty:
bundle_payload = {
"odds_api_raw": pd.DataFrame(),
"scraper_raw": pd.DataFrame(),
"merged_props_feed": merged if isinstance(merged, pd.DataFrame) else pd.DataFrame(),
"coverage_summary": coverage if isinstance(coverage, pd.DataFrame) else pd.DataFrame(),
"coverage_summary_api": coverage_api if isinstance(coverage_api, pd.DataFrame) else pd.DataFrame(),
"coverage_summary_scraper_added": coverage_scraper_added if isinstance(coverage_scraper_added, pd.DataFrame) else pd.DataFrame(),
"coverage_summary_final": coverage_final if isinstance(coverage_final, pd.DataFrame) else pd.DataFrame(),
"coverage_summary_hr_api": coverage_hr_api if isinstance(coverage_hr_api, pd.DataFrame) else pd.DataFrame(),
"coverage_summary_hr_supplemental": coverage_hr_supplemental if isinstance(coverage_hr_supplemental, pd.DataFrame) else pd.DataFrame(),
"coverage_summary_hr_final": coverage_hr_final if isinstance(coverage_hr_final, pd.DataFrame) else pd.DataFrame(),
"missing_books_by_market": missing_books_by_market if isinstance(missing_books_by_market, pd.DataFrame) else pd.DataFrame(),
"missing_event_books_by_market": missing_event_books_by_market if isinstance(missing_event_books_by_market, pd.DataFrame) else pd.DataFrame(),
"missing_hr_books_global": missing_hr_books_global if isinstance(missing_hr_books_global, pd.DataFrame) else pd.DataFrame(),
"missing_hr_books_by_event": missing_hr_books_by_event if isinstance(missing_hr_books_by_event, pd.DataFrame) else pd.DataFrame(),
"hr_snapshot_completeness": dict(hr_snapshot_completeness or {}),
"hr_snapshot_state": hr_snapshot_state,
"current_hr_row_count": current_hr_row_count,
"current_hr_event_count": current_hr_event_count,
"last_known_good_hr_row_count": last_known_good_hr_row_count,
"last_known_good_hr_built_at": last_known_good_hr_built_at,
"hr_refresh_overwrite_prevented": hr_refresh_overwrite_prevented,
"adapter_status_by_book": dict(adapter_status_by_book or {}),
"adapter_error_by_book": dict(adapter_error_by_book or {}),
"adapter_rows_by_book": dict(adapter_rows_by_book or {}),
"adapter_last_attempted_at_by_book": dict(adapter_last_attempted_at_by_book or {}),
"adapter_retry_after_by_book": dict(adapter_retry_after_by_book or {}),
"scraper_candidate_count": int(cached_bundle.get("scraper_candidate_count") or 0),
"scraper_added_count": int(cached_bundle.get("scraper_added_count") or 0),
"scraper_duplicate_reject_count": int(cached_bundle.get("scraper_duplicate_reject_count") or 0),
"cache_meta": cache_meta,
"cache_source": "db_snapshot",
}
cache_is_fresh = _is_fetched_at_fresh(cache_meta.iloc[0]["fetched_at"], 300)
hr_complete = _hr_bundle_is_complete(bundle_payload)
hr_usable = _hr_bundle_is_usable(bundle_payload)
dk_refresh_needed = _hr_bundle_needs_draftkings_refresh(bundle_payload)
if cache_is_fresh and (hr_usable or not dk_refresh_needed):
if dk_refresh_needed:
_queue_async_refresh(
"upcoming_props_bundle",
lambda: _run_with_fresh_conn(
lambda fresh_conn: replace_cached_upcoming_props_bundle(
fresh_conn,
fetch_all_upcoming_hr_props_bundle(sportsbooks=DEFAULT_PROP_BOOKS),
cache_key="default",
)
),
)
return bundle_payload
if (not cache_is_fresh) or (dk_refresh_needed and not hr_usable):
_queue_async_refresh(
"upcoming_props_bundle",
lambda: _run_with_fresh_conn(
lambda fresh_conn: replace_cached_upcoming_props_bundle(
fresh_conn,
fetch_all_upcoming_hr_props_bundle(sportsbooks=DEFAULT_PROP_BOOKS),
cache_key="default",
)
),
)
bundle_payload["cache_source"] = "stale_snapshot" if not cache_is_fresh else "incomplete_snapshot"
return bundle_payload
except Exception:
pass
try:
bundle = fetch_all_upcoming_hr_props_bundle(sportsbooks=DEFAULT_PROP_BOOKS)
try:
_persist_result: list[Exception | None] = [None]
def _do_persist() -> None:
try:
replace_cached_upcoming_props_bundle(conn, bundle, cache_key="default")
except Exception as _exc:
_persist_result[0] = _exc
_pt = threading.Thread(target=_do_persist, daemon=True)
_pt.start()
_pt.join(timeout=15)
if _pt.is_alive():
logger.warning("[load_upcoming_hr_props_bundle] cache persist timed out after 15s — skipping")
elif _persist_result[0] is not None:
logger.warning("[load_upcoming_hr_props_bundle] cache persist failure: %s", _persist_result[0])
except Exception as exc:
logger.warning("[load_upcoming_hr_props_bundle] cache persist failure: %s", exc)
bundle["cache_source"] = "live_fetch"
return bundle
except Exception as exc:
logger.warning("[load_upcoming_hr_props_bundle] failure: %s", exc)
return {
"odds_api_raw": pd.DataFrame(),
"scraper_raw": pd.DataFrame(),
"merged_props_feed": pd.DataFrame(),
"coverage_summary": pd.DataFrame(),
"coverage_summary_api": pd.DataFrame(),
"coverage_summary_scraper_added": pd.DataFrame(),
"coverage_summary_final": pd.DataFrame(),
"coverage_summary_hr_api": pd.DataFrame(),
"coverage_summary_hr_supplemental": pd.DataFrame(),
"coverage_summary_hr_final": pd.DataFrame(),
"missing_books_by_market": pd.DataFrame(),
"missing_event_books_by_market": pd.DataFrame(),
"missing_hr_books_global": pd.DataFrame(),
"missing_hr_books_by_event": pd.DataFrame(),
"hr_snapshot_completeness": {},
"hr_snapshot_state": "empty",
"current_hr_row_count": 0,
"current_hr_event_count": 0,
"last_known_good_hr_row_count": 0,
"last_known_good_hr_built_at": "",
"hr_refresh_overwrite_prevented": False,
"adapter_status_by_book": {},
"adapter_error_by_book": {},
"adapter_rows_by_book": {},
"adapter_last_attempted_at_by_book": {},
"adapter_retry_after_by_book": {},
"scraper_candidate_count": 0,
"scraper_added_count": 0,
"scraper_duplicate_reject_count": 0,
"cache_meta": pd.DataFrame(),
"cache_source": "unavailable",
}
@st.cache_data(ttl=REFRESH_TTL_SECONDS)
def load_odds() -> pd.DataFrame:
try:
cached = read_cached_odds(conn)
if not cached.empty:
if _is_fetched_at_fresh(cached.iloc[0]["fetched_at"], REFRESH_TTL_SECONDS):
return cached
_queue_async_refresh(
"featured_odds",
lambda: _run_with_fresh_conn(
lambda fresh_conn: replace_cached_odds(fresh_conn, fetch_featured_odds())
),
)
return cached
except Exception:
pass
fresh = fetch_featured_odds()
try:
replace_cached_odds(conn, fresh)
except Exception as exc:
logger.warning("[load_odds] cache persist failure: %s", exc)
return fresh
@st.cache_data(ttl=SCHEDULE_TTL_SECONDS)
def load_dashboard_schedule_for_date(date_str: str) -> pd.DataFrame:
try:
cached = read_cached_schedule_for_date(conn, date_str)
latest_cached_at = _latest_fetched_at_from_df(cached)
if not cached.empty:
if _is_fetched_at_fresh(latest_cached_at, SCHEDULE_TTL_SECONDS):
return cached
_queue_async_refresh(
f"schedule:{date_str}",
lambda: _run_with_fresh_conn(
lambda fresh_conn: replace_cached_schedule(
fresh_conn,
fetch_schedule_for_date(date_str),
)
),
)
return cached
except Exception:
pass
fresh = fetch_schedule_for_date(date_str)
try:
replace_cached_schedule(conn, fresh)
except Exception as exc:
logger.warning("[load_dashboard_schedule_for_date] cache persist failure: %s", exc)
return fresh
@st.cache_data(ttl=SCORES_TTL_SECONDS)
def load_scores_for_dashboard_date(date_str: str) -> pd.DataFrame:
try:
df = fetch_scores_for_date(date_str)
if df is None or df.empty:
return pd.DataFrame()
out = df.copy()
out["scores_source_date"] = date_str
return out
except Exception:
return pd.DataFrame()
def get_stable_scores_for_dashboard_date(date_str: str) -> pd.DataFrame:
fresh_scores = load_scores_for_dashboard_date(date_str)
if _scores_df_has_live_or_final_content(fresh_scores):
st.session_state["last_good_scores_df"] = fresh_scores.copy()
return fresh_scores
if "last_good_scores_df" in st.session_state:
return st.session_state["last_good_scores_df"].copy()
return fresh_scores
def load_weather(venue_name: str) -> pd.DataFrame:
try:
cached = read_cached_weather_for_venue(conn, venue_name)
latest_cached_at = _latest_fetched_at_from_df(cached)
if not cached.empty:
if _is_fetched_at_fresh(latest_cached_at, REFRESH_TTL_SECONDS):
return cached.head(1).reset_index(drop=True)
_queue_async_refresh(
f"weather:{venue_name}",
lambda: _run_with_fresh_conn(
lambda fresh_conn: replace_cached_weather(
fresh_conn,
fetch_weather_for_venue(venue_name),
)
),
)
return cached.head(1).reset_index(drop=True)
except Exception:
pass
fresh = fetch_weather_for_venue(venue_name)
try:
if fresh is not None and not fresh.empty:
replace_cached_weather(conn, fresh)
except Exception as exc:
logger.warning("[load_weather] cache persist failure: %s", exc)
return fresh
def render_header() -> None:
st.title("\n\n\n⚾ Kasper")
st.caption(
"All-in-One Baseball Assistant. Excellent for finding Home Run True +EV. "
"Full pitch telemetry with XGBoost Machine Learning model trained on a 3.8M pitch-event "
"data set + live data with custom anchors."
)
secret_status = []
secret_status.append("ODDS_API_KEY ✓" if ODDS_API_KEY else "ODDS_API_KEY missing")
st.caption("💰".join(secret_status))
def render_source_diagnostics(
schedule_df: pd.DataFrame,
statcast_df: pd.DataFrame,
odds_df: pd.DataFrame | None = None,
scores_df: pd.DataFrame | None = None,
) -> None:
c1, c2, c3, c4 = st.columns(4)
c1.metric("Schedule rows", int(len(schedule_df)))
c2.metric("Scores rows", int(len(scores_df)) if scores_df is not None else 0)
c3.metric("Statcast rows", int(len(statcast_df)))
c4.metric("Odds rows", int(len(odds_df)) if odds_df is not None else 0)
def _scores_df_has_live_or_final_content(df: pd.DataFrame) -> bool:
"""
This helper should answer only one question:
does the scores dataframe contain usable LIVE/FINAL STATUSES?
Do NOT count raw score values here, because partially parsed scheduled-style
rows with score numbers can incorrectly block the schedule->live-feed fallback.
"""
if df is None or df.empty:
return False
temp = df.copy()
if "status" not in temp.columns:
return False
status_series = temp["status"].fillna("").astype(str).str.strip().str.lower()
has_live_or_final_status = status_series.str.contains(
r"live|top|bot|bottom|mid|middle|inning|in progress|delayed|suspended|final|game over|completed|ended",
regex=True,
na=False,
).any()
return bool(has_live_or_final_status)
@st.cache_data(ttl=SCORES_TTL_SECONDS)
def load_scores_for_today() -> pd.DataFrame:
base_date = pd.to_datetime(current_wbc_date_str()).date()
candidates: list[pd.DataFrame] = []
for offset in range(0, 4):
candidate_date = (base_date - timedelta(days=offset)).isoformat()
try:
df = fetch_scores_for_date(candidate_date)
if df is not None and not df.empty:
out = df.copy()
out["scores_source_date"] = candidate_date
candidates.append(out)
except Exception as e:
logger.warning(f"[scores_source_date_enrich] failure: {e}", exc_info=True)
for df in candidates:
if _scores_df_has_live_or_final_content(df):
return df
if candidates:
return candidates[0]
return pd.DataFrame()
def current_dashboard_date_str() -> str:
return pd.Timestamp.now(tz="America/New_York").date().isoformat()
def get_stable_scores_for_dashboard() -> pd.DataFrame:
fresh_scores = load_scores_for_today()
if _scores_df_has_live_or_final_content(fresh_scores):
st.session_state["last_good_scores_df"] = fresh_scores.copy()
return fresh_scores
if "last_good_scores_df" in st.session_state:
return st.session_state["last_good_scores_df"].copy()
return fresh_scores
WBC_COUNTRY_NAMES = {
"australia",
"canada",
"china",
"chinese taipei",
"colombia",
"cuba",
"czech republic",
"dominican republic",
"great britain",
"israel",
"italy",
"japan",
"korea",
"mexico",
"netherlands",
"nicaragua",
"panama",
"puerto rico",
"united states",
"usa",
"venezuela",
}
MLB_TEAM_NAMES = {
"angels",
"astros",
"athletics",
"blue jays",
"braves",
"brewers",
"cardinals",
"cubs",
"diamondbacks",
"dodgers",
"giants",
"guardians",
"mariners",
"marlins",
"mets",
"nationals",
"orioles",
"padres",
"phillies",
"pirates",
"rangers",
"rays",
"red sox",
"reds",
"rockies",
"royals",
"tigers",
"twins",
"white sox",
"yankees",
}
def _normalize_team_bucket_name(name: str) -> str:
text = str(name or "").strip().lower()
text = " ".join(text.split())
return text
def _infer_competition_bucket(
away_team: str,
home_team: str,
raw_game: dict | None = None,
) -> str:
raw_game = raw_game or {}
# Preferred: use feed metadata when available
possible_text = " ".join(
[
str(raw_game.get("series_description", "") or ""),
str(raw_game.get("seriesDescription", "") or ""),
str(raw_game.get("game_type", "") or ""),
str(raw_game.get("gameType", "") or ""),
str(raw_game.get("competition_name", "") or ""),
str(raw_game.get("tournament_name", "") or ""),
]
).lower()
if "world baseball classic" in possible_text or "wbc" in possible_text:
return "WBC"
if "mlb" in possible_text or "major league" in possible_text:
return "MLB"
away_norm = _normalize_team_bucket_name(away_team)
home_norm = _normalize_team_bucket_name(home_team)
if away_norm in WBC_COUNTRY_NAMES and home_norm in WBC_COUNTRY_NAMES:
return "WBC"
away_is_mlb = any(away_norm == team or away_norm.endswith(f" {team}") for team in MLB_TEAM_NAMES)
home_is_mlb = any(home_norm == team or home_norm.endswith(f" {team}") for team in MLB_TEAM_NAMES)
if away_is_mlb and home_is_mlb:
return "MLB"
return "OTHER"
def merge_schedule_and_scores(schedule_df: pd.DataFrame, scores_df: pd.DataFrame) -> pd.DataFrame:
if schedule_df.empty and scores_df.empty:
return pd.DataFrame()
if schedule_df.empty:
return scores_df.copy()
if scores_df.empty:
return schedule_df.copy()
merged = schedule_df.merge(
scores_df,
on=["game_date", "away_team", "home_team"],
how="left",
suffixes=("", "_score"),
)
if "away_score_score" in merged.columns:
merged["away_score"] = merged["away_score_score"].combine_first(merged.get("away_score"))
merged = merged.drop(columns=["away_score_score"])
if "home_score_score" in merged.columns:
merged["home_score"] = merged["home_score_score"].combine_first(merged.get("home_score"))
merged = merged.drop(columns=["home_score_score"])
if "status_score" in merged.columns:
merged["status"] = merged["status_score"].combine_first(merged.get("status"))
merged = merged.drop(columns=["status_score"])
if "start_time_et_score" in merged.columns:
if "start_time_et" in merged.columns:
merged["start_time_et"] = merged["start_time_et"].combine_first(merged["start_time_et_score"])
else:
merged["start_time_et"] = merged["start_time_et_score"]
merged = merged.drop(columns=["start_time_et_score"])
if "tv_score" in merged.columns:
if "tv" in merged.columns:
merged["tv"] = merged["tv"].combine_first(merged["tv_score"])
else:
merged["tv"] = merged["tv_score"]
merged = merged.drop(columns=["tv_score"])
return merged
def _normalize_pitch_type_key(pitch_type: str) -> str:
text = str(pitch_type or "").strip().lower()
if text in {"", "nan", "none"}:
return "unknown"
if "4-seam" in text or "four-seam" in text or "four seam" in text:
return "four_seam"
if "sinker" in text:
return "sinker"
if "cutter" in text:
return "cutter"
if "slider" in text:
return "slider"
if "sweeper" in text:
return "sweeper"
if "curve" in text:
return "curveball"
if "change" in text:
return "changeup"
if "split" in text or "fork" in text:
return "splitter"
if "knuckle" in text:
return "knuckleball"
return re.sub(r"[^a-z0-9]+", "_", text).strip("_") or "unknown"
def normalize_game_cards_df(df: pd.DataFrame) -> pd.DataFrame:
if df is None or df.empty:
return pd.DataFrame()
out = df.copy()
for col in ["away_team", "home_team", "status", "tv", "start_time_et", "away_record", "home_record", "game_pk"]:
if col not in out.columns:
out[col] = ""
for col in ["away_score", "home_score", "away_hits", "home_hits", "away_errors", "home_errors"]:
if col not in out.columns:
out[col] = None
for col in ["runner_on_1b", "runner_on_2b", "runner_on_3b"]:
if col not in out.columns:
out[col] = False
for col in ["away_win_prob", "home_win_prob", "outs", "balls", "strikes"]:
if col not in out.columns:
out[col] = None
for col in ["batter_name", "pitcher_name", "last_play", "last_pitch", "pitch_type"]:
if col not in out.columns:
out[col] = ""
for col in ["pitch_velocity", "pitch_spin_rate", "pitch_extension", "pitch_pfx_x", "pitch_pfx_z"]:
if col not in out.columns:
out[col] = None
return out
def filter_games_for_display(df: pd.DataFrame, selected_filter: str) -> pd.DataFrame:
if df.empty or selected_filter == "All":
return df
temp = df.copy()
status_series = temp["status"].fillna("").astype(str).str.lower()
if selected_filter == "Live":
return temp[
status_series.str.contains("live")
| status_series.str.contains("top")
| status_series.str.contains("bot")
| status_series.str.contains("bottom")
| status_series.str.contains("mid")
| status_series.str.contains("inning")
]
if selected_filter == "Final":
return temp[status_series.str.contains("final")]
if selected_filter == "Scheduled":
return temp[
status_series.str.contains("scheduled")
| status_series.str.contains("preview")
| status_series.eq("")
| temp["start_time_et"].fillna("").astype(str).str.len().gt(0)
]
return temp
def filter_games_for_competition(df: pd.DataFrame, competition_filter: str) -> pd.DataFrame:
if df.empty or competition_filter == "All":
return df
temp = df.copy()
if "competition_bucket" not in temp.columns:
temp["competition_bucket"] = temp.apply(
lambda row: _infer_competition_bucket(
away_team=row.get("away_team", ""),
home_team=row.get("home_team", ""),
raw_game=row.to_dict(),
),
axis=1,
)
return temp[
temp["competition_bucket"].fillna("").astype(str).str.upper()
== competition_filter.upper()
]
def render_scoreboard_section(title: str, games_df: pd.DataFrame) -> None:
if games_df.empty:
return
st.markdown(f'<div class="section-title">{title}</div>', unsafe_allow_html=True)
games = games_df.to_dict("records")
cols = st.columns(2)
for i, game in enumerate(games):
with cols[i % 2]:
render_game_card(game)
def normalize_game_pk(value: object) -> str:
try:
if value is None:
return ""
text = str(value).strip()
if text.lower() in {"", "nan", "none"}:
return ""
return str(int(float(text)))
except Exception:
text = str(value).strip()
return text if text.isdigit() else ""
def split_games_for_scoreboard(
schedule_df: pd.DataFrame,
scores_df: pd.DataFrame,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
def ensure_cols(df: pd.DataFrame) -> pd.DataFrame:
out = df.copy()
for col in [
"away_team",
"home_team",
"away_score",
"home_score",
"away_hits",
"home_hits",
"away_errors",
"home_errors",
]:
if col not in out.columns:
out[col] = None
for col in [
"tv",
"start_time_et",
"away_record",
"home_record",
"status",
"game_pk",
]:
if col not in out.columns:
out[col] = ""
return out
def canon(name: str) -> str:
if not name:
return ""
n = str(name).strip().lower()
n = n.replace(".", "").replace(",", "")
n = " ".join(n.split())
mapping = {
"usa": "united states",
"usa ": "united states",
"u s a": "united states",
"u s a ": "united states",
"u.s.a": "united states",
"u.s.a.": "united states",
"united states": "united states",
"puerto rico": "puerto rico",
"dominican republic": "dominican republic",
"dominicana": "dominican republic",
"dominic republic": "dominican republic",
"dom rep": "dominican republic",
"great britain": "great britain",
"chinese taipei": "chinese taipei",
"taiwan": "chinese taipei",
"czech republic": "czechia",
"czechia": "czechia",
"korea": "korea",
"south korea": "korea",
"republic of korea": "korea",
"rep of korea": "korea",
"japan": "japan",
"brazil": "brazil",
"brasil": "brazil",
"italy": "italy",
"nicaragua": "nicaragua",
"mexico": "mexico",
"venezuela": "venezuela",
"colombia": "colombia",
"columbia": "colombia",
"panama": "panama",
"netherlands": "netherlands",
"cuba": "cuba",
"israel": "israel",
"canada": "canada",
"australia": "australia",
"china": "china",
}
return mapping.get(n, n)
live_games = pd.DataFrame()
final_games = pd.DataFrame()
scheduled_games = pd.DataFrame()
scores = ensure_cols(scores_df) if not scores_df.empty else pd.DataFrame()
schedule = ensure_cols(schedule_df) if not schedule_df.empty else pd.DataFrame()
# Fallback: if scores feed is empty OR contains no live/final content,
# derive rows directly from schedule game_pk + live feed.
if (scores.empty or not _scores_df_has_live_or_final_content(scores)) and not schedule.empty:
try:
fallback_scores = build_scores_from_schedule_via_live_feeds(schedule)
if fallback_scores is not None and not fallback_scores.empty:
scores = ensure_cols(fallback_scores)
elif scores.empty:
scores = pd.DataFrame()
except Exception:
if scores.empty:
scores = pd.DataFrame()
if not scores.empty:
scores["away_key"] = scores["away_team"].fillna("").apply(canon)
scores["home_key"] = scores["home_team"].fillna("").apply(canon)
if not schedule.empty:
schedule["away_key"] = schedule["away_team"].fillna("").apply(canon)
schedule["home_key"] = schedule["home_team"].fillna("").apply(canon)
# Attach game_pk from schedule to scores using normalized team keys
# First try direct away/home match, then fallback to reversed team-pair match.
if not scores.empty and not schedule.empty and "game_pk" in schedule.columns:
schedule_keys = (
schedule[["away_key", "home_key", "game_pk"]]
.dropna(subset=["away_key", "home_key"])
.drop_duplicates(subset=["away_key", "home_key"])
.copy()
)
# direct match
scores = scores.merge(
schedule_keys,
on=["away_key", "home_key"],
how="left",
suffixes=("", "_sched"),
)
if "game_pk_sched" in scores.columns:
scores["game_pk"] = (
scores["game_pk"]
.astype(str)
.replace({"": pd.NA, "nan": pd.NA, "None": pd.NA})
.combine_first(scores["game_pk_sched"])
)
scores = scores.drop(columns=["game_pk_sched"])
if "game_pk" in scores.columns:
scores["game_pk"] = scores["game_pk"].apply(normalize_game_pk)
# reverse-order fallback for feeds that swap away/home labels
missing_game_pk_mask = (
scores["game_pk"].astype(str).str.strip().replace({"nan": "", "None": ""}).eq("")
)
if missing_game_pk_mask.any():
reverse_schedule_keys = schedule_keys.rename(
columns={
"away_key": "home_key",
"home_key": "away_key",
"game_pk": "game_pk_rev",
}
)
reverse_matches = scores.loc[missing_game_pk_mask, ["away_key", "home_key"]].merge(
reverse_schedule_keys,
on=["away_key", "home_key"],
how="left",
)
scores.loc[missing_game_pk_mask, "game_pk"] = (
reverse_matches["game_pk_rev"]
.astype(str)
.replace({"": pd.NA, "nan": pd.NA, "None": pd.NA})
.values
)
if "game_pk_sched" in scores.columns:
scores["game_pk"] = (
scores["game_pk"]
.astype(str)
.replace({"": pd.NA, "nan": pd.NA, "None": pd.NA})
.combine_first(scores["game_pk_sched"])
)
scores = scores.drop(columns=["game_pk_sched"])
if not scores.empty:
try:
scores = enrich_live_games_from_feeds(scores)
except Exception as e:
logger.warning(f"[live_feed_enrich] failure: {e}", exc_info=True)
if scores is None or scores.empty:
scores = pd.DataFrame()
if not scores.empty:
scores = ensure_cols(scores)
if "status" not in scores.columns:
scores["status"] = ""
status_series = scores["status"].fillna("").astype(str).str.strip().str.lower()
def has_score_value(series: pd.Series) -> pd.Series:
text = series.fillna("").astype(str).str.strip().str.lower()
return ~text.isin(["", "nan", "none"])
away_score_present = (
has_score_value(scores["away_score"])
if "away_score" in scores.columns
else pd.Series(False, index=scores.index)
)
home_score_present = (
has_score_value(scores["home_score"])
if "home_score" in scores.columns
else pd.Series(False, index=scores.index)
)
score_present_mask = away_score_present | home_score_present
final_mask = status_series.str.contains(
r"final|game over|completed|ended",
regex=True,
na=False,
)
detailed_live_mask = status_series.str.contains(
r"top|bot|bottom|mid|middle|end|inning|in progress|delayed|suspended",
regex=True,
na=False,
)
# Treat plain "Live" as truly live only if the row has score context.
plain_live_mask = status_series.eq("live") & score_present_mask
live_status_mask = detailed_live_mask | plain_live_mask
live_games = scores[live_status_mask].copy()
final_games = scores[
final_mask | (score_present_mask & ~live_status_mask)
].copy()
# fallback only if both are still empty
if live_games.empty and final_games.empty and not scores.empty:
score_rows = scores[score_present_mask].copy()
if not score_rows.empty:
score_status = score_rows["status"].fillna("").astype(str).str.lower()
live_games = score_rows[
score_status.str.contains(
r"live|top|bot|bottom|mid|inning|in progress|delayed|suspended",
regex=True,
na=False,
)
].copy()
final_games = score_rows[
score_status.str.contains(
r"final|game over|completed|ended",
regex=True,
na=False,
) | ~score_status.str.contains(
r"live|top|bot|bottom|mid|inning|in progress|delayed|suspended",
regex=True,
na=False,
)
].copy()
if not schedule.empty:
status_series = schedule["status"].fillna("").astype(str).str.strip().str.lower()
scheduled_games = schedule[
status_series.str.contains(r"\bscheduled\b|\bpreview\b", regex=True, na=False)
| schedule["start_time_et"].fillna("").astype(str).str.len().gt(0)
| status_series.eq("")
].copy()
# ---------------------------------------
# Tag games with competition bucket
# ---------------------------------------
if not live_games.empty:
live_games["competition_bucket"] = live_games.apply(
lambda r: _infer_competition_bucket(
away_team=r.get("away_team", ""),
home_team=r.get("home_team", ""),
raw_game=r.to_dict(),
),
axis=1,
)
if not final_games.empty:
final_games["competition_bucket"] = final_games.apply(
lambda r: _infer_competition_bucket(
away_team=r.get("away_team", ""),
home_team=r.get("home_team", ""),
raw_game=r.to_dict(),
),
axis=1,
)
if not scheduled_games.empty:
scheduled_games["competition_bucket"] = scheduled_games.apply(
lambda r: _infer_competition_bucket(
away_team=r.get("away_team", ""),
home_team=r.get("home_team", ""),
raw_game=r.to_dict(),
),
axis=1,
)
return live_games, final_games, scheduled_games
@st.cache_data(ttl=LIVE_FEED_TTL_SECONDS, show_spinner=False)
def load_live_game_feed_cached(game_pk: str) -> dict:
game_pk = normalize_game_pk(game_pk)
if not game_pk:
return {}
try:
return fetch_live_game_feed(game_pk)
except Exception:
return {}
def merge_live_game_row(base_game: dict, enriched_game: dict | None) -> dict:
merged = dict(base_game)
if not isinstance(enriched_game, dict):
return merged
for key, value in enriched_game.items():
if _is_missing_like(value):
continue
merged[key] = value
return merged
def build_prepared_live_games_df(live_games: pd.DataFrame) -> pd.DataFrame:
if live_games is None or live_games.empty:
return pd.DataFrame()
rows = []
for _, row in live_games.iterrows():
rows.append(prepare_live_game_for_ui(row.to_dict()))
return pd.DataFrame(rows)
def _is_missing_like(value: object) -> bool:
if value is None:
return True
text = str(value).strip().lower()
return text in {"", "nan", "none"}
def merge_live_game_row(base_game: dict, enriched_game: dict | None) -> dict:
merged = dict(base_game)
if not isinstance(enriched_game, dict):
return merged
for key, value in enriched_game.items():
if _is_missing_like(value):
continue
merged[key] = value
return merged
def _extract_pitch_velocity_value(pitch_data: dict) -> float | None:
if not isinstance(pitch_data, dict):
return None
candidates = [
pitch_data.get("startSpeed"),
pitch_data.get("releaseSpeed"),
pitch_data.get("speed"),
]
for value in candidates:
try:
if value is None:
continue
text = str(value).strip().lower()
if text in {"", "nan", "none"}:
continue
return float(value)
except Exception:
continue
return None
def _extract_pitch_spin_rate(pitch_data: dict) -> float | None:
if not isinstance(pitch_data, dict):
return None
pitch_breaks = pitch_data.get("breaks", {}) or {}
coordinates = pitch_data.get("coordinates", {}) or {}
candidates = [
pitch_data.get("release_spin_rate"),
pitch_data.get("spinRate"),
pitch_data.get("spin_rate"),
pitch_breaks.get("spinRate"),
pitch_breaks.get("spin_rate"),
coordinates.get("spinRate"),
]
for value in candidates:
try:
if value is None:
continue
text = str(value).strip().lower()
if text in {"", "nan", "none"}:
continue
return float(value)
except Exception:
continue
return None
def _extract_pitch_extension_value(pitch_data: dict) -> float | None:
if not isinstance(pitch_data, dict):
return None
candidates = [
pitch_data.get("release_extension"),
pitch_data.get("extension"),
pitch_data.get("releaseExtension"),
]
for value in candidates:
try:
if value is None:
continue
text = str(value).strip().lower()
if text in {"", "nan", "none"}:
continue
return float(value)
except Exception:
continue
return None
def _extract_pitch_movement_x_value(pitch_data: dict) -> float | None:
if not isinstance(pitch_data, dict):
return None
coords = pitch_data.get("coordinates", {}) or {}
candidates = [
coords.get("pfxX"),
pitch_data.get("pfxX"),
pitch_data.get("pfx_x"),
]
for value in candidates:
try:
if value is None:
continue
text = str(value).strip().lower()
if text in {"", "nan", "none"}:
continue
return float(value)
except Exception:
continue
return None
def _extract_pitch_movement_z_value(pitch_data: dict) -> float | None:
if not isinstance(pitch_data, dict):
return None
coords = pitch_data.get("coordinates", {}) or {}
candidates = [
coords.get("pfxZ"),
pitch_data.get("pfxZ"),
pitch_data.get("pfx_z"),
]
for value in candidates:
try:
if value is None:
continue
text = str(value).strip().lower()
if text in {"", "nan", "none"}:
continue
return float(value)
except Exception:
continue
return None
def _extract_person_name(obj: object) -> str:
if not isinstance(obj, dict):
return ""
return str(obj.get("fullName", "") or obj.get("name", "") or "").strip()
def _safe_mean_numeric(values: list[object]) -> float | None:
cleaned: list[float] = []
for value in values:
try:
if value is None:
continue
text = str(value).strip().lower()
if text in {"", "nan", "none"}:
continue
cleaned.append(float(value))
except Exception:
continue
if not cleaned:
return None
return float(sum(cleaned) / len(cleaned))
def prepare_live_game_for_ui(game: dict) -> dict:
prepared = dict(game)
try:
game_pk = normalize_game_pk(prepared.get("game_pk"))
if not game_pk:
return prepared
feed = load_live_game_feed_cached(game_pk)
if not isinstance(feed, dict) or not feed:
return prepared
# First: use existing enrichment if it works
try:
enriched = enrich_game_from_live_feed(prepared, feed)
prepared = merge_live_game_row(prepared, enriched)
except Exception as e:
logger.warning(f"[live_feed_merge] failure: {e}", exc_info=True)
# Second: direct fallback extraction from feed so UI fields are guaranteed
live_data = feed.get("liveData", {}) or {}
linescore = live_data.get("linescore", {}) or {}
plays = live_data.get("plays", {}) or {}
current_play = plays.get("currentPlay", {}) or {}
matchup = current_play.get("matchup", {}) or {}
count = current_play.get("count", {}) or {}
offense = linescore.get("offense", {}) or {}
defense = linescore.get("defense", {}) or {}
batter_name = _extract_person_name(matchup.get("batter", {}))
if not batter_name:
batter_name = _extract_person_name(offense.get("batter", {}))
pitcher_name = _extract_person_name(matchup.get("pitcher", {}))
if not pitcher_name:
pitcher_name = _extract_person_name(defense.get("pitcher", {}))
three_away_name = None
try:
lineup = offense.get("battingOrder", []) or []
if isinstance(lineup, list) and len(lineup) >= 3:
three_away_name = _extract_person_name(lineup[2])
except Exception as e:
logger.warning(f"[lineup_slot_extract] failure: {e}", exc_info=True)
prepared = merge_live_game_row(
prepared,
{
"batter_name": batter_name,
"pitcher_name": pitcher_name,
"pitcher_id": (
(matchup.get("pitcher", {}) or {}).get("id")
or (defense.get("pitcher", {}) or {}).get("id")
),
"balls": count.get("balls"),
"strikes": count.get("strikes"),
"outs": count.get("outs", linescore.get("outs")),
"runner_on_1b": offense.get("first") is not None,
"runner_on_2b": offense.get("second") is not None,
"runner_on_3b": offense.get("third") is not None,
"last_play": str((current_play.get("result", {}) or {}).get("description", "") or "").strip(),
"on_deck_name": _extract_person_name(offense.get("onDeck", {})),
"in_hole_name": _extract_person_name(offense.get("inHole", {})),
"three_away_name": three_away_name,
},
)
# Task 3: Extract batting order lineup slots (fully fallback-safe)
try:
batting_order = offense.get("battingOrder") or []
def _find_slot(player_id: object, bo_list: list) -> int | None:
if not player_id or not bo_list:
return None
for i, p in enumerate(bo_list):
pid = p.get("id") if isinstance(p, dict) else p
if str(pid) == str(player_id):
return i + 1 # 1-based slot
return None
on_deck_id = offense.get("onDeck", {}).get("id")
in_hole_id = offense.get("inHole", {}).get("id")
prepared["on_deck_lineup_slot"] = _find_slot(on_deck_id, batting_order)
prepared["in_hole_lineup_slot"] = _find_slot(in_hole_id, batting_order)
prepared["three_away_lineup_slot"] = None
except Exception:
prepared["on_deck_lineup_slot"] = None
prepared["in_hole_lineup_slot"] = None
prepared["three_away_lineup_slot"] = None
# Prefer the most recent pitch event that actually has RPM/EXT.
# If none exists, fall back to the most recent event with any pitchData.
play_events = current_play.get("playEvents", []) or []
pitch_event = None
fallback_pitch_event = None
for event in reversed(play_events):
pitch_data = event.get("pitchData") or {}
if not pitch_data:
continue
if fallback_pitch_event is None:
fallback_pitch_event = event
pitch_breaks = pitch_data.get("breaks", {}) or {}
has_spin_or_ext = any(
value is not None
for value in [
pitch_data.get("release_spin_rate"),
pitch_breaks.get("spinRate"),
pitch_data.get("spinRate"),
pitch_data.get("release_extension"),
pitch_data.get("extension"),
]
)
if has_spin_or_ext:
pitch_event = event
break
if pitch_event is None:
pitch_event = fallback_pitch_event
if pitch_event:
pitch_data = pitch_event.get("pitchData", {}) or {}
pitch_breaks = pitch_data.get("breaks", {}) or {}
coords = pitch_data.get("coordinates", {}) or {}
details = pitch_event.get("details", {}) or {}
current_pitch_type = str(((details.get("type", {}) or {}).get("description", "")) or "").strip()
prepared = merge_live_game_row(
prepared,
{
"last_pitch": str(details.get("description", "") or "").strip(),
"pitch_type": current_pitch_type,
"pitch_type_key": _normalize_pitch_type_key(current_pitch_type),
"pitch_velocity": _extract_pitch_velocity_value(pitch_data),
"pitch_spin_rate": _extract_pitch_spin_rate(pitch_data),
"pitch_extension": _extract_pitch_extension_value(pitch_data),
"pitch_pfx_x": _extract_pitch_movement_x_value(pitch_data),
"pitch_pfx_z": _extract_pitch_movement_z_value(pitch_data),
},
)
# Phase 6.5: rolling telemetry from recent pitch events
recent_pitch_events: list[dict] = []
for event in reversed(play_events):
pitch_data = event.get("pitchData") or {}
event_details = event.get("details", {}) or {}
event_pitch_type = str(((event_details.get("type", {}) or {}).get("description", "")) or "").strip()
event_pitch_type_key = _normalize_pitch_type_key(event_pitch_type)
if not pitch_data:
continue
recent_pitch_events.append(event)
if len(recent_pitch_events) >= 8:
break
if recent_pitch_events:
velocity_values: list[object] = []
spin_values: list[object] = []
extension_values: list[object] = []
pfx_x_values: list[object] = []
pfx_z_values: list[object] = []
current_pitch_type_key = str(prepared.get("pitch_type_key", "") or "").strip()
pitch_type_velocity_values: list[object] = []
pitch_type_spin_values: list[object] = []
pitch_type_extension_values: list[object] = []
pitch_type_pfx_x_values: list[object] = []
pitch_type_pfx_z_values: list[object] = []
for event in recent_pitch_events:
pitch_data = event.get("pitchData", {}) or {}
velocity_values.append(_extract_pitch_velocity_value(pitch_data))
spin_values.append(_extract_pitch_spin_rate(pitch_data))
extension_values.append(_extract_pitch_extension_value(pitch_data))
pfx_x_values.append(_extract_pitch_movement_x_value(pitch_data))
pfx_z_values.append(_extract_pitch_movement_z_value(pitch_data))
if current_pitch_type_key and current_pitch_type_key != "unknown":
if event_pitch_type_key == current_pitch_type_key:
pitch_type_velocity_values.append(_extract_pitch_velocity_value(pitch_data))
pitch_type_spin_values.append(_extract_pitch_spin_rate(pitch_data))
pitch_type_extension_values.append(_extract_pitch_extension_value(pitch_data))
pitch_type_pfx_x_values.append(_extract_pitch_movement_x_value(pitch_data))
pitch_type_pfx_z_values.append(_extract_pitch_movement_z_value(pitch_data))
valid_velocity_values = [v for v in velocity_values if v is not None]
valid_spin_values = [v for v in spin_values if v is not None]
valid_extension_values = [v for v in extension_values if v is not None]
valid_pfx_x_values = [v for v in pfx_x_values if v is not None]
valid_pfx_z_values = [v for v in pfx_z_values if v is not None]
valid_pitch_type_velocity_values = [v for v in pitch_type_velocity_values if v is not None]
valid_pitch_type_spin_values = [v for v in pitch_type_spin_values if v is not None]
valid_pitch_type_extension_values = [v for v in pitch_type_extension_values if v is not None]
valid_pitch_type_pfx_x_values = [v for v in pitch_type_pfx_x_values if v is not None]
valid_pitch_type_pfx_z_values = [v for v in pitch_type_pfx_z_values if v is not None]
prepared = merge_live_game_row(
prepared,
{
"rolling_pitch_velocity": _safe_mean_numeric(valid_velocity_values),
"rolling_pitch_spin_rate": _safe_mean_numeric(valid_spin_values),
"rolling_pitch_extension": _safe_mean_numeric(valid_extension_values),
"rolling_pitch_pfx_x": _safe_mean_numeric(valid_pfx_x_values),
"rolling_pitch_pfx_z": _safe_mean_numeric(valid_pfx_z_values),
"rolling_pitch_sample_size": len(recent_pitch_events),
"rolling_pitch_velocity_sample_size": len(valid_velocity_values),
"rolling_pitch_spin_sample_size": len(valid_spin_values),
"rolling_pitch_extension_sample_size": len(valid_extension_values),
"rolling_pitch_pfx_x_sample_size": len(valid_pfx_x_values),
"rolling_pitch_pfx_z_sample_size": len(valid_pfx_z_values),
"rolling_pitch_type_key": current_pitch_type_key,
"rolling_pitch_type_velocity": _safe_mean_numeric(valid_pitch_type_velocity_values),
"rolling_pitch_type_spin_rate": _safe_mean_numeric(valid_pitch_type_spin_values),
"rolling_pitch_type_extension": _safe_mean_numeric(valid_pitch_type_extension_values),
"rolling_pitch_type_pfx_x": _safe_mean_numeric(valid_pitch_type_pfx_x_values),
"rolling_pitch_type_pfx_z": _safe_mean_numeric(valid_pitch_type_pfx_z_values),
"rolling_pitch_type_velocity_sample_size": len(valid_pitch_type_velocity_values),
"rolling_pitch_type_spin_sample_size": len(valid_pitch_type_spin_values),
"rolling_pitch_type_extension_sample_size": len(valid_pitch_type_extension_values),
"rolling_pitch_type_pfx_x_sample_size": len(valid_pitch_type_pfx_x_values),
"rolling_pitch_type_pfx_z_sample_size": len(valid_pitch_type_pfx_z_values),
},
)
# Seed a per-game live baseline using the FIRST observed pitch of each inning,
# and persist it across games.
try:
game_pk_seed = normalize_game_pk(prepared.get("game_pk"))
pitcher_id_seed = prepared.get("pitcher_id")
pitcher_name_seed = str(prepared.get("pitcher_name", "") or "").strip()
game_date_seed = str(prepared.get("game_date", "") or prepared.get("official_date", "") or "").strip()
inning_seed = current_play.get("about", {}) or {}
current_inning_seed = (
inning_seed.get("inning")
or linescore.get("currentInning")
or prepared.get("inning")
)
current_pitch_type_key = str(prepared.get("pitch_type_key", "") or "").strip()
if "live_pitcher_baseline_seeds" not in st.session_state:
st.session_state["live_pitcher_baseline_seeds"] = {}
baseline_seed_map = st.session_state["live_pitcher_baseline_seeds"]
baseline_seed_key = None
if game_pk_seed and pitcher_id_seed:
baseline_seed_key = f"{game_pk_seed}:{pitcher_id_seed}"
elif game_pk_seed and pitcher_name_seed:
baseline_seed_key = f"{game_pk_seed}:{pitcher_name_seed.lower()}"
elif pitcher_name_seed:
baseline_seed_key = f"name_only:{pitcher_name_seed.lower()}"
if baseline_seed_key:
baseline_seed = baseline_seed_map.get(
baseline_seed_key,
{
"innings": {},
"pitch_type_innings": {},
},
)
innings_map = baseline_seed.get("innings", {})
pitch_type_innings_map = baseline_seed.get("pitch_type_innings", {})
persisted_new_inning_seed = False
if current_inning_seed is not None:
inning_key = str(current_inning_seed)
if inning_key not in innings_map:
innings_map[inning_key] = {
"velocity": prepared.get("pitch_velocity"),
"spin_rate": prepared.get("pitch_spin_rate"),
"extension": prepared.get("pitch_extension"),
"pfx_x": prepared.get("pitch_pfx_x"),
"pfx_z": prepared.get("pitch_pfx_z"),
}
persisted_new_inning_seed = True
if current_pitch_type_key and current_pitch_type_key != "unknown":
if current_pitch_type_key not in pitch_type_innings_map:
pitch_type_innings_map[current_pitch_type_key] = {}
if inning_key not in pitch_type_innings_map[current_pitch_type_key]:
pitch_type_innings_map[current_pitch_type_key][inning_key] = {
"velocity": prepared.get("pitch_velocity"),
"spin_rate": prepared.get("pitch_spin_rate"),
"extension": prepared.get("pitch_extension"),
"pfx_x": prepared.get("pitch_pfx_x"),
"pfx_z": prepared.get("pitch_pfx_z"),
}
baseline_seed["innings"] = innings_map
baseline_seed["pitch_type_innings"] = pitch_type_innings_map
baseline_seed_map[baseline_seed_key] = baseline_seed
if persisted_new_inning_seed and current_inning_seed is not None:
upsert_inning_first_seed_event(
{
"pitcher_id": pitcher_id_seed,
"pitcher_name": pitcher_name_seed,
"game_pk": game_pk_seed,
"game_date": game_date_seed,
"inning": current_inning_seed,
"pitch_type_key": current_pitch_type_key or "unknown",
"velocity": prepared.get("pitch_velocity"),
"spin_rate": prepared.get("pitch_spin_rate"),
"extension": prepared.get("pitch_extension"),
"pfx_x": prepared.get("pitch_pfx_x"),
"pfx_z": prepared.get("pitch_pfx_z"),
}
)
def _mean_from_metric_rows(rows_map: dict, metric_name: str) -> float | None:
values: list[object] = []
for row in rows_map.values():
if not isinstance(row, dict):
continue
values.append(row.get(metric_name))
return _safe_mean_numeric(values)
velocity_seed = _mean_from_metric_rows(innings_map, "velocity")
spin_seed = _mean_from_metric_rows(innings_map, "spin_rate")
extension_seed = _mean_from_metric_rows(innings_map, "extension")
pfx_x_seed = _mean_from_metric_rows(innings_map, "pfx_x")
pfx_z_seed = _mean_from_metric_rows(innings_map, "pfx_z")
velocity_seed_count = sum(
1 for row in innings_map.values()
if isinstance(row, dict) and row.get("velocity") is not None
)
spin_seed_count = sum(
1 for row in innings_map.values()
if isinstance(row, dict) and row.get("spin_rate") is not None
)
extension_seed_count = sum(
1 for row in innings_map.values()
if isinstance(row, dict) and row.get("extension") is not None
)
active_pitch_type_rows = {}
if current_pitch_type_key and current_pitch_type_key != "unknown":
active_pitch_type_rows = pitch_type_innings_map.get(current_pitch_type_key, {}) or {}
pitch_type_velocity_seed = _mean_from_metric_rows(active_pitch_type_rows, "velocity")
pitch_type_spin_seed = _mean_from_metric_rows(active_pitch_type_rows, "spin_rate")
pitch_type_extension_seed = _mean_from_metric_rows(active_pitch_type_rows, "extension")
pitch_type_pfx_x_seed = _mean_from_metric_rows(active_pitch_type_rows, "pfx_x")
pitch_type_pfx_z_seed = _mean_from_metric_rows(active_pitch_type_rows, "pfx_z")
pitch_type_velocity_seed_count = sum(
1 for row in active_pitch_type_rows.values()
if isinstance(row, dict) and row.get("velocity") is not None
)
pitch_type_spin_seed_count = sum(
1 for row in active_pitch_type_rows.values()
if isinstance(row, dict) and row.get("spin_rate") is not None
)
pitch_type_extension_seed_count = sum(
1 for row in active_pitch_type_rows.values()
if isinstance(row, dict) and row.get("extension") is not None
)
stored_baseline = load_pitcher_cross_game_baseline(
pitcher_id=pitcher_id_seed,
pitcher_name=pitcher_name_seed,
pitch_type_key=current_pitch_type_key,
)
prepared = merge_live_game_row(
prepared,
{
"seed_baseline_velocity": velocity_seed,
"seed_baseline_spin_rate": spin_seed,
"seed_baseline_extension": extension_seed,
"seed_baseline_pfx_x": pfx_x_seed,
"seed_baseline_pfx_z": pfx_z_seed,
"seed_baseline_velocity_sample_size": velocity_seed_count,
"seed_baseline_spin_sample_size": spin_seed_count,
"seed_baseline_extension_sample_size": extension_seed_count,
"seed_pitch_type_key": current_pitch_type_key,
"seed_pitch_type_baseline_velocity": pitch_type_velocity_seed,
"seed_pitch_type_baseline_spin_rate": pitch_type_spin_seed,
"seed_pitch_type_baseline_extension": pitch_type_extension_seed,
"seed_pitch_type_baseline_pfx_x": pitch_type_pfx_x_seed,
"seed_pitch_type_baseline_pfx_z": pitch_type_pfx_z_seed,
"seed_pitch_type_baseline_velocity_sample_size": pitch_type_velocity_seed_count,
"seed_pitch_type_baseline_spin_sample_size": pitch_type_spin_seed_count,
"seed_pitch_type_baseline_extension_sample_size": pitch_type_extension_seed_count,
**stored_baseline,
},
)
if "pitcher_store_error" in st.session_state:
del st.session_state["pitcher_store_error"]
except Exception as e:
st.session_state["pitcher_store_error"] = str(e)
# Persist batter zone events from live playEvents
try:
batter_name_store = str(prepared.get("batter_name", "") or "").strip()
game_pk_store = normalize_game_pk(prepared.get("game_pk"))
game_date_store = str(prepared.get("game_date", "") or prepared.get("official_date", "") or "").strip()
if batter_name_store and game_pk_store:
if "batter_zone_event_keys" not in st.session_state:
st.session_state["batter_zone_event_keys"] = set()
seen_keys = st.session_state["batter_zone_event_keys"]
new_rows: list[dict[str, Any]] = []
for event in play_events:
pitch_data = event.get("pitchData") or {}
if not pitch_data:
continue
details = event.get("details", {}) or {}
result = event.get("result", {}) or {}
pitch_name = str(((details.get("type", {}) or {}).get("description", "")) or "").strip()
pitch_family = normalize_pitch_family(pitch_name)
coords = pitch_data.get("coordinates", {}) or {}
plate_x = coords.get("pX")
plate_z = coords.get("pZ")
if plate_x is None:
plate_x = pitch_data.get("plate_x")
if plate_z is None:
plate_z = pitch_data.get("plate_z")
zone_bucket = classify_zone_bucket(plate_x, plate_z)
if zone_bucket == "unknown":
continue
description = str(details.get("description", "") or "").strip().lower()
event_type = str(result.get("event", "") or "").strip().lower()
event_desc = str(result.get("eventType", "") or "").strip().lower()
hit_flag = int(
event_type in {"single", "double", "triple", "home_run"}
or event_desc in {"single", "double", "triple", "home_run"}
)
hr_flag = int(event_type == "home_run" or event_desc == "home_run")
tb2p_flag = int(
event_type in {"double", "triple", "home_run"}
or event_desc in {"double", "triple", "home_run"}
)
whiff_flag = int(
description in {"swinging strike", "swinging strike blocked"}
or event_desc in {"swinging_strike", "swinging_strike_blocked"}
)
launch_speed = result.get("launchSpeed")
damage_flag = 0
try:
if launch_speed is not None and float(launch_speed) >= 95:
damage_flag = 1
except Exception:
pass
if hr_flag:
damage_flag = 1
event_key = (
str(game_pk_store),
batter_name_store,
str(event.get("playId", "") or event.get("index", "") or ""),
pitch_family,
zone_bucket,
)
if event_key in seen_keys:
continue
seen_keys.add(event_key)
play_id_value = str(event.get("playId", "") or event.get("index", "") or "").strip()
if not play_id_value:
continue
event_key = ":".join(
[
str(game_pk_store),
play_id_value,
batter_name_store,
pitch_family,
zone_bucket,
]
)
pitch_breaks = pitch_data.get("breaks", {}) or {}
coords = pitch_data.get("coordinates", {}) or {}
pfx_x_value = _extract_pitch_movement_x_value(pitch_data)
pfx_z_value = _extract_pitch_movement_z_value(pitch_data)
ax_value = coords.get("aX")
ay_value = coords.get("aY")
az_value = coords.get("aZ")
if ax_value is None:
ax_value = pitch_data.get("ax")
if ay_value is None:
ay_value = pitch_data.get("ay")
if az_value is None:
az_value = pitch_data.get("az")
new_rows.append(
{
"event_key": event_key,
"batter_name": batter_name_store,
"game_pk": game_pk_store,
"game_date": game_date_store,
"pitch_family": pitch_family,
"zone_bucket": zone_bucket,
"plate_x": plate_x,
"plate_z": plate_z,
"pfx_x": pfx_x_value,
"pfx_z": pfx_z_value,
"ax": ax_value,
"ay": ay_value,
"az": az_value,
"hit_flag": hit_flag,
"hr_flag": hr_flag,
"tb2p_flag": tb2p_flag,
"whiff_flag": whiff_flag,
"damage_flag": damage_flag,
}
)
if new_rows:
insert_batter_zone_events(new_rows)
if "batter_zone_store_error" in st.session_state:
del st.session_state["batter_zone_store_error"]
except Exception as e:
st.session_state["batter_zone_store_error"] = str(e)
# Live-populate live_pitch_mix_2026 and live_batter_game_log_2026 from allPlays
try:
if "live_pitch_mix_seen_keys" not in st.session_state:
st.session_state["live_pitch_mix_seen_keys"] = {}
seen_pitch_keys: set = st.session_state["live_pitch_mix_seen_keys"].setdefault(
str(game_pk), set()
)
home_team = (feed.get("gameData", {}) or {}).get("teams", {}).get("home", {}).get("abbreviation") or prepared.get("home_team")
away_team = (feed.get("gameData", {}) or {}).get("teams", {}).get("away", {}).get("abbreviation") or prepared.get("away_team")
game_date_live = str(prepared.get("game_date") or prepared.get("official_date") or "")[:10]
all_plays = (plays.get("allPlays") or []) + ([current_play] if current_play else [])
new_pitch_rows: list[dict] = []
new_pa_rows: list[dict] = []
for play in all_plays:
matchup = play.get("matchup", {}) or {}
about = play.get("about", {}) or {}
result = play.get("result", {}) or {}
pitcher_id = (matchup.get("pitcher", {}) or {}).get("id")
batter_id = (matchup.get("batter", {}) or {}).get("id")
pitcher_name = _extract_person_name(matchup.get("pitcher", {}))
stand = ((matchup.get("batSide", {}) or {}).get("code")) or None
p_throws = ((matchup.get("pitchHand", {}) or {}).get("code")) or None
ab_num = (about.get("atBatIndex") or 0) + 1 # convert 0-based to 1-based
inning = about.get("inning")
inning_topbot = str(about.get("halfInning") or "").capitalize() or None
play_events_all = play.get("playEvents", []) or []
pa_complete = bool(result.get("eventType"))
for event in play_events_all:
if not event.get("isPitch"):
continue
pitch_num = event.get("pitchNumber")
if pitch_num is None:
continue
ek = f"{game_pk}_{ab_num}_{pitch_num}"
if ek in seen_pitch_keys:
continue
pitch_data = event.get("pitchData", {}) or {}
pitch_breaks = pitch_data.get("breaks", {}) or {}
coords = pitch_data.get("coordinates", {}) or {}
details = event.get("details", {}) or {}
count = event.get("count", {}) or {}
plate_x = coords.get("pX")
plate_z = coords.get("pZ")
new_pitch_rows.append({
"event_key": ek,
"pa_key": f"{game_pk}_{ab_num}",
"game_pk": int(game_pk),
"game_date": game_date_live,
"source_season": int(game_date_live[:4]) if game_date_live else None,
"batter": int(batter_id) if batter_id else None,
"pitcher": int(pitcher_id) if pitcher_id else None,
"player_name": pitcher_name or None,
"stand": stand,
"p_throws": p_throws,
"home_team": home_team,
"away_team": away_team,
"inning": inning,
"inning_topbot": inning_topbot,
"at_bat_number": ab_num,
"pitch_number": pitch_num,
"pitch_type": ((details.get("type", {}) or {}).get("code")) or None,
"pitch_name": ((details.get("type", {}) or {}).get("description")) or None,
"release_speed": _safe_float(pitch_data.get("startSpeed")),
"effective_speed": _safe_float(pitch_data.get("endSpeed")),
"release_spin_rate": _safe_float(pitch_breaks.get("spinRate")),
"spin_axis": _safe_float(pitch_breaks.get("spinDirection")),
"pfx_x": _safe_float(pitch_breaks.get("pfxX") or coords.get("pfxX")),
"pfx_z": _safe_float(pitch_breaks.get("pfxZ") or coords.get("pfxZ")),
"release_pos_x": _safe_float(coords.get("x0")),
"release_pos_y": _safe_float(coords.get("y0")),
"release_pos_z": _safe_float(coords.get("z0")),
"release_extension": _safe_float(pitch_data.get("extension")),
"plate_x": _safe_float(plate_x),
"plate_z": _safe_float(plate_z),
"zone": None, # TODO: integer zone classifier not yet implemented; backfilled by post-game Savant ingest
"balls": count.get("balls"),
"strikes": count.get("strikes"),
"outs_when_up": count.get("outs"),
"bat_score": None,
"fld_score": None,
"type": (details.get("code")) or None,
"description": (details.get("description")) or None,
"events": (result.get("event")) if pa_complete and event == play_events_all[-1] else None,
})
seen_pitch_keys.add(ek)
# PA-level row when the play is complete
if pa_complete:
pa_key = f"{game_pk}_{ab_num}"
if pa_key not in seen_pitch_keys:
events_val = result.get("event") or None
events_str = str(events_val or "").lower().replace(" ", "_")
hit_flag = int(events_str in {"single", "double", "triple", "home_run"})
hr_flag = int(events_str == "home_run")
tb2p_flag = int(events_str in {"double", "triple", "home_run"})
pitch_events_only = [e for e in play_events_all if e.get("isPitch")]
terminal = pitch_events_only[-1] if pitch_events_only else {}
tc = terminal.get("count", {}) or {}
new_pa_rows.append({
"pa_key": pa_key,
"game_pk": int(game_pk),
"game_date": game_date_live,
"source_season": int(game_date_live[:4]) if game_date_live else None,
"batter": int(batter_id) if batter_id else None,
"player_name": _extract_person_name(matchup.get("batter", {})),
"stand": stand,
"p_throws": p_throws,
"home_team": home_team,
"away_team": away_team,
"inning": inning,
"inning_topbot": inning_topbot,
"at_bat_number": ab_num,
"pitches_seen": len(pitch_events_only),
"balls_final": tc.get("balls"),
"strikes_final": tc.get("strikes"),
"outs_when_up": tc.get("outs"),
"events": events_val,
"description": result.get("description"),
"hit_flag": hit_flag,
"hr_flag": hr_flag,
"tb2p_flag": tb2p_flag,
})
seen_pitch_keys.add(pa_key)
if new_pitch_rows or new_pa_rows:
_fire_live_pitch_upsert(new_pitch_rows, new_pa_rows)
except Exception as e:
logger.warning("[live_pitch_mix_ingest] failure game_pk=%s: %s", game_pk, e)
except Exception as e:
logger.warning(f"[batter_zone_store_init] failure: {e}", exc_info=True)
return prepared
def build_prepared_live_games_df(live_games: pd.DataFrame) -> pd.DataFrame:
if live_games is None or live_games.empty:
return pd.DataFrame()
rows = []
for _, row in live_games.iterrows():
rows.append(prepare_live_game_for_ui(row.to_dict()))
return pd.DataFrame(rows)
def render_live_games_with_edge_strips(
live_games: pd.DataFrame,
statcast_df: pd.DataFrame,
pitcher_statcast_df: pd.DataFrame | None = None,
odds_df: pd.DataFrame | None = None,
) -> None:
if live_games.empty:
return
st.markdown('<div class="section-title">LIVE GAMES</div>', unsafe_allow_html=True)
live_games_json = live_games.to_json(orient="records")
prepared_live_games = build_prepared_live_games_df_cached(live_games_json)
games = prepared_live_games.to_dict("records")
cols = st.columns(2)
for i, game in enumerate(games):
with cols[i % 2]:
render_game_card(game)
prop_odds_df = load_hr_prop_odds_for_game(
away_team=str(game.get("away_team", "")),
home_team=str(game.get("home_team", "")),
)
recommendations = build_upcoming_hitter_recommendations(
game_row=game,
statcast_df=statcast_df,
pitcher_statcast_df=pitcher_statcast_df,
odds_df=odds_df,
prop_odds_df=prop_odds_df,
weather_row=None,
)
has_recommendations = False
if recommendations is not None:
if isinstance(recommendations, pd.DataFrame):
has_recommendations = not recommendations.empty
elif isinstance(recommendations, list):
has_recommendations = len(recommendations) > 0
else:
try:
has_recommendations = len(recommendations) > 0
except Exception:
has_recommendations = False
try:
timestamp = utc_now_iso()
log_df = build_recommendation_log_rows(
recommendations=recommendations,
game_row=game,
created_at=timestamp,
)
insert_recommendation_logs(conn, log_df)
outcome_df = build_recommendation_outcome_rows(
game_row=game,
graded_at=timestamp,
)
insert_recommendation_outcomes(conn, outcome_df)
except Exception as e:
logger.warning(f"[recommendation_outcome_insert] failure: {e}", exc_info=True)
def normalize_game_pk(value: object) -> str:
try:
if value is None:
return ""
text = str(value).strip()
if text.lower() in {"", "nan", "none"}:
return ""
return str(int(float(text)))
except Exception:
text = str(value).strip()
return text if text.isdigit() else ""
def build_live_pitch_metrics_debug_df(live_games: pd.DataFrame) -> pd.DataFrame:
rows: list[dict] = []
if live_games is None or live_games.empty:
return pd.DataFrame()
for _, row in live_games.iterrows():
game = row.to_dict()
game_pk = normalize_game_pk(game.get("game_pk"))
if not game_pk:
rows.append(
{
"away_team": game.get("away_team"),
"home_team": game.get("home_team"),
"game_pk": "",
"status": game.get("status"),
"pitch_velocity": None,
"pitch_extension": None,
"pitch_spin_rate": None,
"pitch_type": None,
"last_pitch": None,
"pfx_x": None,
"pfx_z": None,
"ax": None,
"ay": None,
"az": None,
"release_x": None,
"release_y": None,
"release_z": None,
"found_pitch_event": False,
}
)
continue
try:
feed = load_live_game_feed_cached(game_pk)
except Exception:
feed = {}
live_data = (feed.get("liveData", {}) or {}) if isinstance(feed, dict) else {}
plays = live_data.get("plays", {}) or {}
current_play = plays.get("currentPlay", {}) or {}
play_events = current_play.get("playEvents", []) or []
pitch_event = None
best_pitch_event = None
for event in reversed(play_events):
pitch_data = event.get("pitchData") or {}
if not pitch_data:
continue
best_pitch_event = event
# Prefer an event that actually has usable numeric pitch metrics
start_speed = pitch_data.get("startSpeed")
extension = pitch_data.get("extension")
spin_rate = (pitch_data.get("breaks", {}) or {}).get("spinRate")
coords = pitch_data.get("coordinates", {}) or {}
has_useful_metric = any(
value is not None
for value in [
start_speed,
extension,
spin_rate,
coords.get("pfxX"),
coords.get("pfxZ"),
coords.get("x0"),
coords.get("y0"),
coords.get("z0"),
]
)
if has_useful_metric:
pitch_event = event
break
if pitch_event is None:
pitch_event = best_pitch_event
if pitch_event:
pitch_data = pitch_event.get("pitchData", {}) or {}
pitch_breaks = pitch_data.get("breaks", {}) or {}
coords = pitch_data.get("coordinates", {}) or {}
details = pitch_event.get("details", {}) or {}
rows.append(
{
"away_team": game.get("away_team"),
"home_team": game.get("home_team"),
"game_pk": game_pk,
"status": game.get("status"),
"pitch_velocity": pitch_data.get("startSpeed"),
"pitch_extension": (
pitch_data.get("release_extension")
or pitch_data.get("extension")
),
"pitch_spin_rate": (
pitch_data.get("release_spin_rate")
or pitch_breaks.get("spinRate")
or pitch_data.get("spinRate")
),
"pitch_type": (details.get("type", {}) or {}).get("description"),
"last_pitch": details.get("description"),
"pfx_x": coords.get("pfxX"),
"pfx_z": coords.get("pfxZ"),
"ax": coords.get("aX"),
"ay": coords.get("aY"),
"az": coords.get("aZ"),
"release_x": coords.get("x0"),
"release_y": coords.get("y0"),
"release_z": coords.get("z0"),
"found_pitch_event": True,
"raw_pitch_data": str(pitch_data)[:1500],
"raw_pitch_breaks": str(pitch_breaks)[:1000],
}
)
else:
rows.append(
{
"away_team": game.get("away_team"),
"home_team": game.get("home_team"),
"game_pk": game_pk,
"status": game.get("status"),
"pitch_velocity": None,
"pitch_extension": None,
"pitch_spin_rate": None,
"pitch_type": None,
"last_pitch": None,
"pfx_x": None,
"pfx_z": None,
"ax": None,
"ay": None,
"az": None,
"release_x": None,
"release_y": None,
"release_z": None,
"found_pitch_event": False,
}
)
return pd.DataFrame(rows)
def build_scores_from_schedule_via_live_feeds(schedule_df: pd.DataFrame) -> pd.DataFrame:
"""
Fallback path when the scores parser/feed is empty or unreliable.
Uses schedule rows + game_pk + live feed to derive live/final score rows.
"""
if schedule_df is None or schedule_df.empty:
return pd.DataFrame()
if "game_pk" not in schedule_df.columns:
return pd.DataFrame()
rows: list[dict] = []
for _, row in schedule_df.iterrows():
game = row.to_dict()
game_pk = normalize_game_pk(game.get("game_pk"))
if not game_pk:
rows.append(game)
continue
try:
feed = load_live_game_feed_cached(game_pk)
if isinstance(feed, dict) and feed:
game["game_pk"] = game_pk
game = enrich_game_from_live_feed(game, feed)
except Exception as e:
logger.warning(f"[feed_cache_load] failure: {e}", exc_info=True)
rows.append(game)
return pd.DataFrame(rows)
def enrich_live_games_from_feeds(scores_df: pd.DataFrame) -> pd.DataFrame:
if scores_df.empty:
return scores_df
rows = []
live_feed_calls = 0
for _, row in scores_df.iterrows():
game = row.to_dict()
original_status = str(game.get("status", "")).strip()
status = original_status.lower()
game_pk = normalize_game_pk(game.get("game_pk"))
is_live_candidate = any(
token in status for token in ["live", "top", "bot", "bottom", "mid", "inning"]
)
is_final_candidate = any(
token in status for token in ["final", "game over", "completed", "ended"]
)
# For finals, enrich aggressively if we have a usable game_pk.
# For live games, still respect the live feed cap.
should_enrich_live = (
is_live_candidate
and game_pk.isdigit()
and live_feed_calls < MAX_LIVE_FEEDS
)
should_enrich_final = (
is_final_candidate
and game_pk.isdigit()
)
if should_enrich_live or should_enrich_final:
try:
feed = load_live_game_feed_cached(game_pk)
if isinstance(feed, dict) and feed:
game = enrich_game_from_live_feed(game, feed)
if should_enrich_live:
live_feed_calls += 1
# Preserve original completed-game status text
if is_final_candidate:
game["status"] = original_status if original_status else "Final"
except Exception as e:
logger.warning(f"[game_status_preserve] failure: {e}", exc_info=True)
rows.append(game)
return pd.DataFrame(rows)
def _extract_status_order(status: str) -> tuple[int, int]:
s = str(status or "").strip().lower()
if any(token in s for token in ["top", "bot", "bottom", "mid", "live"]):
inning_num = 0
for part in reversed(s.split()):
try:
inning_num = int(part)
break
except Exception:
continue
return (0, -inning_num)
if "final" in s:
return (1, 0)
return (2, 0)
MLB_TEAM_LOGOS = {
"angels": "https://a.espncdn.com/i/teamlogos/mlb/500/ana.png",
"astros": "https://a.espncdn.com/i/teamlogos/mlb/500/hou.png",
"athletics": "https://a.espncdn.com/i/teamlogos/mlb/500/oak.png",
"blue jays": "https://a.espncdn.com/i/teamlogos/mlb/500/tor.png",
"braves": "https://a.espncdn.com/i/teamlogos/mlb/500/atl.png",
"brewers": "https://a.espncdn.com/i/teamlogos/mlb/500/mil.png",
"cardinals": "https://a.espncdn.com/i/teamlogos/mlb/500/stl.png",
"cubs": "https://a.espncdn.com/i/teamlogos/mlb/500/chc.png",
"diamondbacks": "https://a.espncdn.com/i/teamlogos/mlb/500/ari.png",
"dodgers": "https://a.espncdn.com/i/teamlogos/mlb/500/lad.png",
"giants": "https://a.espncdn.com/i/teamlogos/mlb/500/sf.png",
"guardians": "https://a.espncdn.com/i/teamlogos/mlb/500/cle.png",
"mariners": "https://a.espncdn.com/i/teamlogos/mlb/500/sea.png",
"marlins": "https://a.espncdn.com/i/teamlogos/mlb/500/mia.png",
"mets": "https://a.espncdn.com/i/teamlogos/mlb/500/nym.png",
"nationals": "https://a.espncdn.com/i/teamlogos/mlb/500/wsh.png",
"orioles": "https://a.espncdn.com/i/teamlogos/mlb/500/bal.png",
"padres": "https://a.espncdn.com/i/teamlogos/mlb/500/sd.png",
"phillies": "https://a.espncdn.com/i/teamlogos/mlb/500/phi.png",
"pirates": "https://a.espncdn.com/i/teamlogos/mlb/500/pit.png",
"rangers": "https://a.espncdn.com/i/teamlogos/mlb/500/tex.png",
"rays": "https://a.espncdn.com/i/teamlogos/mlb/500/tb.png",
"red sox": "https://a.espncdn.com/i/teamlogos/mlb/500/bos.png",
"reds": "https://a.espncdn.com/i/teamlogos/mlb/500/cin.png",
"rockies": "https://a.espncdn.com/i/teamlogos/mlb/500/col.png",
"royals": "https://a.espncdn.com/i/teamlogos/mlb/500/kc.png",
"tigers": "https://a.espncdn.com/i/teamlogos/mlb/500/det.png",
"twins": "https://a.espncdn.com/i/teamlogos/mlb/500/min.png",
"white sox": "https://a.espncdn.com/i/teamlogos/mlb/500/cws.png",
"yankees": "https://a.espncdn.com/i/teamlogos/mlb/500/nyy.png",
}
def get_team_logo_url(team_name: str, competition_bucket: str = "") -> str | None:
if str(competition_bucket or "").upper() != "MLB":
return None
return MLB_TEAM_LOGOS.get(str(team_name or "").strip().lower())
def sort_scoreboard_games(df: pd.DataFrame) -> pd.DataFrame:
if df is None or df.empty:
return df
out = df.copy()
if "status" not in out.columns:
out["status"] = ""
if "start_time_et" not in out.columns:
out["start_time_et"] = ""
if "away_team" not in out.columns:
out["away_team"] = ""
if "home_team" not in out.columns:
out["home_team"] = ""
status_keys = out["status"].fillna("").astype(str).apply(_extract_status_order)
out["status_group"] = status_keys.apply(lambda x: x[0])
out["status_rank"] = status_keys.apply(lambda x: x[1])
out = out.sort_values(
by=["status_group", "status_rank", "start_time_et", "away_team", "home_team"],
ascending=[True, True, True, True, True],
)
return out.drop(columns=["status_group", "status_rank"], errors="ignore")
def render_live_prop_odds_debug_panel(live_games: pd.DataFrame) -> None:
if live_games.empty:
return
with st.expander("Debug: live prop odds for first live game"):
first_game = live_games.iloc[0].to_dict()
st.write("Game context")
st.write(
{
"away_team": first_game.get("away_team"),
"home_team": first_game.get("home_team"),
"status": first_game.get("status"),
}
)
odds_df = load_live_prop_odds_for_game(first_game)
if odds_df.empty:
st.info("No live prop odds returned for this game.")
return
display_cols = [
col
for col in [
"sportsbook",
"market",
"player_name",
"odds_american",
"line",
]
if col in odds_df.columns
]
st.dataframe(odds_df[display_cols], use_container_width=True, hide_index=True)
def grade_final_game_outcomes_from_scores(scores_df: pd.DataFrame) -> None:
try:
outcome_df = build_game_outcome_rows_from_scores(
scores_df=scores_df,
graded_at=utc_now_iso(),
)
insert_game_outcomes(conn, outcome_df)
except Exception:
pass
def grade_batter_prop_outcomes_from_audit() -> None:
try:
audit_df = read_recommendation_audit_view(conn)
outcome_df = build_batter_prop_outcome_rows_from_audit(
audit_df=audit_df,
graded_at=utc_now_iso(),
)
replace_batter_prop_outcomes(conn, outcome_df)
except Exception:
pass
def fill_batter_prop_realized_outcomes(statcast_df: pd.DataFrame) -> None:
try:
batter_prop_outcomes_df = read_batter_prop_outcomes(conn)
if batter_prop_outcomes_df.empty:
return
graded_df = build_batter_realization_rows(
batter_prop_outcomes_df=batter_prop_outcomes_df,
statcast_df=statcast_df,
graded_at=utc_now_iso(),
)
if not graded_df.empty:
replace_batter_prop_outcomes(conn, graded_df)
except Exception:
pass
def attach_game_pk_from_schedule(
scores_like_df: pd.DataFrame,
schedule_df: pd.DataFrame,
) -> pd.DataFrame:
if scores_like_df is None or scores_like_df.empty:
return pd.DataFrame() if scores_like_df is None else scores_like_df
if schedule_df is None or schedule_df.empty or "game_pk" not in schedule_df.columns:
return scores_like_df.copy()
def canon(name: str) -> str:
if not name:
return ""
n = str(name).strip().lower()
mapping = {
"usa": "united states",
"u.s.a.": "united states",
"united states": "united states",
"puerto rico": "puerto rico",
"dominican republic": "dominican republic",
"great britain": "great britain",
"chinese taipei": "chinese taipei",
"taiwan": "chinese taipei",
"czech republic": "czechia",
"czechia": "czechia",
"korea": "korea",
"south korea": "korea",
"japan": "japan",
"brazil": "brazil",
"italy": "italy",
"nicaragua": "nicaragua",
"mexico": "mexico",
"venezuela": "venezuela",
"colombia": "colombia",
"panama": "panama",
"netherlands": "netherlands",
"cuba": "cuba",
"israel": "israel",
"canada": "canada",
"australia": "australia",
"china": "china",
}
return mapping.get(n, n)
scores = scores_like_df.copy()
schedule = schedule_df.copy()
if "away_team" not in scores.columns or "home_team" not in scores.columns:
return scores
scores["away_key"] = scores["away_team"].apply(canon)
scores["home_key"] = scores["home_team"].apply(canon)
schedule["away_key"] = schedule["away_team"].apply(canon)
schedule["home_key"] = schedule["home_team"].apply(canon)
schedule_keys = (
schedule[["away_key", "home_key", "game_pk"]]
.dropna(subset=["away_key", "home_key"])
.drop_duplicates(subset=["away_key", "home_key"])
.copy()
)
scores = scores.merge(
schedule_keys,
on=["away_key", "home_key"],
how="left",
suffixes=("", "_sched"),
)
if "game_pk_sched" in scores.columns:
if "game_pk" in scores.columns:
scores["game_pk"] = (
scores["game_pk"]
.astype(str)
.replace({"": pd.NA, "nan": pd.NA, "None": pd.NA})
.combine_first(scores["game_pk_sched"])
)
else:
scores["game_pk"] = scores["game_pk_sched"]
scores = scores.drop(columns=["game_pk_sched"])
return scores
def inject_live_auto_refresh(interval_ms: int = 5000) -> None:
components.html(
f"""
<script>
const intervalMs = {interval_ms};
if (!window.parent.__liveAutoRefreshScheduled) {{
window.parent.__liveAutoRefreshScheduled = true;
setTimeout(() => {{
window.parent.__liveAutoRefreshScheduled = false;
window.parent.location.reload();
}}, intervalMs);
}}
</script>
""",
height=0,
)
def render_dashboard() -> None:
phase6_debug_rows = []
st.subheader("Live Dashboard")
st.caption("Professional scoreboard view")
st.caption(f"Dashboard date: {current_dashboard_date_str()} (America/New_York)")
st.caption(
f"Live dashboard refresh cadence: {LIVE_FEED_TTL_SECONDS}s cache. "
f"Use browser refresh or swap tabs for immediate update."
)
dashboard_date_str = current_dashboard_date_str()
scores_df = get_stable_scores_for_dashboard_date(dashboard_date_str)
schedule_date_str = dashboard_date_str
if not scores_df.empty and "scores_source_date" in scores_df.columns:
try:
source_date = str(scores_df["scores_source_date"].iloc[0]).strip()
if source_date:
schedule_date_str = source_date
except Exception:
pass
dashboard_payload = _build_dashboard_ready_payload(
dashboard_date_str=dashboard_date_str,
schedule_date_str=schedule_date_str,
scores_json=scores_df.to_json(orient="split", date_format="iso"),
)
schedule_df = dashboard_payload["schedule_df"]
live_games = dashboard_payload["live_games"]
final_games = dashboard_payload["final_games"]
scheduled_games = dashboard_payload["scheduled_games"]
baseline_bundle = dashboard_payload["baseline_slice"] if isinstance(dashboard_payload.get("baseline_slice"), dict) else {}
statcast_df = dashboard_payload["statcast_df"]
pitcher_statcast_df = dashboard_payload["pitcher_statcast_df"]
odds_df = dashboard_payload["odds_df"]
filter_option = st.radio(
"Game Status",
["All", "Live", "Final", "Scheduled"],
horizontal=True,
key="dashboard_filter",
)
live_games = sort_scoreboard_games(normalize_game_cards_df(live_games))
final_games = sort_scoreboard_games(normalize_game_cards_df(final_games))
scheduled_games = sort_scoreboard_games(normalize_game_cards_df(scheduled_games))
auto_refresh_live = st.sidebar.checkbox(
"Full Page Auto Refresh Toggle",
value=False,
key="auto_refresh_live_dashboard",
)
if auto_refresh_live and not live_games.empty:
inject_live_auto_refresh(interval_ms=5000)
if filter_option == "All":
if not live_games.empty:
render_live_games_fragment(
live_games=live_games,
statcast_df=statcast_df,
pitcher_statcast_df=pitcher_statcast_df,
odds_df=odds_df,
)
render_scoreboard_section("FINAL", final_games)
render_scoreboard_section("UPCOMING", scheduled_games)
elif filter_option == "Live":
if not live_games.empty:
render_live_games_fragment(
live_games=live_games,
statcast_df=statcast_df,
pitcher_statcast_df=pitcher_statcast_df,
odds_df=odds_df,
)
if live_games.empty and final_games.empty and scheduled_games.empty:
st.warning("No games available from either schedule or scores feed.")
render_live_prop_odds_debug_panel(live_games)
render_statcast_retry_fragment()
if statcast_df.empty:
st.info("No WBC Statcast rows returned for the selected window.")
def render_players() -> None:
st.subheader("WBC Player Analytics")
statcast_df = read_table(conn, "shared_hitter_baseline_event_rows").drop(
columns=["snapshot_built_at", "snapshot_version", "source_status"],
errors="ignore",
)
if statcast_df.empty:
statcast_df = load_statcast_recent()
if statcast_df.empty:
st.info("No recent WBC Statcast data available.")
return
st.caption(f"Loaded {len(statcast_df)} WBC Statcast rows")
col1, col2 = st.columns(2)
with col1:
st.plotly_chart(create_exit_velocity_chart(statcast_df), use_container_width=True)
with col2:
st.plotly_chart(create_launch_angle_chart(statcast_df), use_container_width=True)
def compute_market_edges(odds_df: pd.DataFrame) -> pd.DataFrame:
if odds_df.empty:
return odds_df
out = odds_df.copy()
out["implied_prob"] = out["price"].apply(american_to_implied_prob)
grouped_rows: list[dict] = []
for (event_id, sportsbook, market_key), group in out.groupby(["event_id", "sportsbook", "market_key"]):
temp = group.copy().reset_index(drop=True)
if len(temp) == 2:
p1, p2 = temp.loc[0, "implied_prob"], temp.loc[1, "implied_prob"]
nv1, nv2 = remove_vig_two_way(p1, p2)
temp.loc[0, "no_vig_prob"] = nv1
temp.loc[1, "no_vig_prob"] = nv2
else:
total = temp["implied_prob"].sum()
temp["no_vig_prob"] = temp["implied_prob"] / total if total else temp["implied_prob"]
market_key_val = str(market_key).lower()
has_model = market_key_val in ("player_props_hr", "batter_home_runs", "hr")
for _, row in temp.iterrows():
grouped_rows.append(
{
**row.to_dict(),
"model_prob": None,
"edge": None,
"kelly": None,
"has_model": has_model,
}
)
return pd.DataFrame(grouped_rows)
def render_betting() -> None:
from visualization.betting_page import render_betting_tab
bundle = load_upcoming_hr_props_bundle()
render_betting_tab(bundle, load_fn=load_upcoming_hr_props_bundle)
def render_matchups() -> None:
st.subheader("WBC Matchup Analyzer")
hitter_meta = read_table(conn, "shared_hitter_baseline_meta")
pitcher_meta = read_table(conn, "shared_pitcher_baseline_meta")
available_players = sorted(
hitter_meta.get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist()
) if not hitter_meta.empty else []
available_pitchers = sorted(
pitcher_meta.get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist()
) if not pitcher_meta.empty else []
if not available_players or not available_pitchers:
baseline_bundle = load_shared_baseline_bundle_cached()
hitter_meta = baseline_bundle.get("batter_baseline_meta", pd.DataFrame())
pitcher_meta = baseline_bundle.get("pitcher_baseline_meta", pd.DataFrame())
available_players = sorted(
hitter_meta.get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist()
) if not hitter_meta.empty else []
available_pitchers = sorted(
pitcher_meta.get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist()
) if not pitcher_meta.empty else []
if not available_players or not available_pitchers:
st.info("No WBC Statcast available yet.")
return
selected_pitcher_player = st.selectbox(
"Pitcher reference sample",
options=available_pitchers,
)
selected_venue = st.text_input("Venue", value="loanDepot park")
selected_batters = st.multiselect("Batters", options=available_players, default=available_players[:8])
baseline_bundle = load_shared_baseline_bundle_cached(
batter_names=_coerce_name_tuple(selected_batters),
pitcher_names=_coerce_name_tuple([selected_pitcher_player]),
)
statcast_df = baseline_bundle.get("blended_batter_df", pd.DataFrame())
pitcher_statcast_df = baseline_bundle.get("blended_pitcher_df", pd.DataFrame())
if statcast_df.empty:
statcast_df = load_statcast_recent()
if pitcher_statcast_df.empty:
pitcher_statcast_df = load_statcast_recent()
if statcast_df.empty or pitcher_statcast_df.empty:
st.info("No WBC Statcast available yet.")
return
batter_df = batter_summary(statcast_df)
if batter_df.empty:
st.info("No batter summary available.")
return
pitcher_events = pitcher_statcast_df[pitcher_statcast_df["player_name"] == selected_pitcher_player].copy()
pitcher_profile = pitcher_baseline_from_events(pitcher_events)
weather_df = load_weather(selected_venue)
temperature_f = None
wind_speed_mph = None
if not weather_df.empty:
temperature_f = float(weather_df.iloc[0]["temperature_f"]) if pd.notna(weather_df.iloc[0]["temperature_f"]) else None
wind_speed_mph = float(weather_df.iloc[0]["wind_speed_mph"]) if pd.notna(weather_df.iloc[0]["wind_speed_mph"]) else None
rows = []
filtered_batters = batter_df[batter_df["player_name"].isin(selected_batters)].copy()
for _, batter_row in filtered_batters.iterrows():
result = calculate_matchup_score(
batter_row=batter_row,
pitcher_profile=pitcher_profile,
venue_name=selected_venue,
temperature_f=temperature_f,
wind_speed_mph=wind_speed_mph,
)
rows.append(
{
"player_name": batter_row["player_name"],
**result,
}
)
matchup_df = pd.DataFrame(rows).sort_values("matchup_score", ascending=False)
st.dataframe(matchup_df, use_container_width=True, hide_index=True)
col1, col2 = st.columns(2)
with col1:
st.plotly_chart(create_matchup_score_chart(matchup_df), use_container_width=True)
with col2:
st.plotly_chart(create_hit_hr_chart(matchup_df), use_container_width=True)
if not matchup_df.empty:
selected_player = st.selectbox("Simulate batter", options=matchup_df["player_name"].tolist())
sim_row = matchup_df[matchup_df["player_name"] == selected_player].iloc[0]
sim_df = simulate_batter_outcomes(
hit_prob=float(sim_row["hit_prob"]),
hr_prob=float(sim_row["hr_prob"]),
n_sims=10000,
)
c1, c2, c3 = st.columns(3)
c1.metric("Sim Hit Rate", f"{sim_df['hit'].mean():.2%}")
c2.metric("Sim HR Rate", f"{sim_df['hr'].mean():.2%}")
c3.metric("Avg Total Bases", f"{sim_df['total_bases'].mean():.2f}")
col3, col4 = st.columns(2)
with col3:
st.plotly_chart(create_total_bases_distribution(sim_df, selected_player), use_container_width=True)
with col4:
st.plotly_chart(create_hr_distribution(sim_df, selected_player), use_container_width=True)
def render_bet_tracker() -> None:
st.subheader("Bet Tracker")
with st.form("bet_form", clear_on_submit=True):
c1, c2, c3 = st.columns(3)
sportsbook = c1.text_input("Sportsbook", value="DraftKings")
market = c2.text_input("Market", value="h2h")
selection = c3.text_input("Selection", value="Example Team")
c4, c5, c6 = st.columns(3)
odds = c4.number_input("Odds", min_value=-1000, max_value=1000, value=120, step=1)
stake = c5.number_input("Stake", min_value=0.0, value=10.0, step=1.0)
game_id = c6.text_input("Game ID", value="")
notes = st.text_input("Notes", value="")
submitted = st.form_submit_button("Log Bet")
if submitted:
bet_id = next_bet_id(conn)
insert_bet(
conn=conn,
bet_id=bet_id,
created_at=utc_now_iso(),
sportsbook=sportsbook,
market=market,
selection=selection,
odds=int(odds),
stake=float(stake),
result="open",
profit=0.0,
game_id=game_id,
notes=notes,
)
st.success(f"Logged bet #{bet_id}")
bets_df = read_table(conn, "bets")
if bets_df.empty:
st.info("No bets logged yet.")
return
st.dataframe(bets_df, use_container_width=True, hide_index=True)
with st.expander("Grade a bet"):
bet_id_to_grade = st.number_input("Bet ID", min_value=1, step=1, value=1)
result = st.selectbox("Result", options=["win", "loss"])
if st.button("Apply Grade"):
row = bets_df[bets_df["bet_id"] == bet_id_to_grade]
if row.empty:
st.error("Bet ID not found.")
else:
stake = float(row.iloc[0]["stake"])
odds = int(row.iloc[0]["odds"])
profit = grade_profit(stake, odds, result)
update_bet_result(conn, int(bet_id_to_grade), result, profit)
st.success(f"Updated bet #{bet_id_to_grade} to {result}")
bets_df = read_table(conn, "bets")
metrics = summary_metrics(bets_df)
c1, c2, c3, c4 = st.columns(4)
c1.metric("Graded Bets", metrics["bets"])
c2.metric("Profit", f"${metrics['profit']:.2f}")
c3.metric("ROI", f"{metrics['roi']:.2%}")
c4.metric("Win Rate", f"{metrics['win_rate']:.2%}")
curve_df = bankroll_curve(bets_df)
st.plotly_chart(create_bankroll_chart(curve_df), use_container_width=True)
def render_alpha_release() -> None:
st.subheader("Alpha Release")
st.info(
"**Kasper is in alpha.** Model probabilities are statistical estimates, not guarantees. "
"Edge values reflect model output vs. market implied probability — they do not predict outcomes. "
"All outputs are for informational and research purposes only."
)
st.markdown(
"""
**Kasper** is a pre-game and live-game baseball analytics engine built for the 2026 MLB season.
It ingests Statcast data, live game feeds, and sportsbook odds to compute batter HR probabilities,
compare them against the market, and surface edges in real time.
This is an **alpha release** — the model stack is functional and actively processing live data,
but outputs are under ongoing validation. Calibration data is accumulating each game day.
"""
)
with st.expander("System Overview", expanded=False):
st.markdown(
"""
**What Kasper currently supports:**
- Live game recommendations (Dashboard) — HR, Hit, Total Bases props for batters On Deck / In Hole / 3 Away
- Pre-game HR prop analysis (Props tab) — edge vs. retail books (DraftKings, FanDuel, BetMGM, Caesars)
- Execution layer (Alpha) — cross-book market comparison, edge quality filtering, final recommendation score
- Full debug visibility — adjustment ladders, signal attribution, execution layer diagnostics
**Data sources:**
- Statcast (Baseball Savant) — batter and pitcher features, 14-day rolling window
- MLB Schedule API — live game state, lineup, score
- Sportsbook odds API — HR prop odds from retail books
"""
)
with st.expander("How It Works", expanded=False):
st.markdown(
"""
**Signal flow:**
```
Statcast features
→ Batter baseline (EV90, barrel rate, hard-hit rate, xwOBA, launch angle)
→ Pitcher adjustment (velo, EV allowed, barrel rate allowed)
→ Context adjustments (park, weather, bullpen state)
→ Zone / arsenal / family-zone matchup overlays
→ Trend & rolling form (5/10-game windows)
→ Opportunity adjustment (expected PA given game state)
→ Fair probability → American odds
→ Compare vs. sportsbook implied probability
→ Edge = model prob − book implied prob
→ Execution layer (market disagreement, confidence, timing, final score)
→ Recommendation: BET / WATCH / PASS
```
"""
)
with st.expander("Core Math", expanded=False):
st.markdown(
r"""
**Baseline probability** (per batter, pre-game):
- EV90, barrel rate, hard-hit rate, xwOBA, launch angle → weighted sum → bounded probability
- Bounds: HR [0.5%, 22%], Hit [5%, 50%], TB2P [3%, 42%]
**Edge:**
```
edge = model_prob − implied_prob(book_odds)
```
Positive edge = model believes event is more likely than the market does.
**Adjusted edge** (live Dashboard):
```
adjusted_edge = hr_edge + slot_boost
slot_boost: On Deck +1.2pp, In Hole +0.6pp, 3 Away +0.0pp
```
**Execution score** (Execution Layer):
```
base = edge_filtered × (0.4 + confidence × 0.6)
score = base − vol_penalty + market_bonus + timing_bonus
score clamped to [−0.30, +0.30]
```
**Recommendation tiers:**
- BET: adjusted_edge ≥ 6% AND confidence ≥ 78
- WATCH: adjusted_edge ≥ 2.5% AND confidence ≥ 62
- PASS: all others
"""
)
with st.expander("Signal Library", expanded=False):
st.markdown(
"""
| Signal | Source | Type |
|--------|--------|------|
| EV90 | Statcast (90th pct exit velo) | Batter power |
| Barrel rate | Statcast | Batter quality contact |
| Hard-hit rate | Statcast | Batter contact strength |
| xwOBA | Statcast | Batter overall quality |
| Launch angle | Statcast | HR trajectory profile |
| Pitcher velo | Statcast | Pitcher difficulty |
| EV allowed | Statcast | Pitcher weakness |
| Zone matchup | Statcast pitch zones | Pitch-to-zone alignment |
| Arsenal matchup | Statcast pitch types | Batter vs. pitch family |
| Rolling form | 5/10-game window | Recent batter/pitcher trend |
| Bullpen state | Live game feed | Leverage / transition risk |
| Park factor | Venue lookup | HR environment |
| Platoon | Batter/pitcher handedness | Splits adjustment |
| Opportunity | Game state (outs, slot) | Expected PA probability |
"""
)
with st.expander("Execution Layer (Alpha)", expanded=False):
st.markdown(
"""
The Execution Layer is a post-model pass that does **not** modify probabilities.
It operates on already-computed outputs (model probs + book odds) to improve edge selection.
**Five passes:**
1. **Market Disagreement** — best/median/worst implied prob across books; flags outlier and stale books
2. **Edge Quality** — confidence score (source quality), volatility score (market width), signal strength; filters edge_raw → edge_filtered
3. **Timing Heuristics** — detects aggressive prices (>2pp better than median) and timestamp presence
4. **Correlation** — flags all HR props as positively correlated; detects stacked games (>2 players per game)
5. **Final Score** — blends edge_filtered, confidence, volatility, market width, and timing into a [−0.30, +0.30] score
Visible in: Props tab → "Execution Layer" expander | Debug tab → "Execution Layer (Props)" expander
"""
)
with st.expander("System Health", expanded=False):
st.markdown(
"""
| Feed | Refresh | Notes |
|------|---------|-------|
| Live game feed | 5s TTL | Live only when games in progress |
| Scores | 8s TTL | |
| Schedule | 300s TTL | |
| Statcast | 600s TTL | 14-day rolling window |
| Odds (moneyline) | 30s TTL | Used for Betting tab |
| HR props (live, per game) | 60s TTL | Wired into Dashboard recommendations |
| HR props (pre-game) | On demand | Via Props tab |
Data is stored in CockroachDB. Tables: `recommendation_logs`, `upcoming_hr_props`,
`batter_prop_outcomes`, `game_outcomes`, `feedback_submissions`.
"""
)
with st.expander("Alpha Scope", expanded=False):
st.markdown(
"""
**Primary focus:** HR props (home run probability)
HR is the primary market because:
- It has the clearest Statcast signal (EV90, barrel rate, launch angle)
- It's a binary outcome — clean to evaluate
- Books offer consistent retail HR prop lines (DK, FD, BetMGM, Caesars)
Hit and Total Bases props are computed and displayed but receive less model focus in alpha.
"""
)
with st.expander("Known Limitations", expanded=False):
st.markdown(
"""
- **Pre-game baseline only** (Props tab): No live lineup, park, or weather context. Model uses season Statcast features.
- **Live book odds**: When live HR prop odds are unavailable for a game, the Dashboard uses market-neutral reference odds (~+425). These are labeled with `~` in the BOOK column.
- **Calibration**: Model has not yet accumulated a full-season outcome dataset. Probability estimates are structurally reasonable but not empirically calibrated to 2026 data.
- **Name mapping**: Sportsbook player names sometimes differ from Statcast names. Some players may show "unavailable" source until mapping is added.
- **No closing line value (CLV)**: CLV tracking requires final closing odds — not yet wired.
- **No account for lineup scratches**: If a player is scratched post-lineup release, the model doesn't know.
"""
)
with st.expander("Feedback & Roadmap", expanded=False):
st.markdown(
"""
Use the **Feedback** tab to submit observations, bugs, or suggestions.
**Near-term roadmap:**
- Post-game outcome grading and calibration reports
- Closing line value (CLV) tracking
- Hit and Total Bases model calibration
- XGBoost model integration (currently shadow mode only)
"""
)
def main() -> None:
render_header()
page = st.sidebar.radio(
"Navigation",
options=[
"Dashboard",
"Props",
"Card Lab",
"Odds",
"Bet Tracker",
"Alpha Release",
"Feedback",
"Debug",
],
)
st.sidebar.caption(f"Live: {LIVE_FEED_TTL_SECONDS}s | Scores: {SCORES_TTL_SECONDS}s | "
f"Schedule: {SCHEDULE_TTL_SECONDS}s | Statcast: {STATCAST_TTL_SECONDS}s")
loader = render_loading_shell()
if page == "Dashboard":
loader["update"]("Loading dashboard data and live game context...", 0.35)
render_dashboard()
loader["clear"]()
elif page == "Props":
loader["update"]("Loading pregame props, statcast context, and projected lineups...", 0.25)
loader["update"]("Loading sportsbook props and probable starters...", 0.45)
_upcoming_props_bundle = load_upcoming_hr_props_bundle()
st.session_state["upcoming_props_bundle_debug"] = _upcoming_props_bundle
_probable_starters = load_probable_starters()
loader["update"]("Preparing market-scoped baseline context...", 0.70)
loader["update"]("Building Props Command Center...", 0.85)
render_props(
None,
conn=conn,
raw_props=_upcoming_props_bundle["merged_props_feed"],
pitcher_statcast_df=None,
probable_starters=_probable_starters,
)
loader["clear"]()
elif page == "Card Lab":
loader["update"]("Loading Card Lab assets...", 0.45)
render_card_lab(conn=conn)
loader["clear"]()
elif page == "Odds":
loader["update"]("Loading live odds...", 0.45)
render_betting()
loader["clear"]()
elif page == "Bet Tracker":
loader["update"]("Loading bet tracker...", 0.45)
render_bet_tracker()
loader["clear"]()
elif page == "Alpha Release":
loader["update"]("Loading release notes and explainer content...", 0.45)
render_alpha_release()
loader["clear"]()
elif page == "Feedback":
loader["update"]("Loading feedback tools...", 0.45)
render_feedback(conn)
loader["clear"]()
elif page == "Debug":
loader["update"]("Loading debug diagnostics and provider visibility...", 0.30)
_debug_scores = get_stable_scores_for_dashboard_date(current_wbc_date_str())
_debug_baseline_bundle = load_shared_baseline_bundle_from_snapshots(
max_age_seconds=max(STATCAST_TTL_SECONDS, 60 * 60)
)
_debug_read_status = {
"hitter_event_rows": {
"table_name": "shared_hitter_baseline_event_rows",
"read_source": "baseline_bundle",
"read_attempts": 1,
"retry_used": False,
"read_error": "",
"snapshot_built_at": None,
"source_status": str(_debug_baseline_bundle.get("snapshot_source_status") or ""),
},
"pitcher_event_rows": {
"table_name": "shared_pitcher_baseline_event_rows",
"read_source": "baseline_bundle",
"read_attempts": 1,
"retry_used": False,
"read_error": "",
"snapshot_built_at": None,
"source_status": str(_debug_baseline_bundle.get("snapshot_source_status") or ""),
},
}
_baseline_status_df = _debug_baseline_bundle.get("snapshot_status", pd.DataFrame())
if isinstance(_baseline_status_df, pd.DataFrame) and not _baseline_status_df.empty:
for _table_key, _status_key in [
("shared_hitter_baseline_event_rows", "hitter_event_rows"),
("shared_pitcher_baseline_event_rows", "pitcher_event_rows"),
]:
_row = _baseline_status_df[_baseline_status_df["table_name"] == _table_key]
if not _row.empty:
_debug_read_status[_status_key]["snapshot_built_at"] = _row.iloc[0].get("snapshot_built_at")
_debug_read_status[_status_key]["source_status"] = _row.iloc[0].get("source_status")
_debug_hitter_df = _debug_baseline_bundle.get("blended_batter_df", pd.DataFrame()).copy()
_debug_pitcher_df = _debug_baseline_bundle.get("blended_pitcher_df", pd.DataFrame()).copy()
if not isinstance(_debug_hitter_df, pd.DataFrame):
_debug_hitter_df = pd.DataFrame()
if not isinstance(_debug_pitcher_df, pd.DataFrame):
_debug_pitcher_df = pd.DataFrame()
_debug_hitter_df = _debug_hitter_df.drop(
columns=["snapshot_built_at", "snapshot_version", "source_status"],
errors="ignore",
)
_debug_pitcher_df = _debug_pitcher_df.drop(
columns=["snapshot_built_at", "snapshot_version", "source_status"],
errors="ignore",
)
if _debug_hitter_df.empty:
_debug_hitter_df, _debug_read_status["hitter_event_rows"] = read_table_retryable(
conn,
"shared_hitter_baseline_event_rows",
)
_debug_hitter_df = _debug_hitter_df.drop(
columns=["snapshot_built_at", "snapshot_version", "source_status"],
errors="ignore",
)
if _debug_pitcher_df.empty:
_debug_pitcher_df, _debug_read_status["pitcher_event_rows"] = read_table_retryable(
conn,
"shared_pitcher_baseline_event_rows",
)
_debug_pitcher_df = _debug_pitcher_df.drop(
columns=["snapshot_built_at", "snapshot_version", "source_status"],
errors="ignore",
)
if _debug_hitter_df.empty:
_debug_read_status["hitter_event_rows"]["read_source"] = "load_statcast_recent_fallback"
_debug_hitter_df = load_statcast_recent()
if _debug_pitcher_df.empty:
_debug_read_status["pitcher_event_rows"]["read_source"] = "debug_hitter_fallback"
_debug_pitcher_df = _debug_hitter_df
loader["update"]("Rendering debug tables...", 0.75)
render_debug(
statcast_df=_debug_hitter_df,
pitcher_statcast_df=_debug_pitcher_df,
baseline_bundle=_debug_baseline_bundle,
odds_df=load_odds(),
conn=conn,
live_games=pd.DataFrame(),
scores_df=_debug_scores,
upcoming_props_debug=load_upcoming_hr_props_bundle(),
grade_outcomes_fn=grade_final_game_outcomes_from_scores,
grade_props_fn=grade_batter_prop_outcomes_from_audit,
fill_realized_fn=fill_batter_prop_realized_outcomes,
debug_event_row_status=_debug_read_status,
)
loader["clear"]()
if __name__ == "__main__":
main()