Spaces:
Running
Running
| from __future__ import annotations | |
| from typing import Any | |
| from data.odds_name_map import normalize_pitcher_name | |
| from database.db import log_pitcher_resolution | |
| from models.win_probability import estimate_win_probability | |
| from data.live_statcast_feed import fetch_live_statcast_feed | |
| def _safe_int(value: Any) -> int: | |
| try: | |
| if value is None: | |
| return 0 | |
| text = str(value).strip().lower() | |
| if text in {"", "nan", "none"}: | |
| return 0 | |
| return int(float(value)) | |
| except Exception: | |
| return 0 | |
| def _safe_float(value: Any) -> float | None: | |
| try: | |
| if value is None: | |
| return None | |
| text = str(value).strip().lower() | |
| if text in {"", "nan", "none"}: | |
| return None | |
| return float(value) | |
| except Exception: | |
| return None | |
| def _first_non_null(*values): | |
| for value in values: | |
| if value is None: | |
| continue | |
| text = str(value).strip().lower() | |
| if text in {"", "nan", "none"}: | |
| continue | |
| return value | |
| return None | |
| def _extract_latest_savant_pitch_metrics(savant_feed: dict[str, Any]) -> dict[str, Any]: | |
| """ | |
| Defensive parser for unofficial Baseball Savant live feed payloads. | |
| Searches recursively for pitch-like dicts containing any of: | |
| - velocity | |
| - spin | |
| - extension | |
| - movement | |
| - pitch type / call / description | |
| """ | |
| if not savant_feed: | |
| return {} | |
| candidates: list[dict[str, Any]] = [] | |
| def walk(obj: Any) -> None: | |
| if isinstance(obj, dict): | |
| lowered = {str(k).lower(): v for k, v in obj.items()} | |
| keys = set(lowered.keys()) | |
| interesting = any( | |
| key in keys | |
| for key in [ | |
| "startspeed", | |
| "endspeed", | |
| "pitchvelocity", | |
| "velocity", | |
| "spinrate", | |
| "spin_rate", | |
| "extension", | |
| "pfxx", | |
| "pfxz", | |
| "breakangle", | |
| "breaklength", | |
| "pitchtype", | |
| "pitch_type", | |
| "pitchname", | |
| "pitch_name", | |
| "description", | |
| "pitchresult", | |
| "pitch_result", | |
| ] | |
| ) | |
| if interesting: | |
| candidates.append(obj) | |
| for value in obj.values(): | |
| walk(value) | |
| elif isinstance(obj, list): | |
| for item in obj: | |
| walk(item) | |
| walk(savant_feed) | |
| if not candidates: | |
| return {} | |
| pitch = candidates[-1] | |
| lowered = {str(k).lower(): v for k, v in pitch.items()} | |
| def pick(*names: str) -> Any: | |
| for name in names: | |
| if name.lower() in lowered: | |
| return lowered[name.lower()] | |
| return None | |
| pitch_velocity = _safe_float( | |
| pick("startSpeed", "pitchVelocity", "velocity", "release_speed", "velo", "endSpeed") | |
| ) | |
| pitch_spin_rate = _safe_float(pick("spinRate", "spin_rate")) | |
| pitch_extension = _safe_float(pick("extension")) | |
| pitch_pfx_x = _safe_float(pick("pfxX", "pfx_x", "horzBreak", "horizontalBreak")) | |
| pitch_pfx_z = _safe_float(pick("pfxZ", "pfz_z", "vertBreak", "verticalBreak")) | |
| pitch_break_angle = _safe_float(pick("breakAngle")) | |
| pitch_break_length = _safe_float(pick("breakLength")) | |
| pitch_type = str( | |
| pick("pitchType", "pitch_type", "pitchName", "pitch_name", "type", "pitch") | |
| or "" | |
| ).strip() | |
| last_pitch = str( | |
| pick("description", "pitchResult", "pitch_result", "call", "result", "outcome") | |
| or "" | |
| ).strip() | |
| return { | |
| "last_pitch": last_pitch, | |
| "pitch_type": pitch_type, | |
| "pitch_velocity": round(pitch_velocity, 1) if pitch_velocity is not None and pitch_velocity > 40 else None, | |
| "pitch_spin_rate": round(pitch_spin_rate, 0) if pitch_spin_rate is not None else None, | |
| "pitch_extension": round(pitch_extension, 1) if pitch_extension is not None else None, | |
| "pitch_pfx_x": round(pitch_pfx_x, 2) if pitch_pfx_x is not None else None, | |
| "pitch_pfx_z": round(pitch_pfx_z, 2) if pitch_pfx_z is not None else None, | |
| "pitch_break_angle": round(pitch_break_angle, 1) if pitch_break_angle is not None else None, | |
| "pitch_break_length": round(pitch_break_length, 1) if pitch_break_length is not None else None, | |
| "savant_pitch_debug": str(pitch)[:2000], | |
| } | |
| def format_status(inning_half: str, current_inning: int | None, fallback: str = "") -> str: | |
| if inning_half and current_inning: | |
| half_map = { | |
| "Top": "TOP", | |
| "Bottom": "BOT", | |
| "Middle": "MID", | |
| "End": "END", | |
| } | |
| prefix = half_map.get(str(inning_half).strip(), str(inning_half).upper()) | |
| return f"{prefix} {current_inning}" | |
| return fallback | |
| def _extract_person_name(obj: Any) -> str: | |
| if not isinstance(obj, dict): | |
| return "" | |
| return str(obj.get("fullName", "") or obj.get("name", "") or "").strip() | |
| def _player_name_from_id(feed: dict[str, Any], player_id: Any) -> str: | |
| if player_id is None or player_id == "": | |
| return "" | |
| players = (feed.get("gameData", {}) or {}).get("players", {}) or {} | |
| key = f"ID{player_id}" | |
| player = players.get(key, {}) or {} | |
| return str(player.get("fullName", "") or "").strip() | |
| def _extract_upcoming_hitters( | |
| feed: dict[str, Any], | |
| inning_half: str, | |
| current_batter_id: Any, | |
| ) -> tuple[str, str, str]: | |
| live_data = feed.get("liveData", {}) or {} | |
| boxscore = live_data.get("boxscore", {}) or {} | |
| teams = boxscore.get("teams", {}) or {} | |
| batting_side = "home" if str(inning_half).strip().lower() == "bottom" else "away" | |
| team_box = teams.get(batting_side, {}) or {} | |
| batter_ids = team_box.get("batters", []) or [] | |
| batter_ids = [str(x) for x in batter_ids if x is not None and str(x).strip() != ""] | |
| current_id = str(current_batter_id) if current_batter_id is not None else "" | |
| if not batter_ids: | |
| return "", "", "" | |
| if current_id in batter_ids: | |
| idx = batter_ids.index(current_id) | |
| rotated = batter_ids[idx + 1 :] + batter_ids[:idx] | |
| else: | |
| rotated = batter_ids[:] | |
| on_deck = _player_name_from_id(feed, rotated[0]) if len(rotated) >= 1 else "" | |
| in_hole = _player_name_from_id(feed, rotated[1]) if len(rotated) >= 2 else "" | |
| three_away = _player_name_from_id(feed, rotated[2]) if len(rotated) >= 3 else "" | |
| return on_deck, in_hole, three_away | |
| def derive_feed_status( | |
| feed: dict[str, Any], | |
| inning_half: str, | |
| current_inning: int | None, | |
| fallback: str = "", | |
| ) -> str: | |
| game_data = feed.get("gameData", {}) or {} | |
| status_info = game_data.get("status", {}) or {} | |
| abstract_state = str(status_info.get("abstractGameState", "") or "").strip().lower() | |
| detailed_state = str(status_info.get("detailedState", "") or "").strip() | |
| if abstract_state == "final": | |
| return "Final" | |
| if detailed_state.lower() in {"final", "game over", "completed", "ended"}: | |
| return "Final" | |
| if abstract_state == "live": | |
| formatted = format_status(inning_half, current_inning, "") | |
| return formatted if formatted else (detailed_state or "Live") | |
| if abstract_state == "preview": | |
| return fallback if fallback else "Scheduled" | |
| formatted = format_status(inning_half, current_inning, "") | |
| if formatted: | |
| return formatted | |
| if detailed_state: | |
| return detailed_state | |
| return fallback | |
| def enrich_game_from_live_feed(game: dict[str, Any], feed: dict[str, Any], conn: Any = None) -> dict[str, Any]: | |
| out = dict(game) | |
| if not feed: | |
| return out | |
| live_data = feed.get("liveData", {}) or {} | |
| savant_feed = {} | |
| try: | |
| game_pk = str(out.get("game_pk", "") or "").strip() | |
| if game_pk: | |
| savant_feed = fetch_live_statcast_feed(game_pk) | |
| except Exception: | |
| savant_feed = {} | |
| linescore = live_data.get("linescore", {}) or {} | |
| plays = live_data.get("plays", {}) or {} | |
| current_play = plays.get("currentPlay", {}) or {} | |
| count = current_play.get("count", {}) or {} | |
| #offense = linescore.get("offense", {}) or {} | |
| #defense = linescore.get("defense", {}) or {} | |
| matchup = current_play.get("matchup", {}) or {} | |
| batter = matchup.get("batter", {}) or {} | |
| pitcher = matchup.get("pitcher", {}) or {} | |
| # Primary source: current play matchup | |
| batter_name = _extract_person_name(batter) | |
| pitcher_name = _extract_person_name(pitcher) | |
| batter_id = batter.get("id") | |
| pitcher_id = pitcher.get("id") | |
| # Fallback 1: linescore offense / defense objects | |
| offense = linescore.get("offense", {}) or {} | |
| defense = linescore.get("defense", {}) or {} | |
| if not batter_name: | |
| batter_name = _extract_person_name(offense.get("batter", {})) | |
| if not pitcher_name: | |
| pitcher_name = _extract_person_name(defense.get("pitcher", {})) | |
| if not batter_id: | |
| batter_id = (offense.get("batter", {}) or {}).get("id") | |
| if not pitcher_id: | |
| pitcher_id = (defense.get("pitcher", {}) or {}).get("id") | |
| # Fallback 2: global players lookup by id | |
| if not batter_name and batter_id: | |
| batter_name = _player_name_from_id(feed, batter_id) | |
| if not pitcher_name and pitcher_id: | |
| pitcher_name = _player_name_from_id(feed, pitcher_id) | |
| out["batter_name"] = batter_name | |
| out["pitcher_name"] = pitcher_name | |
| # Log the raw pitcher name from the live feed — statcast resolution happens downstream | |
| if conn is not None and pitcher_name: | |
| _feed_game_pk = str(out.get("game_pk", "") or "").strip() | |
| _feed_date = str((feed.get("gameData", {}) or {}).get("datetime", {}).get("officialDate", "") or "").strip() | |
| try: | |
| log_pitcher_resolution(conn, { | |
| "game_pk": _feed_game_pk, | |
| "game_date": _feed_date, | |
| "source": "mlb_live", | |
| "input_name": pitcher_name, | |
| "normalized_name": normalize_pitcher_name(pitcher_name), | |
| "matched_canonical": None, | |
| "pitcher_id": pitcher_id, | |
| "match_method": "live_feed", | |
| "sample_size": 0, | |
| "p_throws": None, | |
| }) | |
| except Exception: | |
| pass | |
| out["balls"] = count.get("balls") | |
| out["strikes"] = count.get("strikes") | |
| out["outs"] = count.get("outs", linescore.get("outs")) | |
| out["runner_on_1b"] = offense.get("first") is not None | |
| out["runner_on_2b"] = offense.get("second") is not None | |
| out["runner_on_3b"] = offense.get("third") is not None | |
| result = current_play.get("result", {}) or {} | |
| out["last_play"] = str(result.get("description", "") or "").strip() | |
| play_events = current_play.get("playEvents", []) or [] | |
| pitch_event = None | |
| for event in reversed(play_events): | |
| pitch_data = event.get("pitchData") or {} | |
| if pitch_data: | |
| pitch_event = event | |
| break | |
| if pitch_event: | |
| pitch_data = pitch_event.get("pitchData", {}) or {} | |
| pitch_breaks = pitch_data.get("breaks", {}) or {} | |
| coords = pitch_data.get("coordinates", {}) or {} | |
| details = pitch_event.get("details", {}) or {} | |
| out["last_pitch"] = str(details.get("description", "") or "").strip() | |
| out["pitch_type"] = str(((details.get("type", {}) or {}).get("description", "")) or "").strip() | |
| out["pitch_velocity"] = pitch_data.get("startSpeed") | |
| out["pitch_spin_rate"] = pitch_breaks.get("spinRate") | |
| out["pitch_extension"] = pitch_data.get("extension") | |
| out["pitch_pfx_x"] = coords.get("pfxX") | |
| out["pitch_pfx_z"] = coords.get("pfxZ") | |
| def _safe_float(value: Any) -> float | None: | |
| try: | |
| if value is None: | |
| return None | |
| text = str(value).strip().lower() | |
| if text in {"", "nan", "none"}: | |
| return None | |
| return float(value) | |
| except Exception: | |
| return None | |
| def _extract_latest_savant_pitch_metrics(savant_feed: dict[str, Any]) -> dict[str, Any]: | |
| """ | |
| Defensive parser for unofficial Baseball Savant live feed payloads. | |
| Searches recursively for pitch-like dicts containing any of: | |
| - velocity | |
| - spin | |
| - extension | |
| - movement | |
| - pitch type / call / description | |
| """ | |
| if not savant_feed: | |
| return {} | |
| candidates: list[dict[str, Any]] = [] | |
| def walk(obj: Any) -> None: | |
| if isinstance(obj, dict): | |
| lowered = {str(k).lower(): v for k, v in obj.items()} | |
| keys = set(lowered.keys()) | |
| interesting = any( | |
| key in keys | |
| for key in [ | |
| "startspeed", | |
| "endspeed", | |
| "pitchvelocity", | |
| "velocity", | |
| "spinrate", | |
| "spin_rate", | |
| "extension", | |
| "pfxx", | |
| "pfxz", | |
| "breakangle", | |
| "breaklength", | |
| "pitchtype", | |
| "pitch_type", | |
| "pitchname", | |
| "pitch_name", | |
| "description", | |
| "pitchresult", | |
| "pitch_result", | |
| ] | |
| ) | |
| if interesting: | |
| candidates.append(obj) | |
| for value in obj.values(): | |
| walk(value) | |
| elif isinstance(obj, list): | |
| for item in obj: | |
| walk(item) | |
| walk(savant_feed) | |
| if not candidates: | |
| return {} | |
| pitch = candidates[-1] | |
| lowered = {str(k).lower(): v for k, v in pitch.items()} | |
| def pick(*names: str) -> Any: | |
| for name in names: | |
| if name.lower() in lowered: | |
| return lowered[name.lower()] | |
| return None | |
| pitch_velocity = _safe_float( | |
| pick("startSpeed", "pitchVelocity", "velocity", "release_speed", "velo", "endSpeed") | |
| ) | |
| pitch_spin_rate = _safe_float(pick("spinRate", "spin_rate")) | |
| pitch_extension = _safe_float(pick("extension")) | |
| pitch_pfx_x = _safe_float(pick("pfxX", "pfx_x", "horzBreak", "horizontalBreak")) | |
| pitch_pfx_z = _safe_float(pick("pfxZ", "pfz_z", "vertBreak", "verticalBreak")) | |
| pitch_break_angle = _safe_float(pick("breakAngle")) | |
| pitch_break_length = _safe_float(pick("breakLength")) | |
| pitch_type = str( | |
| pick("pitchType", "pitch_type", "pitchName", "pitch_name", "type", "pitch") | |
| or "" | |
| ).strip() | |
| last_pitch = str( | |
| pick("description", "pitchResult", "pitch_result", "call", "result", "outcome") | |
| or "" | |
| ).strip() | |
| return { | |
| "last_pitch": last_pitch, | |
| "pitch_type": pitch_type, | |
| "pitch_velocity": round(pitch_velocity, 1) if pitch_velocity is not None and pitch_velocity > 40 else None, | |
| "pitch_spin_rate": round(pitch_spin_rate, 0) if pitch_spin_rate is not None else None, | |
| "pitch_extension": round(pitch_extension, 1) if pitch_extension is not None else None, | |
| "pitch_pfx_x": round(pitch_pfx_x, 2) if pitch_pfx_x is not None else None, | |
| "pitch_pfx_z": round(pitch_pfx_z, 2) if pitch_pfx_z is not None else None, | |
| "pitch_break_angle": round(pitch_break_angle, 1) if pitch_break_angle is not None else None, | |
| "pitch_break_length": round(pitch_break_length, 1) if pitch_break_length is not None else None, | |
| "savant_pitch_debug": str(pitch)[:2000], | |
| } | |
| try: | |
| score_diff = int(out["away_score"] or 0) - int(out["home_score"] or 0) | |
| batting_team_is_home = inning_half.lower() == "bottom" | |
| away_wp, home_wp = estimate_win_probability( | |
| score_diff=score_diff, | |
| inning=int(current_inning or 1), | |
| outs=int(out["outs"] or 0), | |
| runner_on_1b=bool(out["runner_on_1b"]), | |
| runner_on_2b=bool(out["runner_on_2b"]), | |
| runner_on_3b=bool(out["runner_on_3b"]), | |
| batting_team_is_home=batting_team_is_home, | |
| ) | |
| out["away_win_prob"] = away_wp | |
| out["home_win_prob"] = home_wp | |
| except Exception: | |
| out["away_win_prob"] = None | |
| out["home_win_prob"] = None | |
| return out |