from __future__ import annotations from collections import defaultdict import json import logging import threading from typing import Any _log = logging.getLogger(__name__) import pandas as pd from sqlalchemy import text from database.db import get_connection, replace_table_contents, safe_read_sql, upsert_dataframe from features.pitch_features import add_pitch_features from models.rolling_form_model import ( build_batter_rolling_form_row, build_pitcher_rolling_form_row, ) from utils.helpers import utc_now_iso from visualization.cards.player_identity import load_identity_map, normalize_for_matching, to_canonical_name PRIOR_SEASONS = (2021, 2022, 2023, 2024, 2025) CURRENT_SEASON = 2026 _HITTER_BLEND_K = 260.0 _PITCHER_BLEND_K = 320.0 _MAX_ROWS_PER_PLAYER = 420 _MIN_CURRENT_ROWS_WHEN_AVAILABLE = 20 _SNAPSHOT_VERSION = "shared_baseline_v1" _DEFAULT_SNAPSHOT_MAX_AGE_SECONDS = 60 * 30 _FALLBACK_BUILD_TIMEOUT_SECONDS: int = 5 _SNAPSHOT_READ_TIMEOUT_SECONDS: int = 20 # wall-clock cap for the 8 DB reads in load_shared_baseline_bundle_from_snapshots _PRIOR_SEASON_RECENCY_WEIGHTS = { 2025: 1.00, 2024: 0.85, 2023: 0.70, 2022: 0.55, 2021: 0.45, } _REFRESH_LOCK = threading.Lock() _REFRESH_INFLIGHT: set[tuple[tuple[str, ...], tuple[str, ...]]] = set() _PERSIST_LOCK = threading.Lock() _PERSIST_INFLIGHT: set[tuple[str, tuple[str, ...], tuple[str, ...]]] = set() _NUMERIC_COLS = [ "batter", "pitcher", "game_pk", "source_season", "inning", "at_bat_number", "pitch_number", "plate_x", "plate_z", "release_speed", "release_spin_rate", "release_extension", "release_pos_x", "release_pos_z", "pfx_x", "pfx_z", "launch_speed", "launch_angle", "estimated_woba_using_speedangle", "spray_angle", "hc_x", "hc_y", "balls", "strikes", "outs_when_up", "bat_score", "fld_score", "post_bat_score", "post_fld_score", ] _EVENT_SNAPSHOT_COLS = [ "player_name", "event_key", "batter", "pitcher", "game_date", "game_pk", "source_season", "pitch_type", "pitch_name", "events", "description", "stand", "p_throws", "home_team", "away_team", "inning", "inning_topbot", "at_bat_number", "pitch_number", "plate_x", "plate_z", "release_speed", "release_spin_rate", "release_extension", "release_pos_x", "release_pos_z", "pfx_x", "pfx_z", "launch_speed", "launch_angle", "estimated_woba_using_speedangle", "spray_angle", "hc_x", "hc_y", "bb_type", "balls", "strikes", "outs_when_up", "bat_score", "fld_score", "post_bat_score", "post_fld_score", "pitcher_hand", "batter_stand", "movement_magnitude", "spin_efficiency_proxy", "release_height_proxy", "release_side_proxy", "count_string", "baseline_mode", "prior_sample_size", "season_2026_sample_size", "prior_weight", "season_2026_weight", "baseline_driver", "rolling_overlay_active", "baseline_role", "_baseline_source", "snapshot_built_at", "snapshot_version", "source_status", ] _BATTER_ROLLING_COLS = [ "player_name", "batter_ev_5g", "batter_ev_10g", "batter_ev90_5g", "batter_ev90_10g", "batter_hard_hit_rate_5g", "batter_hard_hit_rate_10g", "batter_barrel_rate_5g", "batter_barrel_rate_10g", "batter_avg_launch_angle_5g", "batter_avg_launch_angle_10g", "batter_fb_rate_5g", "batter_fb_rate_10g", "batter_ld_rate_5g", "batter_gb_rate_5g", "batter_air_ball_rate_5g", "batter_hr_rate_5g", "batter_hr_rate_10g", "batter_pull_air_rate_5g", "batter_pulled_hard_air_rate_5g", "batter_pulled_barrel_rate_5g", "batter_games_in_window_5g", "batter_games_in_window_10g", "batter_recent_form_available", "source_row_count", "snapshot_built_at", "snapshot_version", "source_status", ] _PITCHER_ROLLING_COLS = [ "player_name", "pitcher_avg_release_speed_5g", "pitcher_avg_release_speed_10g", "pitcher_avg_release_spin_rate_5g", "pitcher_ev_allowed_5g", "pitcher_ev_allowed_10g", "pitcher_hard_hit_rate_allowed_5g", "pitcher_hard_hit_rate_allowed_10g", "pitcher_barrel_rate_allowed_5g", "pitcher_barrel_rate_allowed_10g", "pitcher_avg_launch_angle_allowed_5g", "pitcher_fb_rate_allowed_5g", "pitcher_ld_rate_allowed_5g", "pitcher_gb_rate_allowed_5g", "pitcher_hr_allowed_rate_5g", "pitcher_hr_allowed_rate_10g", "pitcher_games_in_window_5g", "pitcher_games_in_window_10g", "pitcher_recent_form_available", "pitcher_rolling_confidence", "source_row_count", "snapshot_built_at", "snapshot_version", "source_status", ] def _safe_int(value: Any) -> int | None: try: if value is None or str(value).strip() in {"", "nan", "None"}: return None return int(float(value)) except Exception: return None def _coerce_bool(value: Any) -> bool: text = str(value or "").strip().lower() return text in {"1", "true", "yes"} def _normalize_name(value: Any) -> str: return normalize_for_matching(to_canonical_name(str(value or "").strip())) def _normalized_name_set(values: list[Any] | tuple[Any, ...] | set[Any] | pd.Series | None) -> set[str]: if values is None: return set() return { _normalize_name(value) for value in list(values) if _normalize_name(value) } def _compute_missing_requested_names( requested_names: tuple[str, ...] | list[str] | None, available_names: set[str], ) -> list[str]: if not requested_names: return [] out: list[str] = [] for raw_name in requested_names: cleaned = str(raw_name or "").strip() if not cleaned: continue if _normalize_name(cleaned) not in available_names: out.append(cleaned) return out def _annotate_request_coverage( bundle: dict[str, pd.DataFrame], *, requested_hitter_names: tuple[str, ...], requested_pitcher_names: tuple[str, ...], coverage_mode: str, background_refresh_queued: bool, ) -> dict[str, pd.DataFrame]: available_hitters = _normalized_name_set( bundle.get("batter_baseline_meta", pd.DataFrame()).get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist() if isinstance(bundle.get("batter_baseline_meta", pd.DataFrame()), pd.DataFrame) else [] ) available_pitchers = _normalized_name_set( bundle.get("pitcher_baseline_meta", pd.DataFrame()).get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist() if isinstance(bundle.get("pitcher_baseline_meta", pd.DataFrame()), pd.DataFrame) else [] ) missing_hitter_names = _compute_missing_requested_names(requested_hitter_names, available_hitters) missing_pitcher_names = _compute_missing_requested_names(requested_pitcher_names, available_pitchers) bundle["requested_hitter_count"] = int(len(requested_hitter_names)) bundle["requested_pitcher_count"] = int(len(requested_pitcher_names)) bundle["resolved_hitter_count"] = int(len(requested_hitter_names) - len(missing_hitter_names)) bundle["resolved_pitcher_count"] = int(len(requested_pitcher_names) - len(missing_pitcher_names)) bundle["missing_hitter_names"] = missing_hitter_names bundle["missing_pitcher_names"] = missing_pitcher_names bundle["snapshot_coverage_mode"] = coverage_mode bundle["background_refresh_queued"] = background_refresh_queued return bundle def _merge_bundle_frames( left: pd.DataFrame | None, right: pd.DataFrame | None, *, subset_candidates: list[str], ) -> pd.DataFrame: left_df = left if isinstance(left, pd.DataFrame) else pd.DataFrame() right_df = right if isinstance(right, pd.DataFrame) else pd.DataFrame() if left_df.empty: return right_df.copy() if right_df.empty: return left_df.copy() merged = pd.concat([left_df, right_df], ignore_index=True, sort=False) subset = [col for col in subset_candidates if col in merged.columns] if subset: merged = merged.drop_duplicates(subset=subset, keep="last") else: merged = merged.drop_duplicates(keep="last") return merged.reset_index(drop=True) def _merge_shared_baseline_bundles( snapshot_bundle: dict[str, pd.DataFrame], patch_bundle: dict[str, pd.DataFrame], ) -> dict[str, pd.DataFrame]: merged = dict(snapshot_bundle) merged["blended_batter_df"] = _merge_bundle_frames( snapshot_bundle.get("blended_batter_df"), patch_bundle.get("blended_batter_df"), subset_candidates=["player_name", "event_key", "game_pk", "at_bat_number", "pitch_number"], ) merged["blended_pitcher_df"] = _merge_bundle_frames( snapshot_bundle.get("blended_pitcher_df"), patch_bundle.get("blended_pitcher_df"), subset_candidates=["player_name", "event_key", "game_pk", "at_bat_number", "pitch_number"], ) merged["batter_baseline_meta"] = _merge_bundle_frames( snapshot_bundle.get("batter_baseline_meta"), patch_bundle.get("batter_baseline_meta"), subset_candidates=["player_name"], ) merged["pitcher_baseline_meta"] = _merge_bundle_frames( snapshot_bundle.get("pitcher_baseline_meta"), patch_bundle.get("pitcher_baseline_meta"), subset_candidates=["player_name"], ) merged["hitter_rolling_snapshot"] = _merge_bundle_frames( snapshot_bundle.get("hitter_rolling_snapshot"), patch_bundle.get("hitter_rolling_snapshot"), subset_candidates=["player_name"], ) merged["pitcher_rolling_snapshot"] = _merge_bundle_frames( snapshot_bundle.get("pitcher_rolling_snapshot"), patch_bundle.get("pitcher_rolling_snapshot"), subset_candidates=["player_name"], ) merged["snapshot_status"] = _merge_bundle_frames( snapshot_bundle.get("snapshot_status"), patch_bundle.get("snapshot_status"), subset_candidates=["table_name"], ) merged["snapshot_source_status"] = "snapshot_request_patched" merged["runtime_fallback_used"] = False merged["request_patch_used"] = True return merged def _queue_shared_baseline_bundle_persist( bundle: dict[str, pd.DataFrame], *, source_status: str, ) -> bool: batter_meta = bundle.get("batter_baseline_meta", pd.DataFrame()) pitcher_meta = bundle.get("pitcher_baseline_meta", pd.DataFrame()) batter_names = tuple( sorted( { _normalize_name(name) for name in ( batter_meta.get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist() if isinstance(batter_meta, pd.DataFrame) else [] ) if _normalize_name(name) } ) ) pitcher_names = tuple( sorted( { _normalize_name(name) for name in ( pitcher_meta.get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist() if isinstance(pitcher_meta, pd.DataFrame) else [] ) if _normalize_name(name) } ) ) key = (str(source_status or "").strip().lower(), batter_names, pitcher_names) with _PERSIST_LOCK: if key in _PERSIST_INFLIGHT: return False _PERSIST_INFLIGHT.add(key) persist_bundle = { k: (v.copy() if isinstance(v, pd.DataFrame) else v) for k, v in dict(bundle).items() } def _run() -> None: try: persist_shared_baseline_snapshots( persist_bundle, source_status=source_status, ) except Exception: pass finally: with _PERSIST_LOCK: _PERSIST_INFLIGHT.discard(key) threading.Thread(target=_run, daemon=True).start() return True def _clamp(value: float, lo: float, hi: float) -> float: return max(lo, min(hi, value)) def _current_weight(current_sample_size: int, k: float) -> float: if current_sample_size <= 0: return 0.0 raw = float(current_sample_size) / float(current_sample_size + k) return _clamp(raw, 0.05, 0.85) def _load_identity_maps() -> dict[str, Any]: identity_df = load_identity_map() batter_id_to_name: dict[int, str] = {} pitcher_id_to_name: dict[int, str] = {} batter_name_to_ids: dict[str, set[int]] = defaultdict(set) pitcher_name_to_ids: dict[str, set[int]] = defaultdict(set) if identity_df is None or identity_df.empty: return { "batter_id_to_name": batter_id_to_name, "pitcher_id_to_name": pitcher_id_to_name, "batter_name_to_ids": batter_name_to_ids, "pitcher_name_to_ids": pitcher_name_to_ids, } for _, row in identity_df.dropna(subset=["player_id"]).iterrows(): player_id = _safe_int(row.get("player_id")) if player_id is None: continue canonical_name = str( row.get("display_name") or row.get("canonical_name") or row.get("statcast_name") or "" ).strip() pitcher_name = str( row.get("statcast_name") or row.get("canonical_name") or row.get("display_name") or "" ).strip() if not canonical_name: canonical_name = pitcher_name if not pitcher_name: pitcher_name = canonical_name if _coerce_bool(row.get("role_hitter")): batter_id_to_name[player_id] = canonical_name if _coerce_bool(row.get("role_pitcher")): pitcher_id_to_name[player_id] = pitcher_name for raw_name in [ row.get("display_name"), row.get("canonical_name"), row.get("statcast_name"), row.get("pybaseball_name"), ]: normalized = _normalize_name(raw_name) if not normalized: continue if _coerce_bool(row.get("role_hitter")): batter_name_to_ids[normalized].add(player_id) if _coerce_bool(row.get("role_pitcher")): pitcher_name_to_ids[normalized].add(player_id) return { "batter_id_to_name": batter_id_to_name, "pitcher_id_to_name": pitcher_id_to_name, "batter_name_to_ids": batter_name_to_ids, "pitcher_name_to_ids": pitcher_name_to_ids, } def _build_requested_ids( names: tuple[str, ...] | None, lookup: dict[str, set[int]], ) -> set[int]: if not names: return set() ids: set[int] = set() for name in names: normalized = _normalize_name(name) ids.update(lookup.get(normalized, set())) return ids def _format_id_list(ids: set[int]) -> str: return ", ".join(str(int(v)) for v in sorted(ids)) def _prepare_frame(df: pd.DataFrame) -> pd.DataFrame: if df.empty: return df.copy() out = df.copy() for col in _NUMERIC_COLS: if col in out.columns: out[col] = pd.to_numeric(out[col], errors="coerce") if "game_date" in out.columns: out["game_date"] = pd.to_datetime(out["game_date"], errors="coerce") if "p_throws" in out.columns and "pitcher_hand" not in out.columns: out["pitcher_hand"] = out["p_throws"] if "stand" in out.columns and "batter_stand" not in out.columns: out["batter_stand"] = out["stand"] return add_pitch_features(out) def _json_default(value: Any) -> Any: if isinstance(value, (pd.Timestamp, pd.Timedelta)): return str(value) return str(value) def _serialize_payload_frame(df: pd.DataFrame) -> str: if df is None or df.empty: return "[]" out = df.copy() if "game_date" in out.columns: out["game_date"] = out["game_date"].astype(str) return json.dumps(out.where(out.notna(), other=None).to_dict("records"), default=_json_default) def _deserialize_payload_frame(payload_json: str) -> pd.DataFrame: try: payload = json.loads(str(payload_json or "[]")) except Exception: payload = [] if not payload: return pd.DataFrame() return _prepare_frame(pd.DataFrame(payload)) def _normalize_names_tuple(values: tuple[str, ...] | None) -> tuple[str, ...]: if not values: return tuple() out = [] seen = set() for value in values: cleaned = str(value or "").strip() if not cleaned: continue normalized = _normalize_name(cleaned) if normalized in seen: continue seen.add(normalized) out.append(cleaned) return tuple(sorted(out)) def _resolve_snapshot_player_names( names: tuple[str, ...] | None, *, role: str, ) -> tuple[str, ...]: normalized_names = _normalize_names_tuple(names) if not normalized_names: return tuple() identity_maps = _load_identity_maps() if role == "pitcher": name_to_ids = identity_maps.get("pitcher_name_to_ids", {}) id_to_name = identity_maps.get("pitcher_id_to_name", {}) else: name_to_ids = identity_maps.get("batter_name_to_ids", {}) id_to_name = identity_maps.get("batter_id_to_name", {}) resolved: list[str] = [] seen: set[str] = set() for raw_name in normalized_names: mapped_ids = name_to_ids.get(_normalize_name(raw_name), set()) mapped_names = [ str(id_to_name.get(player_id) or "").strip() for player_id in mapped_ids if str(id_to_name.get(player_id) or "").strip() ] candidates = mapped_names or [str(raw_name).strip()] for candidate in candidates: lowered = candidate.lower() if not candidate or lowered in seen: continue seen.add(lowered) resolved.append(candidate) return tuple(sorted(resolved)) def _is_snapshot_stale(built_at: Any, max_age_seconds: int) -> bool: if not built_at: return True try: built_ts = pd.to_datetime(built_at, errors="coerce", utc=True) if pd.isna(built_ts): return True now_ts = pd.Timestamp.now(tz="UTC") age_seconds = max(0.0, float((now_ts - built_ts).total_seconds())) return age_seconds > float(max_age_seconds) except Exception: return True def _load_current_events( conn, current_season: int, batter_ids: set[int] | None = None, pitcher_ids: set[int] | None = None, ) -> pd.DataFrame: filters = ["lpm.source_season = :season"] if batter_ids: filters.append(f"lpm.batter IN ({_format_id_list(batter_ids)})") if pitcher_ids: filters.append(f"lpm.pitcher IN ({_format_id_list(pitcher_ids)})") where_clause = " AND ".join(filters) query = text( f""" SELECT lpm.event_key, lpm.player_name AS pitcher_player_name, lpm.batter, lpm.pitcher, lpm.game_date, lpm.game_pk, lpm.source_season, lpm.pitch_type, lpm.pitch_name, lpm.events, lpm.description, lpm.stand, lpm.p_throws, lpm.home_team, lpm.away_team, lpm.inning, lpm.inning_topbot, lpm.at_bat_number, lpm.pitch_number, lpm.plate_x, lpm.plate_z, lpm.release_speed, lpm.release_spin_rate, lpm.release_extension, lpm.release_pos_x, lpm.release_pos_z, lpm.pfx_x, lpm.pfx_z, lpm.launch_speed, lpm.launch_angle, lpm.estimated_woba_using_speedangle, lpm.spray_angle, lpm.hc_x, lpm.hc_y, lpm.bb_type, lpm.balls, lpm.strikes, lpm.outs_when_up, lpm.bat_score, lpm.fld_score, NULL::DOUBLE PRECISION AS post_bat_score, NULL::DOUBLE PRECISION AS post_fld_score FROM live_pitch_mix_2026 lpm WHERE {where_clause} """ ) return pd.read_sql(query, conn, params={"season": int(current_season)}) def _load_prior_hitter_events(conn, seasons: tuple[int, ...], batter_ids: set[int]) -> pd.DataFrame: if not batter_ids: return pd.DataFrame() query = text( f""" SELECT ec.event_key, ec.player_name AS pitcher_player_name, ec.batter, ec.pitcher, ec.game_date, ec.game_pk, ec.source_season, NULL::TEXT AS pitch_type, ec.pitch_name, ec.events, ec.description, ec.stand, ec.p_throws, ec.home_team, ec.away_team, ec.inning, ec.inning_topbot, ec.at_bat_number, ec.pitch_number, ec.plate_x, ec.plate_z, pr.release_speed, pr.release_spin_rate, pr.release_extension, NULL::DOUBLE PRECISION AS release_pos_x, NULL::DOUBLE PRECISION AS release_pos_z, pr.pfx_x, pr.pfx_z, bb.launch_speed, bb.launch_angle, bb.estimated_woba_using_speedangle, NULL::DOUBLE PRECISION AS spray_angle, NULL::DOUBLE PRECISION AS hc_x, NULL::DOUBLE PRECISION AS hc_y, bb.bb_type, NULL::DOUBLE PRECISION AS balls, NULL::DOUBLE PRECISION AS strikes, NULL::DOUBLE PRECISION AS outs_when_up, NULL::DOUBLE PRECISION AS bat_score, NULL::DOUBLE PRECISION AS fld_score, NULL::DOUBLE PRECISION AS post_bat_score, NULL::DOUBLE PRECISION AS post_fld_score FROM statcast_event_core ec LEFT JOIN statcast_batted_ball bb ON ec.event_key = bb.event_key LEFT JOIN statcast_pitch_release pr ON ec.event_key = pr.event_key WHERE ec.source_season IN ({", ".join(str(int(season)) for season in seasons)}) AND ec.batter IN ({_format_id_list(batter_ids)}) """ ) return pd.read_sql(query, conn) def _load_prior_pitcher_events(conn, seasons: tuple[int, ...], pitcher_ids: set[int]) -> pd.DataFrame: if not pitcher_ids: return pd.DataFrame() query = text( f""" SELECT ec.event_key, ec.player_name AS pitcher_player_name, ec.batter, ec.pitcher, ec.game_date, ec.game_pk, ec.source_season, NULL::TEXT AS pitch_type, ec.pitch_name, ec.events, ec.description, ec.stand, ec.p_throws, ec.home_team, ec.away_team, ec.inning, ec.inning_topbot, ec.at_bat_number, ec.pitch_number, ec.plate_x, ec.plate_z, pr.release_speed, pr.release_spin_rate, pr.release_extension, NULL::DOUBLE PRECISION AS release_pos_x, NULL::DOUBLE PRECISION AS release_pos_z, pr.pfx_x, pr.pfx_z, bb.launch_speed, bb.launch_angle, bb.estimated_woba_using_speedangle, NULL::DOUBLE PRECISION AS spray_angle, NULL::DOUBLE PRECISION AS hc_x, NULL::DOUBLE PRECISION AS hc_y, bb.bb_type, NULL::DOUBLE PRECISION AS balls, NULL::DOUBLE PRECISION AS strikes, NULL::DOUBLE PRECISION AS outs_when_up, NULL::DOUBLE PRECISION AS bat_score, NULL::DOUBLE PRECISION AS fld_score, NULL::DOUBLE PRECISION AS post_bat_score, NULL::DOUBLE PRECISION AS post_fld_score FROM statcast_event_core ec LEFT JOIN statcast_batted_ball bb ON ec.event_key = bb.event_key LEFT JOIN statcast_pitch_release pr ON ec.event_key = pr.event_key WHERE ec.source_season IN ({", ".join(str(int(season)) for season in seasons)}) AND ec.pitcher IN ({_format_id_list(pitcher_ids)}) """ ) return pd.read_sql(query, conn) def _to_hitter_frame(events_df: pd.DataFrame, batter_id_to_name: dict[int, str]) -> pd.DataFrame: if events_df.empty: return pd.DataFrame() out = events_df.copy() out["player_name"] = out["batter"].apply(lambda value: batter_id_to_name.get(_safe_int(value) or -1)) out = out.dropna(subset=["player_name"]).copy() return _prepare_frame(out) def _to_pitcher_frame(events_df: pd.DataFrame) -> pd.DataFrame: if events_df.empty: return pd.DataFrame() out = events_df.copy() out["player_name"] = out["pitcher_player_name"].astype(str).str.strip() out = out[out["player_name"] != ""].copy() return _prepare_frame(out) def _allocate_prior_counts(prior_df: pd.DataFrame, target_count: int) -> dict[int, int]: if target_count <= 0 or prior_df.empty or "source_season" not in prior_df.columns: return {} available_by_season = prior_df["source_season"].dropna().astype(int).value_counts().to_dict() weighted_caps: dict[int, float] = {} for season, available in available_by_season.items(): weighted_caps[season] = float(available) * float(_PRIOR_SEASON_RECENCY_WEIGHTS.get(season, 0.35)) total_weighted = sum(weighted_caps.values()) if total_weighted <= 0: return {season: min(available, target_count) for season, available in available_by_season.items()} allocations: dict[int, int] = {} remainders: list[tuple[float, int]] = [] assigned = 0 for season, weighted_cap in weighted_caps.items(): raw = (weighted_cap / total_weighted) * float(target_count) base = int(raw) capped = min(base, available_by_season.get(season, 0)) allocations[season] = capped assigned += capped remainders.append((raw - base, season)) remaining = max(0, target_count - assigned) for _, season in sorted(remainders, reverse=True): if remaining <= 0: break available = available_by_season.get(season, 0) if allocations.get(season, 0) >= available: continue allocations[season] = allocations.get(season, 0) + 1 remaining -= 1 return allocations def _sample_prior_rows(prior_df: pd.DataFrame, target_count: int) -> pd.DataFrame: if prior_df.empty or target_count <= 0: return pd.DataFrame(columns=prior_df.columns) if len(prior_df) <= target_count: return prior_df.copy() out_frames: list[pd.DataFrame] = [] allocations = _allocate_prior_counts(prior_df, target_count) for season, keep_count in allocations.items(): if keep_count <= 0: continue season_df = prior_df[prior_df["source_season"].astype(int) == int(season)].copy() if season_df.empty: continue season_df = season_df.sort_values("game_date", ascending=False, na_position="last") out_frames.append(season_df.head(keep_count)) if not out_frames: return prior_df.sort_values("game_date", ascending=False, na_position="last").head(target_count).copy() combined = pd.concat(out_frames, ignore_index=True) if len(combined) < target_count: existing_keys = set(combined["event_key"].astype(str).tolist()) if "event_key" in combined.columns else set() remainder = prior_df[ ~prior_df["event_key"].astype(str).isin(existing_keys) ].sort_values("game_date", ascending=False, na_position="last") combined = pd.concat([combined, remainder.head(target_count - len(combined))], ignore_index=True) return combined.head(target_count).copy() def _sample_entity_rows( prior_df: pd.DataFrame, season_df: pd.DataFrame, player_name: str, blend_k: float, role_label: str, ) -> tuple[pd.DataFrame, dict[str, Any]]: prior_count = int(len(prior_df)) season_count = int(len(season_df)) rolling_overlay_active = season_count > 0 if prior_count == 0 and season_count == 0: return ( pd.DataFrame(columns=(season_df.columns if not season_df.empty else prior_df.columns)), { "player_name": player_name, "baseline_mode": "unavailable", "prior_sample_size": 0, "season_2026_sample_size": 0, "prior_weight": 0.0, "season_2026_weight": 0.0, "baseline_driver": "unavailable", "rolling_overlay_active": False, "baseline_role": role_label, }, ) if prior_count == 0: sampled = season_df.sort_values("game_date", ascending=False, na_position="last").head(_MAX_ROWS_PER_PLAYER).copy() sampled["_baseline_source"] = "season_2026" metadata = { "player_name": player_name, "baseline_mode": "current_only", "prior_sample_size": 0, "season_2026_sample_size": season_count, "prior_weight": 0.0, "season_2026_weight": 1.0, "baseline_driver": "current_season_led", "rolling_overlay_active": rolling_overlay_active, "baseline_role": role_label, } return sampled, metadata if season_count == 0: sampled = _sample_prior_rows(prior_df, min(prior_count, _MAX_ROWS_PER_PLAYER)) sampled["_baseline_source"] = "prior" metadata = { "player_name": player_name, "baseline_mode": "prior_only", "prior_sample_size": prior_count, "season_2026_sample_size": 0, "prior_weight": 1.0, "season_2026_weight": 0.0, "baseline_driver": "prior_led", "rolling_overlay_active": False, "baseline_role": role_label, } return sampled, metadata season_weight = _current_weight(season_count, blend_k) prior_weight = _clamp(1.0 - season_weight, 0.15, 0.95) total_weight = prior_weight + season_weight prior_weight /= total_weight season_weight /= total_weight target_total = min(_MAX_ROWS_PER_PLAYER, prior_count + season_count) target_current = min( season_count, max(_MIN_CURRENT_ROWS_WHEN_AVAILABLE, int(round(target_total * season_weight))), ) target_current = min(target_current, target_total) target_prior = min(prior_count, max(0, target_total - target_current)) current_rows = season_df.sort_values("game_date", ascending=False, na_position="last").head(target_current).copy() prior_rows = _sample_prior_rows(prior_df, target_prior) current_rows["_baseline_source"] = "season_2026" prior_rows["_baseline_source"] = "prior" sampled = pd.concat([current_rows, prior_rows], ignore_index=True) if sampled.empty: sampled = season_df.sort_values("game_date", ascending=False, na_position="last").head(target_total).copy() sampled["_baseline_source"] = "season_2026" metadata = { "player_name": player_name, "baseline_mode": "blended", "prior_sample_size": prior_count, "season_2026_sample_size": season_count, "prior_weight": float(prior_weight), "season_2026_weight": float(season_weight), "baseline_driver": "current_season_led" if season_weight > prior_weight else "prior_led", "rolling_overlay_active": rolling_overlay_active, "baseline_role": role_label, } return sampled, metadata def _blend_entity_frames( prior_df: pd.DataFrame, season_df: pd.DataFrame, blend_k: float, role_label: str, ) -> tuple[pd.DataFrame, pd.DataFrame]: player_names = sorted( { str(name).strip() for name in pd.concat( [ prior_df["player_name"] if "player_name" in prior_df.columns else pd.Series(dtype="object"), season_df["player_name"] if "player_name" in season_df.columns else pd.Series(dtype="object"), ], ignore_index=True, ).dropna().tolist() if str(name).strip() } ) sampled_frames: list[pd.DataFrame] = [] metadata_rows: list[dict[str, Any]] = [] for player_name in player_names: player_prior = prior_df[prior_df["player_name"].astype(str) == player_name].copy() if not prior_df.empty else pd.DataFrame(columns=season_df.columns) player_season = season_df[season_df["player_name"].astype(str) == player_name].copy() if not season_df.empty else pd.DataFrame(columns=prior_df.columns) sampled, metadata = _sample_entity_rows( prior_df=player_prior, season_df=player_season, player_name=player_name, blend_k=blend_k, role_label=role_label, ) if not sampled.empty: for key, value in metadata.items(): sampled[key] = value sampled_frames.append(sampled) metadata_rows.append(metadata) blended_df = pd.concat(sampled_frames, ignore_index=True, sort=False) if sampled_frames else pd.DataFrame() metadata_df = pd.DataFrame(metadata_rows) if not blended_df.empty and "game_date" in blended_df.columns: blended_df = blended_df.sort_values( ["player_name", "game_date", "source_season"], ascending=[True, False, False], na_position="last", ).reset_index(drop=True) return blended_df, metadata_df def _build_snapshot_rows( frame: pd.DataFrame, built_at: str, snapshot_version: str, source_status: str, ) -> pd.DataFrame: rows: list[dict[str, Any]] = [] if frame is None or frame.empty or "player_name" not in frame.columns: return pd.DataFrame( columns=[ "player_name", "source_row_count", "payload_json", "snapshot_built_at", "snapshot_version", "source_status", ] ) for player_name, player_df in frame.groupby("player_name", dropna=False): player_name_str = str(player_name or "").strip() if not player_name_str: continue rows.append( { "player_name": player_name_str, "source_row_count": int(len(player_df)), "payload_json": _serialize_payload_frame(player_df.reset_index(drop=True)), "snapshot_built_at": built_at, "snapshot_version": snapshot_version, "source_status": source_status, } ) return pd.DataFrame(rows) def _build_event_snapshot_rows( frame: pd.DataFrame, built_at: str, snapshot_version: str, source_status: str, ) -> pd.DataFrame: if frame is None: frame = pd.DataFrame() out = frame.copy() if out.empty: return pd.DataFrame(columns=_EVENT_SNAPSHOT_COLS) out["snapshot_built_at"] = built_at out["snapshot_version"] = snapshot_version out["source_status"] = source_status for col in _EVENT_SNAPSHOT_COLS: if col not in out.columns: out[col] = None return out[_EVENT_SNAPSHOT_COLS].rename(columns={"_baseline_source": "baseline_source"}).copy() def _build_meta_snapshot_rows( meta_df: pd.DataFrame, built_at: str, snapshot_version: str, source_status: str, ) -> pd.DataFrame: if meta_df is None: meta_df = pd.DataFrame() out = meta_df.copy() for col in [ "player_name", "baseline_role", "baseline_mode", "prior_sample_size", "season_2026_sample_size", "prior_weight", "season_2026_weight", "baseline_driver", "rolling_overlay_active", ]: if col not in out.columns: out[col] = None out["snapshot_built_at"] = built_at out["snapshot_version"] = snapshot_version out["source_status"] = source_status return out[ [ "player_name", "baseline_role", "baseline_mode", "prior_sample_size", "season_2026_sample_size", "prior_weight", "season_2026_weight", "baseline_driver", "rolling_overlay_active", "snapshot_built_at", "snapshot_version", "source_status", ] ].copy() def _build_rolling_snapshot_rows( frame: pd.DataFrame, role_label: str, built_at: str, snapshot_version: str, source_status: str, ) -> pd.DataFrame: rows: list[dict[str, Any]] = [] if frame is None or frame.empty or "player_name" not in frame.columns: return pd.DataFrame( columns=[ "player_name", "source_row_count", "payload_json", "snapshot_built_at", "snapshot_version", "source_status", ] ) for player_name, player_df in frame.groupby("player_name", dropna=False): player_name_str = str(player_name or "").strip() if not player_name_str: continue reference_date = None if "game_date" in player_df.columns: try: reference_date = pd.to_datetime( player_df["game_date"], errors="coerce" ).max() except Exception: reference_date = None if role_label == "batter": payload = build_batter_rolling_form_row( statcast_df=frame, player_name=player_name_str, reference_date=reference_date, ) else: payload = build_pitcher_rolling_form_row( statcast_df=frame, pitcher_name=player_name_str, reference_date=reference_date, ) rows.append( { "player_name": player_name_str, **payload, "source_row_count": int(len(player_df)), "snapshot_built_at": built_at, "snapshot_version": snapshot_version, "source_status": source_status, } ) out = pd.DataFrame(rows) expected_cols = _BATTER_ROLLING_COLS if role_label == "batter" else _PITCHER_ROLLING_COLS for col in expected_cols: if col not in out.columns: out[col] = None return out[expected_cols].copy() def _read_snapshot_table( conn, table_name: str, player_names: tuple[str, ...] = (), ) -> pd.DataFrame: if player_names: clauses = [] params: dict[str, Any] = {} for idx, player_name in enumerate(player_names): key = f"name_{idx}" clauses.append(f":{key}") params[key] = str(player_name) query = text( f"SELECT * FROM {table_name} WHERE player_name IN ({', '.join(clauses)}) ORDER BY player_name" ) return safe_read_sql(query, conn, params=params) return safe_read_sql(text(f"SELECT * FROM {table_name} ORDER BY player_name"), conn) def _replace_snapshot_scope( conn, table_name: str, df: pd.DataFrame, player_names: tuple[str, ...], ) -> None: scoped_names = tuple(sorted({str(name or "").strip() for name in player_names if str(name or "").strip()})) if scoped_names: clauses = [] params: dict[str, Any] = {} for idx, player_name in enumerate(scoped_names): key = f"name_{idx}" clauses.append(f":{key}") params[key] = player_name conn.execute( text(f"DELETE FROM {table_name} WHERE player_name IN ({', '.join(clauses)})"), params, ) if df is not None and not df.empty: upsert_dataframe(conn, table_name, df, replace=False) return replace_table_contents(conn, table_name, df) def _hydrate_snapshot_frame(snapshot_df: pd.DataFrame) -> pd.DataFrame: if snapshot_df is None or snapshot_df.empty: return pd.DataFrame() frames: list[pd.DataFrame] = [] for _, row in snapshot_df.iterrows(): frame = _deserialize_payload_frame(row.get("payload_json")) if frame.empty: continue frames.append(frame) if not frames: return pd.DataFrame() return pd.concat(frames, ignore_index=True, sort=False) def _hydrate_rolling_snapshot_frame(snapshot_df: pd.DataFrame) -> pd.DataFrame: if snapshot_df is None or snapshot_df.empty: return pd.DataFrame() return snapshot_df.copy() def persist_shared_baseline_snapshots( bundle: dict[str, pd.DataFrame], source_status: str = "runtime_refreshed", ) -> dict[str, pd.DataFrame]: built_at = utc_now_iso() batter_names = _normalize_names_tuple( tuple(bundle.get("batter_baseline_meta", pd.DataFrame()).get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist()) ) pitcher_names = _normalize_names_tuple( tuple(bundle.get("pitcher_baseline_meta", pd.DataFrame()).get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist()) ) hitter_snapshot = _build_snapshot_rows( bundle.get("blended_batter_df", pd.DataFrame()), built_at=built_at, snapshot_version=_SNAPSHOT_VERSION, source_status=source_status, ) hitter_event_rows = _build_event_snapshot_rows( bundle.get("blended_batter_df", pd.DataFrame()), built_at=built_at, snapshot_version=_SNAPSHOT_VERSION, source_status=source_status, ) pitcher_snapshot = _build_snapshot_rows( bundle.get("blended_pitcher_df", pd.DataFrame()), built_at=built_at, snapshot_version=_SNAPSHOT_VERSION, source_status=source_status, ) pitcher_event_rows = _build_event_snapshot_rows( bundle.get("blended_pitcher_df", pd.DataFrame()), built_at=built_at, snapshot_version=_SNAPSHOT_VERSION, source_status=source_status, ) hitter_meta = _build_meta_snapshot_rows( bundle.get("batter_baseline_meta", pd.DataFrame()), built_at=built_at, snapshot_version=_SNAPSHOT_VERSION, source_status=source_status, ) pitcher_meta = _build_meta_snapshot_rows( bundle.get("pitcher_baseline_meta", pd.DataFrame()), built_at=built_at, snapshot_version=_SNAPSHOT_VERSION, source_status=source_status, ) hitter_rolling = _build_rolling_snapshot_rows( bundle.get("season_2026_ytd_hitter_df", pd.DataFrame()), role_label="batter", built_at=built_at, snapshot_version=_SNAPSHOT_VERSION, source_status=source_status, ) pitcher_rolling = _build_rolling_snapshot_rows( bundle.get("season_2026_ytd_pitcher_df", pd.DataFrame()), role_label="pitcher", built_at=built_at, snapshot_version=_SNAPSHOT_VERSION, source_status=source_status, ) conn = get_connection() try: _replace_snapshot_scope(conn, "shared_hitter_baseline_event_rows", hitter_event_rows, batter_names) _replace_snapshot_scope(conn, "shared_pitcher_baseline_event_rows", pitcher_event_rows, pitcher_names) _replace_snapshot_scope(conn, "shared_hitter_baseline_snapshot", hitter_snapshot, batter_names) _replace_snapshot_scope(conn, "shared_pitcher_baseline_snapshot", pitcher_snapshot, pitcher_names) _replace_snapshot_scope(conn, "shared_hitter_baseline_meta", hitter_meta, batter_names) _replace_snapshot_scope(conn, "shared_pitcher_baseline_meta", pitcher_meta, pitcher_names) _replace_snapshot_scope(conn, "shared_hitter_rolling_summary", hitter_rolling, batter_names) _replace_snapshot_scope(conn, "shared_pitcher_rolling_summary", pitcher_rolling, pitcher_names) finally: try: conn.close() except Exception: pass snapshot_status = pd.DataFrame( [ { "table_name": "shared_hitter_baseline_event_rows", "row_count": int(len(hitter_event_rows)), "snapshot_built_at": built_at, "snapshot_version": _SNAPSHOT_VERSION, "source_status": source_status, "stale": False, }, { "table_name": "shared_pitcher_baseline_event_rows", "row_count": int(len(pitcher_event_rows)), "snapshot_built_at": built_at, "snapshot_version": _SNAPSHOT_VERSION, "source_status": source_status, "stale": False, }, { "table_name": "shared_hitter_baseline_snapshot", "row_count": int(len(hitter_snapshot)), "snapshot_built_at": built_at, "snapshot_version": _SNAPSHOT_VERSION, "source_status": source_status, "stale": False, }, { "table_name": "shared_pitcher_baseline_snapshot", "row_count": int(len(pitcher_snapshot)), "snapshot_built_at": built_at, "snapshot_version": _SNAPSHOT_VERSION, "source_status": source_status, "stale": False, }, { "table_name": "shared_hitter_baseline_meta", "row_count": int(len(hitter_meta)), "snapshot_built_at": built_at, "snapshot_version": _SNAPSHOT_VERSION, "source_status": source_status, "stale": False, }, { "table_name": "shared_pitcher_baseline_meta", "row_count": int(len(pitcher_meta)), "snapshot_built_at": built_at, "snapshot_version": _SNAPSHOT_VERSION, "source_status": source_status, "stale": False, }, { "table_name": "shared_hitter_rolling_summary", "row_count": int(len(hitter_rolling)), "snapshot_built_at": built_at, "snapshot_version": _SNAPSHOT_VERSION, "source_status": source_status, "stale": False, }, { "table_name": "shared_pitcher_rolling_summary", "row_count": int(len(pitcher_rolling)), "snapshot_built_at": built_at, "snapshot_version": _SNAPSHOT_VERSION, "source_status": source_status, "stale": False, }, ] ) bundle["snapshot_status"] = snapshot_status bundle["snapshot_source_status"] = source_status bundle["runtime_fallback_used"] = False return bundle def load_shared_baseline_bundle_from_snapshots( batter_names: tuple[str, ...] = (), pitcher_names: tuple[str, ...] = (), max_age_seconds: int = _DEFAULT_SNAPSHOT_MAX_AGE_SECONDS, ) -> dict[str, pd.DataFrame]: batter_names = _resolve_snapshot_player_names(batter_names, role="batter") pitcher_names = _resolve_snapshot_player_names(pitcher_names, role="pitcher") _degraded_bundle: dict[str, Any] = { "multi_year_prior_hitter_df": pd.DataFrame(), "season_2026_ytd_hitter_df": pd.DataFrame(), "multi_year_prior_pitcher_df": pd.DataFrame(), "season_2026_ytd_pitcher_df": pd.DataFrame(), "blended_batter_df": pd.DataFrame(), "blended_pitcher_df": pd.DataFrame(), "batter_baseline_meta": pd.DataFrame(), "pitcher_baseline_meta": pd.DataFrame(), "hitter_rolling_snapshot": pd.DataFrame(), "pitcher_rolling_snapshot": pd.DataFrame(), "snapshot_status": pd.DataFrame(), "snapshot_source_status": "snapshot_unavailable", "runtime_fallback_used": False, } _read_result: list[tuple | None] = [None] _read_exc: list[Exception | None] = [None] def _do_reads() -> None: conn = get_connection() try: _read_result[0] = ( _read_snapshot_table(conn, "shared_hitter_baseline_event_rows", player_names=batter_names), _read_snapshot_table(conn, "shared_pitcher_baseline_event_rows", player_names=pitcher_names), _read_snapshot_table(conn, "shared_hitter_baseline_snapshot", player_names=batter_names), _read_snapshot_table(conn, "shared_pitcher_baseline_snapshot", player_names=pitcher_names), _read_snapshot_table(conn, "shared_hitter_baseline_meta", player_names=batter_names), _read_snapshot_table(conn, "shared_pitcher_baseline_meta", player_names=pitcher_names), _read_snapshot_table(conn, "shared_hitter_rolling_summary", player_names=batter_names), _read_snapshot_table(conn, "shared_pitcher_rolling_summary", player_names=pitcher_names), ) except Exception as exc: _read_exc[0] = exc finally: try: conn.close() except Exception: pass _rt = threading.Thread(target=_do_reads, daemon=True) _rt.start() _rt.join(timeout=_SNAPSHOT_READ_TIMEOUT_SECONDS) if _rt.is_alive(): _log.warning( "[shared_baseline] snapshot DB reads timed out after %ds — returning degraded bundle", _SNAPSHOT_READ_TIMEOUT_SECONDS, ) return _degraded_bundle if _read_exc[0] is not None or _read_result[0] is None: return _degraded_bundle ( hitter_event_rows, pitcher_event_rows, hitter_snapshot, pitcher_snapshot, hitter_meta, pitcher_meta, hitter_rolling, pitcher_rolling, ) = _read_result[0] snapshot_status_rows: list[dict[str, Any]] = [] for table_name, frame in [ ("shared_hitter_baseline_snapshot", hitter_snapshot), ("shared_pitcher_baseline_snapshot", pitcher_snapshot), ("shared_hitter_baseline_meta", hitter_meta), ("shared_pitcher_baseline_meta", pitcher_meta), ("shared_hitter_baseline_event_rows", hitter_event_rows), ("shared_pitcher_baseline_event_rows", pitcher_event_rows), ("shared_hitter_rolling_summary", hitter_rolling), ("shared_pitcher_rolling_summary", pitcher_rolling), ]: built_at = None version = None source_status = None if isinstance(frame, pd.DataFrame) and not frame.empty: built_at = frame.get("snapshot_built_at", pd.Series(dtype="object")).iloc[0] version = frame.get("snapshot_version", pd.Series(dtype="object")).iloc[0] source_status = frame.get("source_status", pd.Series(dtype="object")).iloc[0] snapshot_status_rows.append( { "table_name": table_name, "row_count": 0 if frame is None else int(len(frame)), "snapshot_built_at": built_at, "snapshot_version": version, "source_status": source_status, "stale": _is_snapshot_stale(built_at, max_age_seconds), } ) return { "multi_year_prior_hitter_df": pd.DataFrame(), "season_2026_ytd_hitter_df": pd.DataFrame(), "multi_year_prior_pitcher_df": pd.DataFrame(), "season_2026_ytd_pitcher_df": pd.DataFrame(), "blended_batter_df": _prepare_frame(hitter_event_rows.drop(columns=["snapshot_built_at", "snapshot_version", "source_status"], errors="ignore")) if isinstance(hitter_event_rows, pd.DataFrame) and not hitter_event_rows.empty else _hydrate_snapshot_frame(hitter_snapshot), "blended_pitcher_df": _prepare_frame(pitcher_event_rows.drop(columns=["snapshot_built_at", "snapshot_version", "source_status"], errors="ignore")) if isinstance(pitcher_event_rows, pd.DataFrame) and not pitcher_event_rows.empty else _hydrate_snapshot_frame(pitcher_snapshot), "batter_baseline_meta": hitter_meta, "pitcher_baseline_meta": pitcher_meta, "hitter_rolling_snapshot": _hydrate_rolling_snapshot_frame(hitter_rolling), "pitcher_rolling_snapshot": _hydrate_rolling_snapshot_frame(pitcher_rolling), "snapshot_status": pd.DataFrame(snapshot_status_rows), "snapshot_source_status": "snapshot", "runtime_fallback_used": False, } def queue_shared_baseline_refresh( batter_names: tuple[str, ...] = (), pitcher_names: tuple[str, ...] = (), ) -> bool: key = (_normalize_names_tuple(batter_names), _normalize_names_tuple(pitcher_names)) with _REFRESH_LOCK: if key in _REFRESH_INFLIGHT: return False _REFRESH_INFLIGHT.add(key) def _run() -> None: try: refreshed = build_shared_baseline_bundle( batter_names=key[0], pitcher_names=key[1], ) persist_shared_baseline_snapshots( refreshed, source_status="background_refreshed", ) except Exception: pass finally: with _REFRESH_LOCK: _REFRESH_INFLIGHT.discard(key) threading.Thread(target=_run, daemon=True).start() return True def load_or_build_shared_baseline_bundle( batter_names: tuple[str, ...] = (), pitcher_names: tuple[str, ...] = (), max_age_seconds: int = _DEFAULT_SNAPSHOT_MAX_AGE_SECONDS, persist_runtime_refresh: bool = True, ) -> dict[str, pd.DataFrame]: batter_names = _normalize_names_tuple(batter_names) pitcher_names = _normalize_names_tuple(pitcher_names) snapshot_batter_names = _resolve_snapshot_player_names(batter_names, role="batter") snapshot_pitcher_names = _resolve_snapshot_player_names(pitcher_names, role="pitcher") snapshot_bundle = load_shared_baseline_bundle_from_snapshots( batter_names=snapshot_batter_names, pitcher_names=snapshot_pitcher_names, max_age_seconds=max_age_seconds, ) snapshot_status = snapshot_bundle.get("snapshot_status", pd.DataFrame()) available_hitters = _normalized_name_set( snapshot_bundle.get("batter_baseline_meta", pd.DataFrame()) .get("player_name", pd.Series(dtype="object")) .dropna() .astype(str) .tolist() if isinstance(snapshot_bundle.get("batter_baseline_meta", pd.DataFrame()), pd.DataFrame) else [] ) available_pitchers = _normalized_name_set( snapshot_bundle.get("pitcher_baseline_meta", pd.DataFrame()) .get("player_name", pd.Series(dtype="object")) .dropna() .astype(str) .tolist() if isinstance(snapshot_bundle.get("pitcher_baseline_meta", pd.DataFrame()), pd.DataFrame) else [] ) missing_hitter_names = _compute_missing_requested_names(snapshot_batter_names, available_hitters) missing_pitcher_names = _compute_missing_requested_names(snapshot_pitcher_names, available_pitchers) requested_hitter_covered = True if snapshot_batter_names: requested_hitter_covered = not missing_hitter_names requested_pitcher_covered = True if snapshot_pitcher_names: requested_pitcher_covered = not missing_pitcher_names snapshot_has_data = not snapshot_bundle.get("blended_batter_df", pd.DataFrame()).empty or not snapshot_bundle.get("blended_pitcher_df", pd.DataFrame()).empty snapshot_stale = bool( isinstance(snapshot_status, pd.DataFrame) and not snapshot_status.empty and snapshot_status["stale"].fillna(False).any() ) coverage_mode = "empty" if snapshot_has_data and requested_hitter_covered and requested_pitcher_covered: coverage_mode = "full" elif snapshot_has_data: coverage_mode = "partial" background_refresh_queued = False if snapshot_has_data and requested_hitter_covered and requested_pitcher_covered and not snapshot_stale: return _annotate_request_coverage( snapshot_bundle, requested_hitter_names=snapshot_batter_names, requested_pitcher_names=snapshot_pitcher_names, coverage_mode=coverage_mode, background_refresh_queued=background_refresh_queued, ) if snapshot_has_data and ( (requested_hitter_covered and requested_pitcher_covered and snapshot_stale) or missing_hitter_names or missing_pitcher_names ): background_refresh_queued = queue_shared_baseline_refresh( batter_names=snapshot_batter_names, pitcher_names=snapshot_pitcher_names, ) snapshot_bundle["snapshot_source_status"] = "snapshot_partial_served" if coverage_mode == "partial" else "snapshot_stale_served" snapshot_bundle["runtime_fallback_used"] = False return _annotate_request_coverage( snapshot_bundle, requested_hitter_names=snapshot_batter_names, requested_pitcher_names=snapshot_pitcher_names, coverage_mode=coverage_mode, background_refresh_queued=background_refresh_queued, ) runtime_bundle = build_shared_baseline_bundle( batter_names=snapshot_batter_names, pitcher_names=snapshot_pitcher_names, ) runtime_bundle["snapshot_source_status"] = "runtime_fallback" runtime_bundle["runtime_fallback_used"] = True if persist_runtime_refresh: runtime_bundle = persist_shared_baseline_snapshots( runtime_bundle, source_status="runtime_refreshed", ) runtime_bundle["runtime_fallback_used"] = True if "snapshot_status" not in runtime_bundle: runtime_bundle["snapshot_status"] = snapshot_status return _annotate_request_coverage( runtime_bundle, requested_hitter_names=snapshot_batter_names, requested_pitcher_names=snapshot_pitcher_names, coverage_mode="runtime_fallback", background_refresh_queued=background_refresh_queued, ) def load_or_build_shared_baseline_bundle_complete_for_request( batter_names: tuple[str, ...] = (), pitcher_names: tuple[str, ...] = (), max_age_seconds: int = _DEFAULT_SNAPSHOT_MAX_AGE_SECONDS, persist_runtime_refresh: bool = True, ) -> dict[str, pd.DataFrame]: batter_names = _normalize_names_tuple(batter_names) pitcher_names = _normalize_names_tuple(pitcher_names) snapshot_batter_names = _resolve_snapshot_player_names(batter_names, role="batter") snapshot_pitcher_names = _resolve_snapshot_player_names(pitcher_names, role="pitcher") snapshot_bundle = load_shared_baseline_bundle_from_snapshots( batter_names=snapshot_batter_names, pitcher_names=snapshot_pitcher_names, max_age_seconds=max_age_seconds, ) snapshot_status = snapshot_bundle.get("snapshot_status", pd.DataFrame()) snapshot_has_data = not snapshot_bundle.get("blended_batter_df", pd.DataFrame()).empty or not snapshot_bundle.get("blended_pitcher_df", pd.DataFrame()).empty snapshot_stale = bool( isinstance(snapshot_status, pd.DataFrame) and not snapshot_status.empty and snapshot_status["stale"].fillna(False).any() ) available_hitters = _normalized_name_set( snapshot_bundle.get("batter_baseline_meta", pd.DataFrame()) .get("player_name", pd.Series(dtype="object")) .dropna() .astype(str) .tolist() if isinstance(snapshot_bundle.get("batter_baseline_meta", pd.DataFrame()), pd.DataFrame) else [] ) available_pitchers = _normalized_name_set( snapshot_bundle.get("pitcher_baseline_meta", pd.DataFrame()) .get("player_name", pd.Series(dtype="object")) .dropna() .astype(str) .tolist() if isinstance(snapshot_bundle.get("pitcher_baseline_meta", pd.DataFrame()), pd.DataFrame) else [] ) missing_hitter_names = _compute_missing_requested_names(snapshot_batter_names, available_hitters) missing_pitcher_names = _compute_missing_requested_names(snapshot_pitcher_names, available_pitchers) if snapshot_has_data and not missing_hitter_names and not missing_pitcher_names: coverage_mode = "full" if not snapshot_stale else "stale_full" background_refresh_queued = False if snapshot_stale: background_refresh_queued = queue_shared_baseline_refresh( batter_names=snapshot_batter_names, pitcher_names=snapshot_pitcher_names, ) snapshot_bundle["snapshot_source_status"] = "snapshot_stale_served" return _annotate_request_coverage( snapshot_bundle, requested_hitter_names=snapshot_batter_names, requested_pitcher_names=snapshot_pitcher_names, coverage_mode=coverage_mode, background_refresh_queued=background_refresh_queued, ) if snapshot_has_data and (missing_hitter_names or missing_pitcher_names): patch_result: list[dict | None] = [None] def _run_patch() -> None: try: patch_result[0] = build_shared_baseline_bundle( batter_names=tuple(sorted(missing_hitter_names)), pitcher_names=tuple(sorted(missing_pitcher_names)), ) except Exception as exc: _log.warning("[shared_baseline] patch build error: %s", exc) _pt = threading.Thread(target=_run_patch, daemon=True) _pt.start() _pt.join(timeout=_FALLBACK_BUILD_TIMEOUT_SECONDS) if patch_result[0] is not None: patch_bundle = patch_result[0] if persist_runtime_refresh: _queue_shared_baseline_bundle_persist(patch_bundle, source_status="runtime_request_patch") merged_bundle = _merge_shared_baseline_bundles(snapshot_bundle, patch_bundle) if snapshot_stale: merged_bundle["background_refresh_queued"] = queue_shared_baseline_refresh( batter_names=snapshot_batter_names, pitcher_names=snapshot_pitcher_names, ) return _annotate_request_coverage( merged_bundle, requested_hitter_names=snapshot_batter_names, requested_pitcher_names=snapshot_pitcher_names, coverage_mode="request_completed_patch", background_refresh_queued=bool(merged_bundle.get("background_refresh_queued")), ) else: _log.warning( "[shared_baseline] patch build timed out after %ds — serving partial snapshot", _FALLBACK_BUILD_TIMEOUT_SECONDS, ) if persist_runtime_refresh: def _persist_patch_when_done() -> None: _pt.join() if patch_result[0] is not None: _queue_shared_baseline_bundle_persist(patch_result[0], source_status="runtime_request_patch") threading.Thread(target=_persist_patch_when_done, daemon=True).start() snapshot_bundle["snapshot_source_status"] = "patch_build_timeout" return _annotate_request_coverage( snapshot_bundle, requested_hitter_names=snapshot_batter_names, requested_pitcher_names=snapshot_pitcher_names, coverage_mode="request_completed_patch_partial", background_refresh_queued=False, ) runtime_result: list[dict | None] = [None] def _run_fallback() -> None: try: runtime_result[0] = build_shared_baseline_bundle( batter_names=snapshot_batter_names, pitcher_names=snapshot_pitcher_names, ) except Exception as exc: _log.warning("[shared_baseline] runtime fallback build error: %s", exc) _rt = threading.Thread(target=_run_fallback, daemon=True) _rt.start() _rt.join(timeout=_FALLBACK_BUILD_TIMEOUT_SECONDS) if runtime_result[0] is not None: runtime_bundle = runtime_result[0] runtime_bundle["snapshot_source_status"] = "runtime_fallback" runtime_bundle["runtime_fallback_used"] = True runtime_bundle["request_patch_used"] = False if persist_runtime_refresh: _queue_shared_baseline_bundle_persist(runtime_bundle, source_status="runtime_refreshed") if "snapshot_status" not in runtime_bundle: runtime_bundle["snapshot_status"] = snapshot_status return _annotate_request_coverage( runtime_bundle, requested_hitter_names=snapshot_batter_names, requested_pitcher_names=snapshot_pitcher_names, coverage_mode="runtime_fallback", background_refresh_queued=False, ) else: _log.warning( "[shared_baseline] full fallback build timed out after %ds — returning empty bundle, " "will persist when complete", _FALLBACK_BUILD_TIMEOUT_SECONDS, ) if persist_runtime_refresh: def _persist_runtime_when_done() -> None: _rt.join() if runtime_result[0] is not None: _queue_shared_baseline_bundle_persist(runtime_result[0], source_status="runtime_refreshed") threading.Thread(target=_persist_runtime_when_done, daemon=True).start() degraded: dict[str, Any] = { "blended_batter_df": pd.DataFrame(), "blended_pitcher_df": pd.DataFrame(), "batter_baseline_meta": pd.DataFrame(), "pitcher_baseline_meta": pd.DataFrame(), "snapshot_status": snapshot_status, "snapshot_source_status": "runtime_fallback_timeout", "runtime_fallback_used": True, "request_patch_used": False, } return _annotate_request_coverage( degraded, requested_hitter_names=snapshot_batter_names, requested_pitcher_names=snapshot_pitcher_names, coverage_mode="runtime_fallback_timeout", background_refresh_queued=False, ) def build_shared_baseline_bundle( batter_names: tuple[str, ...] | None = None, pitcher_names: tuple[str, ...] | None = None, prior_seasons: tuple[int, ...] = PRIOR_SEASONS, current_season: int = CURRENT_SEASON, ) -> dict[str, pd.DataFrame]: identity_maps = _load_identity_maps() batter_id_to_name = identity_maps["batter_id_to_name"] pitcher_id_to_name = identity_maps["pitcher_id_to_name"] batter_name_to_ids = identity_maps["batter_name_to_ids"] pitcher_name_to_ids = identity_maps["pitcher_name_to_ids"] conn = get_connection() try: requested_batter_ids = _build_requested_ids(batter_names, batter_name_to_ids) requested_pitcher_ids = _build_requested_ids(pitcher_names, pitcher_name_to_ids) requested_batter_scope = bool(batter_names) requested_pitcher_scope = bool(pitcher_names) current_events = _load_current_events( conn, current_season=current_season, batter_ids=requested_batter_ids if requested_batter_scope else None, pitcher_ids=requested_pitcher_ids if requested_pitcher_scope else None, ) current_batter_ids = { value for value in current_events.get("batter", pd.Series(dtype="float")).dropna().apply(_safe_int).tolist() if value is not None } current_pitcher_ids = { value for value in current_events.get("pitcher", pd.Series(dtype="float")).dropna().apply(_safe_int).tolist() if value is not None } if requested_batter_scope: active_batter_ids = requested_batter_ids else: active_batter_ids = current_batter_ids if requested_pitcher_scope: active_pitcher_ids = requested_pitcher_ids else: active_pitcher_ids = current_pitcher_ids prior_hitter_events = _load_prior_hitter_events( conn, seasons=prior_seasons, batter_ids=active_batter_ids, ) prior_pitcher_events = _load_prior_pitcher_events( conn, seasons=prior_seasons, pitcher_ids=active_pitcher_ids, ) finally: try: conn.close() except Exception: pass current_hitter_events = current_events.copy() if active_batter_ids: current_hitter_events = current_hitter_events[ current_hitter_events["batter"].apply(_safe_int).isin(active_batter_ids) ].copy() current_pitcher_events = current_events.copy() if active_pitcher_ids: current_pitcher_events = current_pitcher_events[ current_pitcher_events["pitcher"].apply(_safe_int).isin(active_pitcher_ids) ].copy() multi_year_prior_hitter_df = _to_hitter_frame(prior_hitter_events, batter_id_to_name) season_2026_ytd_hitter_df = _to_hitter_frame(current_hitter_events, batter_id_to_name) multi_year_prior_pitcher_df = _to_pitcher_frame(prior_pitcher_events) season_2026_ytd_pitcher_df = _to_pitcher_frame(current_pitcher_events) blended_batter_df, batter_baseline_meta = _blend_entity_frames( prior_df=multi_year_prior_hitter_df, season_df=season_2026_ytd_hitter_df, blend_k=_HITTER_BLEND_K, role_label="batter", ) blended_pitcher_df, pitcher_baseline_meta = _blend_entity_frames( prior_df=multi_year_prior_pitcher_df, season_df=season_2026_ytd_pitcher_df, blend_k=_PITCHER_BLEND_K, role_label="pitcher", ) return { "multi_year_prior_hitter_df": multi_year_prior_hitter_df, "season_2026_ytd_hitter_df": season_2026_ytd_hitter_df, "multi_year_prior_pitcher_df": multi_year_prior_pitcher_df, "season_2026_ytd_pitcher_df": season_2026_ytd_pitcher_df, "blended_batter_df": blended_batter_df, "blended_pitcher_df": blended_pitcher_df, "batter_baseline_meta": batter_baseline_meta, "pitcher_baseline_meta": pitcher_baseline_meta, }