from __future__ import annotations from typing import Any from data.odds_name_map import normalize_pitcher_name from database.db import log_pitcher_resolution from models.win_probability import estimate_win_probability from data.live_statcast_feed import fetch_live_statcast_feed def _safe_int(value: Any) -> int: try: if value is None: return 0 text = str(value).strip().lower() if text in {"", "nan", "none"}: return 0 return int(float(value)) except Exception: return 0 def _safe_float(value: Any) -> float | None: try: if value is None: return None text = str(value).strip().lower() if text in {"", "nan", "none"}: return None return float(value) except Exception: return None def _first_non_null(*values): for value in values: if value is None: continue text = str(value).strip().lower() if text in {"", "nan", "none"}: continue return value return None def _extract_latest_savant_pitch_metrics(savant_feed: dict[str, Any]) -> dict[str, Any]: """ Defensive parser for unofficial Baseball Savant live feed payloads. Searches recursively for pitch-like dicts containing any of: - velocity - spin - extension - movement - pitch type / call / description """ if not savant_feed: return {} candidates: list[dict[str, Any]] = [] def walk(obj: Any) -> None: if isinstance(obj, dict): lowered = {str(k).lower(): v for k, v in obj.items()} keys = set(lowered.keys()) interesting = any( key in keys for key in [ "startspeed", "endspeed", "pitchvelocity", "velocity", "spinrate", "spin_rate", "extension", "pfxx", "pfxz", "breakangle", "breaklength", "pitchtype", "pitch_type", "pitchname", "pitch_name", "description", "pitchresult", "pitch_result", ] ) if interesting: candidates.append(obj) for value in obj.values(): walk(value) elif isinstance(obj, list): for item in obj: walk(item) walk(savant_feed) if not candidates: return {} pitch = candidates[-1] lowered = {str(k).lower(): v for k, v in pitch.items()} def pick(*names: str) -> Any: for name in names: if name.lower() in lowered: return lowered[name.lower()] return None pitch_velocity = _safe_float( pick("startSpeed", "pitchVelocity", "velocity", "release_speed", "velo", "endSpeed") ) pitch_spin_rate = _safe_float(pick("spinRate", "spin_rate")) pitch_extension = _safe_float(pick("extension")) pitch_pfx_x = _safe_float(pick("pfxX", "pfx_x", "horzBreak", "horizontalBreak")) pitch_pfx_z = _safe_float(pick("pfxZ", "pfz_z", "vertBreak", "verticalBreak")) pitch_break_angle = _safe_float(pick("breakAngle")) pitch_break_length = _safe_float(pick("breakLength")) pitch_type = str( pick("pitchType", "pitch_type", "pitchName", "pitch_name", "type", "pitch") or "" ).strip() last_pitch = str( pick("description", "pitchResult", "pitch_result", "call", "result", "outcome") or "" ).strip() return { "last_pitch": last_pitch, "pitch_type": pitch_type, "pitch_velocity": round(pitch_velocity, 1) if pitch_velocity is not None and pitch_velocity > 40 else None, "pitch_spin_rate": round(pitch_spin_rate, 0) if pitch_spin_rate is not None else None, "pitch_extension": round(pitch_extension, 1) if pitch_extension is not None else None, "pitch_pfx_x": round(pitch_pfx_x, 2) if pitch_pfx_x is not None else None, "pitch_pfx_z": round(pitch_pfx_z, 2) if pitch_pfx_z is not None else None, "pitch_break_angle": round(pitch_break_angle, 1) if pitch_break_angle is not None else None, "pitch_break_length": round(pitch_break_length, 1) if pitch_break_length is not None else None, "savant_pitch_debug": str(pitch)[:2000], } def format_status(inning_half: str, current_inning: int | None, fallback: str = "") -> str: if inning_half and current_inning: half_map = { "Top": "TOP", "Bottom": "BOT", "Middle": "MID", "End": "END", } prefix = half_map.get(str(inning_half).strip(), str(inning_half).upper()) return f"{prefix} {current_inning}" return fallback def _extract_person_name(obj: Any) -> str: if not isinstance(obj, dict): return "" return str(obj.get("fullName", "") or obj.get("name", "") or "").strip() def _player_name_from_id(feed: dict[str, Any], player_id: Any) -> str: if player_id is None or player_id == "": return "" players = (feed.get("gameData", {}) or {}).get("players", {}) or {} key = f"ID{player_id}" player = players.get(key, {}) or {} return str(player.get("fullName", "") or "").strip() def _extract_upcoming_hitters( feed: dict[str, Any], inning_half: str, current_batter_id: Any, ) -> tuple[str, str, str]: live_data = feed.get("liveData", {}) or {} boxscore = live_data.get("boxscore", {}) or {} teams = boxscore.get("teams", {}) or {} batting_side = "home" if str(inning_half).strip().lower() == "bottom" else "away" team_box = teams.get(batting_side, {}) or {} batter_ids = team_box.get("batters", []) or [] batter_ids = [str(x) for x in batter_ids if x is not None and str(x).strip() != ""] current_id = str(current_batter_id) if current_batter_id is not None else "" if not batter_ids: return "", "", "" if current_id in batter_ids: idx = batter_ids.index(current_id) rotated = batter_ids[idx + 1 :] + batter_ids[:idx] else: rotated = batter_ids[:] on_deck = _player_name_from_id(feed, rotated[0]) if len(rotated) >= 1 else "" in_hole = _player_name_from_id(feed, rotated[1]) if len(rotated) >= 2 else "" three_away = _player_name_from_id(feed, rotated[2]) if len(rotated) >= 3 else "" return on_deck, in_hole, three_away def derive_feed_status( feed: dict[str, Any], inning_half: str, current_inning: int | None, fallback: str = "", ) -> str: game_data = feed.get("gameData", {}) or {} status_info = game_data.get("status", {}) or {} abstract_state = str(status_info.get("abstractGameState", "") or "").strip().lower() detailed_state = str(status_info.get("detailedState", "") or "").strip() if abstract_state == "final": return "Final" if detailed_state.lower() in {"final", "game over", "completed", "ended"}: return "Final" if abstract_state == "live": formatted = format_status(inning_half, current_inning, "") return formatted if formatted else (detailed_state or "Live") if abstract_state == "preview": return fallback if fallback else "Scheduled" formatted = format_status(inning_half, current_inning, "") if formatted: return formatted if detailed_state: return detailed_state return fallback def enrich_game_from_live_feed(game: dict[str, Any], feed: dict[str, Any], conn: Any = None) -> dict[str, Any]: out = dict(game) if not feed: return out live_data = feed.get("liveData", {}) or {} savant_feed = {} try: game_pk = str(out.get("game_pk", "") or "").strip() if game_pk: savant_feed = fetch_live_statcast_feed(game_pk) except Exception: savant_feed = {} linescore = live_data.get("linescore", {}) or {} plays = live_data.get("plays", {}) or {} current_play = plays.get("currentPlay", {}) or {} count = current_play.get("count", {}) or {} #offense = linescore.get("offense", {}) or {} #defense = linescore.get("defense", {}) or {} matchup = current_play.get("matchup", {}) or {} batter = matchup.get("batter", {}) or {} pitcher = matchup.get("pitcher", {}) or {} # Primary source: current play matchup batter_name = _extract_person_name(batter) pitcher_name = _extract_person_name(pitcher) batter_id = batter.get("id") pitcher_id = pitcher.get("id") # Fallback 1: linescore offense / defense objects offense = linescore.get("offense", {}) or {} defense = linescore.get("defense", {}) or {} if not batter_name: batter_name = _extract_person_name(offense.get("batter", {})) if not pitcher_name: pitcher_name = _extract_person_name(defense.get("pitcher", {})) if not batter_id: batter_id = (offense.get("batter", {}) or {}).get("id") if not pitcher_id: pitcher_id = (defense.get("pitcher", {}) or {}).get("id") # Fallback 2: global players lookup by id if not batter_name and batter_id: batter_name = _player_name_from_id(feed, batter_id) if not pitcher_name and pitcher_id: pitcher_name = _player_name_from_id(feed, pitcher_id) out["batter_name"] = batter_name out["pitcher_name"] = pitcher_name # Log the raw pitcher name from the live feed — statcast resolution happens downstream if conn is not None and pitcher_name: _feed_game_pk = str(out.get("game_pk", "") or "").strip() _feed_date = str((feed.get("gameData", {}) or {}).get("datetime", {}).get("officialDate", "") or "").strip() try: log_pitcher_resolution(conn, { "game_pk": _feed_game_pk, "game_date": _feed_date, "source": "mlb_live", "input_name": pitcher_name, "normalized_name": normalize_pitcher_name(pitcher_name), "matched_canonical": None, "pitcher_id": pitcher_id, "match_method": "live_feed", "sample_size": 0, "p_throws": None, }) except Exception: pass out["balls"] = count.get("balls") out["strikes"] = count.get("strikes") out["outs"] = count.get("outs", linescore.get("outs")) out["runner_on_1b"] = offense.get("first") is not None out["runner_on_2b"] = offense.get("second") is not None out["runner_on_3b"] = offense.get("third") is not None result = current_play.get("result", {}) or {} out["last_play"] = str(result.get("description", "") or "").strip() play_events = current_play.get("playEvents", []) or [] pitch_event = None for event in reversed(play_events): pitch_data = event.get("pitchData") or {} if pitch_data: pitch_event = event break if pitch_event: pitch_data = pitch_event.get("pitchData", {}) or {} pitch_breaks = pitch_data.get("breaks", {}) or {} coords = pitch_data.get("coordinates", {}) or {} details = pitch_event.get("details", {}) or {} out["last_pitch"] = str(details.get("description", "") or "").strip() out["pitch_type"] = str(((details.get("type", {}) or {}).get("description", "")) or "").strip() out["pitch_velocity"] = pitch_data.get("startSpeed") out["pitch_spin_rate"] = pitch_breaks.get("spinRate") out["pitch_extension"] = pitch_data.get("extension") out["pitch_pfx_x"] = coords.get("pfxX") out["pitch_pfx_z"] = coords.get("pfxZ") def _safe_float(value: Any) -> float | None: try: if value is None: return None text = str(value).strip().lower() if text in {"", "nan", "none"}: return None return float(value) except Exception: return None def _extract_latest_savant_pitch_metrics(savant_feed: dict[str, Any]) -> dict[str, Any]: """ Defensive parser for unofficial Baseball Savant live feed payloads. Searches recursively for pitch-like dicts containing any of: - velocity - spin - extension - movement - pitch type / call / description """ if not savant_feed: return {} candidates: list[dict[str, Any]] = [] def walk(obj: Any) -> None: if isinstance(obj, dict): lowered = {str(k).lower(): v for k, v in obj.items()} keys = set(lowered.keys()) interesting = any( key in keys for key in [ "startspeed", "endspeed", "pitchvelocity", "velocity", "spinrate", "spin_rate", "extension", "pfxx", "pfxz", "breakangle", "breaklength", "pitchtype", "pitch_type", "pitchname", "pitch_name", "description", "pitchresult", "pitch_result", ] ) if interesting: candidates.append(obj) for value in obj.values(): walk(value) elif isinstance(obj, list): for item in obj: walk(item) walk(savant_feed) if not candidates: return {} pitch = candidates[-1] lowered = {str(k).lower(): v for k, v in pitch.items()} def pick(*names: str) -> Any: for name in names: if name.lower() in lowered: return lowered[name.lower()] return None pitch_velocity = _safe_float( pick("startSpeed", "pitchVelocity", "velocity", "release_speed", "velo", "endSpeed") ) pitch_spin_rate = _safe_float(pick("spinRate", "spin_rate")) pitch_extension = _safe_float(pick("extension")) pitch_pfx_x = _safe_float(pick("pfxX", "pfx_x", "horzBreak", "horizontalBreak")) pitch_pfx_z = _safe_float(pick("pfxZ", "pfz_z", "vertBreak", "verticalBreak")) pitch_break_angle = _safe_float(pick("breakAngle")) pitch_break_length = _safe_float(pick("breakLength")) pitch_type = str( pick("pitchType", "pitch_type", "pitchName", "pitch_name", "type", "pitch") or "" ).strip() last_pitch = str( pick("description", "pitchResult", "pitch_result", "call", "result", "outcome") or "" ).strip() return { "last_pitch": last_pitch, "pitch_type": pitch_type, "pitch_velocity": round(pitch_velocity, 1) if pitch_velocity is not None and pitch_velocity > 40 else None, "pitch_spin_rate": round(pitch_spin_rate, 0) if pitch_spin_rate is not None else None, "pitch_extension": round(pitch_extension, 1) if pitch_extension is not None else None, "pitch_pfx_x": round(pitch_pfx_x, 2) if pitch_pfx_x is not None else None, "pitch_pfx_z": round(pitch_pfx_z, 2) if pitch_pfx_z is not None else None, "pitch_break_angle": round(pitch_break_angle, 1) if pitch_break_angle is not None else None, "pitch_break_length": round(pitch_break_length, 1) if pitch_break_length is not None else None, "savant_pitch_debug": str(pitch)[:2000], } try: score_diff = int(out["away_score"] or 0) - int(out["home_score"] or 0) batting_team_is_home = inning_half.lower() == "bottom" away_wp, home_wp = estimate_win_probability( score_diff=score_diff, inning=int(current_inning or 1), outs=int(out["outs"] or 0), runner_on_1b=bool(out["runner_on_1b"]), runner_on_2b=bool(out["runner_on_2b"]), runner_on_3b=bool(out["runner_on_3b"]), batting_team_is_home=batting_team_is_home, ) out["away_win_prob"] = away_wp out["home_win_prob"] = home_wp except Exception: out["away_win_prob"] = None out["home_win_prob"] = None return out