from __future__ import annotations import re import streamlit as st from PIL import Image from sqlalchemy import text from visualization.cards.card_data import ( build_hitter_card_data, build_pitcher_card_data, build_game_summary_card_data, ) from visualization.cards.poster_renderer import ( render_hitter_poster, render_pitcher_poster, render_game_summary_poster, ) from visualization.cards.card_queries import ( get_pitcher_selector_names, get_hitter_selector_names, get_pitcher_id_by_name, get_player_card_window_df, get_recent_completed_games_for_card_lab, get_game_summary_window_df, get_game_batter_log_df, get_game_players_for_card_lab, get_batter_mlbam_id, _PITCHER_SELECTOR_MIN_EVENTS, _HITTER_SELECTOR_MIN_EVENTS, ) from visualization.cards.player_assets import resolve_player_image from visualization.cards.player_identity import ( _HITTER_SEASONS_PATH, _PITCHER_SEASONS_PATH, _IDENTITY_MAP_PATH, ) from utils.logger import logger def normalize_name(name: str) -> str: """Convert 'Last, First' to 'First Last'. Pass-through for any other format.""" if not name: return "" if "," in name: last, first = [x.strip() for x in name.split(",", 1)] return f"{first} {last}" return str(name).strip() # --------------------------------------------------------------------------- # Card generation functions — button-click only, no caching # --------------------------------------------------------------------------- def _gen_hitter_bytes(conn, player_name, batter_id, mode, year, date, start_date, end_date, fmt, player_pil): windowed_df = get_player_card_window_df( conn, player_name, "Hitter", mode=mode, year=year, date=date, start_date=start_date, end_date=end_date, batter_id=batter_id, ) if windowed_df.empty: return None, "", "limited" payload = build_hitter_card_data( player_name, windowed_df, mode=mode, year=year, date=date, start_date=start_date, end_date=end_date, ) # Normalize player name for poster display (DB may store "Last, First") payload["player_name"] = normalize_name(payload.get("player_name", player_name)) img_bytes = render_hitter_poster(payload, player_img=player_pil, fmt=fmt) return img_bytes, payload.get("timeframe", ""), payload.get("data_quality", "") def _gen_pitcher_bytes(conn, player_name, pitcher_id, mode, year, date, start_date, end_date, fmt, player_pil): windowed_df = get_player_card_window_df( conn, player_name, "Pitcher", mode=mode, year=year, date=date, start_date=start_date, end_date=end_date, pitcher_id=pitcher_id, ) if windowed_df.empty: return None, "", "limited" payload = build_pitcher_card_data( player_name, windowed_df, mode=mode, year=year, date=date, start_date=start_date, end_date=end_date, ) # Normalize player name for poster display payload["player_name"] = normalize_name(payload.get("player_name", player_name)) img_bytes = render_pitcher_poster(payload, player_img=player_pil, fmt=fmt) return img_bytes, payload.get("timeframe", ""), payload.get("data_quality", "") def _gen_game_bytes( conn, game_pk, away_team, home_team, away_score, home_score, game_date, player_name, fmt, selected_hitters=None, selected_pitchers=None, ): windowed_df = get_game_summary_window_df(conn, game_pk, player_name=player_name) batter_log_df = get_game_batter_log_df(conn, int(game_pk)) game_row = { "game_pk": game_pk, "away_team": away_team, "home_team": home_team, "away_score": away_score, "home_score": home_score, "game_date": game_date, } payload = build_game_summary_card_data( game_pk, windowed_df, game_row, player_name, selected_hitters=selected_hitters, selected_pitchers=selected_pitchers, batter_log_df=batter_log_df, ) img_bytes = render_game_summary_poster(payload, fmt) return img_bytes, payload.get("timeframe", "") # --------------------------------------------------------------------------- # Main render function # --------------------------------------------------------------------------- def render_card_lab(conn) -> None: st.subheader("Kasper Card Lab") st.caption("Generate downloadable Kasper scouting report posters.") # ---- Card type ---- card_type = st.radio("Card Type", ["Hitter", "Pitcher", "Game Summary"], horizontal=True) # ---- Timeframe controls (Hitter / Pitcher only) ---- date = start_date = end_date = None year = None mode_key = "season" pitcher_id = None # set in pitcher selectbox block if card_type in ("Hitter", "Pitcher"): st.markdown("**Data Window**") tcol1, tcol2 = st.columns([1, 2]) with tcol1: mode = st.selectbox( "Mode", ["Season", "Date Range", "Single Date"], key="cl_mode" ) with tcol2: year = st.selectbox( "Year", [2026, 2025, 2024, 2023, 2022, 2021], key="cl_year" ) if mode == "Single Date": date = str(st.date_input("Date", key="cl_date")) elif mode == "Date Range": dcol1, dcol2 = st.columns(2) with dcol1: start_date = str(st.date_input("Start Date", key="cl_start")) with dcol2: end_date = str(st.date_input("End Date", key="cl_end")) mode_key = { "Season": "season", "Date Range": "date_range", "Single Date": "single_date", }[mode] # ---- Selector scope variables — initialized here, set in branches below ---- player_name: str | None = None player_name_display: str | None = None # display-only (hitter) selected_game_row: dict | None = None # Hitter pipeline variables hitter_display_names: list[str] = [] hitter_display_to_statcast: dict[str, str] = {} _hitter_batter_id: int | None = None # MLBAM batter id from Parquet # Pitcher pipeline variables pitcher_display_names: list[str] = [] pitcher_display_to_id: dict[str, int | None] = {} pitcher_pairs: list[tuple[str, int | None]] = [] # Game Summary pipeline variables game_players: dict = {"pitchers": [], "hitters": []} game_selected_hitters: list[dict] = [] game_selected_pitchers: list[dict] = [] selector_year = year if mode_key == "season" else None # ---- Player / game selection ---- if card_type == "Hitter": # Source: data/card_lab_hitter_seasons.parquet (pybaseball batting_stats, AB > 0 gate) hitter_display_names, hitter_display_to_statcast = get_hitter_selector_names( conn, year=selector_year ) if not hitter_display_names: st.info("No hitters found. Run scripts/build_card_lab_season_summaries.py first.") return selected_hitter_display = st.selectbox("Player", hitter_display_names, key="cl_player_hitter") # Read batter_id (MLBAM) from the Parquet — used for ec.batter = :batter_id query. # DB schema: ec.player_name = PITCHER, not batter. batter_id is required. _hitter_batter_id: int | None = None if _HITTER_SEASONS_PATH.exists(): import pandas as _pd_hi _hdf = _pd_hi.read_parquet(_HITTER_SEASONS_PATH) _match = _hdf[_hdf["display_name"] == selected_hitter_display] if selector_year is not None: _yr = _match[_match["Season"] == selector_year] if not _yr.empty: _match = _yr if not _match.empty: _pid = _match.iloc[0]["player_id"] if _pd_hi.notna(_pid): _hitter_batter_id = int(_pid) if _hitter_batter_id is None: st.warning( f"No MLBAM batter ID found for {selected_hitter_display!r}. " "Card data will be empty. Re-run build_player_identity_map.py." ) player_name = selected_hitter_display # display label (injected as literal in SQL) player_name_display = selected_hitter_display elif card_type == "Pitcher": # Source: data/card_lab_pitcher_seasons.parquet (pybaseball pitching_stats, IP > 0 gate) pitcher_pairs = get_pitcher_selector_names(conn, year=selector_year) if not pitcher_pairs: st.info("No pitchers found. Run scripts/build_card_lab_season_summaries.py first.") return pitcher_display_to_id = {name: pid for name, pid in pitcher_pairs} pitcher_display_names = [name for name, _ in pitcher_pairs] # already sorted selected_pitcher_display = st.selectbox("Player", pitcher_display_names, key="cl_player_pitcher") pitcher_id = pitcher_display_to_id[selected_pitcher_display] player_name = selected_pitcher_display # display name for poster (literal in pitcher SQL) else: # Game Summary gdf = get_recent_completed_games_for_card_lab(conn) if gdf.empty: st.info("No completed game data available for Game Summary cards.") return recent_games = gdf.to_dict("records") game_labels = [ f"{g.get('away_team', '?')} @ {g.get('home_team', '?')} — {str(g.get('game_date', ''))[:10]}" for g in recent_games ] sel_idx = st.selectbox( "Game", range(len(game_labels)), format_func=lambda i: game_labels[i], key="cl_game", ) selected_game_row = recent_games[sel_idx] game_pk = selected_game_row.get("game_pk") if game_pk: game_players = get_game_players_for_card_lab(conn, int(game_pk)) hitter_opts = [f"{p['team']} — {p['display_name']}" for p in game_players["hitters"]] pitcher_opts = [f"{p['team']} — {p['display_name']}" for p in game_players["pitchers"]] _hitter_label_map = {f"{p['team']} — {p['display_name']}": p for p in game_players["hitters"]} _pitcher_label_map = {f"{p['team']} — {p['display_name']}": p for p in game_players["pitchers"]} sel_hitter_labels = st.multiselect("Hitters in This Game", hitter_opts, key="cl_game_hitters") sel_pitcher_labels = st.multiselect("Pitchers in This Game", pitcher_opts, key="cl_game_pitchers") game_selected_hitters = [_hitter_label_map[lbl] for lbl in sel_hitter_labels] game_selected_pitchers = [_pitcher_label_map[lbl] for lbl in sel_pitcher_labels] player_name = None # full game windowed_df, no single-pitcher filter # ---- Auto-image: MLBAM ID lookup + 3-layer image resolve ---- mlbam_id: int | None = None player_pil = None if card_type == "Hitter" and player_name: # batter_id from Parquet — no DB call needed (ec.player_name = pitcher, not batter) mlbam_id = _hitter_batter_id player_pil = resolve_player_image(mlbam_id) if mlbam_id else None elif card_type == "Pitcher" and pitcher_id: mlbam_id = pitcher_id player_pil = resolve_player_image(mlbam_id) # ---- Debug expander ---- with st.expander("Debug", expanded=False): import pandas as _pd from collections import Counter as _Counter st.write(f"**Active card type:** {card_type}") st.write(f"**Identity map exists:** {_IDENTITY_MAP_PATH.exists()}") st.write(f"**Pitcher seasons exists:** {_PITCHER_SEASONS_PATH.exists()}") st.write(f"**Hitter seasons exists:** {_HITTER_SEASONS_PATH.exists()}") if card_type == "Pitcher": st.write("**Source:** data/card_lab_pitcher_seasons.parquet (pybaseball pitching_stats)") st.write(f"**Season:** {selector_year}") st.write("**Workload filter:** IP > 0") st.write(f"**Total names:** {len(pitcher_display_names)}") st.write("**First 20:**", pitcher_display_names[:20]) if _PITCHER_SEASONS_PATH.exists(): _pdf = _pd.read_parquet(_PITCHER_SEASONS_PATH) _yr_rows = _pdf[(_pdf["IP"] > 0) & (_pdf["Season"] == selector_year)] if selector_year else _pdf[_pdf["IP"] > 0] st.write(f"**Parquet rows for season:** {len(_yr_rows)}") null_id = sum(1 for _, pid in pitcher_pairs if pid is None) st.write(f"**Players with null pitcher_id:** {null_id}") dup_p = sum(1 for c in _Counter(pitcher_display_names).values() if c > 1) st.write(f"**Duplicate display_names:** {dup_p} (must be 0)") st.markdown("**Spot checks:**") for check in ["Tarik Skubal", "Sonny Gray", "Aaron Nola"]: found = [n for n, _ in pitcher_pairs if check.lower() in n.lower()] suffix = f" ({len(found)} match{'es' if len(found) != 1 else ''})" if found else "" st.write(f" {'✓' if found else '✗'} {check}{suffix}") elif card_type == "Hitter": st.write("**Source:** data/card_lab_hitter_seasons.parquet (pybaseball batting_stats)") st.write(f"**Season:** {selector_year}") st.write("**Workload filter:** AB > 0") st.write(f"**Total names:** {len(hitter_display_names)}") st.write("**First 20:**", hitter_display_names[:20]) if _HITTER_SEASONS_PATH.exists(): _hdf = _pd.read_parquet(_HITTER_SEASONS_PATH) _yr_rows = _hdf[(_hdf["AB"] > 0) & (_hdf["Season"] == selector_year)] if selector_year else _hdf[_hdf["AB"] > 0] st.write(f"**Parquet rows for season:** {len(_yr_rows)}") st.write(f"**statcast_name coverage:** all {len(hitter_display_names)} shown rows resolved (null rows excluded at build)") dup_h = sum(1 for c in _Counter(hitter_display_names).values() if c > 1) st.write(f"**Duplicate display_names:** {dup_h} (must be 0)") st.markdown("**Spot checks (hitters ✓, pitchers ✗):**") names_lc = {n.lower() for n in hitter_display_names} for check, expect_present in [ ("Lars Nootbaar", True), ("Alec Burleson", True), ("Freddy Peralta", False), ("Tarik Skubal", False), ]: found = any(check.lower() in n for n in names_lc) ok = found == expect_present st.write(f" {'✓' if ok else '✗'} {check} — {'present' if found else 'absent'}") if card_type in ("Hitter", "Pitcher"): st.write(f"**MLBAM ID:** {mlbam_id}") if card_type == "Hitter": st.write(f"**Batter ID (from Parquet):** {_hitter_batter_id}") st.write(f"**Image fetched:** {player_pil is not None}") elif card_type == "Game Summary": if selected_game_row: raw_gp = selected_game_row.get("game_pk") st.write(f"**game_pk raw value:** {raw_gp!r} (type: {type(raw_gp).__name__})") st.write(f"**Hitters found:** {len(game_players.get('hitters', []))}") st.write(f"**Pitchers found:** {len(game_players.get('pitchers', []))}") st.write("**First 10 hitters:**", [h['display_name'] for h in game_players.get('hitters', [])[:10]]) st.write("**First 10 pitchers:**", [p['display_name'] for p in game_players.get('pitchers', [])[:10]]) st.write(f"**Selected hitters:** {len(game_selected_hitters)}") st.write(f"**Selected pitchers:** {len(game_selected_pitchers)}") if game_selected_hitters: st.write("Selected hitters:", [h['display_name'] for h in game_selected_hitters]) if game_selected_pitchers: st.write("Selected pitchers:", [p['display_name'] for p in game_selected_pitchers]) if selected_game_row and selected_game_row.get("game_pk"): try: _gp = int(selected_game_row["game_pk"]) _cnt = conn.execute( text("SELECT COUNT(*) FROM statcast_event_core WHERE game_pk = :gp"), {"gp": _gp}, ).scalar() _p_cnt = conn.execute( text("SELECT COUNT(DISTINCT pitcher) FROM statcast_event_core WHERE game_pk = :gp"), {"gp": _gp}, ).scalar() _b_cnt = conn.execute( text("SELECT COUNT(DISTINCT batter) FROM statcast_event_core WHERE game_pk = :gp"), {"gp": _gp}, ).scalar() st.write(f"**statcast rows for game_pk {_gp}:** {_cnt}") st.write(f"**Distinct pitchers:** {_p_cnt} | **Distinct batters:** {_b_cnt}") except Exception as _e: st.write(f"**Statcast row count error:** {_e}") if game_players.get("_error"): st.write(f"**get_game_players error:** {game_players['_error']}") fmt = st.radio("Format", ["PNG", "JPG"], horizontal=True, key="cl_fmt") # ---- Generate button ---- generate = st.button("Generate Card", type="primary", key="cl_gen") # ---- Placeholder before first generation ---- if "card_bytes" not in st.session_state: st.info("Generate a card to preview.") if generate: status = st.empty() try: img_bytes = None tf = "" if card_type == "Hitter": status.info("Querying warehouse data...") status.info("Building poster...") img_bytes, tf, dq = _gen_hitter_bytes( conn, player_name, _hitter_batter_id, mode_key, year, date, start_date, end_date, fmt, player_pil ) st.session_state["card_player"] = normalize_name(player_name or "unknown") st.session_state["card_timeframe"] = tf elif card_type == "Pitcher": status.info("Querying warehouse data...") status.info("Building poster...") img_bytes, tf, dq = _gen_pitcher_bytes( conn, player_name, pitcher_id, mode_key, year, date, start_date, end_date, fmt, player_pil ) st.session_state["card_player"] = player_name or "unknown" st.session_state["card_timeframe"] = tf else: status.info("Querying warehouse data...") status.info("Building summary card...") img_bytes, tf = _gen_game_bytes( conn, game_pk=selected_game_row.get("game_pk"), away_team=selected_game_row.get("away_team", "—"), home_team=selected_game_row.get("home_team", "—"), away_score=selected_game_row.get("away_score"), home_score=selected_game_row.get("home_score"), game_date=str(selected_game_row.get("game_date", ""))[:10], player_name=player_name, fmt=fmt, selected_hitters=game_selected_hitters, selected_pitchers=game_selected_pitchers, ) st.session_state["card_player"] = player_name or "game" st.session_state["card_timeframe"] = tf if img_bytes is None: status.empty() st.warning( "No warehouse data found for this player and selected time window. " "Try a different season or date range." ) else: status.info("Finalizing download...") st.session_state["card_bytes"] = img_bytes st.session_state["card_type"] = card_type st.session_state["card_fmt"] = fmt status.empty() except Exception as exc: status.empty() logger.warning("[card_lab] generation failed: %s", exc) st.error(f"Card generation failed: {exc}") # ---- Preview + download ---- if st.session_state.get("card_bytes"): st.image(st.session_state["card_bytes"], width=700) p = (st.session_state.get("card_player") or "game").replace(" ", "_").lower() t = st.session_state.get("card_type", "card").replace(" ", "_").lower() tf = st.session_state.get("card_timeframe", "").replace(" ", "_").replace("/", "-").lower() f = st.session_state.get("card_fmt", "png").lower() p = re.sub(r"[^a-z0-9_\-]", "", p) tf = re.sub(r"[^a-z0-9_\-]", "", tf) fname = f"kasper_{t}_{p}_{tf}.{f}" mime = "image/jpeg" if f == "jpg" else "image/png" st.download_button( label=f"Download {st.session_state.get('card_fmt', 'PNG')}", data=st.session_state["card_bytes"], file_name=fname, mime=mime, key="cl_dl", )