from __future__ import annotations import logging from typing import Any import pandas as pd from data.odds_name_map import normalize_pitcher_name from database.db import log_pitcher_resolution from models.pitcher_live_state_v2 import build_pitcher_live_state_v2 logger = logging.getLogger(__name__) def _to_last_first_variants(name: str) -> set[str]: normalized = normalize_pitcher_name(name) if not normalized: return set() parts = normalized.split() variants = {normalized} if len(parts) >= 2: first = parts[0] last = parts[-1] middle = " ".join(parts[1:-1]).strip() if middle: variants.add(f"{last} {first} {middle}".strip()) variants.add(f"{last} {first}".strip()) return variants def _safe_mean(series: pd.Series) -> float | None: numeric = pd.to_numeric(series, errors="coerce").dropna() if numeric.empty: return None return float(numeric.mean()) def build_pitcher_feature_row( statcast_df: pd.DataFrame, pitcher_name: str, pitcher_id: int | None = None, conn: Any = None, game_pk: str | None = None, game_date: str | None = None, source: str = "pitcher_adjustment", ) -> dict[str, Any]: _empty_row: dict[str, Any] = { "pitcher_name": pitcher_name, "ev_allowed": None, "hard_hit_rate_allowed": None, "barrel_rate_allowed": None, "avg_release_speed": None, "avg_release_spin_rate": None, "avg_release_extension": None, "avg_pfx_x": None, "avg_pfx_z": None, "sample_size": 0, "p_throws": "R", "gb_rate_allowed": 0.0, "ld_rate_allowed": 0.0, "fb_rate_allowed": 0.0, "popup_rate_allowed": 0.0, "la_sweet_spot_allowed_rate": 0.0, "la_optimal_hr_allowed_rate": 0.0, "avg_launch_angle_allowed": None, "swstr_rate": None, "csw_rate": None, "ball_rate": None, } def _log(match_method: str, matched_canonical: str | None, sample_size: int, p_throws: str | None) -> None: if conn is None: return try: log_pitcher_resolution(conn, { "game_pk": game_pk, "game_date": game_date, "source": source, "input_name": pitcher_name, "normalized_name": normalize_pitcher_name(pitcher_name), "matched_canonical": matched_canonical, "pitcher_id": pitcher_id, "match_method": match_method, "sample_size": sample_size, "p_throws": p_throws, }) except Exception as exc: logger.debug("[pitcher_adjustment] resolution log write failed: %s", exc) if statcast_df.empty or "player_name" not in statcast_df.columns: _log("failed", None, 0, None) return _empty_row player_names = statcast_df["player_name"].astype(str).fillna("") normalized_series = player_names.map(normalize_pitcher_name) # Priority 1: pitcher ID match df = pd.DataFrame() if pitcher_id is not None and "pitcher" in statcast_df.columns: try: numeric_pitcher_ids = pd.to_numeric(statcast_df["pitcher"], errors="coerce") id_df = statcast_df[numeric_pitcher_ids == int(pitcher_id)].copy() if not id_df.empty: df = id_df _throws = None if "p_throws" in df.columns: mode_vals = df["p_throws"].dropna().mode() if not mode_vals.empty: _throws = str(mode_vals.iloc[0]).strip() canonical = str(df["player_name"].iloc[0]) if "player_name" in df.columns else None _log("id", canonical, int(len(df)), _throws) except Exception as e: logger.debug(f"[pitcher_adjustment] pitcher ID match failed: {e}") # Priority 2: exact normalized name / last-first variant match if df.empty: normalized_target_variants = _to_last_first_variants(pitcher_name) mask = normalized_series.isin(normalized_target_variants) df = statcast_df[mask].copy() if not df.empty: _throws = None if "p_throws" in df.columns: mode_vals = df["p_throws"].dropna().mode() if not mode_vals.empty: _throws = str(mode_vals.iloc[0]).strip() canonical = str(df["player_name"].iloc[0]) if "player_name" in df.columns else None _log("exact", canonical, int(len(df)), _throws) # Priority 3: loose contains-style fallback on first + last token if df.empty: normalized_pitcher_name = normalize_pitcher_name(pitcher_name) name_parts = normalized_pitcher_name.split() if len(name_parts) >= 2: first = name_parts[0] last = name_parts[-1] loose_mask = normalized_series.apply( lambda n: isinstance(n, str) and first in n and last in n ) df = statcast_df[loose_mask].copy() if not df.empty: _throws = None if "p_throws" in df.columns: mode_vals = df["p_throws"].dropna().mode() if not mode_vals.empty: _throws = str(mode_vals.iloc[0]).strip() canonical = str(df["player_name"].iloc[0]) if "player_name" in df.columns else None _log("loose", canonical, int(len(df)), _throws) if df.empty: _log("failed", None, 0, None) return _empty_row launch_speed = pd.to_numeric(df.get("launch_speed"), errors="coerce") launch_angle = pd.to_numeric(df.get("launch_angle"), errors="coerce") release_speed = pd.to_numeric(df.get("release_speed"), errors="coerce") release_spin_rate = pd.to_numeric(df.get("release_spin_rate"), errors="coerce") release_extension = pd.to_numeric(df.get("release_extension"), errors="coerce") pfx_x = pd.to_numeric(df.get("pfx_x"), errors="coerce") pfx_z = pd.to_numeric(df.get("pfx_z"), errors="coerce") ev_allowed = _safe_mean(launch_speed) avg_release_speed = _safe_mean(release_speed) avg_release_spin_rate = _safe_mean(release_spin_rate) avg_release_extension = _safe_mean(release_extension) avg_pfx_x = _safe_mean(pfx_x) avg_pfx_z = _safe_mean(pfx_z) hard_hit_rate_allowed = None if not launch_speed.dropna().empty: hard_hit_rate_allowed = float((launch_speed >= 95).mean()) barrel_rate_allowed = None if not launch_speed.dropna().empty and not launch_angle.dropna().empty: valid = pd.DataFrame({"launch_speed": launch_speed, "launch_angle": launch_angle}).dropna() if not valid.empty: barrel_mask = ( (valid["launch_speed"] >= 98) & (valid["launch_angle"] >= 26) & (valid["launch_angle"] <= 30) ) | ( (valid["launch_speed"] >= 100) & (valid["launch_angle"] >= 23) & (valid["launch_angle"] <= 33) ) barrel_rate_allowed = float(barrel_mask.mean()) # E1b: Extract pitcher throwing hand p_throws = "R" if "p_throws" in df.columns: mode_vals = df["p_throws"].dropna().mode() if not mode_vals.empty: p_throws = str(mode_vals.iloc[0]).strip() # ---------------------------- # 12G: Pitcher contact-shape taxonomy (allowed) # ---------------------------- gb_rate_allowed = 0.0 ld_rate_allowed = 0.0 fb_rate_allowed = 0.0 popup_rate_allowed = 0.0 la_sweet_spot_allowed_rate = 0.0 la_optimal_hr_allowed_rate = 0.0 avg_launch_angle_allowed = None if "bb_type" in df.columns: contact_df = df[df["bb_type"].notna() & (df["bb_type"].astype(str).str.strip() != "")] if len(contact_df) >= 5: bb = contact_df["bb_type"].astype(str).str.strip().str.lower() n = float(len(bb)) gb_rate_allowed = float((bb == "ground_ball").sum()) / n ld_rate_allowed = float((bb == "line_drive").sum()) / n fb_rate_allowed = float((bb == "fly_ball").sum()) / n popup_rate_allowed = float((bb == "popup").sum()) / n la_valid = launch_angle.dropna() if len(la_valid) >= 5: la_sweet_spot_allowed_rate = float(((la_valid >= 8) & (la_valid <= 32)).mean()) la_optimal_hr_allowed_rate = float(((la_valid >= 25) & (la_valid <= 35)).mean()) avg_launch_angle_allowed = _safe_mean(launch_angle) # Batch 13: Pitch-level command rates from description column swstr_rate = None csw_rate = None ball_rate = None if "description" in df.columns and len(df) >= 10: desc = df["description"].astype(str).str.strip().str.lower() total = len(desc) swstr_mask = desc.isin({"swinging_strike", "swinging_strike_blocked"}) cs_mask = desc == "called_strike" ball_mask = desc.isin({"ball", "blocked_ball", "intent_ball", "pitchout"}) swstr_rate = float(swstr_mask.sum() / total) csw_rate = float((swstr_mask | cs_mask).sum() / total) ball_rate = float(ball_mask.sum() / total) return { "pitcher_name": pitcher_name, "ev_allowed": ev_allowed, "hard_hit_rate_allowed": hard_hit_rate_allowed, "barrel_rate_allowed": barrel_rate_allowed, "avg_release_speed": avg_release_speed, "avg_release_spin_rate": avg_release_spin_rate, "avg_release_extension": avg_release_extension, "avg_pfx_x": avg_pfx_x, "avg_pfx_z": avg_pfx_z, "sample_size": int(len(df)), "p_throws": p_throws, "gb_rate_allowed": gb_rate_allowed, "ld_rate_allowed": ld_rate_allowed, "fb_rate_allowed": fb_rate_allowed, "popup_rate_allowed": popup_rate_allowed, "la_sweet_spot_allowed_rate": la_sweet_spot_allowed_rate, "la_optimal_hr_allowed_rate": la_optimal_hr_allowed_rate, "avg_launch_angle_allowed": avg_launch_angle_allowed, "swstr_rate": swstr_rate, "csw_rate": csw_rate, "ball_rate": ball_rate, } def compute_pitcher_adjustment( batter_row: dict[str, Any], pitcher_row: dict[str, Any], context: dict[str, Any] | None = None, ) -> dict[str, Any]: hit_adj = 0.0 hr_adj = 0.0 tb2p_adj = 0.0 reason_tags: list[str] = [] context = context or {} ev_allowed = pitcher_row.get("ev_allowed") hard_hit_rate_allowed = pitcher_row.get("hard_hit_rate_allowed") barrel_rate_allowed = pitcher_row.get("barrel_rate_allowed") avg_release_speed = pitcher_row.get("avg_release_speed") avg_release_spin_rate = pitcher_row.get("avg_release_spin_rate") avg_release_extension = pitcher_row.get("avg_release_extension") avg_pfx_x = pitcher_row.get("avg_pfx_x") avg_pfx_z = pitcher_row.get("avg_pfx_z") # Shared core profile effects if ev_allowed is not None: shift = max(-0.03, min(0.04, (float(ev_allowed) - 89.0) * 0.004)) hit_adj += shift tb2p_adj += shift * 0.9 hr_adj += shift * 0.7 if shift > 0.01: reason_tags.append("Pitcher allows loud contact") if hard_hit_rate_allowed is not None: shift = max(-0.02, min(0.03, (float(hard_hit_rate_allowed) - 0.38) * 0.12)) hit_adj += shift tb2p_adj += shift if shift > 0.01: reason_tags.append("Elevated hard-hit allowed") if barrel_rate_allowed is not None: shift = max(-0.015, min(0.04, (float(barrel_rate_allowed) - 0.07) * 0.55)) hr_adj += shift tb2p_adj += shift * 0.75 if shift > 0.01: reason_tags.append("Barrel-prone pitcher") if avg_release_spin_rate is not None: spin = float(avg_release_spin_rate) if spin >= 2400: hit_adj -= 0.003 hr_adj -= 0.003 tb2p_adj -= 0.002 reason_tags.append("Strong pitch spin") elif spin <= 2050: hit_adj += 0.004 hr_adj += 0.005 tb2p_adj += 0.003 reason_tags.append("Below-average spin") if avg_release_extension is not None: ext = float(avg_release_extension) if ext >= 6.4: hit_adj -= 0.003 hr_adj -= 0.002 tb2p_adj -= 0.002 reason_tags.append("Long release extension") elif ext <= 5.8: hit_adj += 0.003 hr_adj += 0.003 tb2p_adj += 0.002 reason_tags.append("Short release extension") movement_fired = False _pfx_x_str = f"{avg_pfx_x:.3f}" if avg_pfx_x is not None else "N/A" _pfx_z_str = f"{avg_pfx_z:.3f}" if avg_pfx_z is not None else "N/A" try: if avg_pfx_x is not None and abs(float(avg_pfx_x)) >= 0.75: # ~9 inches in feet hit_adj -= 0.003 tb2p_adj -= 0.003 reason_tags.append("strong_horizontal_break") movement_fired = True except Exception as e: logger.debug(f"[pitcher_adjustment] pfx_x movement block skipped: {e}") try: if avg_pfx_z is not None and abs(float(avg_pfx_z)) >= 1.17: # ~14 inches in feet hit_adj -= 0.003 hr_adj -= 0.003 tb2p_adj -= 0.003 reason_tags.append("strong_vertical_break") movement_fired = True except Exception as e: logger.debug(f"[pitcher_adjustment] pfx_z movement block skipped: {e}") movement_signal_debug = f"pfx_x={_pfx_x_str} pfx_z={_pfx_z_str} fired={'Y' if movement_fired else 'N'}" # G1: Velocity-band precision segmentation if avg_release_speed is not None: avg_velo = float(avg_release_speed) if avg_velo >= 97: hit_adj -= 0.008 hr_adj -= 0.007 reason_tags.append("Elite velo (97+)") elif avg_velo >= 95: hit_adj -= 0.004 hr_adj -= 0.004 reason_tags.append("Plus velo (95-96)") elif avg_velo <= 90: hit_adj += 0.009 hr_adj += 0.008 reason_tags.append("Soft velo (≤90)") elif avg_velo <= 92: hit_adj += 0.005 hr_adj += 0.006 reason_tags.append("Below-avg velo (91-92)") # Strong Phase 6 adaptive live-state overlay game_row = context.get("game_row", {}) or {} live_state = build_pitcher_live_state_v2( pitcher_row=pitcher_row, game_row=game_row, context=context, ) fatigue_score = float(live_state.get("fatigue_score", 0.0) or 0.0) degradation_score = float(live_state.get("degradation_score", 0.0) or 0.0) trust_live_score = float(live_state.get("trust_live_score", 0.0) or 0.0) # Live degradation affects contact / HR more than hit generally hit_adj += fatigue_score * 0.010 hr_adj += fatigue_score * 0.015 tb2p_adj += fatigue_score * 0.012 hit_adj += degradation_score * 0.008 hr_adj += degradation_score * 0.014 tb2p_adj += degradation_score * 0.011 # Trust-live score increases the magnitude slightly when evidence is strong hit_adj += trust_live_score * 0.003 hr_adj += trust_live_score * 0.004 tb2p_adj += trust_live_score * 0.003 reason_tags.extend(live_state.get("reason_tags", [])) # Capture pre-clamp values for debug _hit_adj_pre = hit_adj _hr_adj_pre = hr_adj _tb2p_adj_pre = tb2p_adj # Final net clamp — prevents extreme multi-signal stacking hit_adj = max(-0.030, min(0.030, hit_adj)) hr_adj = max(-0.025, min(0.025, hr_adj)) tb2p_adj = max(-0.025, min(0.025, tb2p_adj)) return { "hit_adj": hit_adj, "hr_adj": hr_adj, "tb2p_adj": tb2p_adj, "reason_tags": reason_tags[:8], "fatigue_score": fatigue_score, "degradation_score": degradation_score, "trust_live_score": trust_live_score, "baseline_weight": live_state.get("baseline_weight"), "live_weight": live_state.get("live_weight"), "velo_delta": live_state.get("velo_delta"), "spin_delta": live_state.get("spin_delta"), "extension_delta": live_state.get("extension_delta"), "pitch_count": live_state.get("pitch_count"), "times_through_order": live_state.get("times_through_order"), "rolling_pitch_sample_size": live_state.get("rolling_pitch_sample_size"), "rolling_pitch_velocity_sample_size": live_state.get("rolling_pitch_velocity_sample_size"), "rolling_pitch_spin_sample_size": live_state.get("rolling_pitch_spin_sample_size"), "rolling_pitch_extension_sample_size": live_state.get("rolling_pitch_extension_sample_size"), "rolling_pitch_pfx_x_sample_size": live_state.get("rolling_pitch_pfx_x_sample_size"), "rolling_pitch_pfx_z_sample_size": live_state.get("rolling_pitch_pfx_z_sample_size"), # Task 5 — movement signal debug "movement_signal_debug": movement_signal_debug, # Task 6 — pre-clamp values for transparency "pitcher_net_adj_pre_clamp_hit": _hit_adj_pre, "pitcher_net_adj_pre_clamp_hr": _hr_adj_pre, "pitcher_net_adj_pre_clamp_tb2p": _tb2p_adj_pre, }