2026_MLB_Model / engine /live_game_engine.py
Syntrex's picture
Accuracy overhaul: pitcher resolution logging, baseline recalibration, vig fix, XGBoost blend
21151ce
raw
history blame
16.4 kB
from __future__ import annotations
from typing import Any
from data.odds_name_map import normalize_pitcher_name
from database.db import log_pitcher_resolution
from models.win_probability import estimate_win_probability
from data.live_statcast_feed import fetch_live_statcast_feed
def _safe_int(value: Any) -> int:
try:
if value is None:
return 0
text = str(value).strip().lower()
if text in {"", "nan", "none"}:
return 0
return int(float(value))
except Exception:
return 0
def _safe_float(value: Any) -> float | None:
try:
if value is None:
return None
text = str(value).strip().lower()
if text in {"", "nan", "none"}:
return None
return float(value)
except Exception:
return None
def _first_non_null(*values):
for value in values:
if value is None:
continue
text = str(value).strip().lower()
if text in {"", "nan", "none"}:
continue
return value
return None
def _extract_latest_savant_pitch_metrics(savant_feed: dict[str, Any]) -> dict[str, Any]:
"""
Defensive parser for unofficial Baseball Savant live feed payloads.
Searches recursively for pitch-like dicts containing any of:
- velocity
- spin
- extension
- movement
- pitch type / call / description
"""
if not savant_feed:
return {}
candidates: list[dict[str, Any]] = []
def walk(obj: Any) -> None:
if isinstance(obj, dict):
lowered = {str(k).lower(): v for k, v in obj.items()}
keys = set(lowered.keys())
interesting = any(
key in keys
for key in [
"startspeed",
"endspeed",
"pitchvelocity",
"velocity",
"spinrate",
"spin_rate",
"extension",
"pfxx",
"pfxz",
"breakangle",
"breaklength",
"pitchtype",
"pitch_type",
"pitchname",
"pitch_name",
"description",
"pitchresult",
"pitch_result",
]
)
if interesting:
candidates.append(obj)
for value in obj.values():
walk(value)
elif isinstance(obj, list):
for item in obj:
walk(item)
walk(savant_feed)
if not candidates:
return {}
pitch = candidates[-1]
lowered = {str(k).lower(): v for k, v in pitch.items()}
def pick(*names: str) -> Any:
for name in names:
if name.lower() in lowered:
return lowered[name.lower()]
return None
pitch_velocity = _safe_float(
pick("startSpeed", "pitchVelocity", "velocity", "release_speed", "velo", "endSpeed")
)
pitch_spin_rate = _safe_float(pick("spinRate", "spin_rate"))
pitch_extension = _safe_float(pick("extension"))
pitch_pfx_x = _safe_float(pick("pfxX", "pfx_x", "horzBreak", "horizontalBreak"))
pitch_pfx_z = _safe_float(pick("pfxZ", "pfz_z", "vertBreak", "verticalBreak"))
pitch_break_angle = _safe_float(pick("breakAngle"))
pitch_break_length = _safe_float(pick("breakLength"))
pitch_type = str(
pick("pitchType", "pitch_type", "pitchName", "pitch_name", "type", "pitch")
or ""
).strip()
last_pitch = str(
pick("description", "pitchResult", "pitch_result", "call", "result", "outcome")
or ""
).strip()
return {
"last_pitch": last_pitch,
"pitch_type": pitch_type,
"pitch_velocity": round(pitch_velocity, 1) if pitch_velocity is not None and pitch_velocity > 40 else None,
"pitch_spin_rate": round(pitch_spin_rate, 0) if pitch_spin_rate is not None else None,
"pitch_extension": round(pitch_extension, 1) if pitch_extension is not None else None,
"pitch_pfx_x": round(pitch_pfx_x, 2) if pitch_pfx_x is not None else None,
"pitch_pfx_z": round(pitch_pfx_z, 2) if pitch_pfx_z is not None else None,
"pitch_break_angle": round(pitch_break_angle, 1) if pitch_break_angle is not None else None,
"pitch_break_length": round(pitch_break_length, 1) if pitch_break_length is not None else None,
"savant_pitch_debug": str(pitch)[:2000],
}
def format_status(inning_half: str, current_inning: int | None, fallback: str = "") -> str:
if inning_half and current_inning:
half_map = {
"Top": "TOP",
"Bottom": "BOT",
"Middle": "MID",
"End": "END",
}
prefix = half_map.get(str(inning_half).strip(), str(inning_half).upper())
return f"{prefix} {current_inning}"
return fallback
def _extract_person_name(obj: Any) -> str:
if not isinstance(obj, dict):
return ""
return str(obj.get("fullName", "") or obj.get("name", "") or "").strip()
def _player_name_from_id(feed: dict[str, Any], player_id: Any) -> str:
if player_id is None or player_id == "":
return ""
players = (feed.get("gameData", {}) or {}).get("players", {}) or {}
key = f"ID{player_id}"
player = players.get(key, {}) or {}
return str(player.get("fullName", "") or "").strip()
def _extract_upcoming_hitters(
feed: dict[str, Any],
inning_half: str,
current_batter_id: Any,
) -> tuple[str, str, str]:
live_data = feed.get("liveData", {}) or {}
boxscore = live_data.get("boxscore", {}) or {}
teams = boxscore.get("teams", {}) or {}
batting_side = "home" if str(inning_half).strip().lower() == "bottom" else "away"
team_box = teams.get(batting_side, {}) or {}
batter_ids = team_box.get("batters", []) or []
batter_ids = [str(x) for x in batter_ids if x is not None and str(x).strip() != ""]
current_id = str(current_batter_id) if current_batter_id is not None else ""
if not batter_ids:
return "", "", ""
if current_id in batter_ids:
idx = batter_ids.index(current_id)
rotated = batter_ids[idx + 1 :] + batter_ids[:idx]
else:
rotated = batter_ids[:]
on_deck = _player_name_from_id(feed, rotated[0]) if len(rotated) >= 1 else ""
in_hole = _player_name_from_id(feed, rotated[1]) if len(rotated) >= 2 else ""
three_away = _player_name_from_id(feed, rotated[2]) if len(rotated) >= 3 else ""
return on_deck, in_hole, three_away
def derive_feed_status(
feed: dict[str, Any],
inning_half: str,
current_inning: int | None,
fallback: str = "",
) -> str:
game_data = feed.get("gameData", {}) or {}
status_info = game_data.get("status", {}) or {}
abstract_state = str(status_info.get("abstractGameState", "") or "").strip().lower()
detailed_state = str(status_info.get("detailedState", "") or "").strip()
if abstract_state == "final":
return "Final"
if detailed_state.lower() in {"final", "game over", "completed", "ended"}:
return "Final"
if abstract_state == "live":
formatted = format_status(inning_half, current_inning, "")
return formatted if formatted else (detailed_state or "Live")
if abstract_state == "preview":
return fallback if fallback else "Scheduled"
formatted = format_status(inning_half, current_inning, "")
if formatted:
return formatted
if detailed_state:
return detailed_state
return fallback
def enrich_game_from_live_feed(game: dict[str, Any], feed: dict[str, Any], conn: Any = None) -> dict[str, Any]:
out = dict(game)
if not feed:
return out
live_data = feed.get("liveData", {}) or {}
savant_feed = {}
try:
game_pk = str(out.get("game_pk", "") or "").strip()
if game_pk:
savant_feed = fetch_live_statcast_feed(game_pk)
except Exception:
savant_feed = {}
linescore = live_data.get("linescore", {}) or {}
plays = live_data.get("plays", {}) or {}
current_play = plays.get("currentPlay", {}) or {}
count = current_play.get("count", {}) or {}
#offense = linescore.get("offense", {}) or {}
#defense = linescore.get("defense", {}) or {}
matchup = current_play.get("matchup", {}) or {}
batter = matchup.get("batter", {}) or {}
pitcher = matchup.get("pitcher", {}) or {}
# Primary source: current play matchup
batter_name = _extract_person_name(batter)
pitcher_name = _extract_person_name(pitcher)
batter_id = batter.get("id")
pitcher_id = pitcher.get("id")
# Fallback 1: linescore offense / defense objects
offense = linescore.get("offense", {}) or {}
defense = linescore.get("defense", {}) or {}
if not batter_name:
batter_name = _extract_person_name(offense.get("batter", {}))
if not pitcher_name:
pitcher_name = _extract_person_name(defense.get("pitcher", {}))
if not batter_id:
batter_id = (offense.get("batter", {}) or {}).get("id")
if not pitcher_id:
pitcher_id = (defense.get("pitcher", {}) or {}).get("id")
# Fallback 2: global players lookup by id
if not batter_name and batter_id:
batter_name = _player_name_from_id(feed, batter_id)
if not pitcher_name and pitcher_id:
pitcher_name = _player_name_from_id(feed, pitcher_id)
out["batter_name"] = batter_name
out["pitcher_name"] = pitcher_name
# Log the raw pitcher name from the live feed — statcast resolution happens downstream
if conn is not None and pitcher_name:
_feed_game_pk = str(out.get("game_pk", "") or "").strip()
_feed_date = str((feed.get("gameData", {}) or {}).get("datetime", {}).get("officialDate", "") or "").strip()
try:
log_pitcher_resolution(conn, {
"game_pk": _feed_game_pk,
"game_date": _feed_date,
"source": "mlb_live",
"input_name": pitcher_name,
"normalized_name": normalize_pitcher_name(pitcher_name),
"matched_canonical": None,
"pitcher_id": pitcher_id,
"match_method": "live_feed",
"sample_size": 0,
"p_throws": None,
})
except Exception:
pass
out["balls"] = count.get("balls")
out["strikes"] = count.get("strikes")
out["outs"] = count.get("outs", linescore.get("outs"))
out["runner_on_1b"] = offense.get("first") is not None
out["runner_on_2b"] = offense.get("second") is not None
out["runner_on_3b"] = offense.get("third") is not None
result = current_play.get("result", {}) or {}
out["last_play"] = str(result.get("description", "") or "").strip()
play_events = current_play.get("playEvents", []) or []
pitch_event = None
for event in reversed(play_events):
pitch_data = event.get("pitchData") or {}
if pitch_data:
pitch_event = event
break
if pitch_event:
pitch_data = pitch_event.get("pitchData", {}) or {}
pitch_breaks = pitch_data.get("breaks", {}) or {}
coords = pitch_data.get("coordinates", {}) or {}
details = pitch_event.get("details", {}) or {}
out["last_pitch"] = str(details.get("description", "") or "").strip()
out["pitch_type"] = str(((details.get("type", {}) or {}).get("description", "")) or "").strip()
out["pitch_velocity"] = pitch_data.get("startSpeed")
out["pitch_spin_rate"] = pitch_breaks.get("spinRate")
out["pitch_extension"] = pitch_data.get("extension")
out["pitch_pfx_x"] = coords.get("pfxX")
out["pitch_pfx_z"] = coords.get("pfxZ")
def _safe_float(value: Any) -> float | None:
try:
if value is None:
return None
text = str(value).strip().lower()
if text in {"", "nan", "none"}:
return None
return float(value)
except Exception:
return None
def _extract_latest_savant_pitch_metrics(savant_feed: dict[str, Any]) -> dict[str, Any]:
"""
Defensive parser for unofficial Baseball Savant live feed payloads.
Searches recursively for pitch-like dicts containing any of:
- velocity
- spin
- extension
- movement
- pitch type / call / description
"""
if not savant_feed:
return {}
candidates: list[dict[str, Any]] = []
def walk(obj: Any) -> None:
if isinstance(obj, dict):
lowered = {str(k).lower(): v for k, v in obj.items()}
keys = set(lowered.keys())
interesting = any(
key in keys
for key in [
"startspeed",
"endspeed",
"pitchvelocity",
"velocity",
"spinrate",
"spin_rate",
"extension",
"pfxx",
"pfxz",
"breakangle",
"breaklength",
"pitchtype",
"pitch_type",
"pitchname",
"pitch_name",
"description",
"pitchresult",
"pitch_result",
]
)
if interesting:
candidates.append(obj)
for value in obj.values():
walk(value)
elif isinstance(obj, list):
for item in obj:
walk(item)
walk(savant_feed)
if not candidates:
return {}
pitch = candidates[-1]
lowered = {str(k).lower(): v for k, v in pitch.items()}
def pick(*names: str) -> Any:
for name in names:
if name.lower() in lowered:
return lowered[name.lower()]
return None
pitch_velocity = _safe_float(
pick("startSpeed", "pitchVelocity", "velocity", "release_speed", "velo", "endSpeed")
)
pitch_spin_rate = _safe_float(pick("spinRate", "spin_rate"))
pitch_extension = _safe_float(pick("extension"))
pitch_pfx_x = _safe_float(pick("pfxX", "pfx_x", "horzBreak", "horizontalBreak"))
pitch_pfx_z = _safe_float(pick("pfxZ", "pfz_z", "vertBreak", "verticalBreak"))
pitch_break_angle = _safe_float(pick("breakAngle"))
pitch_break_length = _safe_float(pick("breakLength"))
pitch_type = str(
pick("pitchType", "pitch_type", "pitchName", "pitch_name", "type", "pitch")
or ""
).strip()
last_pitch = str(
pick("description", "pitchResult", "pitch_result", "call", "result", "outcome")
or ""
).strip()
return {
"last_pitch": last_pitch,
"pitch_type": pitch_type,
"pitch_velocity": round(pitch_velocity, 1) if pitch_velocity is not None and pitch_velocity > 40 else None,
"pitch_spin_rate": round(pitch_spin_rate, 0) if pitch_spin_rate is not None else None,
"pitch_extension": round(pitch_extension, 1) if pitch_extension is not None else None,
"pitch_pfx_x": round(pitch_pfx_x, 2) if pitch_pfx_x is not None else None,
"pitch_pfx_z": round(pitch_pfx_z, 2) if pitch_pfx_z is not None else None,
"pitch_break_angle": round(pitch_break_angle, 1) if pitch_break_angle is not None else None,
"pitch_break_length": round(pitch_break_length, 1) if pitch_break_length is not None else None,
"savant_pitch_debug": str(pitch)[:2000],
}
try:
score_diff = int(out["away_score"] or 0) - int(out["home_score"] or 0)
batting_team_is_home = inning_half.lower() == "bottom"
away_wp, home_wp = estimate_win_probability(
score_diff=score_diff,
inning=int(current_inning or 1),
outs=int(out["outs"] or 0),
runner_on_1b=bool(out["runner_on_1b"]),
runner_on_2b=bool(out["runner_on_2b"]),
runner_on_3b=bool(out["runner_on_3b"]),
batting_team_is_home=batting_team_is_home,
)
out["away_win_prob"] = away_wp
out["home_win_prob"] = home_wp
except Exception:
out["away_win_prob"] = None
out["home_win_prob"] = None
return out