Spaces:
Running
Running
| """ | |
| database/db.py | |
| CockroachDB-backed persistence layer (cutover from DuckDB). | |
| All table management for the active runtime tables: | |
| recommendation_logs, recommendation_outcomes, upcoming_hr_props, | |
| game_outcomes, batter_prop_outcomes, bets, | |
| cached_schedule, cached_odds, cached_weather (schema artifacts, no active inserts) | |
| Public API is unchanged — all callers import from this module. | |
| Connection is a long-lived SQLAlchemy connection opened with AUTOCOMMIT isolation | |
| (matches DuckDB's default auto-commit-per-statement behavior). | |
| Bulk inserts are chunked at _INSERT_CHUNK_SIZE rows per execute call to prevent | |
| large single-payload issues in CockroachDB. SQLAlchemy passes list-of-dicts to | |
| execute() as executemany — no Python-level per-row loops. | |
| No primary keys or unique constraints are added in this batch. Schema is designed | |
| so they can be added later without structural changes. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import threading | |
| import time | |
| from typing import Any, Mapping | |
| import pandas as pd | |
| from sqlalchemy import text | |
| from sqlalchemy.exc import DBAPIError, OperationalError | |
| from database import remote_db | |
| from utils.helpers import utc_now_iso | |
| # --------------------------------------------------------------------------- | |
| # Chunk size for bulk inserts | |
| # --------------------------------------------------------------------------- | |
| _INSERT_CHUNK_SIZE = 500 | |
| _READ_RETRY_ATTEMPTS = 4 | |
| _READ_RETRY_BASE_DELAY_SECONDS = 0.20 | |
| # Guard so initialize_schema() runs exactly once per process, not once per connection. | |
| _schema_initialized: bool = False | |
| _schema_init_lock: threading.Lock = threading.Lock() | |
| # --------------------------------------------------------------------------- | |
| # Connection | |
| # --------------------------------------------------------------------------- | |
| def get_connection(): | |
| """ | |
| Returns a CockroachDB SQLAlchemy connection with AUTOCOMMIT isolation. | |
| Schema is initialized once per process lifetime (guarded by _schema_initialized). | |
| The connection is long-lived (module-level in app.py); the pool handles | |
| stale connection detection via pool_pre_ping=True and pool_recycle=300. | |
| """ | |
| global _schema_initialized | |
| conn = remote_db.get_connection().execution_options(isolation_level="AUTOCOMMIT") | |
| if not _schema_initialized: | |
| with _schema_init_lock: | |
| if not _schema_initialized: | |
| initialize_schema(conn) | |
| _schema_initialized = True | |
| return conn | |
| # --------------------------------------------------------------------------- | |
| # Private helpers | |
| # --------------------------------------------------------------------------- | |
| def _bulk_insert(conn, table: str, df: pd.DataFrame) -> None: | |
| """ | |
| Insert all rows from df into table using chunked executemany. | |
| Column names in df must match table column names exactly. | |
| NaN values are converted to None (SQL NULL) before insertion. | |
| """ | |
| if df is None or df.empty: | |
| return | |
| cols = list(df.columns) | |
| col_list = ", ".join(cols) | |
| placeholders = ", ".join(f":{c}" for c in cols) | |
| sql = text(f"INSERT INTO {table} ({col_list}) VALUES ({placeholders})") | |
| records = df.where(df.notna(), other=None).to_dict("records") | |
| for i in range(0, len(records), _INSERT_CHUNK_SIZE): | |
| conn.execute(sql, records[i : i + _INSERT_CHUNK_SIZE]) | |
| def _filter_market_rows(df: pd.DataFrame | None, market_family: str) -> pd.DataFrame: | |
| if df is None or df.empty: | |
| return pd.DataFrame() | |
| market_series = ( | |
| df.get("market_family", df.get("market", pd.Series(dtype="object", index=df.index))) | |
| .fillna("") | |
| .astype(str) | |
| .str.strip() | |
| .str.lower() | |
| ) | |
| return df[market_series == str(market_family or "").strip().lower()].copy() | |
| def _build_market_book_summary(df: pd.DataFrame | None, market_family: str) -> pd.DataFrame: | |
| market_df = _filter_market_rows(df, market_family) | |
| if market_df.empty: | |
| return pd.DataFrame(columns=["market_family", "sportsbook", "rows", "unique_events", "unique_players"]) | |
| return ( | |
| market_df.groupby(["market_family", "sportsbook"], dropna=False) | |
| .agg( | |
| rows=("player_name", "size"), | |
| unique_events=("event_id", pd.Series.nunique), | |
| unique_players=("player_name", pd.Series.nunique), | |
| ) | |
| .reset_index() | |
| .sort_values(["market_family", "sportsbook"], na_position="last") | |
| .reset_index(drop=True) | |
| ) | |
| def _build_missing_hr_books_global( | |
| merged_df: pd.DataFrame | None, | |
| requested_books: list[str], | |
| ) -> pd.DataFrame: | |
| requested = sorted({str(book).strip().lower() for book in (requested_books or []) if str(book).strip()}) | |
| hr_df = _filter_market_rows(merged_df, "hr") | |
| present = sorted( | |
| { | |
| str(book).strip().lower() | |
| for book in hr_df.get("sportsbook_key", pd.Series(dtype="object")).dropna().astype(str).tolist() | |
| if str(book).strip() | |
| } | |
| ) | |
| missing = sorted(set(requested) - set(present)) | |
| return pd.DataFrame( | |
| [ | |
| { | |
| "market_family": "hr", | |
| "available_books": ", ".join(present), | |
| "missing_books": ", ".join(missing), | |
| "available_count": len(present), | |
| "missing_count": len(missing), | |
| } | |
| ] | |
| ) | |
| def _build_missing_hr_books_by_event( | |
| merged_df: pd.DataFrame | None, | |
| requested_books: list[str], | |
| ) -> pd.DataFrame: | |
| if merged_df is None or merged_df.empty: | |
| return pd.DataFrame( | |
| columns=[ | |
| "event_id", "away_team", "home_team", "commence_time", | |
| "market_family", "available_books", "missing_books", | |
| "available_count", "missing_count", | |
| ] | |
| ) | |
| requested = {str(book).strip().lower() for book in (requested_books or []) if str(book).strip()} | |
| group_cols = [c for c in ["event_id", "away_team", "home_team", "commence_time"] if c in merged_df.columns] | |
| if not group_cols: | |
| return pd.DataFrame() | |
| hr_df = _filter_market_rows(merged_df, "hr") | |
| rows: list[dict[str, Any]] = [] | |
| for key, event_df in merged_df.groupby(group_cols, dropna=False): | |
| if not isinstance(key, tuple): | |
| key = (key,) | |
| key_map = dict(zip(group_cols, key)) | |
| event_hr = hr_df | |
| for col, value in key_map.items(): | |
| event_hr = event_hr[event_hr[col] == value] | |
| available = sorted( | |
| { | |
| str(book).strip().lower() | |
| for book in event_hr.get("sportsbook_key", pd.Series(dtype="object")).dropna().astype(str).tolist() | |
| if str(book).strip() | |
| } | |
| ) | |
| missing = sorted(requested - set(available)) | |
| rows.append( | |
| { | |
| **key_map, | |
| "market_family": "hr", | |
| "available_books": ", ".join(available), | |
| "missing_books": ", ".join(missing), | |
| "available_count": len(available), | |
| "missing_count": len(missing), | |
| } | |
| ) | |
| return pd.DataFrame(rows).sort_values(["event_id"], na_position="last").reset_index(drop=True) | |
| def _build_hr_snapshot_state( | |
| *, | |
| current_hr_row_count: int, | |
| is_complete: bool, | |
| overwrite_prevented: bool, | |
| ) -> str: | |
| if current_hr_row_count <= 0: | |
| return "empty" | |
| if overwrite_prevented: | |
| return "stale_degraded" | |
| if is_complete: | |
| return "usable_complete" | |
| return "usable_partial" | |
| def _preserve_last_known_good_hr_rows( | |
| incoming_df: pd.DataFrame | None, | |
| existing_df: pd.DataFrame | None, | |
| ) -> tuple[pd.DataFrame, bool]: | |
| incoming = incoming_df.copy() if isinstance(incoming_df, pd.DataFrame) else pd.DataFrame() | |
| existing = existing_df.copy() if isinstance(existing_df, pd.DataFrame) else pd.DataFrame() | |
| incoming_hr = _filter_market_rows(incoming, "hr") | |
| existing_hr = _filter_market_rows(existing, "hr") | |
| if incoming_hr.empty and not existing_hr.empty: | |
| non_hr = incoming[ | |
| incoming.get("market_family", incoming.get("market", pd.Series(dtype="object", index=incoming.index))) | |
| .fillna("") | |
| .astype(str) | |
| .str.strip() | |
| .str.lower() | |
| != "hr" | |
| ].copy() | |
| return pd.concat([non_hr, existing_hr], ignore_index=True, sort=False), True | |
| return incoming, False | |
| def _is_retryable_read_error(exc: Exception) -> bool: | |
| text_value = str(exc or "").lower() | |
| retry_markers = ( | |
| "serializationfailure", | |
| "restart transaction", | |
| "transactionretrywithprotorefresherror", | |
| "readwithinuncertaintyintervalerror", | |
| "retry txn", | |
| "retry_serializable", | |
| ) | |
| return any(marker in text_value for marker in retry_markers) | |
| def safe_read_sql( | |
| sql, | |
| conn, | |
| params: Mapping[str, Any] | None = None, | |
| *, | |
| max_attempts: int = _READ_RETRY_ATTEMPTS, | |
| base_delay_seconds: float = _READ_RETRY_BASE_DELAY_SECONDS, | |
| ) -> pd.DataFrame: | |
| last_error: Exception | None = None | |
| for attempt in range(1, max(1, int(max_attempts)) + 1): | |
| try: | |
| return pd.read_sql(sql, conn, params=params) | |
| except (OperationalError, DBAPIError) as exc: | |
| last_error = exc | |
| if attempt >= max_attempts or not _is_retryable_read_error(exc): | |
| raise | |
| time.sleep(base_delay_seconds * attempt) | |
| if last_error is not None: | |
| raise last_error | |
| return pd.DataFrame() | |
| def read_table_retryable( | |
| conn, | |
| table_name: str, | |
| *, | |
| where_sql: str | None = None, | |
| params: Mapping[str, Any] | None = None, | |
| max_attempts: int = _READ_RETRY_ATTEMPTS, | |
| ) -> tuple[pd.DataFrame, dict[str, Any]]: | |
| sql_text = f"SELECT * FROM {table_name}" | |
| if where_sql: | |
| sql_text += f" WHERE {where_sql}" | |
| attempts = 0 | |
| retry_used = False | |
| last_error = "" | |
| for attempt in range(1, max(1, int(max_attempts)) + 1): | |
| attempts = attempt | |
| try: | |
| df = safe_read_sql( | |
| text(sql_text), | |
| conn, | |
| params=params, | |
| max_attempts=1, | |
| ) | |
| return df, { | |
| "table_name": table_name, | |
| "read_source": "db_retryable", | |
| "read_attempts": attempts, | |
| "retry_used": retry_used, | |
| "read_error": "", | |
| } | |
| except (OperationalError, DBAPIError) as exc: | |
| last_error = str(exc) | |
| if attempt >= max_attempts or not _is_retryable_read_error(exc): | |
| return pd.DataFrame(), { | |
| "table_name": table_name, | |
| "read_source": "db_retryable_failed", | |
| "read_attempts": attempts, | |
| "retry_used": retry_used, | |
| "read_error": last_error, | |
| } | |
| retry_used = True | |
| time.sleep(_READ_RETRY_BASE_DELAY_SECONDS * attempt) | |
| return pd.DataFrame(), { | |
| "table_name": table_name, | |
| "read_source": "db_retryable_failed", | |
| "read_attempts": attempts, | |
| "retry_used": retry_used, | |
| "read_error": last_error, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Schema initialization | |
| # --------------------------------------------------------------------------- | |
| def ensure_statcast_core_tables(conn) -> None: | |
| """ | |
| Create statcast tables and the pitcher_zone_events table if they do not exist. | |
| Also ALTER TABLE to add new columns (plate_x/plate_z, release_extension) to | |
| existing tables — CockroachDB ADD COLUMN IF NOT EXISTS is idempotent. | |
| """ | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS statcast_event_core ( | |
| event_key TEXT PRIMARY KEY, | |
| player_name TEXT, | |
| batter BIGINT, | |
| pitcher BIGINT, | |
| game_date TEXT, | |
| game_pk BIGINT, | |
| source_season INT, | |
| pitch_name TEXT, | |
| events TEXT, | |
| description TEXT, | |
| stand TEXT, | |
| p_throws TEXT, | |
| home_team TEXT, | |
| away_team TEXT, | |
| inning INT, | |
| inning_topbot TEXT, | |
| at_bat_number INT, | |
| pitch_number INT, | |
| plate_x DOUBLE PRECISION, | |
| plate_z DOUBLE PRECISION | |
| ) | |
| """ | |
| )) | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS statcast_batted_ball ( | |
| event_key TEXT PRIMARY KEY, | |
| launch_speed DOUBLE PRECISION, | |
| launch_angle DOUBLE PRECISION, | |
| bb_type TEXT, | |
| estimated_woba_using_speedangle DOUBLE PRECISION | |
| ) | |
| """ | |
| )) | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS statcast_pitch_release ( | |
| event_key TEXT PRIMARY KEY, | |
| release_speed DOUBLE PRECISION, | |
| release_spin_rate DOUBLE PRECISION, | |
| pfx_x DOUBLE PRECISION, | |
| pfx_z DOUBLE PRECISION, | |
| release_extension DOUBLE PRECISION | |
| ) | |
| """ | |
| )) | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS pitcher_zone_events ( | |
| event_key TEXT PRIMARY KEY, | |
| pitcher_name TEXT, | |
| pitcher_id BIGINT, | |
| game_pk BIGINT, | |
| game_date TEXT, | |
| pitch_name TEXT, | |
| pitch_family TEXT, | |
| zone_bucket TEXT, | |
| plate_x DOUBLE PRECISION, | |
| plate_z DOUBLE PRECISION, | |
| pfx_x DOUBLE PRECISION, | |
| pfx_z DOUBLE PRECISION, | |
| release_speed DOUBLE PRECISION, | |
| release_spin_rate DOUBLE PRECISION, | |
| release_extension DOUBLE PRECISION, | |
| events TEXT, | |
| whiff_flag INTEGER, | |
| hit_flag INTEGER, | |
| hr_flag INTEGER, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """ | |
| )) | |
| # Add new columns to existing tables — silently no-op if already present | |
| for _stmt in [ | |
| "ALTER TABLE statcast_event_core ADD COLUMN IF NOT EXISTS plate_x DOUBLE PRECISION", | |
| "ALTER TABLE statcast_event_core ADD COLUMN IF NOT EXISTS plate_z DOUBLE PRECISION", | |
| "ALTER TABLE statcast_pitch_release ADD COLUMN IF NOT EXISTS release_extension DOUBLE PRECISION", | |
| ]: | |
| try: | |
| conn.execute(text(_stmt)) | |
| except Exception: | |
| pass | |
| def ensure_live_pitch_tables(conn) -> None: | |
| """Create live 2026 pitch-level and PA-level tables. Idempotent.""" | |
| conn.execute(text(""" | |
| CREATE TABLE IF NOT EXISTS live_pitch_mix_2026 ( | |
| event_key TEXT PRIMARY KEY, | |
| pa_key TEXT, | |
| game_pk BIGINT, | |
| game_date TEXT, | |
| source_season INT, | |
| batter BIGINT, | |
| pitcher BIGINT, | |
| player_name TEXT, | |
| stand TEXT, | |
| p_throws TEXT, | |
| home_team TEXT, | |
| away_team TEXT, | |
| inning INT, | |
| inning_topbot TEXT, | |
| at_bat_number INT, | |
| pitch_number INT, | |
| pitch_type TEXT, | |
| pitch_name TEXT, | |
| release_speed DOUBLE PRECISION, | |
| effective_speed DOUBLE PRECISION, | |
| release_spin_rate DOUBLE PRECISION, | |
| spin_axis DOUBLE PRECISION, | |
| pfx_x DOUBLE PRECISION, | |
| pfx_z DOUBLE PRECISION, | |
| release_pos_x DOUBLE PRECISION, | |
| release_pos_y DOUBLE PRECISION, | |
| release_pos_z DOUBLE PRECISION, | |
| release_extension DOUBLE PRECISION, | |
| plate_x DOUBLE PRECISION, | |
| plate_z DOUBLE PRECISION, | |
| zone INT, | |
| balls INT, | |
| strikes INT, | |
| outs_when_up INT, | |
| bat_score INT, | |
| fld_score INT, | |
| type TEXT, | |
| description TEXT, | |
| events TEXT, | |
| launch_speed DOUBLE PRECISION, | |
| launch_angle DOUBLE PRECISION, | |
| hit_distance_sc DOUBLE PRECISION, | |
| hc_x DOUBLE PRECISION, | |
| hc_y DOUBLE PRECISION, | |
| spray_angle DOUBLE PRECISION, | |
| bb_type TEXT, | |
| launch_speed_angle INT, | |
| barrel INT, | |
| estimated_ba_using_speedangle DOUBLE PRECISION, | |
| estimated_woba_using_speedangle DOUBLE PRECISION, | |
| woba_value DOUBLE PRECISION, | |
| woba_denom INT, | |
| delta_home_win_exp DOUBLE PRECISION, | |
| delta_run_exp DOUBLE PRECISION, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """)) | |
| conn.execute(text(""" | |
| CREATE TABLE IF NOT EXISTS live_batter_game_log_2026 ( | |
| pa_key TEXT PRIMARY KEY, | |
| game_pk BIGINT, | |
| game_date TEXT, | |
| source_season INT, | |
| batter BIGINT, | |
| player_name TEXT, | |
| stand TEXT, | |
| p_throws TEXT, | |
| home_team TEXT, | |
| away_team TEXT, | |
| inning INT, | |
| inning_topbot TEXT, | |
| at_bat_number INT, | |
| pitches_seen INT, | |
| balls_final INT, | |
| strikes_final INT, | |
| outs_when_up INT, | |
| events TEXT, | |
| description TEXT, | |
| launch_speed DOUBLE PRECISION, | |
| launch_angle DOUBLE PRECISION, | |
| hit_distance_sc DOUBLE PRECISION, | |
| spray_angle DOUBLE PRECISION, | |
| bb_type TEXT, | |
| launch_speed_angle INT, | |
| barrel INT, | |
| estimated_ba_using_speedangle DOUBLE PRECISION, | |
| estimated_woba_using_speedangle DOUBLE PRECISION, | |
| woba_value DOUBLE PRECISION, | |
| woba_denom INT, | |
| hit_flag INT, | |
| hr_flag INT, | |
| tb2p_flag INT, | |
| delta_home_win_exp DOUBLE PRECISION, | |
| delta_run_exp DOUBLE PRECISION, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """)) | |
| def initialize_schema(conn) -> None: | |
| """ | |
| Create base tables if they do not exist. | |
| All DDL is idempotent (CREATE TABLE IF NOT EXISTS). | |
| """ | |
| ensure_statcast_core_tables(conn) | |
| ensure_live_pitch_tables(conn) | |
| ensure_shared_baseline_snapshot_tables(conn) | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS bets ( | |
| bet_id BIGINT, | |
| created_at TEXT, | |
| sportsbook TEXT, | |
| market TEXT, | |
| selection TEXT, | |
| odds INTEGER, | |
| stake DOUBLE PRECISION, | |
| result TEXT, | |
| profit DOUBLE PRECISION, | |
| game_id TEXT, | |
| notes TEXT | |
| ) | |
| """ | |
| )) | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS cached_schedule ( | |
| fetched_at TEXT, | |
| game_id TEXT, | |
| game_pk TEXT, | |
| game_date TEXT, | |
| status TEXT, | |
| away_team TEXT, | |
| home_team TEXT, | |
| away_score INTEGER, | |
| home_score INTEGER, | |
| away_hits INTEGER, | |
| home_hits INTEGER, | |
| away_errors INTEGER, | |
| home_errors INTEGER, | |
| venue TEXT, | |
| game_datetime_utc TEXT, | |
| tv TEXT, | |
| start_time_et TEXT, | |
| sport_id INTEGER | |
| ) | |
| """ | |
| )) | |
| for _stmt in [ | |
| "ALTER TABLE cached_schedule ADD COLUMN IF NOT EXISTS game_pk TEXT", | |
| "ALTER TABLE cached_schedule ADD COLUMN IF NOT EXISTS away_hits INTEGER", | |
| "ALTER TABLE cached_schedule ADD COLUMN IF NOT EXISTS home_hits INTEGER", | |
| "ALTER TABLE cached_schedule ADD COLUMN IF NOT EXISTS away_errors INTEGER", | |
| "ALTER TABLE cached_schedule ADD COLUMN IF NOT EXISTS home_errors INTEGER", | |
| "ALTER TABLE cached_schedule ADD COLUMN IF NOT EXISTS game_datetime_utc TEXT", | |
| "ALTER TABLE cached_schedule ADD COLUMN IF NOT EXISTS tv TEXT", | |
| "ALTER TABLE cached_schedule ADD COLUMN IF NOT EXISTS start_time_et TEXT", | |
| "ALTER TABLE cached_schedule ADD COLUMN IF NOT EXISTS sport_id INTEGER", | |
| ]: | |
| try: | |
| conn.execute(text(_stmt)) | |
| except Exception: | |
| pass | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS cached_odds ( | |
| fetched_at TEXT, | |
| event_id TEXT, | |
| commence_time TEXT, | |
| home_team TEXT, | |
| away_team TEXT, | |
| sportsbook TEXT, | |
| market_key TEXT, | |
| outcome_name TEXT, | |
| price INTEGER, | |
| point DOUBLE PRECISION | |
| ) | |
| """ | |
| )) | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS cached_weather ( | |
| fetched_at TEXT, | |
| venue_key TEXT, | |
| location_name TEXT, | |
| temperature_f DOUBLE PRECISION, | |
| humidity INTEGER, | |
| wind_speed_mph DOUBLE PRECISION, | |
| wind_deg INTEGER, | |
| description TEXT | |
| ) | |
| """ | |
| )) | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS cached_probable_starters ( | |
| fetched_at TEXT, | |
| away_team_norm TEXT, | |
| home_team_norm TEXT, | |
| away_team_raw TEXT, | |
| home_team_raw TEXT, | |
| away_pitcher TEXT, | |
| home_pitcher TEXT, | |
| away_pitcher_source TEXT, | |
| home_pitcher_source TEXT, | |
| starter_cache_source TEXT, | |
| fallback_used BOOLEAN | |
| ) | |
| """ | |
| )) | |
| for _stmt in [ | |
| "ALTER TABLE cached_probable_starters ADD COLUMN IF NOT EXISTS away_pitcher_source TEXT", | |
| "ALTER TABLE cached_probable_starters ADD COLUMN IF NOT EXISTS home_pitcher_source TEXT", | |
| "ALTER TABLE cached_probable_starters ADD COLUMN IF NOT EXISTS starter_cache_source TEXT", | |
| "ALTER TABLE cached_probable_starters ADD COLUMN IF NOT EXISTS fallback_used BOOLEAN", | |
| ]: | |
| try: | |
| conn.execute(text(_stmt)) | |
| except Exception: | |
| pass | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS cached_upcoming_props_feed ( | |
| fetched_at TEXT, | |
| cache_key TEXT, | |
| row_count INTEGER, | |
| payload_json TEXT | |
| ) | |
| """ | |
| )) | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS cached_upcoming_props_rows ( | |
| fetched_at TEXT, | |
| cache_key TEXT, | |
| provider TEXT, | |
| row_source_type TEXT, | |
| coverage_completion_status TEXT, | |
| hr_books_requested TEXT, | |
| hr_books_present TEXT, | |
| hr_books_missing TEXT, | |
| event_id TEXT, | |
| commence_time TEXT, | |
| away_team TEXT, | |
| home_team TEXT, | |
| sportsbook TEXT, | |
| sportsbook_key TEXT, | |
| market_key TEXT, | |
| market TEXT, | |
| player_name_raw TEXT, | |
| player_name TEXT, | |
| odds_american INTEGER, | |
| line DOUBLE PRECISION, | |
| selection_label TEXT, | |
| selection_scope TEXT, | |
| selection_side TEXT, | |
| market_family TEXT, | |
| market_variant TEXT, | |
| threshold INTEGER, | |
| display_label TEXT, | |
| is_primary_line BOOLEAN, | |
| is_modeled BOOLEAN, | |
| player_event_market_key TEXT | |
| ) | |
| """ | |
| )) | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS cached_upcoming_props_bundle_meta ( | |
| fetched_at TEXT, | |
| cache_key TEXT, | |
| merged_row_count INTEGER, | |
| odds_api_row_count INTEGER, | |
| scraper_row_count INTEGER, | |
| coverage_summary_json TEXT, | |
| coverage_summary_api_json TEXT, | |
| coverage_summary_scraper_added_json TEXT, | |
| coverage_summary_final_json TEXT, | |
| coverage_summary_hr_api_json TEXT, | |
| coverage_summary_hr_supplemental_json TEXT, | |
| coverage_summary_hr_final_json TEXT, | |
| missing_books_by_market_json TEXT, | |
| missing_event_books_by_market_json TEXT, | |
| missing_hr_books_global_json TEXT, | |
| missing_hr_books_by_event_json TEXT, | |
| hr_snapshot_completeness_json TEXT, | |
| adapter_status_by_book_json TEXT, | |
| adapter_error_by_book_json TEXT, | |
| adapter_rows_by_book_json TEXT, | |
| adapter_last_attempted_at_by_book_json TEXT, | |
| adapter_retry_after_by_book_json TEXT, | |
| hr_snapshot_state TEXT, | |
| current_hr_row_count INTEGER, | |
| current_hr_event_count INTEGER, | |
| last_known_good_hr_row_count INTEGER, | |
| last_known_good_hr_built_at TEXT, | |
| hr_refresh_overwrite_prevented BOOLEAN, | |
| scraper_candidate_count INTEGER, | |
| scraper_added_count INTEGER, | |
| scraper_duplicate_reject_count INTEGER | |
| ) | |
| """ | |
| )) | |
| for _stmt in [ | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS coverage_summary_api_json TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS coverage_summary_scraper_added_json TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS coverage_summary_final_json TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS coverage_summary_hr_api_json TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS coverage_summary_hr_supplemental_json TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS coverage_summary_hr_final_json TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS missing_books_by_market_json TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS missing_event_books_by_market_json TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS missing_hr_books_global_json TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS missing_hr_books_by_event_json TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS hr_snapshot_completeness_json TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS adapter_status_by_book_json TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS adapter_error_by_book_json TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS adapter_rows_by_book_json TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS adapter_last_attempted_at_by_book_json TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS adapter_retry_after_by_book_json TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS hr_snapshot_state TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS current_hr_row_count INTEGER", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS current_hr_event_count INTEGER", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS last_known_good_hr_row_count INTEGER", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS last_known_good_hr_built_at TEXT", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS hr_refresh_overwrite_prevented BOOLEAN", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS scraper_candidate_count INTEGER", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS scraper_added_count INTEGER", | |
| "ALTER TABLE cached_upcoming_props_bundle_meta ADD COLUMN IF NOT EXISTS scraper_duplicate_reject_count INTEGER", | |
| ]: | |
| try: | |
| conn.execute(text(_stmt)) | |
| except Exception: | |
| pass | |
| for _stmt in [ | |
| "ALTER TABLE cached_upcoming_props_rows ADD COLUMN IF NOT EXISTS row_source_type TEXT", | |
| "ALTER TABLE cached_upcoming_props_rows ADD COLUMN IF NOT EXISTS coverage_completion_status TEXT", | |
| "ALTER TABLE cached_upcoming_props_rows ADD COLUMN IF NOT EXISTS hr_books_requested TEXT", | |
| "ALTER TABLE cached_upcoming_props_rows ADD COLUMN IF NOT EXISTS hr_books_present TEXT", | |
| "ALTER TABLE cached_upcoming_props_rows ADD COLUMN IF NOT EXISTS hr_books_missing TEXT", | |
| ]: | |
| try: | |
| conn.execute(text(_stmt)) | |
| except Exception: | |
| pass | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_statcast_player_date " | |
| "ON statcast_event_core (player_name, source_season, game_date)" | |
| )) | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_statcast_pitcher_date " | |
| "ON statcast_event_core (pitcher, source_season, game_date)" | |
| )) | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_statcast_game_pk " | |
| "ON statcast_event_core (game_pk)" | |
| )) | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_lpm_game_pk " | |
| "ON live_pitch_mix_2026 (game_pk)" | |
| )) | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_lpm_batter_date " | |
| "ON live_pitch_mix_2026 (batter, game_date)" | |
| )) | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_lpm_pa_key " | |
| "ON live_pitch_mix_2026 (pa_key)" | |
| )) | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_lbgl_game_pk " | |
| "ON live_batter_game_log_2026 (game_pk)" | |
| )) | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_lbgl_player_date " | |
| "ON live_batter_game_log_2026 (player_name, game_date)" | |
| )) | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_cached_schedule_game_date " | |
| "ON cached_schedule (game_date)" | |
| )) | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_cached_odds_fetched_at " | |
| "ON cached_odds (fetched_at)" | |
| )) | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_cached_weather_venue " | |
| "ON cached_weather (venue_key, fetched_at)" | |
| )) | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_cached_probable_starters_matchup " | |
| "ON cached_probable_starters (away_team_norm, home_team_norm)" | |
| )) | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_cached_upcoming_props_feed_key " | |
| "ON cached_upcoming_props_feed (cache_key, fetched_at)" | |
| )) | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_cached_upcoming_props_rows_key " | |
| "ON cached_upcoming_props_rows (cache_key, fetched_at, event_id, player_name)" | |
| )) | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_cached_upcoming_props_bundle_meta_key " | |
| "ON cached_upcoming_props_bundle_meta (cache_key, fetched_at)" | |
| )) | |
| # --------------------------------------------------------------------------- | |
| # Generic helpers | |
| # --------------------------------------------------------------------------- | |
| def upsert_dataframe( | |
| conn, | |
| table_name: str, | |
| df: pd.DataFrame, | |
| replace: bool = True, | |
| ) -> None: | |
| if df is None or df.empty: | |
| return | |
| if replace: | |
| conn.execute(text(f"DELETE FROM {table_name}")) | |
| _bulk_insert(conn, table_name, df) | |
| def replace_table_contents( | |
| conn, | |
| table_name: str, | |
| df: pd.DataFrame | None, | |
| ) -> None: | |
| """ | |
| Replace the full contents of a table, even when the replacement frame is empty. | |
| """ | |
| conn.execute(text(f"DELETE FROM {table_name}")) | |
| if df is None or df.empty: | |
| return | |
| _bulk_insert(conn, table_name, df) | |
| def read_table(conn, table_name: str) -> pd.DataFrame: | |
| return safe_read_sql(text(f"SELECT * FROM {table_name}"), conn) | |
| def _safe_json_dump(value: Any) -> str: | |
| return json.dumps(value, default=str) | |
| def _safe_json_load(value: Any, default: Any) -> Any: | |
| try: | |
| if value is None or str(value).strip() == "": | |
| return default | |
| return json.loads(str(value)) | |
| except Exception: | |
| return default | |
| def _latest_fetched_at(df: pd.DataFrame) -> str: | |
| if df is None or df.empty or "fetched_at" not in df.columns: | |
| return utc_now_iso() | |
| try: | |
| ts = pd.to_datetime(df["fetched_at"], errors="coerce").max() | |
| if pd.isna(ts): | |
| return utc_now_iso() | |
| return str(ts) | |
| except Exception: | |
| return utc_now_iso() | |
| def replace_cached_schedule(conn, df: pd.DataFrame) -> None: | |
| if df is None: | |
| df = pd.DataFrame() | |
| cols = [ | |
| "fetched_at", | |
| "game_id", | |
| "game_pk", | |
| "game_date", | |
| "status", | |
| "away_team", | |
| "home_team", | |
| "away_score", | |
| "home_score", | |
| "away_hits", | |
| "home_hits", | |
| "away_errors", | |
| "home_errors", | |
| "venue", | |
| "game_datetime_utc", | |
| "tv", | |
| "start_time_et", | |
| "sport_id", | |
| ] | |
| out = df.copy() | |
| for col in cols: | |
| if col not in out.columns: | |
| out[col] = None | |
| date_values = { | |
| str(value).strip() | |
| for value in out["game_date"].dropna().astype(str).tolist() | |
| if str(value).strip() | |
| } | |
| if date_values: | |
| clauses = [] | |
| params: dict[str, Any] = {} | |
| for idx, value in enumerate(sorted(date_values)): | |
| key = f"date_{idx}" | |
| clauses.append(f":{key}") | |
| params[key] = value | |
| conn.execute( | |
| text(f"DELETE FROM cached_schedule WHERE game_date IN ({', '.join(clauses)})"), | |
| params, | |
| ) | |
| _bulk_insert(conn, "cached_schedule", out[cols]) | |
| def read_cached_schedule_for_date(conn, date_str: str) -> pd.DataFrame: | |
| return pd.read_sql( | |
| text("SELECT * FROM cached_schedule WHERE game_date = :date ORDER BY game_id"), | |
| conn, | |
| params={"date": str(date_str)}, | |
| ) | |
| def replace_cached_odds(conn, df: pd.DataFrame) -> None: | |
| if df is None: | |
| df = pd.DataFrame() | |
| cols = [ | |
| "fetched_at", | |
| "event_id", | |
| "commence_time", | |
| "home_team", | |
| "away_team", | |
| "sportsbook", | |
| "market_key", | |
| "outcome_name", | |
| "price", | |
| "point", | |
| ] | |
| out = df.copy() | |
| for col in cols: | |
| if col not in out.columns: | |
| out[col] = None | |
| replace_table_contents(conn, "cached_odds", out[cols]) | |
| def read_cached_odds(conn) -> pd.DataFrame: | |
| return pd.read_sql(text("SELECT * FROM cached_odds ORDER BY fetched_at DESC"), conn) | |
| def replace_cached_weather(conn, df: pd.DataFrame) -> None: | |
| if df is None: | |
| df = pd.DataFrame() | |
| cols = [ | |
| "fetched_at", | |
| "venue_key", | |
| "location_name", | |
| "temperature_f", | |
| "humidity", | |
| "wind_speed_mph", | |
| "wind_deg", | |
| "description", | |
| ] | |
| out = df.copy() | |
| for col in cols: | |
| if col not in out.columns: | |
| out[col] = None | |
| venue_values = { | |
| str(value).strip() | |
| for value in out["venue_key"].dropna().astype(str).tolist() | |
| if str(value).strip() | |
| } | |
| if venue_values: | |
| clauses = [] | |
| params: dict[str, Any] = {} | |
| for idx, value in enumerate(sorted(venue_values)): | |
| key = f"venue_{idx}" | |
| clauses.append(f":{key}") | |
| params[key] = value | |
| conn.execute( | |
| text(f"DELETE FROM cached_weather WHERE venue_key IN ({', '.join(clauses)})"), | |
| params, | |
| ) | |
| _bulk_insert(conn, "cached_weather", out[cols]) | |
| def read_cached_weather_for_venue(conn, venue_key: str) -> pd.DataFrame: | |
| return pd.read_sql( | |
| text( | |
| """ | |
| SELECT * FROM cached_weather | |
| WHERE venue_key = :venue | |
| ORDER BY fetched_at DESC | |
| """ | |
| ), | |
| conn, | |
| params={"venue": str(venue_key)}, | |
| ) | |
| def replace_cached_probable_starters( | |
| conn, | |
| starters_map: Mapping[tuple[str, str], Mapping[str, Any]] | None, | |
| ) -> None: | |
| rows: list[dict[str, Any]] = [] | |
| fetched_at = utc_now_iso() | |
| for key, payload in (starters_map or {}).items(): | |
| if not isinstance(key, tuple) or len(key) != 2: | |
| continue | |
| away_norm, home_norm = key | |
| payload = dict(payload or {}) | |
| rows.append( | |
| { | |
| "fetched_at": fetched_at, | |
| "away_team_norm": str(away_norm or "").strip(), | |
| "home_team_norm": str(home_norm or "").strip(), | |
| "away_team_raw": str(payload.get("away_team_raw") or "").strip(), | |
| "home_team_raw": str(payload.get("home_team_raw") or "").strip(), | |
| "away_pitcher": str(payload.get("away_pitcher") or "").strip() or None, | |
| "home_pitcher": str(payload.get("home_pitcher") or "").strip() or None, | |
| "away_pitcher_source": str(payload.get("away_pitcher_source") or "").strip() or None, | |
| "home_pitcher_source": str(payload.get("home_pitcher_source") or "").strip() or None, | |
| "starter_cache_source": str(payload.get("starter_cache_source") or "").strip() or None, | |
| "fallback_used": bool(payload.get("fallback_used")), | |
| } | |
| ) | |
| replace_table_contents(conn, "cached_probable_starters", pd.DataFrame(rows)) | |
| def read_cached_probable_starters(conn) -> dict[tuple[str, str], dict[str, str | None]]: | |
| df = pd.read_sql(text("SELECT * FROM cached_probable_starters"), conn) | |
| if df.empty: | |
| return {} | |
| out: dict[tuple[str, str], dict[str, str | None]] = {} | |
| for _, row in df.iterrows(): | |
| key = ( | |
| str(row.get("away_team_norm") or "").strip(), | |
| str(row.get("home_team_norm") or "").strip(), | |
| ) | |
| if not key[0] or not key[1]: | |
| continue | |
| out[key] = { | |
| "away_team_raw": str(row.get("away_team_raw") or "").strip(), | |
| "home_team_raw": str(row.get("home_team_raw") or "").strip(), | |
| "away_pitcher": str(row.get("away_pitcher") or "").strip() or None, | |
| "home_pitcher": str(row.get("home_pitcher") or "").strip() or None, | |
| "away_pitcher_source": str(row.get("away_pitcher_source") or "").strip() or None, | |
| "home_pitcher_source": str(row.get("home_pitcher_source") or "").strip() or None, | |
| "starter_cache_source": str(row.get("starter_cache_source") or "").strip() or None, | |
| "fallback_used": bool(row.get("fallback_used")), | |
| } | |
| return out | |
| def read_cached_probable_starters_meta(conn) -> pd.DataFrame: | |
| return pd.read_sql( | |
| text( | |
| """ | |
| SELECT fetched_at, COUNT(*) AS matchup_count | |
| FROM cached_probable_starters | |
| GROUP BY fetched_at | |
| ORDER BY fetched_at DESC | |
| """ | |
| ), | |
| conn, | |
| ) | |
| def replace_cached_upcoming_props_bundle( | |
| conn, | |
| bundle: Mapping[str, pd.DataFrame] | None, | |
| cache_key: str = "default", | |
| ) -> None: | |
| bundle = dict(bundle or {}) | |
| merged = bundle.get("merged_props_feed", pd.DataFrame()) | |
| coverage = bundle.get("coverage_summary", pd.DataFrame()) | |
| coverage_api = bundle.get("coverage_summary_api", pd.DataFrame()) | |
| coverage_scraper_added = bundle.get("coverage_summary_scraper_added", pd.DataFrame()) | |
| coverage_final = bundle.get("coverage_summary_final", pd.DataFrame()) | |
| coverage_hr_api = bundle.get("coverage_summary_hr_api", pd.DataFrame()) | |
| coverage_hr_supplemental = bundle.get("coverage_summary_hr_supplemental", pd.DataFrame()) | |
| coverage_hr_final = bundle.get("coverage_summary_hr_final", pd.DataFrame()) | |
| missing_books_by_market = bundle.get("missing_books_by_market", pd.DataFrame()) | |
| missing_event_books_by_market = bundle.get("missing_event_books_by_market", pd.DataFrame()) | |
| missing_hr_books_global = bundle.get("missing_hr_books_global", pd.DataFrame()) | |
| missing_hr_books_by_event = bundle.get("missing_hr_books_by_event", pd.DataFrame()) | |
| hr_snapshot_completeness = bundle.get("hr_snapshot_completeness", {}) | |
| adapter_status_by_book = bundle.get("adapter_status_by_book", {}) | |
| adapter_error_by_book = bundle.get("adapter_error_by_book", {}) | |
| adapter_rows_by_book = bundle.get("adapter_rows_by_book", {}) | |
| adapter_last_attempted_at_by_book = bundle.get("adapter_last_attempted_at_by_book", {}) | |
| adapter_retry_after_by_book = bundle.get("adapter_retry_after_by_book", {}) | |
| odds_api_raw = bundle.get("odds_api_raw", pd.DataFrame()) | |
| scraper_raw = bundle.get("scraper_raw", pd.DataFrame()) | |
| existing_bundle = read_cached_upcoming_props_bundle(conn, cache_key=cache_key) | |
| existing_merged = existing_bundle.get("merged_props_feed", pd.DataFrame()) | |
| existing_hr_rows = _filter_market_rows(existing_merged, "hr") | |
| merged, hr_refresh_overwrite_prevented = _preserve_last_known_good_hr_rows(merged, existing_merged) | |
| current_hr_rows = _filter_market_rows(merged, "hr") | |
| current_hr_row_count = int(len(current_hr_rows)) | |
| current_hr_event_count = int(current_hr_rows["event_id"].nunique()) if not current_hr_rows.empty and "event_id" in current_hr_rows.columns else 0 | |
| requested_hr_books = list(dict(hr_snapshot_completeness or {}).get("requested_books") or []) | |
| if not requested_hr_books: | |
| requested_hr_books = list( | |
| dict(existing_bundle.get("hr_snapshot_completeness") or {}).get("requested_books") or [] | |
| ) | |
| if not requested_hr_books: | |
| requested_hr_books = [ | |
| str(book).strip().lower() | |
| for book in merged.get("sportsbook_key", pd.Series(dtype="object")).dropna().astype(str).tolist() | |
| if str(book).strip() | |
| ] | |
| present_hr_books = sorted( | |
| { | |
| str(book).strip().lower() | |
| for book in current_hr_rows.get("sportsbook_key", pd.Series(dtype="object")).dropna().astype(str).tolist() | |
| if str(book).strip() | |
| } | |
| ) | |
| missing_hr_books = sorted(set(requested_hr_books) - set(present_hr_books)) | |
| hr_snapshot_completeness = { | |
| "market_family": "hr", | |
| "requested_books": sorted(set(requested_hr_books)), | |
| "present_books": present_hr_books, | |
| "missing_books": missing_hr_books, | |
| "requested_count": len(set(requested_hr_books)), | |
| "present_count": len(present_hr_books), | |
| "missing_count": len(missing_hr_books), | |
| "is_complete": len(missing_hr_books) == 0 and current_hr_row_count > 0, | |
| "row_count": current_hr_row_count, | |
| "event_count": current_hr_event_count, | |
| } | |
| hr_snapshot_state = _build_hr_snapshot_state( | |
| current_hr_row_count=current_hr_row_count, | |
| is_complete=bool(hr_snapshot_completeness.get("is_complete")), | |
| overwrite_prevented=hr_refresh_overwrite_prevented, | |
| ) | |
| if current_hr_row_count > 0: | |
| last_known_good_hr_row_count = current_hr_row_count | |
| last_known_good_hr_built_at = _latest_fetched_at(current_hr_rows) | |
| else: | |
| last_known_good_hr_row_count = int(existing_bundle.get("current_hr_row_count") or len(existing_hr_rows) or 0) | |
| last_known_good_hr_built_at = existing_bundle.get("last_known_good_hr_built_at") or "" | |
| coverage_hr_final = _build_market_book_summary(merged, "hr") | |
| missing_hr_books_global = _build_missing_hr_books_global(merged, requested_hr_books) | |
| missing_hr_books_by_event = _build_missing_hr_books_by_event(merged, requested_hr_books) | |
| fetched_at = _latest_fetched_at(merged if isinstance(merged, pd.DataFrame) else pd.DataFrame()) | |
| feed_df = pd.DataFrame( | |
| [ | |
| { | |
| "fetched_at": fetched_at, | |
| "cache_key": cache_key, | |
| "row_count": int(len(merged)) if isinstance(merged, pd.DataFrame) else 0, | |
| "payload_json": _safe_json_dump( | |
| [] if merged is None or not isinstance(merged, pd.DataFrame) | |
| else merged.where(merged.notna(), other=None).to_dict("records") | |
| ), | |
| } | |
| ] | |
| ) | |
| merged_rows_df = pd.DataFrame() | |
| if isinstance(merged, pd.DataFrame): | |
| merged_rows_df = merged.copy() | |
| if merged_rows_df is None or merged_rows_df.empty: | |
| merged_rows_df = pd.DataFrame(columns=[ | |
| "provider","row_source_type","coverage_completion_status","hr_books_requested", | |
| "hr_books_present","hr_books_missing","event_id","commence_time","away_team","home_team","sportsbook", | |
| "sportsbook_key","market_key","market","player_name_raw","player_name", | |
| "odds_american","line","selection_label","selection_scope","selection_side", | |
| "market_family","market_variant","threshold","display_label", | |
| "is_primary_line","is_modeled","player_event_market_key" | |
| ]) | |
| merged_rows_df["fetched_at"] = fetched_at | |
| merged_rows_df["cache_key"] = cache_key | |
| ordered_row_cols = [ | |
| "fetched_at","cache_key","provider","event_id","commence_time","away_team","home_team", | |
| "row_source_type","coverage_completion_status","hr_books_requested","hr_books_present","hr_books_missing", | |
| "sportsbook","sportsbook_key","market_key","market","player_name_raw","player_name", | |
| "odds_american","line","selection_label","selection_scope","selection_side", | |
| "market_family","market_variant","threshold","display_label","is_primary_line", | |
| "is_modeled","player_event_market_key", | |
| ] | |
| for col in ordered_row_cols: | |
| if col not in merged_rows_df.columns: | |
| merged_rows_df[col] = None | |
| meta_df = pd.DataFrame( | |
| [ | |
| { | |
| "fetched_at": fetched_at, | |
| "cache_key": cache_key, | |
| "merged_row_count": int(len(merged)) if isinstance(merged, pd.DataFrame) else 0, | |
| "odds_api_row_count": int(len(odds_api_raw)) if isinstance(odds_api_raw, pd.DataFrame) else 0, | |
| "scraper_row_count": int(len(scraper_raw)) if isinstance(scraper_raw, pd.DataFrame) else 0, | |
| "coverage_summary_json": _safe_json_dump( | |
| [] if coverage is None or not isinstance(coverage, pd.DataFrame) | |
| else coverage.where(coverage.notna(), other=None).to_dict("records") | |
| ), | |
| "coverage_summary_api_json": _safe_json_dump( | |
| [] if coverage_api is None or not isinstance(coverage_api, pd.DataFrame) | |
| else coverage_api.where(coverage_api.notna(), other=None).to_dict("records") | |
| ), | |
| "coverage_summary_scraper_added_json": _safe_json_dump( | |
| [] if coverage_scraper_added is None or not isinstance(coverage_scraper_added, pd.DataFrame) | |
| else coverage_scraper_added.where(coverage_scraper_added.notna(), other=None).to_dict("records") | |
| ), | |
| "coverage_summary_final_json": _safe_json_dump( | |
| [] if coverage_final is None or not isinstance(coverage_final, pd.DataFrame) | |
| else coverage_final.where(coverage_final.notna(), other=None).to_dict("records") | |
| ), | |
| "coverage_summary_hr_api_json": _safe_json_dump( | |
| [] if coverage_hr_api is None or not isinstance(coverage_hr_api, pd.DataFrame) | |
| else coverage_hr_api.where(coverage_hr_api.notna(), other=None).to_dict("records") | |
| ), | |
| "coverage_summary_hr_supplemental_json": _safe_json_dump( | |
| [] if coverage_hr_supplemental is None or not isinstance(coverage_hr_supplemental, pd.DataFrame) | |
| else coverage_hr_supplemental.where(coverage_hr_supplemental.notna(), other=None).to_dict("records") | |
| ), | |
| "coverage_summary_hr_final_json": _safe_json_dump( | |
| [] if coverage_hr_final is None or not isinstance(coverage_hr_final, pd.DataFrame) | |
| else coverage_hr_final.where(coverage_hr_final.notna(), other=None).to_dict("records") | |
| ), | |
| "missing_books_by_market_json": _safe_json_dump( | |
| [] if missing_books_by_market is None or not isinstance(missing_books_by_market, pd.DataFrame) | |
| else missing_books_by_market.where(missing_books_by_market.notna(), other=None).to_dict("records") | |
| ), | |
| "missing_event_books_by_market_json": _safe_json_dump( | |
| [] if missing_event_books_by_market is None or not isinstance(missing_event_books_by_market, pd.DataFrame) | |
| else missing_event_books_by_market.where(missing_event_books_by_market.notna(), other=None).to_dict("records") | |
| ), | |
| "missing_hr_books_global_json": _safe_json_dump( | |
| [] if missing_hr_books_global is None or not isinstance(missing_hr_books_global, pd.DataFrame) | |
| else missing_hr_books_global.where(missing_hr_books_global.notna(), other=None).to_dict("records") | |
| ), | |
| "missing_hr_books_by_event_json": _safe_json_dump( | |
| [] if missing_hr_books_by_event is None or not isinstance(missing_hr_books_by_event, pd.DataFrame) | |
| else missing_hr_books_by_event.where(missing_hr_books_by_event.notna(), other=None).to_dict("records") | |
| ), | |
| "hr_snapshot_completeness_json": _safe_json_dump(dict(hr_snapshot_completeness or {})), | |
| "adapter_status_by_book_json": _safe_json_dump(dict(adapter_status_by_book or {})), | |
| "adapter_error_by_book_json": _safe_json_dump(dict(adapter_error_by_book or {})), | |
| "adapter_rows_by_book_json": _safe_json_dump(dict(adapter_rows_by_book or {})), | |
| "adapter_last_attempted_at_by_book_json": _safe_json_dump(dict(adapter_last_attempted_at_by_book or {})), | |
| "adapter_retry_after_by_book_json": _safe_json_dump(dict(adapter_retry_after_by_book or {})), | |
| "hr_snapshot_state": hr_snapshot_state, | |
| "current_hr_row_count": current_hr_row_count, | |
| "current_hr_event_count": current_hr_event_count, | |
| "last_known_good_hr_row_count": int(last_known_good_hr_row_count or 0), | |
| "last_known_good_hr_built_at": str(last_known_good_hr_built_at or ""), | |
| "hr_refresh_overwrite_prevented": bool(hr_refresh_overwrite_prevented), | |
| "scraper_candidate_count": int(bundle.get("scraper_candidate_count") or 0), | |
| "scraper_added_count": int(bundle.get("scraper_added_count") or 0), | |
| "scraper_duplicate_reject_count": int(bundle.get("scraper_duplicate_reject_count") or 0), | |
| } | |
| ] | |
| ) | |
| replace_table_contents(conn, "cached_upcoming_props_feed", feed_df) | |
| replace_table_contents(conn, "cached_upcoming_props_rows", merged_rows_df[ordered_row_cols]) | |
| replace_table_contents(conn, "cached_upcoming_props_bundle_meta", meta_df) | |
| def read_cached_upcoming_props_bundle( | |
| conn, | |
| cache_key: str = "default", | |
| ) -> dict[str, pd.DataFrame]: | |
| rows_df = pd.read_sql( | |
| text( | |
| """ | |
| SELECT * FROM cached_upcoming_props_rows | |
| WHERE cache_key = :cache_key | |
| ORDER BY fetched_at DESC, event_id, player_name | |
| LIMIT 5000 | |
| """ | |
| ), | |
| conn, | |
| params={"cache_key": cache_key}, | |
| ) | |
| feed_df = pd.read_sql( | |
| text( | |
| """ | |
| SELECT * FROM cached_upcoming_props_feed | |
| WHERE cache_key = :cache_key | |
| ORDER BY fetched_at DESC | |
| LIMIT 1 | |
| """ | |
| ), | |
| conn, | |
| params={"cache_key": cache_key}, | |
| ) | |
| meta_df = pd.read_sql( | |
| text( | |
| """ | |
| SELECT * FROM cached_upcoming_props_bundle_meta | |
| WHERE cache_key = :cache_key | |
| ORDER BY fetched_at DESC | |
| LIMIT 1 | |
| """ | |
| ), | |
| conn, | |
| params={"cache_key": cache_key}, | |
| ) | |
| if not rows_df.empty: | |
| merged = rows_df.drop(columns=["cache_key"], errors="ignore").copy() | |
| else: | |
| merged = pd.DataFrame(_safe_json_load(feed_df.iloc[0]["payload_json"], [])) if not feed_df.empty else pd.DataFrame() | |
| coverage = pd.DataFrame(_safe_json_load(meta_df.iloc[0]["coverage_summary_json"], [])) if not meta_df.empty else pd.DataFrame() | |
| return { | |
| "merged_props_feed": merged, | |
| "coverage_summary": coverage, | |
| "coverage_summary_api": pd.DataFrame(_safe_json_load(meta_df.iloc[0].get("coverage_summary_api_json"), [])) if not meta_df.empty else pd.DataFrame(), | |
| "coverage_summary_scraper_added": pd.DataFrame(_safe_json_load(meta_df.iloc[0].get("coverage_summary_scraper_added_json"), [])) if not meta_df.empty else pd.DataFrame(), | |
| "coverage_summary_final": pd.DataFrame(_safe_json_load(meta_df.iloc[0].get("coverage_summary_final_json"), [])) if not meta_df.empty else pd.DataFrame(), | |
| "coverage_summary_hr_api": pd.DataFrame(_safe_json_load(meta_df.iloc[0].get("coverage_summary_hr_api_json"), [])) if not meta_df.empty else pd.DataFrame(), | |
| "coverage_summary_hr_supplemental": pd.DataFrame(_safe_json_load(meta_df.iloc[0].get("coverage_summary_hr_supplemental_json"), [])) if not meta_df.empty else pd.DataFrame(), | |
| "coverage_summary_hr_final": pd.DataFrame(_safe_json_load(meta_df.iloc[0].get("coverage_summary_hr_final_json"), [])) if not meta_df.empty else pd.DataFrame(), | |
| "missing_books_by_market": pd.DataFrame(_safe_json_load(meta_df.iloc[0].get("missing_books_by_market_json"), [])) if not meta_df.empty else pd.DataFrame(), | |
| "missing_event_books_by_market": pd.DataFrame(_safe_json_load(meta_df.iloc[0].get("missing_event_books_by_market_json"), [])) if not meta_df.empty else pd.DataFrame(), | |
| "missing_hr_books_global": pd.DataFrame(_safe_json_load(meta_df.iloc[0].get("missing_hr_books_global_json"), [])) if not meta_df.empty else pd.DataFrame(), | |
| "missing_hr_books_by_event": pd.DataFrame(_safe_json_load(meta_df.iloc[0].get("missing_hr_books_by_event_json"), [])) if not meta_df.empty else pd.DataFrame(), | |
| "hr_snapshot_completeness": _safe_json_load(meta_df.iloc[0].get("hr_snapshot_completeness_json"), {}) if not meta_df.empty else {}, | |
| "adapter_status_by_book": _safe_json_load(meta_df.iloc[0].get("adapter_status_by_book_json"), {}) if not meta_df.empty else {}, | |
| "adapter_error_by_book": _safe_json_load(meta_df.iloc[0].get("adapter_error_by_book_json"), {}) if not meta_df.empty else {}, | |
| "adapter_rows_by_book": _safe_json_load(meta_df.iloc[0].get("adapter_rows_by_book_json"), {}) if not meta_df.empty else {}, | |
| "adapter_last_attempted_at_by_book": _safe_json_load(meta_df.iloc[0].get("adapter_last_attempted_at_by_book_json"), {}) if not meta_df.empty else {}, | |
| "adapter_retry_after_by_book": _safe_json_load(meta_df.iloc[0].get("adapter_retry_after_by_book_json"), {}) if not meta_df.empty else {}, | |
| "hr_snapshot_state": str(meta_df.iloc[0].get("hr_snapshot_state") or "") if not meta_df.empty else "", | |
| "current_hr_row_count": int(meta_df.iloc[0].get("current_hr_row_count") or 0) if not meta_df.empty else 0, | |
| "current_hr_event_count": int(meta_df.iloc[0].get("current_hr_event_count") or 0) if not meta_df.empty else 0, | |
| "last_known_good_hr_row_count": int(meta_df.iloc[0].get("last_known_good_hr_row_count") or 0) if not meta_df.empty else 0, | |
| "last_known_good_hr_built_at": str(meta_df.iloc[0].get("last_known_good_hr_built_at") or "") if not meta_df.empty else "", | |
| "hr_refresh_overwrite_prevented": bool(meta_df.iloc[0].get("hr_refresh_overwrite_prevented")) if not meta_df.empty else False, | |
| "scraper_candidate_count": int(meta_df.iloc[0].get("scraper_candidate_count") or 0) if not meta_df.empty else 0, | |
| "scraper_added_count": int(meta_df.iloc[0].get("scraper_added_count") or 0) if not meta_df.empty else 0, | |
| "scraper_duplicate_reject_count": int(meta_df.iloc[0].get("scraper_duplicate_reject_count") or 0) if not meta_df.empty else 0, | |
| "cache_meta": meta_df, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Bets | |
| # --------------------------------------------------------------------------- | |
| def insert_bet( | |
| conn, | |
| bet_id: int, | |
| created_at: str, | |
| sportsbook: str, | |
| market: str, | |
| selection: str, | |
| odds: int, | |
| stake: float, | |
| result: str, | |
| profit: float, | |
| game_id: str, | |
| notes: str, | |
| ) -> None: | |
| conn.execute( | |
| text( | |
| """ | |
| INSERT INTO bets ( | |
| bet_id, created_at, sportsbook, market, selection, odds, stake, | |
| result, profit, game_id, notes | |
| ) VALUES ( | |
| :bet_id, :created_at, :sportsbook, :market, :selection, :odds, :stake, | |
| :result, :profit, :game_id, :notes | |
| ) | |
| """ | |
| ), | |
| { | |
| "bet_id": bet_id, | |
| "created_at": created_at, | |
| "sportsbook": sportsbook, | |
| "market": market, | |
| "selection": selection, | |
| "odds": odds, | |
| "stake": stake, | |
| "result": result, | |
| "profit": profit, | |
| "game_id": game_id, | |
| "notes": notes, | |
| }, | |
| ) | |
| def next_bet_id(conn) -> int: | |
| return int( | |
| conn.execute(text("SELECT COALESCE(MAX(bet_id), 0) + 1 FROM bets")).scalar() | |
| ) | |
| def update_bet_result(conn, bet_id: int, result: str, profit: float) -> None: | |
| conn.execute( | |
| text( | |
| """ | |
| UPDATE bets | |
| SET result = :result, profit = :profit | |
| WHERE bet_id = :bet_id | |
| """ | |
| ), | |
| {"result": result, "profit": profit, "bet_id": bet_id}, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Recommendation audit view | |
| # --------------------------------------------------------------------------- | |
| def read_recommendation_audit_view(conn) -> pd.DataFrame: | |
| ensure_recommendation_logs_table(conn) | |
| ensure_recommendation_outcomes_table(conn) | |
| # Both source tables accumulate repeated rows (written every 3 s during live games). | |
| # DISTINCT ON each side before joining prevents Cartesian multiplication. | |
| # latest_outcomes is scoped to market='hr' (only current market); if additional markets | |
| # are added to recommendation_outcomes this view must be revised. | |
| query = """ | |
| WITH latest_logs AS ( | |
| SELECT DISTINCT ON (game_pk, batter_name, slot) * | |
| FROM recommendation_logs | |
| ORDER BY game_pk, batter_name, slot, created_at DESC NULLS LAST | |
| ), | |
| latest_outcomes AS ( | |
| SELECT DISTINCT ON (game_pk, batter_name, slot) * | |
| FROM recommendation_outcomes | |
| WHERE market = 'hr' | |
| ORDER BY game_pk, batter_name, slot, graded_at DESC NULLS LAST | |
| ) | |
| SELECT | |
| l.created_at, | |
| l.game_pk, | |
| l.away_team, | |
| l.home_team, | |
| l.status, | |
| l.slot, | |
| l.batter_name, | |
| l.pitcher_name, | |
| l.ev90, | |
| l.hit_prob, | |
| l.hr_prob, | |
| l.tb2p_prob, | |
| l.fair_hit_odds, | |
| l.fair_hr_odds, | |
| l.fair_tb2p_odds, | |
| l.book_hit_odds, | |
| l.book_hr_odds, | |
| l.book_tb2p_odds, | |
| l.hit_edge, | |
| l.hr_edge, | |
| l.tb2p_edge, | |
| l.adjusted_edge, | |
| l.hit_bet_ev, | |
| l.hr_bet_ev, | |
| l.tb2p_bet_ev, | |
| l.confidence, | |
| l.confidence_bucket, | |
| l.recommendation_tier, | |
| l.priority_score, | |
| l.reason_tags, | |
| l.starter_stays_next_batter_prob, | |
| l.starter_stays_next_inning_prob, | |
| l.bullpen_entry_prob, | |
| o.realized_hit, | |
| o.realized_hr, | |
| o.realized_tb2p, | |
| o.graded_at, | |
| o.outcome_source | |
| FROM latest_logs l | |
| LEFT JOIN latest_outcomes o | |
| ON l.game_pk = o.game_pk | |
| AND l.batter_name = o.batter_name | |
| AND l.slot = o.slot | |
| ORDER BY l.created_at DESC | |
| LIMIT 2000 | |
| """ | |
| return pd.read_sql(text(query), conn) | |
| def read_recommendation_logs_recent(conn, limit: int = 2000) -> pd.DataFrame: | |
| ensure_recommendation_logs_table(conn) | |
| return safe_read_sql( | |
| text("SELECT * FROM recommendation_logs ORDER BY created_at DESC LIMIT :lim"), | |
| conn, | |
| params={"lim": limit}, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Recommendation outcomes | |
| # --------------------------------------------------------------------------- | |
| def ensure_recommendation_outcomes_table(conn) -> None: | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS recommendation_outcomes ( | |
| created_at TEXT, | |
| game_pk TEXT, | |
| away_team TEXT, | |
| home_team TEXT, | |
| batter_name TEXT, | |
| slot TEXT, | |
| market TEXT, | |
| realized_hit INTEGER, | |
| realized_hr INTEGER, | |
| realized_tb2p INTEGER, | |
| graded_at TEXT, | |
| outcome_source TEXT, | |
| lineup_slot TEXT | |
| ) | |
| """ | |
| )) | |
| try: | |
| conn.execute(text("ALTER TABLE recommendation_outcomes ADD COLUMN lineup_slot TEXT")) | |
| except Exception: | |
| pass # Column already exists | |
| def insert_recommendation_outcomes(conn, df: pd.DataFrame) -> None: | |
| if df is None or df.empty: | |
| return | |
| ensure_recommendation_outcomes_table(conn) | |
| _bulk_insert(conn, "recommendation_outcomes", df) | |
| # --------------------------------------------------------------------------- | |
| # Recommendation logs | |
| # --------------------------------------------------------------------------- | |
| def ensure_recommendation_logs_table(conn) -> None: | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS recommendation_logs ( | |
| created_at TEXT, | |
| game_pk TEXT, | |
| away_team TEXT, | |
| home_team TEXT, | |
| status TEXT, | |
| slot TEXT, | |
| batter_name TEXT, | |
| pitcher_name TEXT, | |
| ev90 DOUBLE PRECISION, | |
| hit_prob DOUBLE PRECISION, | |
| hr_prob DOUBLE PRECISION, | |
| tb2p_prob DOUBLE PRECISION, | |
| fair_hit_odds DOUBLE PRECISION, | |
| fair_hr_odds DOUBLE PRECISION, | |
| fair_tb2p_odds DOUBLE PRECISION, | |
| book_hit_odds DOUBLE PRECISION, | |
| book_hr_odds DOUBLE PRECISION, | |
| book_tb2p_odds DOUBLE PRECISION, | |
| hit_edge DOUBLE PRECISION, | |
| hr_edge DOUBLE PRECISION, | |
| tb2p_edge DOUBLE PRECISION, | |
| adjusted_edge DOUBLE PRECISION, | |
| hit_bet_ev DOUBLE PRECISION, | |
| hr_bet_ev DOUBLE PRECISION, | |
| tb2p_bet_ev DOUBLE PRECISION, | |
| confidence DOUBLE PRECISION, | |
| confidence_bucket TEXT, | |
| recommendation_tier TEXT, | |
| priority_score DOUBLE PRECISION, | |
| reason_tags TEXT, | |
| starter_stays_next_batter_prob DOUBLE PRECISION, | |
| starter_stays_next_inning_prob DOUBLE PRECISION, | |
| bullpen_entry_prob DOUBLE PRECISION, | |
| xgb_hr_delta DOUBLE PRECISION, | |
| xgb_hr_adjusted DOUBLE PRECISION, | |
| xgb_shadow_active BOOLEAN, | |
| lineup_slot TEXT | |
| ) | |
| """ | |
| )) | |
| # Safe migration — add columns missing from older schema | |
| for _col, _dtype in [ | |
| ("xgb_hr_delta", "DOUBLE PRECISION"), | |
| ("xgb_hr_adjusted", "DOUBLE PRECISION"), | |
| ("xgb_shadow_active", "BOOLEAN"), | |
| ("lineup_slot", "TEXT"), | |
| ]: | |
| try: | |
| conn.execute(text(f"ALTER TABLE recommendation_logs ADD COLUMN {_col} {_dtype}")) | |
| except Exception: | |
| pass # Column already exists | |
| def insert_recommendation_logs(conn, df: pd.DataFrame) -> None: | |
| if df is None or df.empty: | |
| return | |
| ensure_recommendation_logs_table(conn) | |
| _bulk_insert(conn, "recommendation_logs", df) | |
| # --------------------------------------------------------------------------- | |
| # Game outcomes | |
| # --------------------------------------------------------------------------- | |
| def ensure_game_outcomes_table(conn) -> None: | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS game_outcomes ( | |
| graded_at TEXT, | |
| game_pk TEXT, | |
| away_team TEXT, | |
| home_team TEXT, | |
| away_score INTEGER, | |
| home_score INTEGER, | |
| status TEXT, | |
| outcome_source TEXT | |
| ) | |
| """ | |
| )) | |
| def insert_game_outcomes(conn, df: pd.DataFrame) -> None: | |
| if df is None or df.empty: | |
| return | |
| ensure_game_outcomes_table(conn) | |
| _bulk_insert(conn, "game_outcomes", df) | |
| def read_game_outcomes(conn) -> pd.DataFrame: | |
| ensure_game_outcomes_table(conn) | |
| return pd.read_sql( | |
| text("SELECT * FROM game_outcomes ORDER BY graded_at DESC"), | |
| conn, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Batter prop outcomes | |
| # --------------------------------------------------------------------------- | |
| def ensure_batter_prop_outcomes_table(conn) -> None: | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS batter_prop_outcomes ( | |
| created_at TEXT, | |
| graded_at TEXT, | |
| game_pk TEXT, | |
| away_team TEXT, | |
| home_team TEXT, | |
| slot TEXT, | |
| batter_name TEXT, | |
| pitcher_name TEXT, | |
| market TEXT, | |
| fair_hr_odds DOUBLE PRECISION, | |
| book_hr_odds DOUBLE PRECISION, | |
| adjusted_edge DOUBLE PRECISION, | |
| confidence DOUBLE PRECISION, | |
| recommendation_tier TEXT, | |
| realized_hit INTEGER, | |
| realized_hr INTEGER, | |
| realized_tb2p INTEGER, | |
| grade_status TEXT, | |
| outcome_source TEXT | |
| ) | |
| """ | |
| )) | |
| try: | |
| conn.execute(text(""" | |
| CREATE UNIQUE INDEX IF NOT EXISTS uq_batter_prop_outcomes_grain | |
| ON batter_prop_outcomes (game_pk, batter_name, market) | |
| """)) | |
| except Exception: | |
| pass # Index already exists or table not yet clean enough (pre-rebuild) | |
| def insert_batter_prop_outcomes(conn, df: pd.DataFrame) -> None: | |
| if df is None or df.empty: | |
| return | |
| ensure_batter_prop_outcomes_table(conn) | |
| _bulk_insert(conn, "batter_prop_outcomes", df) | |
| def read_batter_prop_outcomes(conn) -> pd.DataFrame: | |
| ensure_batter_prop_outcomes_table(conn) | |
| return pd.read_sql( | |
| text("SELECT * FROM batter_prop_outcomes ORDER BY graded_at DESC, created_at DESC"), | |
| conn, | |
| ) | |
| def replace_batter_prop_outcomes(conn, df: pd.DataFrame) -> None: | |
| if df is None or df.empty: | |
| return | |
| ensure_batter_prop_outcomes_table(conn) | |
| conn.execute(text("DELETE FROM batter_prop_outcomes")) | |
| _bulk_insert(conn, "batter_prop_outcomes", df) | |
| def delete_batter_prop_outcomes_for_game(conn, game_pk: str) -> None: | |
| """Delete all batter_prop_outcomes rows for a single game_pk (scoped, idempotent delete).""" | |
| conn.execute( | |
| text("DELETE FROM batter_prop_outcomes WHERE game_pk = :pk"), | |
| {"pk": str(game_pk)}, | |
| ) | |
| def read_batter_prop_outcomes_for_game(conn, game_pk: str) -> pd.DataFrame: | |
| """Return batter_prop_outcomes rows for a single game_pk only.""" | |
| ensure_batter_prop_outcomes_table(conn) | |
| return pd.read_sql( | |
| text("SELECT * FROM batter_prop_outcomes WHERE game_pk = :pk"), | |
| conn, | |
| params={"pk": str(game_pk)}, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Upcoming HR props | |
| # --------------------------------------------------------------------------- | |
| def ensure_upcoming_hr_props_table(conn) -> None: | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS upcoming_hr_props ( | |
| fetched_at TEXT, | |
| event_id TEXT, | |
| commence_time TEXT, | |
| away_team TEXT, | |
| home_team TEXT, | |
| sportsbook TEXT, | |
| market TEXT, | |
| market_variant TEXT, | |
| threshold INTEGER, | |
| display_label TEXT, | |
| is_primary_line BOOLEAN, | |
| is_modeled BOOLEAN, | |
| selection_scope TEXT, | |
| selection_side TEXT, | |
| player_name_raw TEXT, | |
| player_name TEXT, | |
| odds_american INTEGER, | |
| line DOUBLE PRECISION, | |
| implied_prob DOUBLE PRECISION, | |
| raw_hr_prob DOUBLE PRECISION, | |
| calibrated_hr_prob DOUBLE PRECISION, | |
| model_hr_prob DOUBLE PRECISION, | |
| fair_prob DOUBLE PRECISION, | |
| bet_ev DOUBLE PRECISION, | |
| confidence_score DOUBLE PRECISION, | |
| confidence_bucket TEXT, | |
| opportunity_hr_adjustment DOUBLE PRECISION, | |
| model_hr_prob_source TEXT, | |
| edge DOUBLE PRECISION, | |
| verdict TEXT, | |
| model_voice TEXT, | |
| model_voice_primary_reason TEXT, | |
| model_voice_caveat TEXT, | |
| model_voice_tags TEXT, | |
| model_voice_for TEXT, | |
| model_voice_against TEXT | |
| ) | |
| """ | |
| )) | |
| for _col, _dtype in [ | |
| ("model_hr_prob_source", "TEXT"), | |
| ("edge", "DOUBLE PRECISION"), | |
| ("market_variant", "TEXT"), | |
| ("threshold", "INTEGER"), | |
| ("display_label", "TEXT"), | |
| ("is_primary_line", "BOOLEAN"), | |
| ("is_modeled", "BOOLEAN"), | |
| ("selection_scope", "TEXT"), | |
| ("selection_side", "TEXT"), | |
| ("raw_hr_prob", "DOUBLE PRECISION"), | |
| ("calibrated_hr_prob", "DOUBLE PRECISION"), | |
| ("fair_prob", "DOUBLE PRECISION"), | |
| ("bet_ev", "DOUBLE PRECISION"), | |
| ("confidence_score", "DOUBLE PRECISION"), | |
| ("confidence_bucket", "TEXT"), | |
| ("opportunity_hr_adjustment", "DOUBLE PRECISION"), | |
| ("verdict", "TEXT"), | |
| ("model_voice", "TEXT"), | |
| ("model_voice_primary_reason", "TEXT"), | |
| ("model_voice_caveat", "TEXT"), | |
| ("model_voice_tags", "TEXT"), | |
| ("model_voice_for", "TEXT"), | |
| ("model_voice_against", "TEXT"), | |
| ]: | |
| try: | |
| conn.execute(text(f"ALTER TABLE upcoming_hr_props ADD COLUMN {_col} {_dtype}")) | |
| except Exception: | |
| pass # Column already exists | |
| def insert_upcoming_hr_props(conn, df: pd.DataFrame) -> None: | |
| if df is None or df.empty: | |
| return | |
| ensure_upcoming_hr_props_table(conn) | |
| # Select only the expected columns in the correct order | |
| log_cols = [ | |
| "fetched_at", "event_id", "commence_time", "away_team", "home_team", | |
| "sportsbook", "market", "market_variant", "threshold", "display_label", | |
| "is_primary_line", "is_modeled", "selection_scope", "selection_side", | |
| "player_name_raw", "player_name", | |
| "odds_american", "line", "implied_prob", "raw_hr_prob", | |
| "calibrated_hr_prob", "model_hr_prob", "fair_prob", "bet_ev", "confidence_score", | |
| "confidence_bucket", "opportunity_hr_adjustment", | |
| "model_hr_prob_source", "edge", "verdict", "model_voice", "model_voice_primary_reason", "model_voice_caveat", "model_voice_tags", "model_voice_for", "model_voice_against", | |
| ] | |
| present = [c for c in log_cols if c in df.columns] | |
| _bulk_insert(conn, "upcoming_hr_props", df[present]) | |
| def read_upcoming_hr_props(conn) -> pd.DataFrame: | |
| ensure_upcoming_hr_props_table(conn) | |
| return pd.read_sql( | |
| text("SELECT * FROM upcoming_hr_props ORDER BY fetched_at DESC"), | |
| conn, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Shared baseline snapshots | |
| # --------------------------------------------------------------------------- | |
| def ensure_shared_baseline_snapshot_tables(conn) -> None: | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS shared_hitter_baseline_event_rows ( | |
| player_name TEXT, | |
| event_key TEXT, | |
| batter BIGINT, | |
| pitcher BIGINT, | |
| game_date TEXT, | |
| game_pk BIGINT, | |
| source_season INT, | |
| pitch_type TEXT, | |
| pitch_name TEXT, | |
| events TEXT, | |
| description TEXT, | |
| stand TEXT, | |
| p_throws TEXT, | |
| home_team TEXT, | |
| away_team TEXT, | |
| inning INT, | |
| inning_topbot TEXT, | |
| at_bat_number INT, | |
| pitch_number INT, | |
| plate_x DOUBLE PRECISION, | |
| plate_z DOUBLE PRECISION, | |
| release_speed DOUBLE PRECISION, | |
| release_spin_rate DOUBLE PRECISION, | |
| release_extension DOUBLE PRECISION, | |
| release_pos_x DOUBLE PRECISION, | |
| release_pos_z DOUBLE PRECISION, | |
| pfx_x DOUBLE PRECISION, | |
| pfx_z DOUBLE PRECISION, | |
| launch_speed DOUBLE PRECISION, | |
| launch_angle DOUBLE PRECISION, | |
| estimated_woba_using_speedangle DOUBLE PRECISION, | |
| spray_angle DOUBLE PRECISION, | |
| hc_x DOUBLE PRECISION, | |
| hc_y DOUBLE PRECISION, | |
| bb_type TEXT, | |
| balls DOUBLE PRECISION, | |
| strikes DOUBLE PRECISION, | |
| outs_when_up DOUBLE PRECISION, | |
| bat_score DOUBLE PRECISION, | |
| fld_score DOUBLE PRECISION, | |
| post_bat_score DOUBLE PRECISION, | |
| post_fld_score DOUBLE PRECISION, | |
| pitcher_hand TEXT, | |
| batter_stand TEXT, | |
| movement_magnitude DOUBLE PRECISION, | |
| spin_efficiency_proxy DOUBLE PRECISION, | |
| release_height_proxy DOUBLE PRECISION, | |
| release_side_proxy DOUBLE PRECISION, | |
| count_string TEXT, | |
| baseline_mode TEXT, | |
| prior_sample_size INTEGER, | |
| season_2026_sample_size INTEGER, | |
| prior_weight DOUBLE PRECISION, | |
| season_2026_weight DOUBLE PRECISION, | |
| baseline_driver TEXT, | |
| rolling_overlay_active BOOLEAN, | |
| baseline_role TEXT, | |
| baseline_source TEXT, | |
| snapshot_built_at TEXT, | |
| snapshot_version TEXT, | |
| source_status TEXT | |
| ) | |
| """ | |
| )) | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS shared_pitcher_baseline_event_rows ( | |
| player_name TEXT, | |
| event_key TEXT, | |
| batter BIGINT, | |
| pitcher BIGINT, | |
| game_date TEXT, | |
| game_pk BIGINT, | |
| source_season INT, | |
| pitch_type TEXT, | |
| pitch_name TEXT, | |
| events TEXT, | |
| description TEXT, | |
| stand TEXT, | |
| p_throws TEXT, | |
| home_team TEXT, | |
| away_team TEXT, | |
| inning INT, | |
| inning_topbot TEXT, | |
| at_bat_number INT, | |
| pitch_number INT, | |
| plate_x DOUBLE PRECISION, | |
| plate_z DOUBLE PRECISION, | |
| release_speed DOUBLE PRECISION, | |
| release_spin_rate DOUBLE PRECISION, | |
| release_extension DOUBLE PRECISION, | |
| release_pos_x DOUBLE PRECISION, | |
| release_pos_z DOUBLE PRECISION, | |
| pfx_x DOUBLE PRECISION, | |
| pfx_z DOUBLE PRECISION, | |
| launch_speed DOUBLE PRECISION, | |
| launch_angle DOUBLE PRECISION, | |
| estimated_woba_using_speedangle DOUBLE PRECISION, | |
| spray_angle DOUBLE PRECISION, | |
| hc_x DOUBLE PRECISION, | |
| hc_y DOUBLE PRECISION, | |
| bb_type TEXT, | |
| balls DOUBLE PRECISION, | |
| strikes DOUBLE PRECISION, | |
| outs_when_up DOUBLE PRECISION, | |
| bat_score DOUBLE PRECISION, | |
| fld_score DOUBLE PRECISION, | |
| post_bat_score DOUBLE PRECISION, | |
| post_fld_score DOUBLE PRECISION, | |
| pitcher_hand TEXT, | |
| batter_stand TEXT, | |
| movement_magnitude DOUBLE PRECISION, | |
| spin_efficiency_proxy DOUBLE PRECISION, | |
| release_height_proxy DOUBLE PRECISION, | |
| release_side_proxy DOUBLE PRECISION, | |
| count_string TEXT, | |
| baseline_mode TEXT, | |
| prior_sample_size INTEGER, | |
| season_2026_sample_size INTEGER, | |
| prior_weight DOUBLE PRECISION, | |
| season_2026_weight DOUBLE PRECISION, | |
| baseline_driver TEXT, | |
| rolling_overlay_active BOOLEAN, | |
| baseline_role TEXT, | |
| baseline_source TEXT, | |
| snapshot_built_at TEXT, | |
| snapshot_version TEXT, | |
| source_status TEXT | |
| ) | |
| """ | |
| )) | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS shared_hitter_baseline_snapshot ( | |
| player_name TEXT, | |
| source_row_count INTEGER, | |
| payload_json TEXT, | |
| snapshot_built_at TEXT, | |
| snapshot_version TEXT, | |
| source_status TEXT | |
| ) | |
| """ | |
| )) | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS shared_pitcher_baseline_snapshot ( | |
| player_name TEXT, | |
| source_row_count INTEGER, | |
| payload_json TEXT, | |
| snapshot_built_at TEXT, | |
| snapshot_version TEXT, | |
| source_status TEXT | |
| ) | |
| """ | |
| )) | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS shared_hitter_baseline_meta ( | |
| player_name TEXT, | |
| baseline_role TEXT, | |
| baseline_mode TEXT, | |
| prior_sample_size INTEGER, | |
| season_2026_sample_size INTEGER, | |
| prior_weight DOUBLE PRECISION, | |
| season_2026_weight DOUBLE PRECISION, | |
| baseline_driver TEXT, | |
| rolling_overlay_active BOOLEAN, | |
| snapshot_built_at TEXT, | |
| snapshot_version TEXT, | |
| source_status TEXT | |
| ) | |
| """ | |
| )) | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS shared_pitcher_baseline_meta ( | |
| player_name TEXT, | |
| baseline_role TEXT, | |
| baseline_mode TEXT, | |
| prior_sample_size INTEGER, | |
| season_2026_sample_size INTEGER, | |
| prior_weight DOUBLE PRECISION, | |
| season_2026_weight DOUBLE PRECISION, | |
| baseline_driver TEXT, | |
| rolling_overlay_active BOOLEAN, | |
| snapshot_built_at TEXT, | |
| snapshot_version TEXT, | |
| source_status TEXT | |
| ) | |
| """ | |
| )) | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS shared_hitter_rolling_summary ( | |
| player_name TEXT, | |
| batter_ev_5g DOUBLE PRECISION, | |
| batter_ev_10g DOUBLE PRECISION, | |
| batter_ev90_5g DOUBLE PRECISION, | |
| batter_ev90_10g DOUBLE PRECISION, | |
| batter_hard_hit_rate_5g DOUBLE PRECISION, | |
| batter_hard_hit_rate_10g DOUBLE PRECISION, | |
| batter_barrel_rate_5g DOUBLE PRECISION, | |
| batter_barrel_rate_10g DOUBLE PRECISION, | |
| batter_avg_launch_angle_5g DOUBLE PRECISION, | |
| batter_avg_launch_angle_10g DOUBLE PRECISION, | |
| batter_fb_rate_5g DOUBLE PRECISION, | |
| batter_fb_rate_10g DOUBLE PRECISION, | |
| batter_ld_rate_5g DOUBLE PRECISION, | |
| batter_gb_rate_5g DOUBLE PRECISION, | |
| batter_air_ball_rate_5g DOUBLE PRECISION, | |
| batter_hr_rate_5g DOUBLE PRECISION, | |
| batter_hr_rate_10g DOUBLE PRECISION, | |
| batter_pull_air_rate_5g DOUBLE PRECISION, | |
| batter_pulled_hard_air_rate_5g DOUBLE PRECISION, | |
| batter_pulled_barrel_rate_5g DOUBLE PRECISION, | |
| batter_games_in_window_5g INTEGER, | |
| batter_games_in_window_10g INTEGER, | |
| batter_recent_form_available INTEGER, | |
| source_row_count INTEGER, | |
| snapshot_built_at TEXT, | |
| snapshot_version TEXT, | |
| source_status TEXT | |
| ) | |
| """ | |
| )) | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS shared_pitcher_rolling_summary ( | |
| player_name TEXT, | |
| pitcher_avg_release_speed_5g DOUBLE PRECISION, | |
| pitcher_avg_release_speed_10g DOUBLE PRECISION, | |
| pitcher_avg_release_spin_rate_5g DOUBLE PRECISION, | |
| pitcher_ev_allowed_5g DOUBLE PRECISION, | |
| pitcher_ev_allowed_10g DOUBLE PRECISION, | |
| pitcher_hard_hit_rate_allowed_5g DOUBLE PRECISION, | |
| pitcher_hard_hit_rate_allowed_10g DOUBLE PRECISION, | |
| pitcher_barrel_rate_allowed_5g DOUBLE PRECISION, | |
| pitcher_barrel_rate_allowed_10g DOUBLE PRECISION, | |
| pitcher_avg_launch_angle_allowed_5g DOUBLE PRECISION, | |
| pitcher_fb_rate_allowed_5g DOUBLE PRECISION, | |
| pitcher_ld_rate_allowed_5g DOUBLE PRECISION, | |
| pitcher_gb_rate_allowed_5g DOUBLE PRECISION, | |
| pitcher_hr_allowed_rate_5g DOUBLE PRECISION, | |
| pitcher_hr_allowed_rate_10g DOUBLE PRECISION, | |
| pitcher_games_in_window_5g INTEGER, | |
| pitcher_games_in_window_10g INTEGER, | |
| pitcher_recent_form_available INTEGER, | |
| pitcher_rolling_confidence DOUBLE PRECISION, | |
| source_row_count INTEGER, | |
| snapshot_built_at TEXT, | |
| snapshot_version TEXT, | |
| source_status TEXT | |
| ) | |
| """ | |
| )) | |
| for _table in [ | |
| "shared_hitter_baseline_event_rows", | |
| "shared_pitcher_baseline_event_rows", | |
| "shared_hitter_baseline_snapshot", | |
| "shared_pitcher_baseline_snapshot", | |
| "shared_hitter_baseline_meta", | |
| "shared_pitcher_baseline_meta", | |
| "shared_hitter_rolling_summary", | |
| "shared_pitcher_rolling_summary", | |
| ]: | |
| try: | |
| conn.execute(text( | |
| f"CREATE INDEX IF NOT EXISTS idx_{_table}_player_name " | |
| f"ON {_table} (player_name)" | |
| )) | |
| except Exception: | |
| pass | |
| for _stmt in [ | |
| "CREATE INDEX IF NOT EXISTS idx_shared_hitter_baseline_event_rows_player_date ON shared_hitter_baseline_event_rows (player_name, game_date)", | |
| "CREATE INDEX IF NOT EXISTS idx_shared_pitcher_baseline_event_rows_player_date ON shared_pitcher_baseline_event_rows (player_name, game_date)", | |
| ]: | |
| try: | |
| conn.execute(text(_stmt)) | |
| except Exception: | |
| pass | |
| # --------------------------------------------------------------------------- | |
| # Batter prop audit view | |
| # --------------------------------------------------------------------------- | |
| def read_batter_prop_audit_view(conn) -> pd.DataFrame: | |
| ensure_batter_prop_outcomes_table(conn) | |
| query = """ | |
| SELECT | |
| created_at, | |
| graded_at, | |
| game_pk, | |
| away_team, | |
| home_team, | |
| slot, | |
| batter_name, | |
| pitcher_name, | |
| market, | |
| fair_hr_odds, | |
| book_hr_odds, | |
| adjusted_edge, | |
| confidence, | |
| recommendation_tier, | |
| realized_hit, | |
| realized_hr, | |
| realized_tb2p, | |
| grade_status, | |
| outcome_source | |
| FROM batter_prop_outcomes | |
| ORDER BY graded_at DESC, created_at DESC | |
| LIMIT 2000 | |
| """ | |
| return pd.read_sql(text(query), conn) | |
| # --------------------------------------------------------------------------- | |
| # Pitcher resolution log | |
| # --------------------------------------------------------------------------- | |
| def ensure_pitcher_resolution_log_table(conn) -> None: | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS pitcher_resolution_log ( | |
| id UUID DEFAULT gen_random_uuid() PRIMARY KEY, | |
| game_pk TEXT, | |
| game_date TEXT, | |
| source TEXT, | |
| input_name TEXT, | |
| normalized_name TEXT, | |
| matched_canonical TEXT, | |
| pitcher_id BIGINT, | |
| match_method TEXT, | |
| sample_size INTEGER, | |
| p_throws TEXT, | |
| created_at TIMESTAMPTZ DEFAULT now() | |
| ) | |
| """ | |
| )) | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_prl_game_date " | |
| "ON pitcher_resolution_log (game_date)" | |
| )) | |
| conn.execute(text( | |
| "CREATE INDEX IF NOT EXISTS idx_prl_input_name " | |
| "ON pitcher_resolution_log (input_name)" | |
| )) | |
| def log_pitcher_resolution(conn, record: dict) -> None: | |
| """ | |
| Insert one row into pitcher_resolution_log. | |
| Expected keys (all optional except source + input_name): | |
| game_pk, game_date, source, input_name, normalized_name, | |
| matched_canonical, pitcher_id, match_method, sample_size, p_throws | |
| """ | |
| ensure_pitcher_resolution_log_table(conn) | |
| fields = [ | |
| "game_pk", "game_date", "source", "input_name", | |
| "normalized_name", "matched_canonical", "pitcher_id", | |
| "match_method", "sample_size", "p_throws", | |
| ] | |
| row = {f: record.get(f) for f in fields} | |
| col_list = ", ".join(fields) | |
| placeholders = ", ".join(f":{f}" for f in fields) | |
| try: | |
| conn.execute( | |
| text(f"INSERT INTO pitcher_resolution_log ({col_list}) VALUES ({placeholders})"), | |
| row, | |
| ) | |
| except Exception: | |
| pass # Never let logging break the main pipeline | |
| def read_pitcher_resolution_log(conn, limit: int = 500) -> "pd.DataFrame": | |
| ensure_pitcher_resolution_log_table(conn) | |
| return pd.read_sql( | |
| text( | |
| "SELECT * FROM pitcher_resolution_log " | |
| "ORDER BY created_at DESC " | |
| f"LIMIT {limit}" | |
| ), | |
| conn, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Feedback submissions | |
| # --------------------------------------------------------------------------- | |
| def ensure_feedback_submissions_table(conn) -> None: | |
| conn.execute(text( | |
| """ | |
| CREATE TABLE IF NOT EXISTS feedback_submissions ( | |
| created_at TEXT NOT NULL, | |
| message TEXT NOT NULL | |
| ) | |
| """ | |
| )) | |
| def insert_feedback_submission(conn, message: str) -> None: | |
| ensure_feedback_submissions_table(conn) | |
| conn.execute( | |
| text( | |
| "INSERT INTO feedback_submissions (created_at, message) " | |
| "VALUES (:created_at, :message)" | |
| ), | |
| {"created_at": utc_now_iso(), "message": message}, | |
| ) | |
| def read_feedback_submissions(conn) -> pd.DataFrame: | |
| ensure_feedback_submissions_table(conn) | |
| return pd.read_sql( | |
| text("SELECT * FROM feedback_submissions ORDER BY created_at DESC"), | |
| conn, | |
| ) | |