2026_MLB_Model / data /shared_baseline.py
Syntrex's picture
Perf: parallelize TheOddsAPI event loop, fix schema init overhead
bd7830d
raw
history blame
70.9 kB
from __future__ import annotations
from collections import defaultdict
import json
import logging
import threading
from typing import Any
_log = logging.getLogger(__name__)
import pandas as pd
from sqlalchemy import text
from database.db import get_connection, replace_table_contents, safe_read_sql, upsert_dataframe
from features.pitch_features import add_pitch_features
from models.rolling_form_model import (
build_batter_rolling_form_row,
build_pitcher_rolling_form_row,
)
from utils.helpers import utc_now_iso
from visualization.cards.player_identity import load_identity_map, normalize_for_matching, to_canonical_name
PRIOR_SEASONS = (2021, 2022, 2023, 2024, 2025)
CURRENT_SEASON = 2026
_HITTER_BLEND_K = 260.0
_PITCHER_BLEND_K = 320.0
_MAX_ROWS_PER_PLAYER = 420
_MIN_CURRENT_ROWS_WHEN_AVAILABLE = 20
_SNAPSHOT_VERSION = "shared_baseline_v1"
_DEFAULT_SNAPSHOT_MAX_AGE_SECONDS = 60 * 30
_FALLBACK_BUILD_TIMEOUT_SECONDS: int = 5
_SNAPSHOT_READ_TIMEOUT_SECONDS: int = 20 # wall-clock cap for the 8 DB reads in load_shared_baseline_bundle_from_snapshots
_PRIOR_SEASON_RECENCY_WEIGHTS = {
2025: 1.00,
2024: 0.85,
2023: 0.70,
2022: 0.55,
2021: 0.45,
}
_REFRESH_LOCK = threading.Lock()
_REFRESH_INFLIGHT: set[tuple[tuple[str, ...], tuple[str, ...]]] = set()
_PERSIST_LOCK = threading.Lock()
_PERSIST_INFLIGHT: set[tuple[str, tuple[str, ...], tuple[str, ...]]] = set()
_NUMERIC_COLS = [
"batter",
"pitcher",
"game_pk",
"source_season",
"inning",
"at_bat_number",
"pitch_number",
"plate_x",
"plate_z",
"release_speed",
"release_spin_rate",
"release_extension",
"release_pos_x",
"release_pos_z",
"pfx_x",
"pfx_z",
"launch_speed",
"launch_angle",
"estimated_woba_using_speedangle",
"spray_angle",
"hc_x",
"hc_y",
"balls",
"strikes",
"outs_when_up",
"bat_score",
"fld_score",
"post_bat_score",
"post_fld_score",
]
_EVENT_SNAPSHOT_COLS = [
"player_name",
"event_key",
"batter",
"pitcher",
"game_date",
"game_pk",
"source_season",
"pitch_type",
"pitch_name",
"events",
"description",
"stand",
"p_throws",
"home_team",
"away_team",
"inning",
"inning_topbot",
"at_bat_number",
"pitch_number",
"plate_x",
"plate_z",
"release_speed",
"release_spin_rate",
"release_extension",
"release_pos_x",
"release_pos_z",
"pfx_x",
"pfx_z",
"launch_speed",
"launch_angle",
"estimated_woba_using_speedangle",
"spray_angle",
"hc_x",
"hc_y",
"bb_type",
"balls",
"strikes",
"outs_when_up",
"bat_score",
"fld_score",
"post_bat_score",
"post_fld_score",
"pitcher_hand",
"batter_stand",
"movement_magnitude",
"spin_efficiency_proxy",
"release_height_proxy",
"release_side_proxy",
"count_string",
"baseline_mode",
"prior_sample_size",
"season_2026_sample_size",
"prior_weight",
"season_2026_weight",
"baseline_driver",
"rolling_overlay_active",
"baseline_role",
"_baseline_source",
"snapshot_built_at",
"snapshot_version",
"source_status",
]
_BATTER_ROLLING_COLS = [
"player_name",
"batter_ev_5g",
"batter_ev_10g",
"batter_ev90_5g",
"batter_ev90_10g",
"batter_hard_hit_rate_5g",
"batter_hard_hit_rate_10g",
"batter_barrel_rate_5g",
"batter_barrel_rate_10g",
"batter_avg_launch_angle_5g",
"batter_avg_launch_angle_10g",
"batter_fb_rate_5g",
"batter_fb_rate_10g",
"batter_ld_rate_5g",
"batter_gb_rate_5g",
"batter_air_ball_rate_5g",
"batter_hr_rate_5g",
"batter_hr_rate_10g",
"batter_pull_air_rate_5g",
"batter_pulled_hard_air_rate_5g",
"batter_pulled_barrel_rate_5g",
"batter_games_in_window_5g",
"batter_games_in_window_10g",
"batter_recent_form_available",
"source_row_count",
"snapshot_built_at",
"snapshot_version",
"source_status",
]
_PITCHER_ROLLING_COLS = [
"player_name",
"pitcher_avg_release_speed_5g",
"pitcher_avg_release_speed_10g",
"pitcher_avg_release_spin_rate_5g",
"pitcher_ev_allowed_5g",
"pitcher_ev_allowed_10g",
"pitcher_hard_hit_rate_allowed_5g",
"pitcher_hard_hit_rate_allowed_10g",
"pitcher_barrel_rate_allowed_5g",
"pitcher_barrel_rate_allowed_10g",
"pitcher_avg_launch_angle_allowed_5g",
"pitcher_fb_rate_allowed_5g",
"pitcher_ld_rate_allowed_5g",
"pitcher_gb_rate_allowed_5g",
"pitcher_hr_allowed_rate_5g",
"pitcher_hr_allowed_rate_10g",
"pitcher_games_in_window_5g",
"pitcher_games_in_window_10g",
"pitcher_recent_form_available",
"pitcher_rolling_confidence",
"source_row_count",
"snapshot_built_at",
"snapshot_version",
"source_status",
]
def _safe_int(value: Any) -> int | None:
try:
if value is None or str(value).strip() in {"", "nan", "None"}:
return None
return int(float(value))
except Exception:
return None
def _coerce_bool(value: Any) -> bool:
text = str(value or "").strip().lower()
return text in {"1", "true", "yes"}
def _normalize_name(value: Any) -> str:
return normalize_for_matching(to_canonical_name(str(value or "").strip()))
def _normalized_name_set(values: list[Any] | tuple[Any, ...] | set[Any] | pd.Series | None) -> set[str]:
if values is None:
return set()
return {
_normalize_name(value)
for value in list(values)
if _normalize_name(value)
}
def _compute_missing_requested_names(
requested_names: tuple[str, ...] | list[str] | None,
available_names: set[str],
) -> list[str]:
if not requested_names:
return []
out: list[str] = []
for raw_name in requested_names:
cleaned = str(raw_name or "").strip()
if not cleaned:
continue
if _normalize_name(cleaned) not in available_names:
out.append(cleaned)
return out
def _annotate_request_coverage(
bundle: dict[str, pd.DataFrame],
*,
requested_hitter_names: tuple[str, ...],
requested_pitcher_names: tuple[str, ...],
coverage_mode: str,
background_refresh_queued: bool,
) -> dict[str, pd.DataFrame]:
available_hitters = _normalized_name_set(
bundle.get("batter_baseline_meta", pd.DataFrame()).get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist()
if isinstance(bundle.get("batter_baseline_meta", pd.DataFrame()), pd.DataFrame)
else []
)
available_pitchers = _normalized_name_set(
bundle.get("pitcher_baseline_meta", pd.DataFrame()).get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist()
if isinstance(bundle.get("pitcher_baseline_meta", pd.DataFrame()), pd.DataFrame)
else []
)
missing_hitter_names = _compute_missing_requested_names(requested_hitter_names, available_hitters)
missing_pitcher_names = _compute_missing_requested_names(requested_pitcher_names, available_pitchers)
bundle["requested_hitter_count"] = int(len(requested_hitter_names))
bundle["requested_pitcher_count"] = int(len(requested_pitcher_names))
bundle["resolved_hitter_count"] = int(len(requested_hitter_names) - len(missing_hitter_names))
bundle["resolved_pitcher_count"] = int(len(requested_pitcher_names) - len(missing_pitcher_names))
bundle["missing_hitter_names"] = missing_hitter_names
bundle["missing_pitcher_names"] = missing_pitcher_names
bundle["snapshot_coverage_mode"] = coverage_mode
bundle["background_refresh_queued"] = background_refresh_queued
return bundle
def _merge_bundle_frames(
left: pd.DataFrame | None,
right: pd.DataFrame | None,
*,
subset_candidates: list[str],
) -> pd.DataFrame:
left_df = left if isinstance(left, pd.DataFrame) else pd.DataFrame()
right_df = right if isinstance(right, pd.DataFrame) else pd.DataFrame()
if left_df.empty:
return right_df.copy()
if right_df.empty:
return left_df.copy()
merged = pd.concat([left_df, right_df], ignore_index=True, sort=False)
subset = [col for col in subset_candidates if col in merged.columns]
if subset:
merged = merged.drop_duplicates(subset=subset, keep="last")
else:
merged = merged.drop_duplicates(keep="last")
return merged.reset_index(drop=True)
def _merge_shared_baseline_bundles(
snapshot_bundle: dict[str, pd.DataFrame],
patch_bundle: dict[str, pd.DataFrame],
) -> dict[str, pd.DataFrame]:
merged = dict(snapshot_bundle)
merged["blended_batter_df"] = _merge_bundle_frames(
snapshot_bundle.get("blended_batter_df"),
patch_bundle.get("blended_batter_df"),
subset_candidates=["player_name", "event_key", "game_pk", "at_bat_number", "pitch_number"],
)
merged["blended_pitcher_df"] = _merge_bundle_frames(
snapshot_bundle.get("blended_pitcher_df"),
patch_bundle.get("blended_pitcher_df"),
subset_candidates=["player_name", "event_key", "game_pk", "at_bat_number", "pitch_number"],
)
merged["batter_baseline_meta"] = _merge_bundle_frames(
snapshot_bundle.get("batter_baseline_meta"),
patch_bundle.get("batter_baseline_meta"),
subset_candidates=["player_name"],
)
merged["pitcher_baseline_meta"] = _merge_bundle_frames(
snapshot_bundle.get("pitcher_baseline_meta"),
patch_bundle.get("pitcher_baseline_meta"),
subset_candidates=["player_name"],
)
merged["hitter_rolling_snapshot"] = _merge_bundle_frames(
snapshot_bundle.get("hitter_rolling_snapshot"),
patch_bundle.get("hitter_rolling_snapshot"),
subset_candidates=["player_name"],
)
merged["pitcher_rolling_snapshot"] = _merge_bundle_frames(
snapshot_bundle.get("pitcher_rolling_snapshot"),
patch_bundle.get("pitcher_rolling_snapshot"),
subset_candidates=["player_name"],
)
merged["snapshot_status"] = _merge_bundle_frames(
snapshot_bundle.get("snapshot_status"),
patch_bundle.get("snapshot_status"),
subset_candidates=["table_name"],
)
merged["snapshot_source_status"] = "snapshot_request_patched"
merged["runtime_fallback_used"] = False
merged["request_patch_used"] = True
return merged
def _queue_shared_baseline_bundle_persist(
bundle: dict[str, pd.DataFrame],
*,
source_status: str,
) -> bool:
batter_meta = bundle.get("batter_baseline_meta", pd.DataFrame())
pitcher_meta = bundle.get("pitcher_baseline_meta", pd.DataFrame())
batter_names = tuple(
sorted(
{
_normalize_name(name)
for name in (
batter_meta.get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist()
if isinstance(batter_meta, pd.DataFrame)
else []
)
if _normalize_name(name)
}
)
)
pitcher_names = tuple(
sorted(
{
_normalize_name(name)
for name in (
pitcher_meta.get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist()
if isinstance(pitcher_meta, pd.DataFrame)
else []
)
if _normalize_name(name)
}
)
)
key = (str(source_status or "").strip().lower(), batter_names, pitcher_names)
with _PERSIST_LOCK:
if key in _PERSIST_INFLIGHT:
return False
_PERSIST_INFLIGHT.add(key)
persist_bundle = {
k: (v.copy() if isinstance(v, pd.DataFrame) else v)
for k, v in dict(bundle).items()
}
def _run() -> None:
try:
persist_shared_baseline_snapshots(
persist_bundle,
source_status=source_status,
)
except Exception:
pass
finally:
with _PERSIST_LOCK:
_PERSIST_INFLIGHT.discard(key)
threading.Thread(target=_run, daemon=True).start()
return True
def _clamp(value: float, lo: float, hi: float) -> float:
return max(lo, min(hi, value))
def _current_weight(current_sample_size: int, k: float) -> float:
if current_sample_size <= 0:
return 0.0
raw = float(current_sample_size) / float(current_sample_size + k)
return _clamp(raw, 0.05, 0.85)
def _load_identity_maps() -> dict[str, Any]:
identity_df = load_identity_map()
batter_id_to_name: dict[int, str] = {}
pitcher_id_to_name: dict[int, str] = {}
batter_name_to_ids: dict[str, set[int]] = defaultdict(set)
pitcher_name_to_ids: dict[str, set[int]] = defaultdict(set)
if identity_df is None or identity_df.empty:
return {
"batter_id_to_name": batter_id_to_name,
"pitcher_id_to_name": pitcher_id_to_name,
"batter_name_to_ids": batter_name_to_ids,
"pitcher_name_to_ids": pitcher_name_to_ids,
}
for _, row in identity_df.dropna(subset=["player_id"]).iterrows():
player_id = _safe_int(row.get("player_id"))
if player_id is None:
continue
canonical_name = str(
row.get("display_name")
or row.get("canonical_name")
or row.get("statcast_name")
or ""
).strip()
pitcher_name = str(
row.get("statcast_name")
or row.get("canonical_name")
or row.get("display_name")
or ""
).strip()
if not canonical_name:
canonical_name = pitcher_name
if not pitcher_name:
pitcher_name = canonical_name
if _coerce_bool(row.get("role_hitter")):
batter_id_to_name[player_id] = canonical_name
if _coerce_bool(row.get("role_pitcher")):
pitcher_id_to_name[player_id] = pitcher_name
for raw_name in [
row.get("display_name"),
row.get("canonical_name"),
row.get("statcast_name"),
row.get("pybaseball_name"),
]:
normalized = _normalize_name(raw_name)
if not normalized:
continue
if _coerce_bool(row.get("role_hitter")):
batter_name_to_ids[normalized].add(player_id)
if _coerce_bool(row.get("role_pitcher")):
pitcher_name_to_ids[normalized].add(player_id)
return {
"batter_id_to_name": batter_id_to_name,
"pitcher_id_to_name": pitcher_id_to_name,
"batter_name_to_ids": batter_name_to_ids,
"pitcher_name_to_ids": pitcher_name_to_ids,
}
def _build_requested_ids(
names: tuple[str, ...] | None,
lookup: dict[str, set[int]],
) -> set[int]:
if not names:
return set()
ids: set[int] = set()
for name in names:
normalized = _normalize_name(name)
ids.update(lookup.get(normalized, set()))
return ids
def _format_id_list(ids: set[int]) -> str:
return ", ".join(str(int(v)) for v in sorted(ids))
def _prepare_frame(df: pd.DataFrame) -> pd.DataFrame:
if df.empty:
return df.copy()
out = df.copy()
for col in _NUMERIC_COLS:
if col in out.columns:
out[col] = pd.to_numeric(out[col], errors="coerce")
if "game_date" in out.columns:
out["game_date"] = pd.to_datetime(out["game_date"], errors="coerce")
if "p_throws" in out.columns and "pitcher_hand" not in out.columns:
out["pitcher_hand"] = out["p_throws"]
if "stand" in out.columns and "batter_stand" not in out.columns:
out["batter_stand"] = out["stand"]
return add_pitch_features(out)
def _json_default(value: Any) -> Any:
if isinstance(value, (pd.Timestamp, pd.Timedelta)):
return str(value)
return str(value)
def _serialize_payload_frame(df: pd.DataFrame) -> str:
if df is None or df.empty:
return "[]"
out = df.copy()
if "game_date" in out.columns:
out["game_date"] = out["game_date"].astype(str)
return json.dumps(out.where(out.notna(), other=None).to_dict("records"), default=_json_default)
def _deserialize_payload_frame(payload_json: str) -> pd.DataFrame:
try:
payload = json.loads(str(payload_json or "[]"))
except Exception:
payload = []
if not payload:
return pd.DataFrame()
return _prepare_frame(pd.DataFrame(payload))
def _normalize_names_tuple(values: tuple[str, ...] | None) -> tuple[str, ...]:
if not values:
return tuple()
out = []
seen = set()
for value in values:
cleaned = str(value or "").strip()
if not cleaned:
continue
normalized = _normalize_name(cleaned)
if normalized in seen:
continue
seen.add(normalized)
out.append(cleaned)
return tuple(sorted(out))
def _resolve_snapshot_player_names(
names: tuple[str, ...] | None,
*,
role: str,
) -> tuple[str, ...]:
normalized_names = _normalize_names_tuple(names)
if not normalized_names:
return tuple()
identity_maps = _load_identity_maps()
if role == "pitcher":
name_to_ids = identity_maps.get("pitcher_name_to_ids", {})
id_to_name = identity_maps.get("pitcher_id_to_name", {})
else:
name_to_ids = identity_maps.get("batter_name_to_ids", {})
id_to_name = identity_maps.get("batter_id_to_name", {})
resolved: list[str] = []
seen: set[str] = set()
for raw_name in normalized_names:
mapped_ids = name_to_ids.get(_normalize_name(raw_name), set())
mapped_names = [
str(id_to_name.get(player_id) or "").strip()
for player_id in mapped_ids
if str(id_to_name.get(player_id) or "").strip()
]
candidates = mapped_names or [str(raw_name).strip()]
for candidate in candidates:
lowered = candidate.lower()
if not candidate or lowered in seen:
continue
seen.add(lowered)
resolved.append(candidate)
return tuple(sorted(resolved))
def _is_snapshot_stale(built_at: Any, max_age_seconds: int) -> bool:
if not built_at:
return True
try:
built_ts = pd.to_datetime(built_at, errors="coerce", utc=True)
if pd.isna(built_ts):
return True
now_ts = pd.Timestamp.now(tz="UTC")
age_seconds = max(0.0, float((now_ts - built_ts).total_seconds()))
return age_seconds > float(max_age_seconds)
except Exception:
return True
def _load_current_events(
conn,
current_season: int,
batter_ids: set[int] | None = None,
pitcher_ids: set[int] | None = None,
) -> pd.DataFrame:
filters = ["lpm.source_season = :season"]
if batter_ids:
filters.append(f"lpm.batter IN ({_format_id_list(batter_ids)})")
if pitcher_ids:
filters.append(f"lpm.pitcher IN ({_format_id_list(pitcher_ids)})")
where_clause = " AND ".join(filters)
query = text(
f"""
SELECT
lpm.event_key,
lpm.player_name AS pitcher_player_name,
lpm.batter,
lpm.pitcher,
lpm.game_date,
lpm.game_pk,
lpm.source_season,
lpm.pitch_type,
lpm.pitch_name,
lpm.events,
lpm.description,
lpm.stand,
lpm.p_throws,
lpm.home_team,
lpm.away_team,
lpm.inning,
lpm.inning_topbot,
lpm.at_bat_number,
lpm.pitch_number,
lpm.plate_x,
lpm.plate_z,
lpm.release_speed,
lpm.release_spin_rate,
lpm.release_extension,
lpm.release_pos_x,
lpm.release_pos_z,
lpm.pfx_x,
lpm.pfx_z,
lpm.launch_speed,
lpm.launch_angle,
lpm.estimated_woba_using_speedangle,
lpm.spray_angle,
lpm.hc_x,
lpm.hc_y,
lpm.bb_type,
lpm.balls,
lpm.strikes,
lpm.outs_when_up,
lpm.bat_score,
lpm.fld_score,
NULL::DOUBLE PRECISION AS post_bat_score,
NULL::DOUBLE PRECISION AS post_fld_score
FROM live_pitch_mix_2026 lpm
WHERE {where_clause}
"""
)
return pd.read_sql(query, conn, params={"season": int(current_season)})
def _load_prior_hitter_events(conn, seasons: tuple[int, ...], batter_ids: set[int]) -> pd.DataFrame:
if not batter_ids:
return pd.DataFrame()
query = text(
f"""
SELECT
ec.event_key,
ec.player_name AS pitcher_player_name,
ec.batter,
ec.pitcher,
ec.game_date,
ec.game_pk,
ec.source_season,
NULL::TEXT AS pitch_type,
ec.pitch_name,
ec.events,
ec.description,
ec.stand,
ec.p_throws,
ec.home_team,
ec.away_team,
ec.inning,
ec.inning_topbot,
ec.at_bat_number,
ec.pitch_number,
ec.plate_x,
ec.plate_z,
pr.release_speed,
pr.release_spin_rate,
pr.release_extension,
NULL::DOUBLE PRECISION AS release_pos_x,
NULL::DOUBLE PRECISION AS release_pos_z,
pr.pfx_x,
pr.pfx_z,
bb.launch_speed,
bb.launch_angle,
bb.estimated_woba_using_speedangle,
NULL::DOUBLE PRECISION AS spray_angle,
NULL::DOUBLE PRECISION AS hc_x,
NULL::DOUBLE PRECISION AS hc_y,
bb.bb_type,
NULL::DOUBLE PRECISION AS balls,
NULL::DOUBLE PRECISION AS strikes,
NULL::DOUBLE PRECISION AS outs_when_up,
NULL::DOUBLE PRECISION AS bat_score,
NULL::DOUBLE PRECISION AS fld_score,
NULL::DOUBLE PRECISION AS post_bat_score,
NULL::DOUBLE PRECISION AS post_fld_score
FROM statcast_event_core ec
LEFT JOIN statcast_batted_ball bb
ON ec.event_key = bb.event_key
LEFT JOIN statcast_pitch_release pr
ON ec.event_key = pr.event_key
WHERE ec.source_season IN ({", ".join(str(int(season)) for season in seasons)})
AND ec.batter IN ({_format_id_list(batter_ids)})
"""
)
return pd.read_sql(query, conn)
def _load_prior_pitcher_events(conn, seasons: tuple[int, ...], pitcher_ids: set[int]) -> pd.DataFrame:
if not pitcher_ids:
return pd.DataFrame()
query = text(
f"""
SELECT
ec.event_key,
ec.player_name AS pitcher_player_name,
ec.batter,
ec.pitcher,
ec.game_date,
ec.game_pk,
ec.source_season,
NULL::TEXT AS pitch_type,
ec.pitch_name,
ec.events,
ec.description,
ec.stand,
ec.p_throws,
ec.home_team,
ec.away_team,
ec.inning,
ec.inning_topbot,
ec.at_bat_number,
ec.pitch_number,
ec.plate_x,
ec.plate_z,
pr.release_speed,
pr.release_spin_rate,
pr.release_extension,
NULL::DOUBLE PRECISION AS release_pos_x,
NULL::DOUBLE PRECISION AS release_pos_z,
pr.pfx_x,
pr.pfx_z,
bb.launch_speed,
bb.launch_angle,
bb.estimated_woba_using_speedangle,
NULL::DOUBLE PRECISION AS spray_angle,
NULL::DOUBLE PRECISION AS hc_x,
NULL::DOUBLE PRECISION AS hc_y,
bb.bb_type,
NULL::DOUBLE PRECISION AS balls,
NULL::DOUBLE PRECISION AS strikes,
NULL::DOUBLE PRECISION AS outs_when_up,
NULL::DOUBLE PRECISION AS bat_score,
NULL::DOUBLE PRECISION AS fld_score,
NULL::DOUBLE PRECISION AS post_bat_score,
NULL::DOUBLE PRECISION AS post_fld_score
FROM statcast_event_core ec
LEFT JOIN statcast_batted_ball bb
ON ec.event_key = bb.event_key
LEFT JOIN statcast_pitch_release pr
ON ec.event_key = pr.event_key
WHERE ec.source_season IN ({", ".join(str(int(season)) for season in seasons)})
AND ec.pitcher IN ({_format_id_list(pitcher_ids)})
"""
)
return pd.read_sql(query, conn)
def _to_hitter_frame(events_df: pd.DataFrame, batter_id_to_name: dict[int, str]) -> pd.DataFrame:
if events_df.empty:
return pd.DataFrame()
out = events_df.copy()
out["player_name"] = out["batter"].apply(lambda value: batter_id_to_name.get(_safe_int(value) or -1))
out = out.dropna(subset=["player_name"]).copy()
return _prepare_frame(out)
def _to_pitcher_frame(events_df: pd.DataFrame) -> pd.DataFrame:
if events_df.empty:
return pd.DataFrame()
out = events_df.copy()
out["player_name"] = out["pitcher_player_name"].astype(str).str.strip()
out = out[out["player_name"] != ""].copy()
return _prepare_frame(out)
def _allocate_prior_counts(prior_df: pd.DataFrame, target_count: int) -> dict[int, int]:
if target_count <= 0 or prior_df.empty or "source_season" not in prior_df.columns:
return {}
available_by_season = prior_df["source_season"].dropna().astype(int).value_counts().to_dict()
weighted_caps: dict[int, float] = {}
for season, available in available_by_season.items():
weighted_caps[season] = float(available) * float(_PRIOR_SEASON_RECENCY_WEIGHTS.get(season, 0.35))
total_weighted = sum(weighted_caps.values())
if total_weighted <= 0:
return {season: min(available, target_count) for season, available in available_by_season.items()}
allocations: dict[int, int] = {}
remainders: list[tuple[float, int]] = []
assigned = 0
for season, weighted_cap in weighted_caps.items():
raw = (weighted_cap / total_weighted) * float(target_count)
base = int(raw)
capped = min(base, available_by_season.get(season, 0))
allocations[season] = capped
assigned += capped
remainders.append((raw - base, season))
remaining = max(0, target_count - assigned)
for _, season in sorted(remainders, reverse=True):
if remaining <= 0:
break
available = available_by_season.get(season, 0)
if allocations.get(season, 0) >= available:
continue
allocations[season] = allocations.get(season, 0) + 1
remaining -= 1
return allocations
def _sample_prior_rows(prior_df: pd.DataFrame, target_count: int) -> pd.DataFrame:
if prior_df.empty or target_count <= 0:
return pd.DataFrame(columns=prior_df.columns)
if len(prior_df) <= target_count:
return prior_df.copy()
out_frames: list[pd.DataFrame] = []
allocations = _allocate_prior_counts(prior_df, target_count)
for season, keep_count in allocations.items():
if keep_count <= 0:
continue
season_df = prior_df[prior_df["source_season"].astype(int) == int(season)].copy()
if season_df.empty:
continue
season_df = season_df.sort_values("game_date", ascending=False, na_position="last")
out_frames.append(season_df.head(keep_count))
if not out_frames:
return prior_df.sort_values("game_date", ascending=False, na_position="last").head(target_count).copy()
combined = pd.concat(out_frames, ignore_index=True)
if len(combined) < target_count:
existing_keys = set(combined["event_key"].astype(str).tolist()) if "event_key" in combined.columns else set()
remainder = prior_df[
~prior_df["event_key"].astype(str).isin(existing_keys)
].sort_values("game_date", ascending=False, na_position="last")
combined = pd.concat([combined, remainder.head(target_count - len(combined))], ignore_index=True)
return combined.head(target_count).copy()
def _sample_entity_rows(
prior_df: pd.DataFrame,
season_df: pd.DataFrame,
player_name: str,
blend_k: float,
role_label: str,
) -> tuple[pd.DataFrame, dict[str, Any]]:
prior_count = int(len(prior_df))
season_count = int(len(season_df))
rolling_overlay_active = season_count > 0
if prior_count == 0 and season_count == 0:
return (
pd.DataFrame(columns=(season_df.columns if not season_df.empty else prior_df.columns)),
{
"player_name": player_name,
"baseline_mode": "unavailable",
"prior_sample_size": 0,
"season_2026_sample_size": 0,
"prior_weight": 0.0,
"season_2026_weight": 0.0,
"baseline_driver": "unavailable",
"rolling_overlay_active": False,
"baseline_role": role_label,
},
)
if prior_count == 0:
sampled = season_df.sort_values("game_date", ascending=False, na_position="last").head(_MAX_ROWS_PER_PLAYER).copy()
sampled["_baseline_source"] = "season_2026"
metadata = {
"player_name": player_name,
"baseline_mode": "current_only",
"prior_sample_size": 0,
"season_2026_sample_size": season_count,
"prior_weight": 0.0,
"season_2026_weight": 1.0,
"baseline_driver": "current_season_led",
"rolling_overlay_active": rolling_overlay_active,
"baseline_role": role_label,
}
return sampled, metadata
if season_count == 0:
sampled = _sample_prior_rows(prior_df, min(prior_count, _MAX_ROWS_PER_PLAYER))
sampled["_baseline_source"] = "prior"
metadata = {
"player_name": player_name,
"baseline_mode": "prior_only",
"prior_sample_size": prior_count,
"season_2026_sample_size": 0,
"prior_weight": 1.0,
"season_2026_weight": 0.0,
"baseline_driver": "prior_led",
"rolling_overlay_active": False,
"baseline_role": role_label,
}
return sampled, metadata
season_weight = _current_weight(season_count, blend_k)
prior_weight = _clamp(1.0 - season_weight, 0.15, 0.95)
total_weight = prior_weight + season_weight
prior_weight /= total_weight
season_weight /= total_weight
target_total = min(_MAX_ROWS_PER_PLAYER, prior_count + season_count)
target_current = min(
season_count,
max(_MIN_CURRENT_ROWS_WHEN_AVAILABLE, int(round(target_total * season_weight))),
)
target_current = min(target_current, target_total)
target_prior = min(prior_count, max(0, target_total - target_current))
current_rows = season_df.sort_values("game_date", ascending=False, na_position="last").head(target_current).copy()
prior_rows = _sample_prior_rows(prior_df, target_prior)
current_rows["_baseline_source"] = "season_2026"
prior_rows["_baseline_source"] = "prior"
sampled = pd.concat([current_rows, prior_rows], ignore_index=True)
if sampled.empty:
sampled = season_df.sort_values("game_date", ascending=False, na_position="last").head(target_total).copy()
sampled["_baseline_source"] = "season_2026"
metadata = {
"player_name": player_name,
"baseline_mode": "blended",
"prior_sample_size": prior_count,
"season_2026_sample_size": season_count,
"prior_weight": float(prior_weight),
"season_2026_weight": float(season_weight),
"baseline_driver": "current_season_led" if season_weight > prior_weight else "prior_led",
"rolling_overlay_active": rolling_overlay_active,
"baseline_role": role_label,
}
return sampled, metadata
def _blend_entity_frames(
prior_df: pd.DataFrame,
season_df: pd.DataFrame,
blend_k: float,
role_label: str,
) -> tuple[pd.DataFrame, pd.DataFrame]:
player_names = sorted(
{
str(name).strip()
for name in pd.concat(
[
prior_df["player_name"] if "player_name" in prior_df.columns else pd.Series(dtype="object"),
season_df["player_name"] if "player_name" in season_df.columns else pd.Series(dtype="object"),
],
ignore_index=True,
).dropna().tolist()
if str(name).strip()
}
)
sampled_frames: list[pd.DataFrame] = []
metadata_rows: list[dict[str, Any]] = []
for player_name in player_names:
player_prior = prior_df[prior_df["player_name"].astype(str) == player_name].copy() if not prior_df.empty else pd.DataFrame(columns=season_df.columns)
player_season = season_df[season_df["player_name"].astype(str) == player_name].copy() if not season_df.empty else pd.DataFrame(columns=prior_df.columns)
sampled, metadata = _sample_entity_rows(
prior_df=player_prior,
season_df=player_season,
player_name=player_name,
blend_k=blend_k,
role_label=role_label,
)
if not sampled.empty:
for key, value in metadata.items():
sampled[key] = value
sampled_frames.append(sampled)
metadata_rows.append(metadata)
blended_df = pd.concat(sampled_frames, ignore_index=True, sort=False) if sampled_frames else pd.DataFrame()
metadata_df = pd.DataFrame(metadata_rows)
if not blended_df.empty and "game_date" in blended_df.columns:
blended_df = blended_df.sort_values(
["player_name", "game_date", "source_season"],
ascending=[True, False, False],
na_position="last",
).reset_index(drop=True)
return blended_df, metadata_df
def _build_snapshot_rows(
frame: pd.DataFrame,
built_at: str,
snapshot_version: str,
source_status: str,
) -> pd.DataFrame:
rows: list[dict[str, Any]] = []
if frame is None or frame.empty or "player_name" not in frame.columns:
return pd.DataFrame(
columns=[
"player_name",
"source_row_count",
"payload_json",
"snapshot_built_at",
"snapshot_version",
"source_status",
]
)
for player_name, player_df in frame.groupby("player_name", dropna=False):
player_name_str = str(player_name or "").strip()
if not player_name_str:
continue
rows.append(
{
"player_name": player_name_str,
"source_row_count": int(len(player_df)),
"payload_json": _serialize_payload_frame(player_df.reset_index(drop=True)),
"snapshot_built_at": built_at,
"snapshot_version": snapshot_version,
"source_status": source_status,
}
)
return pd.DataFrame(rows)
def _build_event_snapshot_rows(
frame: pd.DataFrame,
built_at: str,
snapshot_version: str,
source_status: str,
) -> pd.DataFrame:
if frame is None:
frame = pd.DataFrame()
out = frame.copy()
if out.empty:
return pd.DataFrame(columns=_EVENT_SNAPSHOT_COLS)
out["snapshot_built_at"] = built_at
out["snapshot_version"] = snapshot_version
out["source_status"] = source_status
for col in _EVENT_SNAPSHOT_COLS:
if col not in out.columns:
out[col] = None
return out[_EVENT_SNAPSHOT_COLS].rename(columns={"_baseline_source": "baseline_source"}).copy()
def _build_meta_snapshot_rows(
meta_df: pd.DataFrame,
built_at: str,
snapshot_version: str,
source_status: str,
) -> pd.DataFrame:
if meta_df is None:
meta_df = pd.DataFrame()
out = meta_df.copy()
for col in [
"player_name",
"baseline_role",
"baseline_mode",
"prior_sample_size",
"season_2026_sample_size",
"prior_weight",
"season_2026_weight",
"baseline_driver",
"rolling_overlay_active",
]:
if col not in out.columns:
out[col] = None
out["snapshot_built_at"] = built_at
out["snapshot_version"] = snapshot_version
out["source_status"] = source_status
return out[
[
"player_name",
"baseline_role",
"baseline_mode",
"prior_sample_size",
"season_2026_sample_size",
"prior_weight",
"season_2026_weight",
"baseline_driver",
"rolling_overlay_active",
"snapshot_built_at",
"snapshot_version",
"source_status",
]
].copy()
def _build_rolling_snapshot_rows(
frame: pd.DataFrame,
role_label: str,
built_at: str,
snapshot_version: str,
source_status: str,
) -> pd.DataFrame:
rows: list[dict[str, Any]] = []
if frame is None or frame.empty or "player_name" not in frame.columns:
return pd.DataFrame(
columns=[
"player_name",
"source_row_count",
"payload_json",
"snapshot_built_at",
"snapshot_version",
"source_status",
]
)
for player_name, player_df in frame.groupby("player_name", dropna=False):
player_name_str = str(player_name or "").strip()
if not player_name_str:
continue
reference_date = None
if "game_date" in player_df.columns:
try:
reference_date = pd.to_datetime(
player_df["game_date"], errors="coerce"
).max()
except Exception:
reference_date = None
if role_label == "batter":
payload = build_batter_rolling_form_row(
statcast_df=frame,
player_name=player_name_str,
reference_date=reference_date,
)
else:
payload = build_pitcher_rolling_form_row(
statcast_df=frame,
pitcher_name=player_name_str,
reference_date=reference_date,
)
rows.append(
{
"player_name": player_name_str,
**payload,
"source_row_count": int(len(player_df)),
"snapshot_built_at": built_at,
"snapshot_version": snapshot_version,
"source_status": source_status,
}
)
out = pd.DataFrame(rows)
expected_cols = _BATTER_ROLLING_COLS if role_label == "batter" else _PITCHER_ROLLING_COLS
for col in expected_cols:
if col not in out.columns:
out[col] = None
return out[expected_cols].copy()
def _read_snapshot_table(
conn,
table_name: str,
player_names: tuple[str, ...] = (),
) -> pd.DataFrame:
if player_names:
clauses = []
params: dict[str, Any] = {}
for idx, player_name in enumerate(player_names):
key = f"name_{idx}"
clauses.append(f":{key}")
params[key] = str(player_name)
query = text(
f"SELECT * FROM {table_name} WHERE player_name IN ({', '.join(clauses)}) ORDER BY player_name"
)
return safe_read_sql(query, conn, params=params)
return safe_read_sql(text(f"SELECT * FROM {table_name} ORDER BY player_name"), conn)
def _replace_snapshot_scope(
conn,
table_name: str,
df: pd.DataFrame,
player_names: tuple[str, ...],
) -> None:
scoped_names = tuple(sorted({str(name or "").strip() for name in player_names if str(name or "").strip()}))
if scoped_names:
clauses = []
params: dict[str, Any] = {}
for idx, player_name in enumerate(scoped_names):
key = f"name_{idx}"
clauses.append(f":{key}")
params[key] = player_name
conn.execute(
text(f"DELETE FROM {table_name} WHERE player_name IN ({', '.join(clauses)})"),
params,
)
if df is not None and not df.empty:
upsert_dataframe(conn, table_name, df, replace=False)
return
replace_table_contents(conn, table_name, df)
def _hydrate_snapshot_frame(snapshot_df: pd.DataFrame) -> pd.DataFrame:
if snapshot_df is None or snapshot_df.empty:
return pd.DataFrame()
frames: list[pd.DataFrame] = []
for _, row in snapshot_df.iterrows():
frame = _deserialize_payload_frame(row.get("payload_json"))
if frame.empty:
continue
frames.append(frame)
if not frames:
return pd.DataFrame()
return pd.concat(frames, ignore_index=True, sort=False)
def _hydrate_rolling_snapshot_frame(snapshot_df: pd.DataFrame) -> pd.DataFrame:
if snapshot_df is None or snapshot_df.empty:
return pd.DataFrame()
return snapshot_df.copy()
def persist_shared_baseline_snapshots(
bundle: dict[str, pd.DataFrame],
source_status: str = "runtime_refreshed",
) -> dict[str, pd.DataFrame]:
built_at = utc_now_iso()
batter_names = _normalize_names_tuple(
tuple(bundle.get("batter_baseline_meta", pd.DataFrame()).get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist())
)
pitcher_names = _normalize_names_tuple(
tuple(bundle.get("pitcher_baseline_meta", pd.DataFrame()).get("player_name", pd.Series(dtype="object")).dropna().astype(str).tolist())
)
hitter_snapshot = _build_snapshot_rows(
bundle.get("blended_batter_df", pd.DataFrame()),
built_at=built_at,
snapshot_version=_SNAPSHOT_VERSION,
source_status=source_status,
)
hitter_event_rows = _build_event_snapshot_rows(
bundle.get("blended_batter_df", pd.DataFrame()),
built_at=built_at,
snapshot_version=_SNAPSHOT_VERSION,
source_status=source_status,
)
pitcher_snapshot = _build_snapshot_rows(
bundle.get("blended_pitcher_df", pd.DataFrame()),
built_at=built_at,
snapshot_version=_SNAPSHOT_VERSION,
source_status=source_status,
)
pitcher_event_rows = _build_event_snapshot_rows(
bundle.get("blended_pitcher_df", pd.DataFrame()),
built_at=built_at,
snapshot_version=_SNAPSHOT_VERSION,
source_status=source_status,
)
hitter_meta = _build_meta_snapshot_rows(
bundle.get("batter_baseline_meta", pd.DataFrame()),
built_at=built_at,
snapshot_version=_SNAPSHOT_VERSION,
source_status=source_status,
)
pitcher_meta = _build_meta_snapshot_rows(
bundle.get("pitcher_baseline_meta", pd.DataFrame()),
built_at=built_at,
snapshot_version=_SNAPSHOT_VERSION,
source_status=source_status,
)
hitter_rolling = _build_rolling_snapshot_rows(
bundle.get("season_2026_ytd_hitter_df", pd.DataFrame()),
role_label="batter",
built_at=built_at,
snapshot_version=_SNAPSHOT_VERSION,
source_status=source_status,
)
pitcher_rolling = _build_rolling_snapshot_rows(
bundle.get("season_2026_ytd_pitcher_df", pd.DataFrame()),
role_label="pitcher",
built_at=built_at,
snapshot_version=_SNAPSHOT_VERSION,
source_status=source_status,
)
conn = get_connection()
try:
_replace_snapshot_scope(conn, "shared_hitter_baseline_event_rows", hitter_event_rows, batter_names)
_replace_snapshot_scope(conn, "shared_pitcher_baseline_event_rows", pitcher_event_rows, pitcher_names)
_replace_snapshot_scope(conn, "shared_hitter_baseline_snapshot", hitter_snapshot, batter_names)
_replace_snapshot_scope(conn, "shared_pitcher_baseline_snapshot", pitcher_snapshot, pitcher_names)
_replace_snapshot_scope(conn, "shared_hitter_baseline_meta", hitter_meta, batter_names)
_replace_snapshot_scope(conn, "shared_pitcher_baseline_meta", pitcher_meta, pitcher_names)
_replace_snapshot_scope(conn, "shared_hitter_rolling_summary", hitter_rolling, batter_names)
_replace_snapshot_scope(conn, "shared_pitcher_rolling_summary", pitcher_rolling, pitcher_names)
finally:
try:
conn.close()
except Exception:
pass
snapshot_status = pd.DataFrame(
[
{
"table_name": "shared_hitter_baseline_event_rows",
"row_count": int(len(hitter_event_rows)),
"snapshot_built_at": built_at,
"snapshot_version": _SNAPSHOT_VERSION,
"source_status": source_status,
"stale": False,
},
{
"table_name": "shared_pitcher_baseline_event_rows",
"row_count": int(len(pitcher_event_rows)),
"snapshot_built_at": built_at,
"snapshot_version": _SNAPSHOT_VERSION,
"source_status": source_status,
"stale": False,
},
{
"table_name": "shared_hitter_baseline_snapshot",
"row_count": int(len(hitter_snapshot)),
"snapshot_built_at": built_at,
"snapshot_version": _SNAPSHOT_VERSION,
"source_status": source_status,
"stale": False,
},
{
"table_name": "shared_pitcher_baseline_snapshot",
"row_count": int(len(pitcher_snapshot)),
"snapshot_built_at": built_at,
"snapshot_version": _SNAPSHOT_VERSION,
"source_status": source_status,
"stale": False,
},
{
"table_name": "shared_hitter_baseline_meta",
"row_count": int(len(hitter_meta)),
"snapshot_built_at": built_at,
"snapshot_version": _SNAPSHOT_VERSION,
"source_status": source_status,
"stale": False,
},
{
"table_name": "shared_pitcher_baseline_meta",
"row_count": int(len(pitcher_meta)),
"snapshot_built_at": built_at,
"snapshot_version": _SNAPSHOT_VERSION,
"source_status": source_status,
"stale": False,
},
{
"table_name": "shared_hitter_rolling_summary",
"row_count": int(len(hitter_rolling)),
"snapshot_built_at": built_at,
"snapshot_version": _SNAPSHOT_VERSION,
"source_status": source_status,
"stale": False,
},
{
"table_name": "shared_pitcher_rolling_summary",
"row_count": int(len(pitcher_rolling)),
"snapshot_built_at": built_at,
"snapshot_version": _SNAPSHOT_VERSION,
"source_status": source_status,
"stale": False,
},
]
)
bundle["snapshot_status"] = snapshot_status
bundle["snapshot_source_status"] = source_status
bundle["runtime_fallback_used"] = False
return bundle
def load_shared_baseline_bundle_from_snapshots(
batter_names: tuple[str, ...] = (),
pitcher_names: tuple[str, ...] = (),
max_age_seconds: int = _DEFAULT_SNAPSHOT_MAX_AGE_SECONDS,
) -> dict[str, pd.DataFrame]:
batter_names = _resolve_snapshot_player_names(batter_names, role="batter")
pitcher_names = _resolve_snapshot_player_names(pitcher_names, role="pitcher")
_degraded_bundle: dict[str, Any] = {
"multi_year_prior_hitter_df": pd.DataFrame(),
"season_2026_ytd_hitter_df": pd.DataFrame(),
"multi_year_prior_pitcher_df": pd.DataFrame(),
"season_2026_ytd_pitcher_df": pd.DataFrame(),
"blended_batter_df": pd.DataFrame(),
"blended_pitcher_df": pd.DataFrame(),
"batter_baseline_meta": pd.DataFrame(),
"pitcher_baseline_meta": pd.DataFrame(),
"hitter_rolling_snapshot": pd.DataFrame(),
"pitcher_rolling_snapshot": pd.DataFrame(),
"snapshot_status": pd.DataFrame(),
"snapshot_source_status": "snapshot_unavailable",
"runtime_fallback_used": False,
}
_read_result: list[tuple | None] = [None]
_read_exc: list[Exception | None] = [None]
def _do_reads() -> None:
conn = get_connection()
try:
_read_result[0] = (
_read_snapshot_table(conn, "shared_hitter_baseline_event_rows", player_names=batter_names),
_read_snapshot_table(conn, "shared_pitcher_baseline_event_rows", player_names=pitcher_names),
_read_snapshot_table(conn, "shared_hitter_baseline_snapshot", player_names=batter_names),
_read_snapshot_table(conn, "shared_pitcher_baseline_snapshot", player_names=pitcher_names),
_read_snapshot_table(conn, "shared_hitter_baseline_meta", player_names=batter_names),
_read_snapshot_table(conn, "shared_pitcher_baseline_meta", player_names=pitcher_names),
_read_snapshot_table(conn, "shared_hitter_rolling_summary", player_names=batter_names),
_read_snapshot_table(conn, "shared_pitcher_rolling_summary", player_names=pitcher_names),
)
except Exception as exc:
_read_exc[0] = exc
finally:
try:
conn.close()
except Exception:
pass
_rt = threading.Thread(target=_do_reads, daemon=True)
_rt.start()
_rt.join(timeout=_SNAPSHOT_READ_TIMEOUT_SECONDS)
if _rt.is_alive():
_log.warning(
"[shared_baseline] snapshot DB reads timed out after %ds — returning degraded bundle",
_SNAPSHOT_READ_TIMEOUT_SECONDS,
)
return _degraded_bundle
if _read_exc[0] is not None or _read_result[0] is None:
return _degraded_bundle
(
hitter_event_rows,
pitcher_event_rows,
hitter_snapshot,
pitcher_snapshot,
hitter_meta,
pitcher_meta,
hitter_rolling,
pitcher_rolling,
) = _read_result[0]
snapshot_status_rows: list[dict[str, Any]] = []
for table_name, frame in [
("shared_hitter_baseline_snapshot", hitter_snapshot),
("shared_pitcher_baseline_snapshot", pitcher_snapshot),
("shared_hitter_baseline_meta", hitter_meta),
("shared_pitcher_baseline_meta", pitcher_meta),
("shared_hitter_baseline_event_rows", hitter_event_rows),
("shared_pitcher_baseline_event_rows", pitcher_event_rows),
("shared_hitter_rolling_summary", hitter_rolling),
("shared_pitcher_rolling_summary", pitcher_rolling),
]:
built_at = None
version = None
source_status = None
if isinstance(frame, pd.DataFrame) and not frame.empty:
built_at = frame.get("snapshot_built_at", pd.Series(dtype="object")).iloc[0]
version = frame.get("snapshot_version", pd.Series(dtype="object")).iloc[0]
source_status = frame.get("source_status", pd.Series(dtype="object")).iloc[0]
snapshot_status_rows.append(
{
"table_name": table_name,
"row_count": 0 if frame is None else int(len(frame)),
"snapshot_built_at": built_at,
"snapshot_version": version,
"source_status": source_status,
"stale": _is_snapshot_stale(built_at, max_age_seconds),
}
)
return {
"multi_year_prior_hitter_df": pd.DataFrame(),
"season_2026_ytd_hitter_df": pd.DataFrame(),
"multi_year_prior_pitcher_df": pd.DataFrame(),
"season_2026_ytd_pitcher_df": pd.DataFrame(),
"blended_batter_df": _prepare_frame(hitter_event_rows.drop(columns=["snapshot_built_at", "snapshot_version", "source_status"], errors="ignore"))
if isinstance(hitter_event_rows, pd.DataFrame) and not hitter_event_rows.empty
else _hydrate_snapshot_frame(hitter_snapshot),
"blended_pitcher_df": _prepare_frame(pitcher_event_rows.drop(columns=["snapshot_built_at", "snapshot_version", "source_status"], errors="ignore"))
if isinstance(pitcher_event_rows, pd.DataFrame) and not pitcher_event_rows.empty
else _hydrate_snapshot_frame(pitcher_snapshot),
"batter_baseline_meta": hitter_meta,
"pitcher_baseline_meta": pitcher_meta,
"hitter_rolling_snapshot": _hydrate_rolling_snapshot_frame(hitter_rolling),
"pitcher_rolling_snapshot": _hydrate_rolling_snapshot_frame(pitcher_rolling),
"snapshot_status": pd.DataFrame(snapshot_status_rows),
"snapshot_source_status": "snapshot",
"runtime_fallback_used": False,
}
def queue_shared_baseline_refresh(
batter_names: tuple[str, ...] = (),
pitcher_names: tuple[str, ...] = (),
) -> bool:
key = (_normalize_names_tuple(batter_names), _normalize_names_tuple(pitcher_names))
with _REFRESH_LOCK:
if key in _REFRESH_INFLIGHT:
return False
_REFRESH_INFLIGHT.add(key)
def _run() -> None:
try:
refreshed = build_shared_baseline_bundle(
batter_names=key[0],
pitcher_names=key[1],
)
persist_shared_baseline_snapshots(
refreshed,
source_status="background_refreshed",
)
except Exception:
pass
finally:
with _REFRESH_LOCK:
_REFRESH_INFLIGHT.discard(key)
threading.Thread(target=_run, daemon=True).start()
return True
def load_or_build_shared_baseline_bundle(
batter_names: tuple[str, ...] = (),
pitcher_names: tuple[str, ...] = (),
max_age_seconds: int = _DEFAULT_SNAPSHOT_MAX_AGE_SECONDS,
persist_runtime_refresh: bool = True,
) -> dict[str, pd.DataFrame]:
batter_names = _normalize_names_tuple(batter_names)
pitcher_names = _normalize_names_tuple(pitcher_names)
snapshot_batter_names = _resolve_snapshot_player_names(batter_names, role="batter")
snapshot_pitcher_names = _resolve_snapshot_player_names(pitcher_names, role="pitcher")
snapshot_bundle = load_shared_baseline_bundle_from_snapshots(
batter_names=snapshot_batter_names,
pitcher_names=snapshot_pitcher_names,
max_age_seconds=max_age_seconds,
)
snapshot_status = snapshot_bundle.get("snapshot_status", pd.DataFrame())
available_hitters = _normalized_name_set(
snapshot_bundle.get("batter_baseline_meta", pd.DataFrame())
.get("player_name", pd.Series(dtype="object"))
.dropna()
.astype(str)
.tolist()
if isinstance(snapshot_bundle.get("batter_baseline_meta", pd.DataFrame()), pd.DataFrame)
else []
)
available_pitchers = _normalized_name_set(
snapshot_bundle.get("pitcher_baseline_meta", pd.DataFrame())
.get("player_name", pd.Series(dtype="object"))
.dropna()
.astype(str)
.tolist()
if isinstance(snapshot_bundle.get("pitcher_baseline_meta", pd.DataFrame()), pd.DataFrame)
else []
)
missing_hitter_names = _compute_missing_requested_names(snapshot_batter_names, available_hitters)
missing_pitcher_names = _compute_missing_requested_names(snapshot_pitcher_names, available_pitchers)
requested_hitter_covered = True
if snapshot_batter_names:
requested_hitter_covered = not missing_hitter_names
requested_pitcher_covered = True
if snapshot_pitcher_names:
requested_pitcher_covered = not missing_pitcher_names
snapshot_has_data = not snapshot_bundle.get("blended_batter_df", pd.DataFrame()).empty or not snapshot_bundle.get("blended_pitcher_df", pd.DataFrame()).empty
snapshot_stale = bool(
isinstance(snapshot_status, pd.DataFrame)
and not snapshot_status.empty
and snapshot_status["stale"].fillna(False).any()
)
coverage_mode = "empty"
if snapshot_has_data and requested_hitter_covered and requested_pitcher_covered:
coverage_mode = "full"
elif snapshot_has_data:
coverage_mode = "partial"
background_refresh_queued = False
if snapshot_has_data and requested_hitter_covered and requested_pitcher_covered and not snapshot_stale:
return _annotate_request_coverage(
snapshot_bundle,
requested_hitter_names=snapshot_batter_names,
requested_pitcher_names=snapshot_pitcher_names,
coverage_mode=coverage_mode,
background_refresh_queued=background_refresh_queued,
)
if snapshot_has_data and (
(requested_hitter_covered and requested_pitcher_covered and snapshot_stale)
or missing_hitter_names
or missing_pitcher_names
):
background_refresh_queued = queue_shared_baseline_refresh(
batter_names=snapshot_batter_names,
pitcher_names=snapshot_pitcher_names,
)
snapshot_bundle["snapshot_source_status"] = "snapshot_partial_served" if coverage_mode == "partial" else "snapshot_stale_served"
snapshot_bundle["runtime_fallback_used"] = False
return _annotate_request_coverage(
snapshot_bundle,
requested_hitter_names=snapshot_batter_names,
requested_pitcher_names=snapshot_pitcher_names,
coverage_mode=coverage_mode,
background_refresh_queued=background_refresh_queued,
)
runtime_bundle = build_shared_baseline_bundle(
batter_names=snapshot_batter_names,
pitcher_names=snapshot_pitcher_names,
)
runtime_bundle["snapshot_source_status"] = "runtime_fallback"
runtime_bundle["runtime_fallback_used"] = True
if persist_runtime_refresh:
runtime_bundle = persist_shared_baseline_snapshots(
runtime_bundle,
source_status="runtime_refreshed",
)
runtime_bundle["runtime_fallback_used"] = True
if "snapshot_status" not in runtime_bundle:
runtime_bundle["snapshot_status"] = snapshot_status
return _annotate_request_coverage(
runtime_bundle,
requested_hitter_names=snapshot_batter_names,
requested_pitcher_names=snapshot_pitcher_names,
coverage_mode="runtime_fallback",
background_refresh_queued=background_refresh_queued,
)
def load_or_build_shared_baseline_bundle_complete_for_request(
batter_names: tuple[str, ...] = (),
pitcher_names: tuple[str, ...] = (),
max_age_seconds: int = _DEFAULT_SNAPSHOT_MAX_AGE_SECONDS,
persist_runtime_refresh: bool = True,
) -> dict[str, pd.DataFrame]:
batter_names = _normalize_names_tuple(batter_names)
pitcher_names = _normalize_names_tuple(pitcher_names)
snapshot_batter_names = _resolve_snapshot_player_names(batter_names, role="batter")
snapshot_pitcher_names = _resolve_snapshot_player_names(pitcher_names, role="pitcher")
snapshot_bundle = load_shared_baseline_bundle_from_snapshots(
batter_names=snapshot_batter_names,
pitcher_names=snapshot_pitcher_names,
max_age_seconds=max_age_seconds,
)
snapshot_status = snapshot_bundle.get("snapshot_status", pd.DataFrame())
snapshot_has_data = not snapshot_bundle.get("blended_batter_df", pd.DataFrame()).empty or not snapshot_bundle.get("blended_pitcher_df", pd.DataFrame()).empty
snapshot_stale = bool(
isinstance(snapshot_status, pd.DataFrame)
and not snapshot_status.empty
and snapshot_status["stale"].fillna(False).any()
)
available_hitters = _normalized_name_set(
snapshot_bundle.get("batter_baseline_meta", pd.DataFrame())
.get("player_name", pd.Series(dtype="object"))
.dropna()
.astype(str)
.tolist()
if isinstance(snapshot_bundle.get("batter_baseline_meta", pd.DataFrame()), pd.DataFrame)
else []
)
available_pitchers = _normalized_name_set(
snapshot_bundle.get("pitcher_baseline_meta", pd.DataFrame())
.get("player_name", pd.Series(dtype="object"))
.dropna()
.astype(str)
.tolist()
if isinstance(snapshot_bundle.get("pitcher_baseline_meta", pd.DataFrame()), pd.DataFrame)
else []
)
missing_hitter_names = _compute_missing_requested_names(snapshot_batter_names, available_hitters)
missing_pitcher_names = _compute_missing_requested_names(snapshot_pitcher_names, available_pitchers)
if snapshot_has_data and not missing_hitter_names and not missing_pitcher_names:
coverage_mode = "full" if not snapshot_stale else "stale_full"
background_refresh_queued = False
if snapshot_stale:
background_refresh_queued = queue_shared_baseline_refresh(
batter_names=snapshot_batter_names,
pitcher_names=snapshot_pitcher_names,
)
snapshot_bundle["snapshot_source_status"] = "snapshot_stale_served"
return _annotate_request_coverage(
snapshot_bundle,
requested_hitter_names=snapshot_batter_names,
requested_pitcher_names=snapshot_pitcher_names,
coverage_mode=coverage_mode,
background_refresh_queued=background_refresh_queued,
)
if snapshot_has_data and (missing_hitter_names or missing_pitcher_names):
patch_result: list[dict | None] = [None]
def _run_patch() -> None:
try:
patch_result[0] = build_shared_baseline_bundle(
batter_names=tuple(sorted(missing_hitter_names)),
pitcher_names=tuple(sorted(missing_pitcher_names)),
)
except Exception as exc:
_log.warning("[shared_baseline] patch build error: %s", exc)
_pt = threading.Thread(target=_run_patch, daemon=True)
_pt.start()
_pt.join(timeout=_FALLBACK_BUILD_TIMEOUT_SECONDS)
if patch_result[0] is not None:
patch_bundle = patch_result[0]
if persist_runtime_refresh:
_queue_shared_baseline_bundle_persist(patch_bundle, source_status="runtime_request_patch")
merged_bundle = _merge_shared_baseline_bundles(snapshot_bundle, patch_bundle)
if snapshot_stale:
merged_bundle["background_refresh_queued"] = queue_shared_baseline_refresh(
batter_names=snapshot_batter_names,
pitcher_names=snapshot_pitcher_names,
)
return _annotate_request_coverage(
merged_bundle,
requested_hitter_names=snapshot_batter_names,
requested_pitcher_names=snapshot_pitcher_names,
coverage_mode="request_completed_patch",
background_refresh_queued=bool(merged_bundle.get("background_refresh_queued")),
)
else:
_log.warning(
"[shared_baseline] patch build timed out after %ds — serving partial snapshot",
_FALLBACK_BUILD_TIMEOUT_SECONDS,
)
if persist_runtime_refresh:
def _persist_patch_when_done() -> None:
_pt.join()
if patch_result[0] is not None:
_queue_shared_baseline_bundle_persist(patch_result[0], source_status="runtime_request_patch")
threading.Thread(target=_persist_patch_when_done, daemon=True).start()
snapshot_bundle["snapshot_source_status"] = "patch_build_timeout"
return _annotate_request_coverage(
snapshot_bundle,
requested_hitter_names=snapshot_batter_names,
requested_pitcher_names=snapshot_pitcher_names,
coverage_mode="request_completed_patch_partial",
background_refresh_queued=False,
)
runtime_result: list[dict | None] = [None]
def _run_fallback() -> None:
try:
runtime_result[0] = build_shared_baseline_bundle(
batter_names=snapshot_batter_names,
pitcher_names=snapshot_pitcher_names,
)
except Exception as exc:
_log.warning("[shared_baseline] runtime fallback build error: %s", exc)
_rt = threading.Thread(target=_run_fallback, daemon=True)
_rt.start()
_rt.join(timeout=_FALLBACK_BUILD_TIMEOUT_SECONDS)
if runtime_result[0] is not None:
runtime_bundle = runtime_result[0]
runtime_bundle["snapshot_source_status"] = "runtime_fallback"
runtime_bundle["runtime_fallback_used"] = True
runtime_bundle["request_patch_used"] = False
if persist_runtime_refresh:
_queue_shared_baseline_bundle_persist(runtime_bundle, source_status="runtime_refreshed")
if "snapshot_status" not in runtime_bundle:
runtime_bundle["snapshot_status"] = snapshot_status
return _annotate_request_coverage(
runtime_bundle,
requested_hitter_names=snapshot_batter_names,
requested_pitcher_names=snapshot_pitcher_names,
coverage_mode="runtime_fallback",
background_refresh_queued=False,
)
else:
_log.warning(
"[shared_baseline] full fallback build timed out after %ds — returning empty bundle, "
"will persist when complete",
_FALLBACK_BUILD_TIMEOUT_SECONDS,
)
if persist_runtime_refresh:
def _persist_runtime_when_done() -> None:
_rt.join()
if runtime_result[0] is not None:
_queue_shared_baseline_bundle_persist(runtime_result[0], source_status="runtime_refreshed")
threading.Thread(target=_persist_runtime_when_done, daemon=True).start()
degraded: dict[str, Any] = {
"blended_batter_df": pd.DataFrame(),
"blended_pitcher_df": pd.DataFrame(),
"batter_baseline_meta": pd.DataFrame(),
"pitcher_baseline_meta": pd.DataFrame(),
"snapshot_status": snapshot_status,
"snapshot_source_status": "runtime_fallback_timeout",
"runtime_fallback_used": True,
"request_patch_used": False,
}
return _annotate_request_coverage(
degraded,
requested_hitter_names=snapshot_batter_names,
requested_pitcher_names=snapshot_pitcher_names,
coverage_mode="runtime_fallback_timeout",
background_refresh_queued=False,
)
def build_shared_baseline_bundle(
batter_names: tuple[str, ...] | None = None,
pitcher_names: tuple[str, ...] | None = None,
prior_seasons: tuple[int, ...] = PRIOR_SEASONS,
current_season: int = CURRENT_SEASON,
) -> dict[str, pd.DataFrame]:
identity_maps = _load_identity_maps()
batter_id_to_name = identity_maps["batter_id_to_name"]
pitcher_id_to_name = identity_maps["pitcher_id_to_name"]
batter_name_to_ids = identity_maps["batter_name_to_ids"]
pitcher_name_to_ids = identity_maps["pitcher_name_to_ids"]
conn = get_connection()
try:
requested_batter_ids = _build_requested_ids(batter_names, batter_name_to_ids)
requested_pitcher_ids = _build_requested_ids(pitcher_names, pitcher_name_to_ids)
requested_batter_scope = bool(batter_names)
requested_pitcher_scope = bool(pitcher_names)
current_events = _load_current_events(
conn,
current_season=current_season,
batter_ids=requested_batter_ids if requested_batter_scope else None,
pitcher_ids=requested_pitcher_ids if requested_pitcher_scope else None,
)
current_batter_ids = {
value
for value in current_events.get("batter", pd.Series(dtype="float")).dropna().apply(_safe_int).tolist()
if value is not None
}
current_pitcher_ids = {
value
for value in current_events.get("pitcher", pd.Series(dtype="float")).dropna().apply(_safe_int).tolist()
if value is not None
}
if requested_batter_scope:
active_batter_ids = requested_batter_ids
else:
active_batter_ids = current_batter_ids
if requested_pitcher_scope:
active_pitcher_ids = requested_pitcher_ids
else:
active_pitcher_ids = current_pitcher_ids
prior_hitter_events = _load_prior_hitter_events(
conn,
seasons=prior_seasons,
batter_ids=active_batter_ids,
)
prior_pitcher_events = _load_prior_pitcher_events(
conn,
seasons=prior_seasons,
pitcher_ids=active_pitcher_ids,
)
finally:
try:
conn.close()
except Exception:
pass
current_hitter_events = current_events.copy()
if active_batter_ids:
current_hitter_events = current_hitter_events[
current_hitter_events["batter"].apply(_safe_int).isin(active_batter_ids)
].copy()
current_pitcher_events = current_events.copy()
if active_pitcher_ids:
current_pitcher_events = current_pitcher_events[
current_pitcher_events["pitcher"].apply(_safe_int).isin(active_pitcher_ids)
].copy()
multi_year_prior_hitter_df = _to_hitter_frame(prior_hitter_events, batter_id_to_name)
season_2026_ytd_hitter_df = _to_hitter_frame(current_hitter_events, batter_id_to_name)
multi_year_prior_pitcher_df = _to_pitcher_frame(prior_pitcher_events)
season_2026_ytd_pitcher_df = _to_pitcher_frame(current_pitcher_events)
blended_batter_df, batter_baseline_meta = _blend_entity_frames(
prior_df=multi_year_prior_hitter_df,
season_df=season_2026_ytd_hitter_df,
blend_k=_HITTER_BLEND_K,
role_label="batter",
)
blended_pitcher_df, pitcher_baseline_meta = _blend_entity_frames(
prior_df=multi_year_prior_pitcher_df,
season_df=season_2026_ytd_pitcher_df,
blend_k=_PITCHER_BLEND_K,
role_label="pitcher",
)
return {
"multi_year_prior_hitter_df": multi_year_prior_hitter_df,
"season_2026_ytd_hitter_df": season_2026_ytd_hitter_df,
"multi_year_prior_pitcher_df": multi_year_prior_pitcher_df,
"season_2026_ytd_pitcher_df": season_2026_ytd_pitcher_df,
"blended_batter_df": blended_batter_df,
"blended_pitcher_df": blended_pitcher_df,
"batter_baseline_meta": batter_baseline_meta,
"pitcher_baseline_meta": pitcher_baseline_meta,
}