import streamlit as st import pandas as pd import numpy as np import json import math import requests import os from scipy.stats import nbinom from PIL import Image import altair as alt import base64 import sklearn # noqa: F401 from sklearn.metrics import ( brier_score_loss, mean_squared_error, mean_absolute_error, r2_score, log_loss, ) import plotly.express as px import plotly.graph_objects as go import hashlib # --- Configuration for Admin Access --- ADMIN_PASSWORD = ( "Monkeyrocks11$$" # <<< IMPORTANT: CHANGE THIS TO A STRONG PASSWORD IN REAL USE! ) GROUP_FILE = "player_groups.json" TEAMS_DICT = { "Arsenal": 1, "Aston Villa": 2, "Burnley": 3, "AFC Bournemouth": 4, "Brentford": 5, "Brighton and Hove Albion": 6, "Chelsea": 7, "Crystal Palace": 8, "Everton": 9, "Fulham": 10, "Leeds United": 11, "Liverpool": 12, "Manchester City": 13, "Manchester United": 14, "Newcastle United": 15, "Nottingham Forest": 16, "Sunderland": 17, "Tottenham Hotspur": 18, "West Ham United": 19, "Wolverhampton Wanderers": 20, } TEAMS_DICT_REVERSE = { 1: "Arsenal", 2: "Aston Villa", 3: "Burnley", 4: "Bournemouth", 5: "Brentford", 6: "Brighton", 7: "Chelsea", 8: "Crystal Palace", 9: "Everton", 10: "Fulham", 11: "Leeds", 12: "Liverpool", 13: "Man City", 14: "Man Utd", 15: "Newcastle", 16: "Nott'm Forest", 17: "Sunderland", 18: "Spurs", 19: "West Ham", 20: "Wolves", } @st.cache_data(ttl=3600) def get_base64_of_bin_file(bin_file): with open(bin_file, "rb") as f: data = f.read() return base64.b64encode(data).decode() def set_bg_hack(main_bg, opacity=0.15): """ A function to unpack an image from root folder and set as bg. Parameters ---------- main_bg : str The file path to the image. opacity : float The opacity of the overlay (0.0 to 1.0). 0.0 = fully visible image, 1.0 = solid color cover. """ main_bg_ext = "png" # Read and encode the image b64_encoded = get_base64_of_bin_file(main_bg) # CSS to inject # The linear-gradient overlays a semi-transparent black layer st.markdown( f""" """, unsafe_allow_html=True, ) def sidebar_bg(side_bg): side_bg_ext = "png" with open(side_bg, "rb") as f: img_data = f.read() b64_encoded = base64.b64encode(img_data).decode() st.markdown( f""" """, unsafe_allow_html=True, ) side_bg = "luigiside.png" sidebar_bg(side_bg) set_bg_hack("luigismansion.jpg") st.markdown( """ """, unsafe_allow_html=True, ) def set_custom_font(font_path): with open(font_path, "rb") as f: data = f.read() b64 = base64.b64encode(data).decode() font_css = f""" """ st.markdown(font_css, unsafe_allow_html=True) set_custom_font("luigifont.ttf") def load_player_groups(): if os.path.exists(GROUP_FILE): try: with open(GROUP_FILE, "r") as f: return json.load(f) except: # noqa: E722 return {} return {} def save_player_groups(groups): with open(GROUP_FILE, "w") as f: json.dump(groups, f, indent=4) if "player_groups" not in st.session_state: st.session_state.player_groups = load_player_groups() # --- Initialize all session state variables at the top level --- # This ensures they are always present, even on reruns. st.session_state.setdefault("initialized", False) st.session_state.setdefault("finalized_df", None) st.session_state.setdefault("match_df", None) # Player penalty shares: Initialized with a default dict, will be overridden by loaded data st.session_state.setdefault("player_penalty_shares", {}) st.session_state.setdefault("admin_persistent_availability_multipliers", {}) st.session_state.setdefault("user_availability_multipliers", {}) # Session-only st.session_state.setdefault("selected_availability_player", None) st.session_state.setdefault("selected_availability_player_non_admin", None) st.session_state.setdefault("MINS_THRESHOLD", 30) # MINS_SCALING_BONUS is explicitly 0.0, as confirmed previously. st.session_state.setdefault("MINS_SCALING_BONUS", 0.0) st.session_state.setdefault("pos_map", {}) st.session_state.setdefault("teams_dict_1", {}) st.session_state.setdefault("teams_dict", {}) # FIX: Adjusted points_config to remove CBIT/CBITR related multipliers, as they are now tiered. st.session_state.setdefault( "points_config", { "goal": {1: 10, 2: 6, 3: 5, 4: 4}, "assist": 3, "clean_sheet": {1: 4, 2: 4, 3: 1, 4: 0}, "saves_per_3": 1, # "cbit_per_10": 2, # Removed: now handled by tiered logic # "cbitr_per_12": 2, # Removed: now handled by tiered logic # "cbit_cbitr_multiplier": 0.45, # Removed: now handled by tiered logic "penalty_points_per_position": {2: 0.8, 3: 0.6, 4: 0.5}, }, ) st.session_state.setdefault( "user_xmins_overrides", {} ) # Session-only overrides for ALL users st.session_state.setdefault( "admin_persistent_xmins_overrides", {} ) # Admin-specific, saved to file st.session_state.setdefault("user_baseline_overrides", {}) # Crucial: Initialized here st.session_state.setdefault( "user_player_status_overrides", {} ) # Crucial: Initialized here st.session_state.setdefault("output_df", None) st.session_state.setdefault("team_baselines", None) st.session_state.setdefault("admin_persistent_share_overrides", {}) # Default decay and ramp-up rates (will be loaded/overridden from rates_config.json) # These provide a fallback if rates_config.json doesn't exist or loading fails. st.session_state.setdefault( "decay_rates", { "default": 0.99, "suspended": 0.99, "injured_decay": 0.99, "rotational_risk": 0.95, }, ) st.session_state.setdefault( "ramp_up_rates", { "default": 3, "injured": 9, "suspended": 3, "starter": 0, "rotational_risk": 2, }, ) st.session_state.setdefault("team_skepticism", {}) st.session_state.setdefault("RAMP_UP_PERIOD", 3) # Initialize session state for selected players in dropdowns st.session_state.setdefault("selected_xm_player", None) st.session_state.setdefault("selected_status_player", None) st.session_state.setdefault("selected_baseline_player", None) st.session_state.setdefault("selected_penalty_player", None) # Admin login status st.session_state.setdefault("is_admin_logged_in", False) @st.cache_data(ttl=300, show_spinner="Loading FPL data...") def fetch_fpl_data_cached(): """ Cached version of FPL API fetch - reduces load time by 5-10 seconds TTL of 300 seconds (5 minutes) keeps data fresh enough """ try: url = "https://fantasy.premierleague.com/api/bootstrap-static/" response = requests.get(url, timeout=10) response.raise_for_status() return response.json() except Exception as e: st.error(f"Failed to fetch FPL data: {e}") return None @st.cache_data(ttl=3600, show_spinner=False) def load_team_ratings_cached(): """Cache team ratings CSV - loaded on Tab 3""" df = pd.read_csv("team_ratings_dual_speed.csv") return df @st.cache_data(ttl=3600, show_spinner=False) def load_fixture_projections_cached(): """Cache fixture projections CSV - loaded on Tab 4""" df = pd.read_csv("ewmapois_model.csv") df.columns = df.columns.str.strip() return df # --- Persistence Functions --- def make_json_serializable(obj): """Convert numpy/pandas types to JSON serializable types""" if hasattr(obj, "item"): # numpy scalar return obj.item() elif hasattr(obj, "tolist"): # numpy array return obj.tolist() elif isinstance(obj, dict): return {str(k): make_json_serializable(v) for k, v in obj.items()} elif isinstance(obj, (list, tuple)): return [make_json_serializable(item) for item in obj] elif isinstance(obj, (int, float, str, bool)) or obj is None: return obj else: # For any other type, try to convert to string return str(obj) def save_admin_overrides(): """ Save ONLY admin-controlled overrides (baselines, status, penalties, global rates, and admin-persistent weekly xMins) to JSON files. """ try: # Save baseline overrides baseline_data = make_json_serializable(st.session_state.user_baseline_overrides) with open("user_baseline_overrides.json", "w", encoding="utf-8") as f: json.dump(baseline_data, f, indent=4, ensure_ascii=False) f.flush() os.fsync(f.fileno()) # Save player status overrides status_data = make_json_serializable( st.session_state.user_player_status_overrides ) with open("user_player_status_overrides.json", "w", encoding="utf-8") as f: json.dump(status_data, f, indent=4, ensure_ascii=False) f.flush() os.fsync(f.fileno()) # Save player penalty shares penalty_shares_data = make_json_serializable( st.session_state.player_penalty_shares ) with open("player_penalty_shares.json", "w", encoding="utf-8") as f: json.dump(penalty_shares_data, f, indent=4, ensure_ascii=False) f.flush() os.fsync(f.fileno()) # Save rates config rates_config = { "decay_rates": make_json_serializable(st.session_state.decay_rates), "ramp_up_rates": make_json_serializable(st.session_state.ramp_up_rates), "RAMP_UP_PERIOD": make_json_serializable(st.session_state.RAMP_UP_PERIOD), "MINS_THRESHOLD": make_json_serializable(st.session_state.MINS_THRESHOLD), } with open("rates_config.json", "w", encoding="utf-8") as f: json.dump(rates_config, f, indent=4, ensure_ascii=False) f.flush() os.fsync(f.fileno()) # Save admin-persistent xMins overrides admin_xmins_data = make_json_serializable( st.session_state.admin_persistent_xmins_overrides ) with open("admin_persistent_xmins_overrides.json", "w", encoding="utf-8") as f: json.dump(admin_xmins_data, f, indent=4, ensure_ascii=False) f.flush() os.fsync(f.fileno()) # Save admin-persistent availability multipliers availability_data = make_json_serializable( st.session_state.admin_persistent_availability_multipliers ) with open( "admin_persistent_availability_multipliers.json", "w", encoding="utf-8" ) as f: json.dump(availability_data, f, indent=4, ensure_ascii=False) f.flush() os.fsync(f.fileno()) share_data = make_json_serializable( st.session_state.admin_persistent_share_overrides ) try: with open( "admin_persistent_share_overrides.json", "w", encoding="utf-8" ) as f: json.dump(share_data, f, indent=4, ensure_ascii=False) except Exception as e: st.error(f"Error saving share overrides: {e}") return True except Exception as e: st.error(f"Error saving admin overrides: {e}") return False def load_admin_overrides(): """ Load ONLY admin-controlled overrides from JSON files. user_xmins_overrides (session-only) are NOT loaded from file. """ try: # Load baseline overrides if ( os.path.exists("user_baseline_overrides.json") and os.path.getsize("user_baseline_overrides.json") > 0 ): with open("user_baseline_overrides.json", "r", encoding="utf-8") as f: loaded_baselines = json.load(f) # Convert string keys back to integers st.session_state.user_baseline_overrides = { int(pid): stat_dict for pid, stat_dict in loaded_baselines.items() } else: st.session_state.user_baseline_overrides = {} # Load player status overrides if ( os.path.exists("user_player_status_overrides.json") and os.path.getsize("user_player_status_overrides.json") > 0 ): with open("user_player_status_overrides.json", "r", encoding="utf-8") as f: loaded_status = json.load(f) st.session_state.user_player_status_overrides = { int(pid): status_dict for pid, status_dict in loaded_status.items() } else: st.session_state.user_player_status_overrides = {} # Load player penalty shares if ( os.path.exists("player_penalty_shares.json") and os.path.getsize("player_penalty_shares.json") > 0 ): with open("player_penalty_shares.json", "r", encoding="utf-8") as f: loaded_penalty_shares = json.load(f) st.session_state.player_penalty_shares = { int(pid): share for pid, share in loaded_penalty_shares.items() } else: # Default penalty shares if file not found st.session_state.player_penalty_shares = { 16: 0.65, 17: 0.15, 30: 0.05, 666: 0.3, 48: 0.4, 64: 0.7, 81: 0.9, 97: 0.25, 136: 0.8, 121: 0.09, 178: 0.8, 158: 0.05, 202: 0.25, 215: 0.6, 216: 0.02, 235: 0.9, 249: 0.1, 266: 0.6, 267: 0.04, 283: 0.4, 299: 0.85, 311: 0.1, 310: 0.1, 337: 0.6, 327: 0.55, 343: 0.4, 362: 0.7, 381: 0.95, 382: 0.1, 386: 0.05, 430: 0.95, 413: 0.15, 449: 0.9, 119: 0.1, 450: 0.05, 499: 0.85, 485: 0.2, 474: 0.02, 525: 0.85, 515: 0.25, 596: 0.9, 612: 0.8, 624: 0.25, 625: 0.04, 647: 0.1, 654: 0.85, } # Load rates config if ( os.path.exists("rates_config.json") and os.path.getsize("rates_config.json") > 0 ): with open("rates_config.json", "r", encoding="utf-8") as f: rates_config = json.load(f) st.session_state.decay_rates = rates_config.get( "decay_rates", st.session_state.decay_rates ) st.session_state.ramp_up_rates = rates_config.get( "ramp_up_rates", st.session_state.ramp_up_rates ) st.session_state.RAMP_UP_PERIOD = rates_config.get( "RAMP_UP_PERIOD", st.session_state.RAMP_UP_PERIOD ) st.session_state.MINS_THRESHOLD = rates_config.get( "MINS_THRESHOLD", st.session_state.MINS_THRESHOLD ) # Load admin-persistent xMins overrides if ( os.path.exists("admin_persistent_xmins_overrides.json") and os.path.getsize("admin_persistent_xmins_overrides.json") > 0 ): with open( "admin_persistent_xmins_overrides.json", "r", encoding="utf-8" ) as f: loaded_xmins = json.load(f) st.session_state.admin_persistent_xmins_overrides = { int(pid): {int(gw): val for gw, val in gw_dict.items()} for pid, gw_dict in loaded_xmins.items() } else: st.session_state.admin_persistent_xmins_overrides = {} if ( os.path.exists("admin_persistent_availability_multipliers.json") and os.path.getsize("admin_persistent_availability_multipliers.json") > 0 ): with open( "admin_persistent_availability_multipliers.json", "r", encoding="utf-8" ) as f: loaded_availability = json.load(f) st.session_state.admin_persistent_availability_multipliers = { int(pid): {int(gw): val for gw, val in gw_dict.items()} for pid, gw_dict in loaded_availability.items() } else: st.session_state.admin_persistent_availability_multipliers = {} # Load share overrides try: if os.path.exists("admin_persistent_share_overrides.json"): with open( "admin_persistent_share_overrides.json", "r", encoding="utf-8" ) as f: st.session_state.admin_persistent_share_overrides = json.load(f) else: st.session_state.admin_persistent_share_overrides = {} except Exception as e: st.error(f"Error loading share overrides: {e}") st.session_state.admin_persistent_share_overrides = {} return True except Exception as e: st.error(f"Error loading admin overrides: {e}") return False # --- Data Loading and Initial Processing Functions --- # ORIGINAL FUNCTION: merge_player_baseline_stats @st.cache_data(ttl=3600) # Cache for 24 hours def load_baseline_stats_cached(gk_path, outfield_path): """Cache the baseline stats CSVs""" gk_stats_df = pd.read_csv(gk_path) outfield_stats_df = pd.read_csv(outfield_path) # Strip columns here too gk_stats_df.columns = gk_stats_df.columns.str.strip() outfield_stats_df.columns = outfield_stats_df.columns.str.strip() gk_stats_df["player_name"] = gk_stats_df["player_name"].str.strip() outfield_stats_df["player_name"] = outfield_stats_df["player_name"].str.strip() return gk_stats_df, outfield_stats_df def merge_player_baseline_stats( df, gk_stats_csv_path, outfield_stats_csv_path, gk_element_type=1 ): main_df = df.copy() try: gk_stats_df, outfield_stats_df = load_baseline_stats_cached( gk_stats_csv_path, outfield_stats_csv_path ) except FileNotFoundError: st.error( f"Error: {gk_stats_csv_path} not found. Please ensure it's in the same directory as the app." ) return None # Indicate failure except Exception as e: st.error(f"Error loading {gk_stats_csv_path}: {e}") return None try: outfield_stats_df = pd.read_csv(outfield_stats_csv_path) except FileNotFoundError: st.error( f"Error: {outfield_stats_csv_path} not found. Please ensure it's in the same directory as the app." ) return None # Indicate failure except Exception as e: st.error(f"Error loading {outfield_stats_csv_path}: {e}") return None main_df.columns = main_df.columns.str.strip() main_df["name"] = main_df["name"].str.strip() gk_mask = main_df["element_type"] == gk_element_type gk_players = main_df[gk_mask].copy() outfield_players = main_df[~gk_mask].copy() gk_merged = gk_players.merge( gk_stats_df, left_on="name", right_on="player_name", how="left" ) outfield_merged = outfield_players.merge( outfield_stats_df, left_on="name", right_on="player_name", how="left" ) gk_baseline_cols = [ col for col in gk_stats_df.columns if col.startswith("baseline_") ] outfield_baseline_cols = [ col for col in outfield_stats_df.columns if col.startswith("baseline_") ] for col in outfield_baseline_cols: if col not in gk_merged.columns: gk_merged[col] = np.nan for col in gk_baseline_cols: if col not in outfield_merged.columns: outfield_merged[col] = np.nan if "player_name" in gk_merged.columns: gk_merged = gk_merged.drop("player_name", axis=1) if "player_name" in outfield_merged.columns: outfield_merged = outfield_merged.drop("player_name", axis=1) final_df = pd.concat([gk_merged, outfield_merged], ignore_index=True) final_df = final_df.sort_values("id").reset_index(drop=True) # Fill NaNs specifically for baseline columns with 0. all_baseline_cols_combined = list(set(gk_baseline_cols + outfield_baseline_cols)) for col in all_baseline_cols_combined: if col in final_df.columns: final_df[col] = final_df[col].fillna(0) return final_df @st.cache_data(ttl=3600) def load_data_and_setup_initial_df(): # Load or create dummy players.csv r = fetch_fpl_data_cached() players = pd.DataFrame(r["elements"]) players["name"] = players["first_name"] + " " + players["second_name"] # Removed 'chance_of_playing_next_round' and 'chance_of_playing_this_round' from drops players = players.drop( columns=[ "can_transact", "can_select", "cost_change_event", "cost_change_event_fall", "cost_change_start", "cost_change_start_fall", "dreamteam_count", "ep_next", "ep_this", "form", "in_dreamteam", "news_added", "photo", "removed", "special", "transfers_in", "transfers_in_event", "transfers_out", "transfers_out_event", "value_form", "value_season", "region", "team_join_date", "birth_date", "now_cost_rank", "now_cost_rank_type", "form_rank", "form_rank_type", "points_per_game_rank", "points_per_game_rank_type", "selected_rank", "selected_rank_type", "selected_by_percent", "code", "penalties_text", "has_temporary_code", "first_name", "second_name", ] ) columns_order = [ "id", "name", "web_name", "element_type", "now_cost", "team", "chance_of_playing_this_round", # Keep this for info, but not directly for decay "news", ] players = players[columns_order] players["now_cost"] = players["now_cost"] / 10 # Load or create dummy rename.json try: with open("rename.json", "r", encoding="utf-8") as file: rename_dict = json.load(file) except FileNotFoundError: st.error( "Error: rename.json not found. Please ensure it's in the same directory as the app." ) return None except Exception as e: st.error(f"Error loading rename.json: {e}") return None players["name"] = players["name"].replace(rename_dict) # Merge player baseline statistics finalized_df = merge_player_baseline_stats( players, "statistical_weighted_baselines_gk.csv", "statistical_weighted_baselines.csv", gk_element_type=1, ) if finalized_df is None: # Propagate error if merge_player_baseline_stats failed return None # Calculate Avg_BPS finalized_df["Avg_BPS"] = 0.0 finalized_df.loc[finalized_df["element_type"] == 1, "Avg_BPS"] = finalized_df[ "baseline_gk_bps_p90" ].astype(float) finalized_df.loc[finalized_df["element_type"] == 2, "Avg_BPS"] = ( finalized_df["baseline_Neutral_BPS_p90"] + finalized_df["baseline_Def_BPS_p90"] ) finalized_df.loc[finalized_df["element_type"] == 3, "Avg_BPS"] = ( finalized_df["baseline_Neutral_BPS_p90"] + finalized_df["baseline_Mid_BPS_p90"] ) finalized_df.loc[finalized_df["element_type"] == 4, "Avg_BPS"] = ( finalized_df["baseline_Neutral_BPS_p90"] + finalized_df["baseline_Fwd_BPS_p90"] ) return finalized_df @st.cache_data(ttl=3600) # Cache for 24 hours def load_team_baselines_cached(): """Cache the team baselines Excel file""" team_baselines = pd.read_excel("team_totals.xlsx", sheet_name="Sheet2") return team_baselines @st.cache_data def load_data_and_setup_initial_df_2(finalized_df): teams_dict = TEAMS_DICT teams_dict_1 = TEAMS_DICT_REVERSE # Load team_totals.xlsx from file try: team_baselines = load_team_baselines_cached() team_baselines["Teams"] = team_baselines["Teams"].replace(teams_dict) except FileNotFoundError: st.error( "Error: team_totals.xlsx not found. Please ensure it's in the same directory as the app." ) return None, None, None, None # Indicate failure except Exception as e: st.error(f"Error loading team_totals.xlsx: {e}") return None, None, None, None # Map team stats to players (these are for calculating shares later) finalized_df["Team_xG"] = finalized_df["team"].map( team_baselines.set_index("Teams")["xG"].to_dict() ) finalized_df["Team_xA"] = finalized_df["team"].map( team_baselines.set_index("Teams")["xA"].to_dict() ) finalized_df["Team_xCBIT"] = finalized_df["team"].map( team_baselines.set_index("Teams")["CBIT"].to_dict() ) finalized_df["Team_xCBITR"] = finalized_df["team"].map( team_baselines.set_index("Teams")["CBITR"].to_dict() ) finalized_df["Team_YC"] = finalized_df["team"].map( team_baselines.set_index("Teams")["YC"].to_dict() ) # FIX: Changed 'df' to 'finalized_df' in the next 6 lines finalized_df["Team_RC"] = finalized_df["team"].map( team_baselines.set_index("Teams")["RC"].to_dict() ) # Load ewmapois_model.csv (match data) from file try: match_df = load_fixture_projections_cached() except FileNotFoundError: st.error( "Error: ewmapois_model.csv not found. Please ensure it's in the same directory as the app." ) return None, None, None, None # Indicate failure except Exception as e: st.error(f"Error loading ewmapois_model.csv: {e}") return None, None, None, None # Map team names to numbers for match_df match_df["home_team_num"] = match_df["home_team"].map(teams_dict) match_df["away_team_num"] = match_df["away_team"].map(teams_dict) # Ensure 'Team' column in finalized_df is string names, not IDs finalized_df["Team"] = finalized_df["team"].map(teams_dict_1) # Recalculate share percentages, handling division by zero for team totals # Using .replace(0, np.nan) to prevent ZeroDivisionError, then fill NaNs finalized_df["xG_share"] = finalized_df["baseline_xG_p90"] / finalized_df[ "Team_xG" ].replace(0, np.nan) finalized_df["xA_share"] = finalized_df["baseline_xA_p90"] / finalized_df[ "Team_xA" ].replace(0, np.nan) finalized_df["xCBIT_share"] = finalized_df["baseline_CBIT_p90"] / finalized_df[ "Team_xCBIT" ].replace(0, np.nan) finalized_df["xCBITR_share"] = finalized_df["baseline_CBITR_p90"] / finalized_df[ "Team_xCBITR" ].replace(0, np.nan) finalized_df["YC_share"] = finalized_df["baseline_yc_p90"] / finalized_df[ "Team_YC" ].replace(0, np.nan) finalized_df["RC_share"] = finalized_df["baseline_rc_p90"] / finalized_df[ "Team_RC" ].replace(0, np.nan) # Fill NaNs resulting from division by zero with 0 or a reasonable default finalized_df.fillna(0, inplace=True) # Recalculate shares after initial data load # This call was moved after the relevant finalized_df share calculations above # to ensure finalized_df already has the Team_xG, Team_xA, etc. columns. # The recalculate_player_shares function works on a copy, so it's safe. finalized_df = recalculate_player_shares(finalized_df, team_baselines) st.session_state.finalized_df = finalized_df.copy() return finalized_df, match_df, teams_dict, teams_dict_1 def poisson_probability_of_conceding_2_or_more_goals(lambd): """Calculates the probability of conceding 2 or more goals using Poisson distribution.""" p_0 = math.exp(-lambd) p_1 = lambd * math.exp(-lambd) return 1 - p_0 - p_1 def poisson_pmf(k, lambd): """Calculates the Poisson Probability Mass Function P(X=k).""" if k < 0: return 0.0 if lambd < 1e-9: # Treat very small lambda as zero for stability return 1.0 if k == 0 else 0.0 return (lambd**k * math.exp(-lambd)) / math.factorial(k) def neg_binom_probability_of_value(expected_mean, value, dispersion=1.0): """ Calculates the exact probability (PMF) of getting exactly 'value' events. Used for: Saves, Goals, Assists. """ if expected_mean <= 0: return 0.0 if dispersion <= 1.0: # Fallback to Poisson if no dispersion return poisson_pmf(value, expected_mean) # Convert Mean + Dispersion to n, p p = 1 / dispersion n = (expected_mean * p) / (1 - p) return nbinom.pmf(value, n, p) def neg_binom_probability_at_least(expected_mean, threshold, dispersion=1.0): """ Calculates probability of getting 'threshold' OR MORE events. Used for: DefCons (CBIT), Recoveries. """ if expected_mean <= 0: return 0.0 if dispersion <= 1.0: # Use existing Poisson logic if dispersion is low return 1 - poisson_cdf(threshold - 1, expected_mean) p = 1 / dispersion n = (expected_mean * p) / (1 - p) # Probability of X >= threshold is (1 - CDF(threshold - 1)) return 1 - nbinom.cdf(threshold - 1, n, p) def calculate_expected_conceded_points(lambd): """ Calculates the expected fantasy points from goals conceded based on a -1 point penalty for every 2 goals. """ total_expected_points = 0 # We check up to 10 goals, which is a safe upper limit for a single match. max_goals_to_check = 10 for k in range(max_goals_to_check + 1): # Calculate the probability of conceding exactly k goals prob_k = poisson_pmf(k=k, lambd=lambd) # Determine the fantasy points for this outcome # Integer division (//) is perfect for this rule. # 0//2=0, 1//2=0, 2//2=1, 3//2=1, 4//2=2, etc. points_for_k_goals = -(k // 2) # Add the weighted value (probability * points) to the total total_expected_points += prob_k * points_for_k_goals return total_expected_points def poisson_cdf(k, lambd): """Calculates the Poisson Cumulative Distribution Function P(X<=k).""" if k < 0: return 0.0 if lambd < 1e-9: # Treat very small lambda as zero for stability return 1.0 if k >= 0 else 0.0 return sum(poisson_pmf(i, lambd) for i in range(math.floor(k) + 1)) def recalculate_player_shares(finalized_df_copy, team_baselines): """ Recalculates player-specific shares of team statistics. This function should be called whenever baseline stats are updated. """ df = finalized_df_copy.copy() # Work on a copy # Ensure team_baselines are correctly mapped to player df teams_dict = TEAMS_DICT # Ensure team_baselines has 'Teams' mapped to numbers as it is used for mapping team_baselines_mapped = team_baselines.copy() team_baselines_mapped["Teams"] = team_baselines_mapped["Teams"].replace(teams_dict) # Map team stats to players (these are for calculating shares later) df["Team_xG"] = df["team"].map( team_baselines_mapped.set_index("Teams")["xG"].to_dict() ) df["Team_xA"] = df["team"].map( team_baselines_mapped.set_index("Teams")["xA"].to_dict() ) df["Team_xCBIT"] = df["team"].map( team_baselines_mapped.set_index("Teams")["CBIT"].to_dict() ) df["Team_xCBITR"] = df["team"].map( team_baselines_mapped.set_index("Teams")["CBITR"].to_dict() ) df["Team_YC"] = df["team"].map( team_baselines_mapped.set_index("Teams")["YC"].to_dict() ) df["Team_RC"] = df["team"].map( team_baselines_mapped.set_index("Teams")["RC"].to_dict() ) # Calculate share percentages, handling division by zero for team totals # Using .replace(0, np.nan) to prevent ZeroDivisionError, then fill NaNs df["xG_share"] = df["baseline_xG_p90"] / df["Team_xG"].replace(0, np.nan) df["xA_share"] = df["baseline_xA_p90"] / df["Team_xA"].replace(0, np.nan) df["xCBIT_share"] = df["baseline_CBIT_p90"] / df["Team_xCBIT"].replace(0, np.nan) df["xCBITR_share"] = df["baseline_CBITR_p90"] / df["Team_xCBITR"].replace(0, np.nan) df["YC_share"] = df["baseline_yc_p90"] / df["Team_YC"].replace(0, np.nan) df["RC_share"] = df["baseline_rc_p90"] / df["Team_RC"].replace(0, np.nan) # Fill NaNs resulting from division by zero with 0 or a reasonable default df.fillna(0, inplace=True) # Ensure no NaNs from division by zero cause issues return df def apply_team_skepticism(df, skepticism_factors): """ Applies a skepticism multiplier to a player's base points based on their team. """ if not skepticism_factors: return df for team_id, multiplier in skepticism_factors.items(): # Get player IDs for the specified team players_on_team = df[df["team"] == team_id].index # Apply the multiplier to the base_pts df.loc[players_on_team, "base_pts"] *= multiplier return df # --- Main Calculation Logic (encapsulated) --- def get_df_hash(df): """Create a stable hash for a DataFrame""" return hashlib.md5(pd.util.hash_pandas_object(df, index=True).values).hexdigest() def calculate_single_match_points( player, match_row, xMins_in_match, points_config, is_gk=False, is_def=False, is_mid=False, is_fwd=False, ): """ Calculates points for a single match given the xMins and match projections. Includes full logic for CBIT, CBITR, Penalty Saves, and dynamic BPS. """ if xMins_in_match <= 0: return 0.0 scaling_factor = xMins_in_match / 90.0 player_team_num = player["team"] player_pos = player["element_type"] # 1. Identify Home/Away and get Opponent Stats if player_team_num == match_row["home_team_num"]: team_proj_goals = match_row["mc_home_goals_mean"] team_conc_goals = match_row["mc_away_goals_mean"] team_proj_assists = match_row["mc_home_assists_xa_mean"] team_proj_cbit = match_row["mc_home_CBIT_mean"] team_proj_cbitr = match_row["mc_home_CBITR_mean"] team_proj_saves = match_row["mc_home_keeper_saves_mean"] team_proj_yc = match_row["mc_home_yc_mean"] team_proj_rc = match_row["mc_home_rc_mean"] cs_odds = match_row["home_clean_sheet_odds"] else: team_proj_goals = match_row["mc_away_goals_mean"] team_conc_goals = match_row["mc_home_goals_mean"] team_proj_assists = match_row["mc_away_assists_xa_mean"] team_proj_cbit = match_row["mc_away_CBIT_mean"] team_proj_cbitr = match_row["mc_away_CBITR_mean"] team_proj_saves = match_row["mc_away_keeper_saves_mean"] team_proj_yc = match_row["mc_away_yc_mean"] team_proj_rc = match_row["mc_away_rc_mean"] cs_odds = match_row["away_clean_sheet_odds"] # 2. Player Share Calculations proj_goals = player["xG_share"] * team_proj_goals proj_assists = player["xA_share"] * team_proj_assists # CBIT / CBITR Projections proj_cbit = player["xCBIT_share"] * team_proj_cbit proj_cbitr = player["xCBITR_share"] * team_proj_cbitr # GK Specific Projections proj_saves = 0 proj_pen_saves = 0 if is_gk: proj_saves = (player["baseline_xSaves_p90"] + team_proj_saves) / 2 proj_pen_saves = player["baseline_pksave_p90"] # --- GOALS (Poisson) --- pts_goals = 0.0 for k in range(9): # Check 0 to 8 goals prob = poisson_pmf(k, proj_goals) pts_per_goal = points_config["goal"][player_pos] pts_goals += prob * k * pts_per_goal pts_goals *= scaling_factor # --- ASSISTS (Poisson) --- pts_assists = 0.0 for k in range(9): prob = poisson_pmf(k, proj_assists) pts_assists += prob * k * points_config["assist"] pts_assists *= scaling_factor # --- CLEAN SHEET --- pts_cs = 0.0 if xMins_in_match >= 60: pts_cs = cs_odds * points_config["clean_sheet"][player_pos] else: pts_cs = (cs_odds * points_config["clean_sheet"][player_pos]) * scaling_factor # --- CONCEDED --- pts_conc = 0.0 if (is_gk or is_def) and team_conc_goals is not None: # Expected points usually negative, calculated via Poisson raw_conc = calculate_expected_conceded_points(team_conc_goals) pts_conc = raw_conc * scaling_factor # --- CARDS --- pts_yc = (player["YC_share"] * team_proj_yc * -1) * scaling_factor pts_rc = (player["RC_share"] * team_proj_rc * -3) * scaling_factor # --- SAVES (GK) --- pts_saves = 0.0 if is_gk: # Saves points (1 pt per 3 saves) # Using Negative Binomial approximation for saves distribution expected_saves_pts_unscaled = 0.0 for k_saves in range(21): prob_k = neg_binom_probability_of_value(proj_saves, k_saves, dispersion=1.5) pts_k = (k_saves // 3) * points_config["saves_per_3"] expected_saves_pts_unscaled += prob_k * pts_k pts_saves = expected_saves_pts_unscaled * scaling_factor # --- PENALTY SAVES (GK) --- pts_pen_save = 0.0 if is_gk: expected_pen_saved_pts_unscaled = 0.0 for k_pen in range(3): prob_k = poisson_pmf(k_pen, proj_pen_saves) pts_k = k_pen * 5 expected_pen_saved_pts_unscaled += prob_k * pts_k pts_pen_save = expected_pen_saved_pts_unscaled * scaling_factor # --- CBIT (Defenders) --- pts_cbit = 0.0 if is_def: cbit_threshold = 10 prob_hit = neg_binom_probability_at_least( proj_cbit, cbit_threshold, dispersion=3.2 ) pts_cbit = prob_hit * 2 * scaling_factor # --- CBITR (Mids/Fwds) --- pts_cbitr = 0.0 if is_mid: cbitr_threshold = 12 prob_hit = neg_binom_probability_at_least( proj_cbitr, cbitr_threshold, dispersion=2.8 ) pts_cbitr = prob_hit * 2 * scaling_factor elif is_fwd: cbitr_threshold = 12 prob_hit = neg_binom_probability_at_least( proj_cbitr, cbitr_threshold, dispersion=1.7 ) pts_cbitr = prob_hit * 2 * scaling_factor # --- PENALTY POINTS (Taker) --- pts_penalty = 0.0 # Assuming standard calculation or passed via config. # If you have specific penalty taker logic, ensure 'penalty_pts_raw' is passed or calculated here. # For now, using the basic structure from your loop: if player["id"] in st.session_state.player_penalty_shares: pen_share = st.session_state.player_penalty_shares[player["id"]] base_pen_pts = points_config["penalty_points_per_position"].get(player_pos, 0) pts_penalty = (base_pen_pts * pen_share) * scaling_factor # --- APPEARANCE (Per Match) --- pts_app = 0.0 if xMins_in_match > 60: pts_app = 2 elif xMins_in_match > 0: pts_app = 1 # --- BONUS POINTS (Detailed Reconstruction) --- # 1. Floor bps_floor = player["baseline_bps_floor_p90"] * scaling_factor # 2. Mins Played BPS bps_mins = 6 if xMins_in_match >= 60 else (3 if xMins_in_match > 0 else 0) # 3. Events BPS (Approximation using Expected Counts) # Using the RAW projected counts (scaled) to estimate BPS scaled_goals = proj_goals * scaling_factor scaled_assists = proj_assists * scaling_factor scaled_saves = proj_saves * scaling_factor if is_gk else 0 scaled_pen_saves = proj_pen_saves * scaling_factor if is_gk else 0 scaled_yc = player["YC_share"] * team_proj_yc * scaling_factor scaled_rc = player["RC_share"] * team_proj_rc * scaling_factor bps_goals = 0 if is_fwd: bps_goals = scaled_goals * 24 elif is_mid: bps_goals = scaled_goals * 18 else: bps_goals = scaled_goals * 12 # Def/GK bps_assists = scaled_assists * 9 bps_cs = 0 if (is_gk or is_def) and xMins_in_match >= 60: # Expected CS BPS = Probability of CS * 12 BPS bps_cs = cs_odds * 12 bps_saves = scaled_saves * 2 bps_pen_saves = scaled_pen_saves * 15 bps_cards = (scaled_yc * -3) + (scaled_rc * -9) total_projected_bps = ( bps_floor + bps_mins + bps_goals + bps_assists + bps_cs + bps_saves + bps_pen_saves + bps_cards ) pts_bonus = 0.0 if not is_gk: # Your specific formula: Total BPS / 29.4 pts_bonus = total_projected_bps / 29.4 # --- FINAL SUM --- total_match_pts = ( pts_goals + pts_assists + pts_cs + pts_conc + pts_yc + pts_rc + pts_saves + pts_pen_save + pts_cbit + pts_cbitr + pts_penalty + pts_app + pts_bonus ) return total_match_pts # @st.cache_data(show_spinner=False, hash_funcs={pd.DataFrame: get_df_hash}) def calculate_all_points( player_df_base, match_df, player_penalty_shares, MINS_SCALING_BONUS, pos_map, teams_dict_1, teams_dict, points_config, effective_xmins_overrides, # Renamed parameter to reflect it's the merged overrides MINS_THRESHOLD, # Pass decay related parameters RAMP_UP_PERIOD, # Pass decay related parameters decay_rates, # Pass decay related parameters ramp_up_rates, # Pass decay related parameters user_player_status_overrides, # Pass player status overrides team_skepticism, effective_availability_multipliers, ): RAMP_UP_PERIOD = 3 player_df = player_df_base.copy() # Work on a copy of the base player_df # Initialize the output DataFrame structure final_df_output = pd.DataFrame( { "Pos": player_df["element_type"].map(pos_map), "ID": player_df["id"], "Name": player_df["web_name"], "BV": player_df["now_cost"], "SV": player_df["now_cost"], "Team": player_df["Team"], } ) # Persistence Setup continuous_xMins_progression = player_df["baseline_xMins"].copy() has_baseline_xmins_override = getattr(player_df, "attrs", {}).get( "has_baseline_xmins_override", False ) all_baseline_overrides = getattr(player_df, "attrs", {}).get( "all_baseline_overrides", {} ) unique_gws = sorted(match_df["GW"].unique()) st.session_state.group_cache = {} # --- MAIN GAMEWEEK LOOP --- for gw_idx, gw in enumerate(unique_gws): # 1. Apply Baseline Overrides (First GW only) if has_baseline_xmins_override and gw == 1: for index, player in player_df.iterrows(): player_id = player["id"] if ( player_id in all_baseline_overrides and "baseline_xMins" in all_baseline_overrides[player_id] ): continuous_xMins_progression.loc[index] = all_baseline_overrides[ player_id ]["baseline_xMins"] # 2. Setup GW DataFrame gw_calc_df = pd.DataFrame(index=player_df.index) gw_calc_df["team"] = player_df["team"] gw_calc_df["id"] = player_df["id"] gw_calc_df["web_name"] = player_df["web_name"] gw_calc_df["player_name"] = player_df["name"] gw_calc_df["xG_share"] = player_df["xG_share"] gw_calc_df["xA_share"] = player_df["xA_share"] gw_calc_df["baseline_xMins"] = player_df["baseline_xMins"] gw_calc_df["baseline_bps_floor_p90"] = player_df["baseline_bps_floor_p90"] gw_calc_df["base_pts"] = 0.0 xMins_for_current_gw_display = pd.Series(index=player_df.index, dtype=float) next_gw_continuous_xMins = pd.Series(index=player_df.index, dtype=float) # ============================================================ # VECTORIZED XMINS CALCULATION (UNCHANGED) # ============================================================ player_ids_array = player_df["id"].values n_players = len(player_ids_array) status_list = [ user_player_status_overrides.get(pid, {"status": "default"})["status"] for pid in player_ids_array ] weeks_out_list = [ user_player_status_overrides.get(pid, {}).get("weeks_out", 0) for pid in player_ids_array ] status_array = np.array(status_list, dtype=object) weeks_out_array = np.array(weeks_out_list) is_not_starter = status_array == "not_a_starter" is_suspended = status_array == "suspended" is_injured = status_array == "injured" is_default = ~(is_not_starter | is_suspended | is_injured) baseline_mins_array = player_df["baseline_xMins"].values prev_continuous_xmins_array = continuous_xMins_progression.values calculated_xmins_array = np.zeros(n_players, dtype=float) next_continuous_xmins_array = np.zeros(n_players, dtype=float) first_gw = min(unique_gws) is_first_gw = gw == first_gw is_available_first_gw = ~(is_not_starter | is_suspended | is_injured) # CASE 1: First GW + Available if is_first_gw: mask_first_available = is_available_first_gw calculated_xmins_array[mask_first_available] = baseline_mins_array[ mask_first_available ] # CASE 2: Not a starter calculated_xmins_array[is_not_starter] = 0 # CASE 3: Suspended mask_suspended_during = is_suspended & (gw <= weeks_out_array) mask_suspended_return = is_suspended & (gw == weeks_out_array + 1) mask_suspended_after = is_suspended & (gw > weeks_out_array + 1) calculated_xmins_array[mask_suspended_during] = 0 calculated_xmins_array[mask_suspended_return] = baseline_mins_array[ mask_suspended_return ] decay_rate_susp = decay_rates.get("suspended", decay_rates.get("default", 0.99)) ramp_rate_susp = ramp_up_rates.get("suspended", ramp_up_rates.get("default", 0)) mask_susp_decay = mask_suspended_after & ( prev_continuous_xmins_array >= MINS_THRESHOLD ) mask_susp_ramp = mask_suspended_after & ( prev_continuous_xmins_array < MINS_THRESHOLD ) calculated_xmins_array[mask_susp_decay] = ( prev_continuous_xmins_array[mask_susp_decay] * decay_rate_susp ) calculated_xmins_array[mask_susp_ramp] = np.minimum( prev_continuous_xmins_array[mask_susp_ramp] + ramp_rate_susp, 90 ) # CASE 4: Injured mask_injured_out = is_injured & (gw <= weeks_out_array) calculated_xmins_array[mask_injured_out] = 0 mask_injured_recovering = is_injured & (gw > weeks_out_array) weeks_since_injury_array = np.maximum(0, gw - weeks_out_array) mask_ramp_phase = mask_injured_recovering & ( weeks_since_injury_array <= RAMP_UP_PERIOD ) calculated_xmins_array[mask_ramp_phase] = ( baseline_mins_array[mask_ramp_phase] / RAMP_UP_PERIOD ) * weeks_since_injury_array[mask_ramp_phase] mask_post_ramp = mask_injured_recovering & ( weeks_since_injury_array > RAMP_UP_PERIOD ) decay_rate_default = decay_rates.get("default", 0.99) ramp_rate_default = ramp_up_rates.get( "default", ramp_up_rates.get("injured", 0) ) mask_post_decay = mask_post_ramp & ( prev_continuous_xmins_array >= MINS_THRESHOLD ) mask_post_ramp_up = mask_post_ramp & ( prev_continuous_xmins_array < MINS_THRESHOLD ) calculated_xmins_array[mask_post_decay] = ( prev_continuous_xmins_array[mask_post_decay] * decay_rate_default ) calculated_xmins_array[mask_post_ramp_up] = np.minimum( prev_continuous_xmins_array[mask_post_ramp_up] + ramp_rate_default, 90 ) # CASE 5: Default/healthy mask_default_calc = is_default & ~(is_first_gw & is_available_first_gw) element_type_array = player_df["element_type"].values is_gk = element_type_array == 1 mask_gk_default = mask_default_calc & is_gk calculated_xmins_array[mask_gk_default] = prev_continuous_xmins_array[ mask_gk_default ] mask_outfield_default = mask_default_calc & (~is_gk) mask_outf_decay = mask_outfield_default & ( prev_continuous_xmins_array >= MINS_THRESHOLD ) calculated_xmins_array[mask_outf_decay] = ( prev_continuous_xmins_array[mask_outf_decay] * decay_rate_default ) mask_outf_ramp = ( mask_outfield_default & (prev_continuous_xmins_array < MINS_THRESHOLD) & (baseline_mins_array > 0) ) calculated_xmins_array[mask_outf_ramp] = np.minimum( prev_continuous_xmins_array[mask_outf_ramp] + ramp_rate_default, 90 ) calculated_xmins_array = np.clip(calculated_xmins_array, 0, 90) next_continuous_xmins_array = calculated_xmins_array.copy() # ============================================== # APPLY OVERRIDES AND AVAILABILITY # ============================================== xMins_for_current_gw_display = calculated_xmins_array.copy() for idx in range(n_players): player_id = player_ids_array[idx] # Apply Availability availability_mult = 1.0 if player_id in effective_availability_multipliers: if gw in effective_availability_multipliers[player_id]: availability_mult = effective_availability_multipliers[player_id][ gw ] xMins_for_current_gw_display[idx] *= availability_mult # Apply Manual Overrides if player_id in effective_xmins_overrides: if gw in effective_xmins_overrides[player_id]: xMins_for_current_gw_display[idx] = effective_xmins_overrides[ player_id ][gw] xMins_for_current_gw_display = pd.Series( xMins_for_current_gw_display, index=player_df.index ) next_gw_continuous_xMins = pd.Series( next_continuous_xmins_array, index=player_df.index ) # Assign calculated xMins to DataFrame for the current GW gw_calc_df[f"{gw}_xMins"] = xMins_for_current_gw_display # ============================================================ # STREAMLINED MATCH SCORING LOOP (NEW FEATURE) # ============================================================ gw_matches = match_df[match_df["GW"] == gw] for index, player in player_df.iterrows(): player_team_num = player["team"] # Find matches for this player in this GW my_matches = gw_matches[ (gw_matches["home_team_num"] == player_team_num) | (gw_matches["away_team_num"] == player_team_num) ] # If Blank GW, score is 0 if my_matches.empty: gw_calc_df.loc[index, "base_pts"] = 0 gw_calc_df.loc[index, f"{gw}_xMins"] = 0 continue # --- DGW FATIGUE LOGIC --- # Get the base xMins calculated for this week (e.g. 90) base_gw_mins = gw_calc_df.loc[index, f"{gw}_xMins"] mins_per_match = base_gw_mins # Default for Single GW # If DGW (match count > 1) AND player is expected to play significant mins: # We apply a 0.95 factor per match to simulate rotation/fatigue. # E.g. 90 mins becomes 85.5 mins PER MATCH (171 total), effectively capping 180. if len(my_matches) > 1 and base_gw_mins > 35: mins_per_match = base_gw_mins * 0.97 # Sum points for all matches in the gameweek total_gw_pts = 0.0 for _, match_row in my_matches.iterrows(): # Call the Helper Function pts = calculate_single_match_points( player=player, match_row=match_row, xMins_in_match=mins_per_match, points_config=points_config, is_gk=(player["element_type"] == 1), is_def=(player["element_type"] == 2), is_mid=(player["element_type"] == 3), is_fwd=(player["element_type"] == 4), ) total_gw_pts += pts gw_calc_df.loc[index, "base_pts"] = total_gw_pts # Apply Team Skepticism gw_calc_df = apply_team_skepticism(gw_calc_df, team_skepticism) # Final Total Points for this GW gw_calc_df["total_pts"] = gw_calc_df["base_pts"] # Store in Final Output final_df_output[f"{gw}_xMins"] = round(gw_calc_df[f"{gw}_xMins"], 0) final_df_output[f"{gw}_Pts"] = round(gw_calc_df["total_pts"], 2) # Update progression for next loop continuous_xMins_progression = next_gw_continuous_xMins.copy() # Calculate Totals and Averages final_df_output["Total Points"] = final_df_output.filter(like="_Pts").sum(axis=1) final_df_output["Average Points"] = round( (final_df_output.filter(like="_Pts").sum(axis=1)) / len(unique_gws), 2 ) return final_df_output def get_modified_finalized_df(): """Function to apply baseline overrides to the finalized_df""" if st.session_state.finalized_df is None: return None modified_df = st.session_state.finalized_df.copy() direct_override_stats = [ "baseline_xMins", "baseline_pksave_p90", "Avg_BPS", ] # Avg_BPS is also a direct override all_overrides = st.session_state.user_baseline_overrides.copy() has_baseline_xmins_override = any( "baseline_xMins" in overrides for overrides in all_overrides.values() ) for player_id, overrides in st.session_state.user_baseline_overrides.items(): mask = modified_df["id"] == player_id if mask.any(): for stat_name, override_value in overrides.items(): if stat_name in modified_df.columns: if stat_name in direct_override_stats: # Apply direct value override modified_df.loc[mask, stat_name] = override_value else: # Apply multiplier override original_value = st.session_state.finalized_df[ st.session_state.finalized_df["id"] == player_id ].iloc[0][stat_name] modified_df.loc[mask, stat_name] = ( original_value * override_value ) # After applying direct and multiplier overrides for *all* baseline stats, # now recalculate Avg_BPS for players where it hasn't been directly overridden. # The 'modified_df' now contains the adjusted baseline stats for other metrics # which will feed into this Avg_BPS calculation. for index, row in modified_df.iterrows(): player_id = row["id"] # Check if Avg_BPS was directly overridden by the admin. # If it was, we DO NOT recalculate it; we use the overridden value already applied. if ( player_id in st.session_state.user_baseline_overrides and "Avg_BPS" in st.session_state.user_baseline_overrides[player_id] ): # The direct override for Avg_BPS would have already been applied earlier in this function. # So, we simply skip recalculation for this player. continue # If Avg_BPS was NOT directly overridden, then calculate it based on # the (potentially adjusted by multipliers) other baseline stats. if row["element_type"] == 1: modified_df.loc[index, "Avg_BPS"] = row["baseline_gk_bps_p90"] elif row["element_type"] == 2: modified_df.loc[index, "Avg_BPS"] = ( row["baseline_Neutral_BPS_p90"] + row["baseline_Def_BPS_p90"] ) elif row["element_type"] == 3: modified_df.loc[index, "Avg_BPS"] = ( row["baseline_Neutral_BPS_p90"] + row["baseline_Mid_BPS_p90"] ) elif row["element_type"] == 4: modified_df.loc[index, "Avg_BPS"] = ( row["baseline_Neutral_BPS_p90"] + row["baseline_Fwd_BPS_p90"] ) modified_df.attrs["has_baseline_xmins_override"] = has_baseline_xmins_override modified_df.attrs["all_baseline_overrides"] = all_overrides return modified_df # Function to recalculate with current overrides def recalculate_projections(): """Recalculate projections with current overrides""" # Get the dataframe with the applied baseline overrides if st.session_state.finalized_df is None: return modified_finalized_df = get_modified_finalized_df() # Recalculate player shares using the modified_finalized_df # This is crucial for non-xMins baseline updates to take effect modified_finalized_df = recalculate_player_shares( modified_finalized_df, st.session_state.team_baselines ) # Determine the effective xMins overrides to pass to calculate_all_points # Admin-persistent overrides are the base, and session-only overrides are layered on top. effective_xmins_overrides = { pid: gw_data.copy() for pid, gw_data in st.session_state.admin_persistent_xmins_overrides.items() } # Now, overlay user's session-only overrides (these are temporary and override if present) for player_id, gw_data in st.session_state.user_xmins_overrides.items(): if player_id not in effective_xmins_overrides: effective_xmins_overrides[player_id] = {} effective_xmins_overrides[player_id].update(gw_data) effective_availability_multipliers = { pid: gw_data.copy() for pid, gw_data in st.session_state.admin_persistent_availability_multipliers.items() } # Now, overlay user's session-only availability multipliers for player_id, gw_data in st.session_state.user_availability_multipliers.items(): if player_id not in effective_availability_multipliers: effective_availability_multipliers[player_id] = {} effective_availability_multipliers[player_id].update(gw_data) st.session_state.output_df = calculate_all_points( modified_finalized_df, # Pass the *fully modified* DataFrame st.session_state.match_df, st.session_state.player_penalty_shares, st.session_state.MINS_SCALING_BONUS, st.session_state.pos_map, st.session_state.teams_dict_1, st.session_state.teams_dict, st.session_state.points_config, effective_xmins_overrides, # Pass the effective dictionary st.session_state.MINS_THRESHOLD, st.session_state.RAMP_UP_PERIOD, st.session_state.decay_rates, st.session_state.ramp_up_rates, st.session_state.user_player_status_overrides, st.session_state.team_skepticism, effective_availability_multipliers, ) def apply_loaded_baseline_overrides_on_startup(): """ Initializes bps_std_devs from the base finalized_df. It does NOT apply baseline overrides to finalized_df directly here; that is handled by get_modified_finalized_df(). """ if st.session_state.finalized_df is not None: # Recalculate bps_std_devs based on the *pristine* finalized_df # This is important for initial display before any user overrides are applied. st.session_state.bps_std_devs = ( st.session_state.finalized_df.groupby("element_type")["Avg_BPS"] .std() .to_dict() ) def update_sync_value_from_slider(sync_key, slider_key): """Updates a session state variable with the value from a slider.""" st.session_state[sync_key] = st.session_state[slider_key] def update_sync_value_from_number_input(sync_key, num_input_key): """Updates a session state variable with the value from a number input.""" st.session_state[sync_key] = st.session_state[num_input_key] # --- Streamlit Application Layout and Logic --- im = Image.open("image.png") st.set_page_config(layout="wide", page_title="Luigi's Mansion", page_icon=im) st.title("Luigi's Mansion") st.markdown( "Yes you can play around with xMins here (I made it more for my convenience rather than yours but nonetheless, enjoy!)" ) st.markdown("---") # Load any existing admin overrides (baselines, status, penalties, global rates) # user_xmins_overrides is NOT loaded here; it is session-only. load_admin_overrides() # Initial call to setup and calculate data (only runs once or on explicit rerun) if not st.session_state.initialized: # Use a spinner to indicate loading with st.spinner( "Preparing data and running initial projections... This may take a while..." ): initial_finalized_df = load_data_and_setup_initial_df() if initial_finalized_df is None: # Handle case where initial data load failed st.stop() # Stop execution if data isn't loaded correctly # Load team_baselines here once teams_dict = TEAMS_DICT # Correctly load team_baselines from file try: st.session_state.team_baselines = pd.read_excel( "team_totals.xlsx", sheet_name="Sheet2" ) st.session_state.team_baselines["Teams"] = st.session_state.team_baselines[ "Teams" ].replace(teams_dict) except FileNotFoundError: st.error( "Error: team_totals.xlsx not found. Please ensure it's in the same directory as the app." ) st.stop() except Exception as e: st.error(f"Error loading team_totals.xlsx: {e}") st.stop() finalized_df, match_df, teams_dict_updated, teams_dict_1_updated = ( load_data_and_setup_initial_df_2(initial_finalized_df) ) if ( finalized_df is None ): # match_df, teams_dict, teams_dict_1 would also be None if load_data_and_setup_initial_df_2 failed st.stop() # Stop execution if data isn't loaded correctly # MINS_SCALING_BONUS is explicitly 0.0 at initialization. st.session_state.MINS_SCALING_BONUS = 0.0 # Ensure bps_std_devs is calculated based on the loaded data st.session_state.pos_map = {1: "G", 2: "D", 3: "M", 4: "F"} st.session_state.teams_dict_1 = teams_dict_1_updated st.session_state.teams_dict = teams_dict_updated # FIX: Adjusted points_config to remove CBIT/CBITR related multipliers, as they are now tiered. st.session_state.points_config = { "goal": {1: 10, 2: 6, 3: 5, 4: 4}, "assist": 3, "clean_sheet": {1: 4, 2: 4, 3: 1, 4: 0}, "saves_per_3": 1, "penalty_points_per_position": {2: 0.9, 3: 0.7, 4: 0.5}, } # Store the base dataframes st.session_state.match_df = match_df st.session_state.finalized_df = finalized_df apply_loaded_baseline_overrides_on_startup() # Applies baseline overrides to st.session_state.finalized_df # Perform the initial full calculation by calling the recalculation function # This ensures all loaded overrides (baselines, status, penalty shares, rates, # and admin_persistent_xmins_overrides) are properly incorporated into the output_df. recalculate_projections() st.session_state.initialized = True # Initialize selected player state variables after data is loaded # We need to map the ID to the unique display name for initial selection if ( st.session_state.output_df is not None and not st.session_state.output_df.empty ): # Create a dictionary to map player ID to its unique display string player_id_to_display_name_map = { row["id"]: f"{row['web_name']} (ID: {row['id']})" for _, row in st.session_state.finalized_df[ ["id", "web_name"] ].iterrows() } player_display_names = list( player_id_to_display_name_map.values() ) # Moved here for consistency # Set initial selected players using their unique display names if st.session_state.selected_xm_player: # Find player ID by name in output_df (assuming Name column corresponds to web_name) # and then get the display name from the map to ensure consistency. player_id_from_name = st.session_state.finalized_df[ st.session_state.finalized_df["web_name"] == st.session_state.selected_xm_player.split(" (ID:")[ 0 ] # Extract web_name ]["id"].iloc[0] st.session_state.selected_xm_player = player_id_to_display_name_map.get( player_id_from_name ) elif player_display_names: # Check if list is not empty before accessing st.session_state.selected_xm_player = player_display_names[0] if st.session_state.selected_status_player: player_id_from_name = st.session_state.finalized_df[ st.session_state.finalized_df["web_name"] == st.session_state.selected_status_player.split(" (ID:")[0] ]["id"].iloc[0] st.session_state.selected_status_player = ( player_id_to_display_name_map.get(player_id_from_name) ) elif player_display_names: # Check if list is not empty before accessing st.session_state.selected_status_player = player_display_names[0] if st.session_state.selected_baseline_player: player_id_from_name = st.session_state.finalized_df[ st.session_state.finalized_df["web_name"] == st.session_state.selected_baseline_player.split(" (ID:")[0] ]["id"].iloc[0] st.session_state.selected_baseline_player = ( player_id_to_display_name_map.get(player_id_from_name) ) elif player_display_names: # Check if list is not empty before accessing st.session_state.selected_baseline_player = player_display_names[0] if st.session_state.selected_penalty_player: player_id_from_name = st.session_state.finalized_df[ st.session_state.finalized_df["web_name"] == st.session_state.selected_penalty_player.split(" (ID:")[0] ]["id"].iloc[0] st.session_state.selected_penalty_player = ( player_id_to_display_name_map.get(player_id_from_name) ) elif player_display_names: # Check if list is not empty before accessing st.session_state.selected_penalty_player = player_display_names[0] st.success( "Data loaded and initial calculations complete! Use the sidebar to make adjustments." ) # --- SIDEBAR EDITING FUNCTIONALITY --- with st.sidebar: st.header("Adjust Player Stats") # Admin Login Section st.subheader("Admin Login") admin_password_input = st.text_input( "Enter Admin Password", type="password", key="admin_password_input" ) if admin_password_input == ADMIN_PASSWORD: st.session_state.is_admin_logged_in = True st.success("Admin logged in!") elif admin_password_input != "" and admin_password_input != ADMIN_PASSWORD: st.session_state.is_admin_logged_in = False st.error("Incorrect password.") if st.session_state.is_admin_logged_in: if st.button("Log Out Admin", key="admin_logout_button"): st.session_state.is_admin_logged_in = False st.rerun() st.divider() # Create a mapping for player names and IDs for display # This ensures unique selection even for players with the same web_name player_id_to_display_name_map = {} player_display_name_to_id_map = {} # Initialize the reverse map player_display_names = [] # Ensure finalized_df exists before attempting to create the map and list if st.session_state.finalized_df is not None: player_id_to_display_name_map = { row["id"]: f"{row['web_name']} (ID: {row['id']})" for _, row in st.session_state.finalized_df[["id", "web_name"]].iterrows() } player_display_names = list(player_id_to_display_name_map.values()) player_display_name_to_id_map = { # Populate the reverse map v: k for k, v in player_id_to_display_name_map.items() } # Minutes Editor Section (Always visible for all users) st.subheader("Adjust Expected Minutes (Weekly Override)") # Ensure output_df is not None before trying to access its columns gw_cols = [] if st.session_state.output_df is not None: gw_cols = [ c for c in st.session_state.output_df.columns if c.endswith("_xMins") ] gw_choices = sorted( [int(col.split("_")[0]) for col in gw_cols] ) # Ensure sorted integers # Determine initial index for selected_xm_player initial_xm_index = 0 if ( st.session_state.selected_xm_player and st.session_state.selected_xm_player in player_display_names ): initial_xm_index = player_display_names.index( st.session_state.selected_xm_player ) elif player_display_names: # Check if list is not empty before accessing st.session_state.selected_xm_player = player_display_names[0] initial_xm_index = 0 player_selected_display = st.selectbox( "Select Player for xMins Override", player_display_names, key="xm_sidebar_player", index=initial_xm_index, on_change=lambda: setattr( st.session_state, "selected_xm_player", st.session_state.xm_sidebar_player ), ) # Get the actual player ID from the selected display name player_id = player_display_name_to_id_map.get(player_selected_display) gw_selected = st.selectbox( "Select Gameweek for xMins Override", gw_choices, key="xm_sidebar_gw" ) if ( player_id is not None and gw_selected and st.session_state.output_df is not None ): # Use player_id for lookup and check output_df gw = int(gw_selected) # Get the current projected xMins for this player and GW (which includes baseline and decay) current_projected_xmins_col = f"{gw}_xMins" # Use player_id for lookup in output_df current_projected_xmins = st.session_state.output_df[ st.session_state.output_df["ID"] == player_id ][current_projected_xmins_col].iloc[0] if st.session_state.is_admin_logged_in: st.info( "Your changes here will be **saved** across sessions because you are logged in as admin." ) # Use the admin persistent override if it exists, otherwise use the current projected value as default default_val = st.session_state.admin_persistent_xmins_overrides.get( player_id, {} ).get(gw, current_projected_xmins) new_xmins = st.number_input( "Expected Minutes", 0.0, # min_value 90.0, # max_value float(default_val), # value step=1.0, # Changed step to 0.01 format="%.2f", key=f"xmins_sidebar_slider_{player_id}_{gw}", ) col1, col2 = st.columns([1, 1]) if col1.button( "Update xMins Override", key=f"update_xmins_sidebar_{player_id}_{gw}" ): st.session_state.admin_persistent_xmins_overrides.setdefault( player_id, {} )[gw] = float(new_xmins) with st.spinner("Recalculating..."): recalculate_projections() save_admin_overrides() # Auto-save for admin st.success("Minutes override updated and saved!") st.info( "Please wait 2-3 seconds for the data to fully update before making further changes." ) st.rerun() if col2.button( "Reset xMins Override", key=f"reset_xmins_sidebar_{player_id}_{gw}" ): if player_id in st.session_state.admin_persistent_xmins_overrides: st.session_state.admin_persistent_xmins_overrides[player_id].pop( gw, None ) if not st.session_state.admin_persistent_xmins_overrides[player_id]: del st.session_state.admin_persistent_xmins_overrides[player_id] with st.spinner("Recalculating..."): recalculate_projections() save_admin_overrides() # Auto-save for admin st.success("xMins override reset to default and saved!") st.info( "Please wait 2-3 seconds for the data to fully update before making further changes." ) st.rerun() st.markdown("---") st.subheader("🔗 Connected Groups (Redistribution)") if ( st.session_state.finalized_df is None or st.session_state.finalized_df.empty ): st.warning( "⚠️ Data not loaded. Please click 'Update Projections' first." ) else: df_source = st.session_state.finalized_df.copy() # Work on a copy # 1. Find Name Column name_col = None for c in ["Name", "web_name", "player_name", "name", "Player"]: if c in df_source.columns: name_col = c break # 2. Find ID Column (Crucial for differentiation) id_col = "id" if "id" in df_source.columns else None if not name_col or not id_col: st.error( f"❌ Missing Name or ID column. Cols: {list(df_source.columns)}" ) else: # 3. Create Unique Display Names: "Name (ID)" # This handles duplicate names like 'Wilson' df_source["unique_display"] = ( df_source[name_col].astype(str) + " (" + df_source[id_col].astype(str) + ")" ) all_player_options = sorted( df_source["unique_display"].unique().tolist() ) # 4. Group Selector existing_groups = list(st.session_state.player_groups.keys()) group_action = st.radio( "Action", ["Edit Existing", "Create New"], horizontal=True, key="grp_action_unique", ) selected_group_name = None if group_action == "Create New": selected_group_name = st.text_input("Enter New Group Name") elif existing_groups: selected_group_name = st.selectbox( "Select Group to Edit", existing_groups ) # 5. Member Selector if selected_group_name: current_members = st.session_state.player_groups.get( selected_group_name, [] ) updated_members = st.multiselect( f"Members of '{selected_group_name}'", options=all_player_options, default=[ p for p in current_members if p in all_player_options ], ) c1, c2 = st.columns(2) if c1.button("💾 Save Group"): st.session_state.player_groups[selected_group_name] = ( updated_members ) save_player_groups(st.session_state.player_groups) st.success(f"Saved group '{selected_group_name}'") if c2.button("❌ Delete Group"): if selected_group_name in st.session_state.player_groups: del st.session_state.player_groups[selected_group_name] save_player_groups(st.session_state.player_groups) st.rerun() else: # Not admin logged in st.info( "Your changes here are only for your current session and will NOT be saved." ) st.info( "Remember to press the 'Update Button' once you have entered the desired value." ) # Use the user session-only override if it exists, otherwise use the current projected value as default default_val = st.session_state.user_xmins_overrides.get(player_id, {}).get( gw, current_projected_xmins ) new_xmins = st.number_input( "Expected Minutes", 0.0, # min_value 90.0, # max_value float(default_val), # value step=1.0, # Changed step to 0.01 format="%.2f", key=f"xmins_sidebar_slider_{player_id}_{gw}", ) col1, col2 = st.columns([1, 1]) if col1.button( "Update xMins Override", key=f"update_xmins_sidebar_{player_id}_{gw}" ): st.session_state.user_xmins_overrides.setdefault(player_id, {})[gw] = ( float(new_xmins) ) with st.spinner("Recalculating..."): recalculate_projections() st.success("Minutes override updated for this session!") st.info( "Please wait 2-3 seconds for the data to fully update before making further changes." ) st.rerun() if col2.button( "Reset xMins Override", key=f"reset_xmins_sidebar_{player_id}_{gw}" ): if player_id in st.session_state.user_xmins_overrides: st.session_state.user_xmins_overrides[player_id].pop(gw, None) with st.spinner("Recalculating..."): recalculate_projections() st.success("xMins override reset to default for this session!") st.info( "Please wait 2-3 seconds for the data to fully update before making further changes." ) st.rerun() st.divider() # --- Non-Admin Baseline xMins Adjustment --- if not st.session_state.is_admin_logged_in: st.subheader("Adjust Baseline Expected Minutes (Temporary)") st.info( "Your changes here are only for your current session and will NOT be saved." ) st.info( "Remember to press the 'Update Button' once you have entered the desired value." ) # Determine initial index for selected_baseline_player initial_baseline_index = 0 if ( st.session_state.selected_baseline_player and st.session_state.selected_baseline_player in player_display_names ): initial_baseline_index = player_display_names.index( st.session_state.selected_baseline_player ) elif player_display_names: st.session_state.selected_baseline_player = player_display_names[0] initial_baseline_index = 0 baseline_player_selected_display_non_admin = st.selectbox( "Select Player for Baseline xMins Editing", options=player_display_names, key="baseline_sidebar_player_non_admin", index=initial_baseline_index, on_change=lambda: setattr( st.session_state, "selected_baseline_player", st.session_state.baseline_sidebar_player_non_admin, ), ) if ( baseline_player_selected_display_non_admin and st.session_state.finalized_df is not None ): player_id_non_admin = player_display_name_to_id_map.get( baseline_player_selected_display_non_admin ) # Only allow editing of baseline_xMins for non-admins selected_stat_non_admin = "baseline_xMins" # Get current value (either overridden or original) if ( player_id_non_admin in st.session_state.user_baseline_overrides and selected_stat_non_admin in st.session_state.user_baseline_overrides[player_id_non_admin] ): current_val_non_admin = st.session_state.user_baseline_overrides[ player_id_non_admin ][selected_stat_non_admin] else: player_row_non_admin = st.session_state.finalized_df[ st.session_state.finalized_df["id"] == player_id_non_admin ].iloc[0] current_val_non_admin = player_row_non_admin[selected_stat_non_admin] new_stat_value_non_admin = st.number_input( "Expected Baseline Minutes", min_value=0.0, # min_value max_value=90.0, # max_value value=float(current_val_non_admin), # Ensure float step=1.0, # Changed step to 0.01, format="%.2f", key=f"baseline_slider_non_admin_{player_id_non_admin}", ) col1_non_admin, col2_non_admin = st.columns([1, 1]) if col1_non_admin.button( "Update Baseline xMins", key=f"update_baseline_non_admin_{player_id_non_admin}", ): if player_id_non_admin not in st.session_state.user_baseline_overrides: st.session_state.user_baseline_overrides[player_id_non_admin] = {} st.session_state.user_baseline_overrides[player_id_non_admin][ selected_stat_non_admin ] = new_stat_value_non_admin with st.spinner("Recalculating..."): recalculate_projections() st.success("Baseline xMins updated for this session!") st.rerun() if col2_non_admin.button( "Reset Baseline xMins", key=f"reset_baseline_non_admin_{player_id_non_admin}", ): if ( player_id_non_admin in st.session_state.user_baseline_overrides and selected_stat_non_admin in st.session_state.user_baseline_overrides[player_id_non_admin] ): del st.session_state.user_baseline_overrides[player_id_non_admin][ selected_stat_non_admin ] if not st.session_state.user_baseline_overrides[ player_id_non_admin ]: del st.session_state.user_baseline_overrides[ player_id_non_admin ] with st.spinner("Recalculating..."): recalculate_projections() st.success("Baseline xMins reset to default for this session!") st.rerun() st.divider() if not st.session_state.is_admin_logged_in: st.subheader("Adjust Availability (Temporary)") st.info( "Your changes here are only for your current session and will NOT be saved." ) st.info( "Remember to press the 'Update Button' once you have entered the desired value." ) # Determine initial index for selected_availability_player_non_admin initial_availability_index_non_admin = 0 if ( st.session_state.selected_availability_player_non_admin and st.session_state.selected_availability_player_non_admin in player_display_names ): initial_availability_index_non_admin = player_display_names.index( st.session_state.selected_availability_player_non_admin ) elif player_display_names: st.session_state.selected_availability_player_non_admin = ( player_display_names[0] ) initial_availability_index_non_admin = 0 availability_player_selected_display_non_admin = st.selectbox( "Select Player to edit Availability for", options=player_display_names, key="availability_sidebar_player_non_admin", index=initial_availability_index_non_admin, on_change=lambda: setattr( st.session_state, "selected_availability_player_non_admin", st.session_state.availability_sidebar_player_non_admin, ), ) if availability_player_selected_display_non_admin: player_id = player_display_name_to_id_map.get( availability_player_selected_display_non_admin ) gw_selected = st.selectbox( "Select the gameweek to edit Availability for", gw_choices, key="availability_sidebar_gw_non_admin", ) if player_id is not None and gw_selected: gw = int(gw_selected) admin_multiplier = None if ( player_id in st.session_state.admin_persistent_availability_multipliers and gw in st.session_state.admin_persistent_availability_multipliers[ player_id ] ): admin_multiplier = ( st.session_state.admin_persistent_availability_multipliers[ player_id ][gw] ) # Get current multiplier or default to 1.0 if admin_multiplier is not None: current_multiplier = admin_multiplier else: current_multiplier = ( st.session_state.user_availability_multipliers.get( player_id, {} ).get(gw, 1.0) ) st.info("Availability is set as a multiplier, ranging from 0-1") new_multiplier = st.slider( "Availability", min_value=0.0, max_value=1.0, step=0.01, value=float(current_multiplier), key=f"availability_slider_non_admin_{player_id}_{gw}", ) st.info(f"Availability for GW{gw} set at: {(current_multiplier):.2f}") col1, col2 = st.columns([1, 1]) if col1.button( "Update Availability", key=f"update_availability_non_admin_{player_id}_{gw}", ): st.session_state.user_availability_multipliers.setdefault( player_id, {} )[gw] = new_multiplier with st.spinner("Recalculating..."): recalculate_projections() st.success("Availability multiplier updated for this session!") st.rerun() if col2.button( "Reset Availability", key=f"reset_availability_non_admin_{player_id}_{gw}", ): if player_id in st.session_state.user_availability_multipliers: st.session_state.user_availability_multipliers[player_id].pop( gw, None ) with st.spinner("Recalculating..."): recalculate_projections() st.success( "Availability multiplier reset to default for this session!" ) st.rerun() st.divider() # Admin-Only Sections (Conditional Rendering) if st.session_state.is_admin_logged_in: # Removed the top-level "Save Admin Changes" button. # Saves will now be automatic with each update/reset. # Adjust Player Status Section st.subheader("Adjust Player Status (Admin)") # Determine initial index for selected_status_player initial_status_index = 0 if ( st.session_state.selected_status_player and st.session_state.selected_status_player in player_display_names ): initial_status_index = player_display_names.index( st.session_state.selected_status_player ) elif ( player_display_names ): # Added check to ensure player_display_names is not empty st.session_state.selected_status_player = player_display_names[0] initial_status_index = 0 status_player_selected_display = st.selectbox( "Select Player to Adjust Status", options=player_display_names, key="status_sidebar_player", index=initial_status_index, on_change=lambda: setattr( st.session_state, "selected_status_player", st.session_state.status_sidebar_player, ), ) if status_player_selected_display: # Use player_id from map player_id = player_display_name_to_id_map.get( status_player_selected_display ) # Get current status info for the player current_status_info = st.session_state.user_player_status_overrides.get( player_id, {"status": "default"} ) current_status_type = current_status_info.get("status", "default") current_weeks_out = current_status_info.get("weeks_out", 0) status_options = [ "default", "not_a_starter", "suspended", "injured", "rotational_risk", ] new_status_type = st.selectbox( "Select Status", options=status_options, index=status_options.index(current_status_type), key=f"status_selector_{player_id}", ) new_weeks_out = current_weeks_out if new_status_type in ["suspended", "injured"]: new_weeks_out = st.number_input( "Weeks Out (Player returns after this GW)", min_value=0, value=int(current_weeks_out), step=1, key=f"weeks_out_input_{player_id}", ) col1, col2 = st.columns([1, 1]) if col1.button("Update Status", key=f"update_status_{player_id}"): updated_status_info = {"status": new_status_type} if new_status_type in ["suspended", "injured"]: updated_status_info["weeks_out"] = int(new_weeks_out) st.session_state.user_player_status_overrides[player_id] = ( updated_status_info ) with st.spinner("Recalculating..."): recalculate_projections() save_admin_overrides() # Automatic save st.success( f"Status for {status_player_selected_display} updated and saved!" ) st.rerun() if col2.button("Reset Status", key=f"reset_status_{player_id}"): if player_id in st.session_state.user_player_status_overrides: del st.session_state.user_player_status_overrides[player_id] with st.spinner("Recalculating..."): recalculate_projections() save_admin_overrides() # Automatic save st.success( f"Status for {status_player_selected_display} reset to default and saved!" ) st.rerun() st.divider() # Baseline Stats Editor Section (Existing) st.subheader("Adjust Baseline Stats (Admin)") initial_baseline_index = 0 if ( st.session_state.selected_baseline_player and st.session_state.selected_baseline_player in player_display_names ): initial_baseline_index = player_display_names.index( st.session_state.selected_baseline_player ) elif ( player_display_names ): # Added check to ensure player_display_names is not empty st.session_state.selected_baseline_player = player_display_names[0] initial_baseline_index = 0 baseline_player_selected_display = st.selectbox( "Select Player for Baseline Editing", options=player_display_names, key="baseline_sidebar_player", index=initial_baseline_index, on_change=lambda: setattr( st.session_state, "selected_baseline_player", st.session_state.baseline_sidebar_player, ), ) if ( baseline_player_selected_display and st.session_state.finalized_df is not None ): # Use player_id from map and check finalized_df player_id = player_display_name_to_id_map.get( baseline_player_selected_display ) # Define stats that are direct value overrides vs. multiplier overrides direct_override_stats = ["baseline_xMins", "baseline_pksave_p90", "Avg_BPS"] multiplier_override_stats = [ "baseline_xSaves_p90", "baseline_xA_p90", "baseline_yc_p90", "baseline_rc_p90", "baseline_xG_p90", "baseline_CBIT_p90", "baseline_CBITR_p90", ] # Filter to only stats that exist for this player player_row_current_display_values = st.session_state.output_df[ # Use output_df for *current displayed* values (incl. previous overrides) st.session_state.output_df["ID"] == player_id ].iloc[0] player_row_original_baselines = st.session_state.finalized_df[ # Use finalized_df for *original* baselines st.session_state.finalized_df["id"] == player_id ].iloc[0] all_editable_stats = direct_override_stats + multiplier_override_stats available_stats = [ stat for stat in all_editable_stats if stat in player_row_original_baselines.index ] selected_stat = st.selectbox( "Select Stat to Edit", available_stats, key="stat_selector" ) if selected_stat: # Define a unique session state key for this specific player's selected baseline stat sync_key_baseline_admin = ( f"sync_baseline_admin_{player_id}_{selected_stat}" ) # Determine the default value for the input widget based on whether it's a direct or multiplier stat if selected_stat in direct_override_stats: # For direct override stats, show the currently overridden value, or the original baseline default_input_value = st.session_state.user_baseline_overrides.get( player_id, {} ).get(selected_stat, player_row_original_baselines[selected_stat]) min_val, max_val, step_val = ( 0.0, 10.0, 0.1, ) # Default, will be overridden below if "xMins" in selected_stat: min_val, max_val, step_val = 0.00, 90.00, 1.0 elif "Avg_BPS" in selected_stat: min_val, max_val, step_val = 0.0, 100.0, 0.1 elif "pksave_p90" in selected_stat: min_val, max_val, step_val = 0.0, 5.0, 0.01 # Ensure value is float default_input_value = float(default_input_value) st.slider( f"Direct Value for {selected_stat} (Slider)", min_value=min_val, max_value=max_val, value=st.session_state.get( sync_key_baseline_admin, default_input_value ), # Reads from synchronized state step=step_val, format="%.2f" if step_val < 0.1 else "%g", key=f"slider_baseline_admin_{player_id}_{selected_stat}", on_change=update_sync_value_from_slider, args=( sync_key_baseline_admin, f"slider_baseline_admin_{player_id}_{selected_stat}", ), ) st.number_input( f"Direct Value for {selected_stat} (Input)", min_value=min_val, max_value=max_val, value=st.session_state.get( sync_key_baseline_admin, default_input_value ), # Reads from synchronized state step=step_val, format="%.2f" if step_val < 0.1 else "%g", key=f"num_input_baseline_admin_{player_id}_{selected_stat}", on_change=update_sync_value_from_number_input, args=( sync_key_baseline_admin, f"num_input_baseline_admin_{player_id}_{selected_stat}", ), ) else: # Multiplier override stats # For multiplier stats, show the currently set multiplier, or default to 1.0 default_input_value = st.session_state.user_baseline_overrides.get( player_id, {} ).get(selected_stat, 1.0) min_val, max_val, step_val = 0.0, 5.0, 0.01 # Ensure value is float default_input_value = float(default_input_value) st.write( f"Original value for {selected_stat}: {player_row_original_baselines[selected_stat]:.2f}" ) st.slider( f"Multiplier for {selected_stat} (Slider)", min_value=min_val, max_value=max_val, value=st.session_state.get( sync_key_baseline_admin, default_input_value ), # Reads from synchronized state step=step_val, format="%.2f", key=f"slider_baseline_admin_{player_id}_{selected_stat}", on_change=update_sync_value_from_slider, args=( sync_key_baseline_admin, f"slider_baseline_admin_{player_id}_{selected_stat}", ), ) st.number_input( f"Multiplier for {selected_stat} (Input)", min_value=min_val, max_value=max_val, value=st.session_state.get( sync_key_baseline_admin, default_input_value ), # Reads from synchronized state step=step_val, format="%.2f", key=f"num_input_baseline_admin_{player_id}_{selected_stat}", on_change=update_sync_value_from_number_input, args=( sync_key_baseline_admin, f"num_input_baseline_admin_{player_id}_{selected_stat}", ), ) # Show the effective value if a multiplier is applied current_multiplier = st.session_state.get( sync_key_baseline_admin, default_input_value ) effective_value = ( player_row_original_baselines[selected_stat] * current_multiplier ) st.info(f"Effective value: {effective_value:.2f}") col1, col2 = st.columns([1, 1]) if col1.button( "Update Stat", key=f"update_baseline_{player_id}_{selected_stat}" ): if player_id not in st.session_state.user_baseline_overrides: st.session_state.user_baseline_overrides[player_id] = {} st.session_state.user_baseline_overrides[player_id][ selected_stat ] = st.session_state[sync_key_baseline_admin] with st.spinner("Recalculating..."): recalculate_projections() save_admin_overrides() st.success("Baseline stat updated and saved!") # st.rerun() # REMOVED: Let Streamlit handle reruns naturally if col2.button( "Reset Stat", key=f"reset_baseline_{player_id}_{selected_stat}" ): if ( player_id in st.session_state.user_baseline_overrides and selected_stat in st.session_state.user_baseline_overrides[player_id] ): del st.session_state.user_baseline_overrides[player_id][ selected_stat ] if not st.session_state.user_baseline_overrides[player_id]: del st.session_state.user_baseline_overrides[player_id] with st.spinner("Recalculating..."): recalculate_projections() save_admin_overrides() st.success("Stat reset to default and saved!") # st.rerun() # REMOVED: Let Streamlit handle reruns naturally st.divider() # Adjust Player Penalty Share Section st.subheader("Adjust Player Penalty Share (Admin)") # Determine initial index for selected_penalty_player initial_penalty_index = 0 if ( st.session_state.selected_penalty_player and st.session_state.selected_penalty_player in player_display_names ): initial_penalty_index = player_display_names.index( st.session_state.selected_penalty_player ) elif ( player_display_names ): # Added check to ensure player_display_names is not empty st.session_state.selected_penalty_player = player_display_names[0] initial_penalty_index = 0 penalty_player_selected_display = st.selectbox( "Select Player for Penalty Share Editing", options=player_display_names, key="penalty_sidebar_player", index=initial_penalty_index, on_change=lambda: setattr( st.session_state, "selected_penalty_player", st.session_state.penalty_sidebar_player, ), ) if penalty_player_selected_display: # Use player_id from map player_id = player_display_name_to_id_map.get( penalty_player_selected_display ) # Get current penalty share or default to 0.0 current_penalty_share = st.session_state.player_penalty_shares.get( player_id, 0.0 ) new_penalty_share = st.slider( "Penalty Share (0.0 to 1.0)", min_value=0.0, max_value=1.0, value=float(current_penalty_share), step=0.01, key=f"penalty_slider_{player_id}", ) col1, col2 = st.columns([1, 1]) if col1.button("Update Penalty Share", key=f"update_penalty_{player_id}"): st.session_state.player_penalty_shares[player_id] = new_penalty_share with st.spinner("Recalculating..."): recalculate_projections() save_admin_overrides() # Automatic save st.success("Penalty share updated and saved!") st.rerun() if col2.button("Reset Penalty Share", key=f"reset_penalty_{player_id}"): if player_id in st.session_state.player_penalty_shares: del st.session_state.player_penalty_shares[player_id] with st.spinner("Recalculating..."): recalculate_projections() save_admin_overrides() # Automatic save st.success("Penalty share reset to default and saved!") st.rerun() st.divider() st.subheader("Adjust Availability Multiplier (Admin)") # Determine initial index for selected_availability_player initial_availability_index = 0 if ( st.session_state.selected_availability_player and st.session_state.selected_availability_player in player_display_names ): initial_availability_index = player_display_names.index( st.session_state.selected_availability_player ) elif player_display_names: st.session_state.selected_availability_player = player_display_names[0] initial_availability_index = 0 availability_player_selected_display = st.selectbox( "Select Player for Availability Multiplier", options=player_display_names, key="availability_sidebar_player", index=initial_availability_index, on_change=lambda: setattr( st.session_state, "selected_availability_player", st.session_state.availability_sidebar_player, ), ) if availability_player_selected_display: player_id = player_display_name_to_id_map.get( availability_player_selected_display ) gw_selected = st.selectbox( "Select Gameweek for Availability Multiplier", gw_choices, key="availability_sidebar_gw", ) if player_id is not None and gw_selected: gw = int(gw_selected) # Get current multiplier or default to 1.0 current_multiplier = ( st.session_state.admin_persistent_availability_multipliers.get( player_id, {} ).get(gw, 1.0) ) new_multiplier = st.slider( "Availability Multiplier", min_value=0.0, max_value=1.0, step=0.01, value=float(current_multiplier), key=f"availability_slider_{player_id}_{gw}", ) col1, col2 = st.columns([1, 1]) if col1.button( "Update Multiplier", key=f"update_availability_{player_id}_{gw}" ): st.session_state.admin_persistent_availability_multipliers.setdefault( player_id, {} )[gw] = new_multiplier with st.spinner("Recalculating..."): recalculate_projections() save_admin_overrides() st.success("Availability multiplier updated and saved!") st.rerun() if col2.button( "Reset Multiplier", key=f"reset_availability_{player_id}_{gw}" ): if ( player_id in st.session_state.admin_persistent_availability_multipliers ): st.session_state.admin_persistent_availability_multipliers[ player_id ].pop(gw, None) if not st.session_state.admin_persistent_availability_multipliers[ player_id ]: del st.session_state.admin_persistent_availability_multipliers[ player_id ] with st.spinner("Recalculating..."): recalculate_projections() save_admin_overrides() st.success("Availability multiplier reset to default and saved!") st.rerun() st.divider() # Adjust Global Rates Section with st.expander("Adjust Global Decay/Ramp-up Rates (Admin)"): st.write( "These rates apply if a player's status doesn't have a specific rate defined." ) # Decay rates st.subheader("Decay Rates (Multiplicative)") for status, rate in st.session_state.decay_rates.items(): new_rate = st.number_input( f"{status.replace('_', ' ').title()} Decay Rate (e.g., 0.99 for 1% decay)", min_value=0.0, max_value=1.0, value=float(rate), step=0.01, key=f"decay_rate_{status}", ) st.session_state.decay_rates[status] = new_rate # Ramp-up rates st.subheader("Ramp-up Rates (Additive)") for status, rate in st.session_state.ramp_up_rates.items(): new_rate = st.number_input( f"{status.replace('_', ' ').title()} Ramp-up Rate (minutes per GW)", min_value=0.0, max_value=90.0, value=float(rate), step=1.0, key=f"ramp_up_rate_{status}", ) st.session_state.ramp_up_rates[status] = new_rate # Ramp-up Period new_ramp_up_period = st.number_input( "Injured Player Ramp-up Period (Weeks)", min_value=1, max_value=10, value=st.session_state.RAMP_UP_PERIOD, step=1, key="ramp_up_period_input", ) st.session_state.RAMP_UP_PERIOD = new_ramp_up_period # Mins Threshold new_mins_threshold = st.number_input( "Minutes Threshold (for decay vs. ramp-up)", min_value=0, max_value=90, value=st.session_state.MINS_THRESHOLD, step=1, key="mins_threshold_input", ) st.session_state.MINS_THRESHOLD = new_mins_threshold if st.button("Apply Global Rate Changes", key="apply_global_rates"): with st.spinner("Recalculating with new rates..."): recalculate_projections() save_admin_overrides() # Automatic save st.success("Global rates updated and saved!") st.rerun() st.divider() # Global Reset Section st.subheader("Global Reset (Admin)") if st.button( "Reset ALL Admin Edits (Baselines, Status, Rates, Penalties, Weekly xMins)", key="global_reset", type="secondary", ): if st.session_state.is_admin_logged_in: # Use st.warning for confirmation instead of window.confirm st.warning( "Are you sure you want to reset ALL admin-controlled edits to their default values? This action cannot be undone and will affect all future sessions." ) # Add a separate button for actual confirmation if st.button("Confirm Global Reset", key="confirm_global_reset"): st.session_state.user_baseline_overrides = {} st.session_state.user_player_status_overrides = {} st.session_state.admin_persistent_xmins_overrides = {} # Reset admin persistent xMins st.session_state.player_penalty_shares = { # Reset to default hardcoded values 16: 0.65, 17: 0.15, 30: 0.05, 666: 0.3, 48: 0.4, 64: 0.7, 81: 0.9, 97: 0.25, 136: 0.8, 121: 0.09, 178: 0.8, 158: 0.05, 202: 0.25, 215: 0.6, 216: 0.02, 235: 0.9, 249: 0.1, 266: 0.6, 267: 0.04, 283: 0.4, 299: 0.85, 311: 0.1, 310: 0.1, 337: 0.6, 327: 0.55, 343: 0.4, 362: 0.7, 381: 0.95, 382: 0.1, 386: 0.05, 430: 0.95, 413: 0.15, 449: 0.9, 119: 0.1, 450: 0.05, 499: 0.85, 485: 0.2, 474: 0.02, 525: 0.85, 515: 0.25, 596: 0.9, 612: 0.8, 624: 0.25, 625: 0.04, 647: 0.1, 654: 0.85, } # Reset rates to their absolute initial defaults (hardcoded defaults) st.session_state.decay_rates = { "default": 0.99, "suspended": 0.95, "injured_decay": 0.95, "rotational_risk": 0.97, } st.session_state.ramp_up_rates = { "default": 5, "injured": 10, "suspended": 5, "starter": 0, "rotational_risk": 0, } st.session_state.RAMP_UP_PERIOD = 3 st.session_state.MINS_THRESHOLD = 45 with st.spinner("Resetting all admin-controlled edits..."): recalculate_projections() save_admin_overrides() # Save the reset state to files st.success("All admin-controlled edits reset to default and saved!") st.rerun() else: st.error("You must be logged in as an admin to perform a global reset.") st.divider() # Show current ADMIN-controlled overrides info st.subheader("Current Admin Overrides (Saved)") if st.session_state.user_baseline_overrides: st.write("**Baseline Overrides:**") for pid, stat_dict in st.session_state.user_baseline_overrides.items(): # Use the display name from the map if available player_name_display = player_id_to_display_name_map.get( pid, st.session_state.finalized_df[ st.session_state.finalized_df["id"] == pid ]["web_name"].iloc[0] if st.session_state.finalized_df is not None and not st.session_state.finalized_df[ st.session_state.finalized_df["id"] == pid ].empty else f"ID: {pid}", ) st.write(f"• {player_name_display}: {len(stat_dict)} stats") else: st.write("No saved baseline overrides.") if st.session_state.user_player_status_overrides: st.write("**Player Status Overrides:**") for ( pid, status_dict, ) in st.session_state.user_player_status_overrides.items(): # Use the display name from the map if available player_name_display = player_id_to_display_name_map.get( pid, st.session_state.finalized_df[ st.session_state.finalized_df["id"] == pid ]["web_name"].iloc[0] if st.session_state.finalized_df is not None and not st.session_state.finalized_df[ st.session_state.finalized_df["id"] == pid ].empty else f"ID: {pid}", ) status_str = status_dict["status"] if "weeks_out" in status_dict: status_str += f" (out until GW {status_dict['weeks_out']})" st.write(f"• {player_name_display}: {status_str}") else: st.write("No saved player status overrides.") if st.session_state.player_penalty_shares: st.write("**Penalty Share Overrides:**") for pid, share_value in st.session_state.player_penalty_shares.items(): # Use the display name from the map if available player_name_display = player_id_to_display_name_map.get( pid, st.session_state.finalized_df[ st.session_state.finalized_df["id"] == pid ]["web_name"].iloc[0] if st.session_state.finalized_df is not None and not st.session_state.finalized_df[ st.session_state.finalized_df["id"] == pid ].empty else f"ID: {pid}", ) if not player_name_display.startswith( "ID:" ): # Only show name if we found it st.write(f"• {player_name_display}: {share_value:.2f}") else: st.write(f"• Unknown Player (ID: {pid}): {share_value:.2f}") else: st.write("No saved penalty share overrides.") if st.session_state.admin_persistent_xmins_overrides: st.write("**Weekly xMins Overrides (Admin Saved):**") for ( pid, gw_dict, ) in st.session_state.admin_persistent_xmins_overrides.items(): player_name_display = player_id_to_display_name_map.get( pid, st.session_state.finalized_df[ st.session_state.finalized_df["id"] == pid ]["web_name"].iloc[0] if st.session_state.finalized_df is not None and not st.session_state.finalized_df[ st.session_state.finalized_df["id"] == pid ].empty else f"ID: {pid}", ) st.write(f"• {player_name_display}: {gw_dict}") else: st.write("No saved weekly xMins overrides by admin.") else: st.info("Log in as admin to see and adjust advanced settings.") # Show current LOCAL (session-only) xMins overrides info (visible to all users, but explicitly stated as temporary) st.divider() st.subheader("Current Local Overrides (Per Session)") if st.session_state.user_xmins_overrides: st.write("**Minutes Overrides (Local Session):**") for pid, gw_dict in st.session_state.user_xmins_overrides.items(): # Use the display name from the map if available, otherwise fallback to original web_name player_name_display = player_id_to_display_name_map.get( pid, st.session_state.output_df[st.session_state.output_df["ID"] == pid][ "Name" ].iloc[0] if st.session_state.output_df is not None and not st.session_state.output_df[ st.session_state.output_df["ID"] == pid ].empty else f"ID: {pid}", ) st.write(f"• {player_name_display}: {gw_dict}") else: st.write("No local xMins overrides for this session.") tab1, tab2, tab3, tab4 = st.tabs( ["Projections", "Accuracy Dashboard", "Team Ratings", "Upcoming Fixtures"] ) # --- MAIN CONTENT AREA --- with tab1: st.subheader("Projected Player Points") # Display the main table (read-only now since editing is in sidebar) st.dataframe( st.session_state.output_df, hide_index=True, use_container_width=True, height=600, ) # Download button csv_data = st.session_state.output_df.to_csv(index=False).encode("utf-8") st.download_button( label="Download Table as CSV", # Removed emoji data=csv_data, file_name="luigis_mansion.csv", mime="text/csv", key="download_csv_button", ) st.markdown("---") st.markdown( "Luigi attempts a thing! I have finally managed to make my very own player model, think it might be subpar to the great ones but it's a start at least and I am relatively happy with that. Hope you all have a fun time tinkering!" ) st.markdown("**Instructions:**") st.markdown( "- Use the **sidebar** to adjust player minutes, weekly or across the horizon." # Updated text ) st.markdown( "- Please wait **1-2 seconds** after pressing the Update button as your changes get processed and updated." ) st.markdown( "- Currently, the overrides **CANNOT** be applied simultaneously, so don't forget to press the **Update** button else your changes will not be processed." ) st.markdown( "- The Reset Override button **ONLY** resets the xMins of the player chosen. To revert back to original projections, simply **reload** the page." ) st.markdown( "- The CSV is compatible with **Sertalp + Moose's Solver**, you can either rename the file to 'fplreview' or set 'luigis_mansion' as the datasource in the settings json." ) st.markdown( "- You can download your customized projections as a CSV file (will be downloaded as 'luigis_mansion.csv')." ) def local_css(): st.markdown( """ """, unsafe_allow_html=True, ) # Apply CSS local_css() with tab2: proj_file = pd.read_excel("projections_check.xlsx") pts_file = pd.read_excel("points_check.xlsx") def calculate_metrics(y_true, y_prob): brier = brier_score_loss(y_true, y_prob) ll = log_loss(y_true, y_prob) return brier, ll def multiclass_brier_score(y_true_onehot, y_prob): """ Calculates the Brier Score for multi-class classification. Formula: Mean of sum of squared differences between probabilities and actuals. """ return np.mean(np.sum((y_prob - y_true_onehot) ** 2, axis=1)) def get_player_stats(df, min_xmins=0, min_xpts=0): # ... [Keep your exact existing function code] ... all_xmins = [] all_actual_mins = [] all_xpts = [] all_actual_pts = [] gameweeks = range(3, 39) for gw in gameweeks: xmins_col = f"{gw}_xMins" actual_mins_col = f"{gw}_Mins" xpts_col = f"{gw}_Pts" actual_pts_col = f"{gw}_Actuals" if all( col in df.columns for col in [xmins_col, actual_mins_col, xpts_col, actual_pts_col] ): mask = (df[xmins_col] >= min_xmins) & (df[xpts_col] >= min_xpts) all_xmins.extend(df.loc[mask, xmins_col].tolist()) all_actual_mins.extend(df.loc[mask, actual_mins_col].tolist()) all_xpts.extend(df.loc[mask, xpts_col].tolist()) all_actual_pts.extend(df.loc[mask, actual_pts_col].tolist()) if not all_xmins: return None # Return a DataFrame friendly format for the new UI return { "xMins": pd.DataFrame( [ { "Metric": "Minutes", "MAE": mean_absolute_error(all_actual_mins, all_xmins), "RMSE": np.sqrt(mean_squared_error(all_actual_mins, all_xmins)), "R2": r2_score(all_actual_mins, all_xmins), } ] ), "xPts": pd.DataFrame( [ { "Metric": "Points", "MAE": mean_absolute_error(all_actual_pts, all_xpts), "RMSE": np.sqrt(mean_squared_error(all_actual_pts, all_xpts)), "R2": r2_score(all_actual_pts, all_xpts), } ] ), } # --- 3. Main Interface Tabs --- st.title("Model Performance Dashboard") main_tab1, main_tab2, main_tab3 = st.tabs( ["Outcome Accuracy", "Goals & xG", "xMins & xPts"] ) # ========================================== # TAB 1: OUTCOME ACCURACY # ========================================== with main_tab1: tab1, tab2 = st.tabs(["Model Performance", "Trend"]) with tab1: st.warning( "Note: Data excludes GW1 & GW2 projections (I forgot to save the projections :sweat_smile:)." ) # --- 1. PREPARE DATA FOR MULTI-CLASS METRICS --- # Create a matrix of probabilities probs_matrix = proj_file[ ["home_win_prob", "draw_prob", "away_win_prob"] ].values # Create a One-Hot encoded matrix of actual results # We need to map Home/Draw/Away cols to a single matrix # Let's stack them: Column 0=Home, 1=Draw, 2=Away actuals_matrix = proj_file[["home_win", "draw", "away_win"]].values # Create a single label array for sklearn log_loss (0, 1, 2) # argmax gives us the index of the column that has a '1' y_true_labels = actuals_matrix.argmax(axis=1) # --- 2. CALCULATE GLOBAL METRICS --- global_ll = log_loss(y_true_labels, probs_matrix) global_brier = multiclass_brier_score(actuals_matrix, probs_matrix) # --- NEW: CALCULATE GLOBAL RPS --- # We calculate it vector-style for the whole dataframe at once # 1. Prediction CDFs p_h = proj_file["home_win_prob"].values p_d = proj_file["draw_prob"].values cdf_pred_1 = p_h cdf_pred_2 = p_h + p_d # 2. Observation CDFs (Actuals) # Note: Ensure we use the actual result columns, not probabilities o_h = proj_file["home_win"].values o_d = proj_file["draw"].values cdf_obs_1 = o_h cdf_obs_2 = o_h + o_d # 3. Mean RPS rps_per_match = 0.5 * ( (cdf_pred_1 - cdf_obs_1) ** 2 + (cdf_pred_2 - cdf_obs_2) ** 2 ) global_rps = np.mean(rps_per_match) st.markdown("### Model Performance") st.caption( "These metrics evaluate the entire match probability distribution (Home/Draw/Away) at once." ) # Display Headline Metrics (Updated to 3 columns) head1, head2, head3 = st.columns(3) with head1: st.metric( "Multi-class Log Loss", f"{global_ll:.4f}", ) with head2: st.metric( "Multi-class Brier Score", f"{global_brier:.4f}", ) with head3: st.metric( "Ranked Probability Score", f"{global_rps:.4f}", ) st.divider() # --- 3. BREAKDOWN METRICS (Keep these for debugging) --- st.markdown("### Outcome Breakdown") st.caption( "Diagnostic metrics to see which specific outcome the model struggles with." ) col1, col2, col3 = st.columns(3) # Home b_home, ll_home = calculate_metrics( proj_file["home_win"], proj_file["home_win_prob"] ) with col1: st.markdown("#### Home Win") st.metric("Binary Brier", f"{b_home:.4f}") st.metric("Binary Log Loss", f"{ll_home:.4f}") # Draw b_draw, ll_draw = calculate_metrics( proj_file["draw"], proj_file["draw_prob"] ) with col2: st.markdown("#### Draw") st.metric("Binary Brier", f"{b_draw:.4f}") st.metric("Binary Log Loss", f"{ll_draw:.4f}") # Away b_away, ll_away = calculate_metrics( proj_file["away_win"], proj_file["away_win_prob"] ) with col3: st.markdown("#### Away Win") st.metric("Binary Brier", f"{b_away:.4f}") st.metric("Binary Log Loss", f"{ll_away:.4f}") st.divider() # Row 3: Clean Sheets (No changes needed here as CS is binary) actual_cs = pd.concat( [proj_file["home_clean_sheet"], proj_file["away_clean_sheet"]], ignore_index=True, ) pred_cs = pd.concat( [ proj_file["home_clean_sheet_odds"], proj_file["away_clean_sheet_odds"], ], ignore_index=True, ) b_cs, ll_cs = calculate_metrics(actual_cs, pred_cs) c1, c2 = st.columns([1, 3]) with c1: st.markdown("#### Clean Sheets") st.metric("CS Brier", f"{b_cs:.4f}") st.metric("CS Log Loss", f"{ll_cs:.4f}") with c2: fig_hist = px.histogram( pred_cs, nbins=20, title="Distribution of CS Probabilities" ) fig_hist.update_layout( height=300, margin=dict(l=20, r=20, t=30, b=20), plot_bgcolor="rgba(0,0,0,0.5)", paper_bgcolor="rgba(0,0,0,0.5)", ) st.plotly_chart(fig_hist, use_container_width=True) with tab2: st.warning( "Note: Data excludes GW1 & GW2 projections (I forgot to save the projections :sweat_smile:)." ) # --- 1. Prepare Data by Gameweek --- # We need to extract the Gameweek from the column names or if you have a 'GW' column # Since your data structure seems to be wide (one row per match), we assume there's a 'GW' column. # If not, we can infer it or you might need to ensure your excel has a 'GW' column. # CHECK: Does your dataframe have a 'GW' or 'Gameweek' column? # If not, add this line to your data loading step: # proj_file['GW'] = proj_file['gameweek'] # or whatever your column is named ll, bs, rps = st.tabs( ["Log Loss", "Brier Score", "Ranked Probability Score (RPS)"] ) with ll: if "GW" in proj_file.columns: # Group by Gameweek gw_metrics = [] for gw in sorted(proj_file["GW"].unique()): gw_data = proj_file[proj_file["GW"] == gw] # Get matrices for this specific GW probs_gw = gw_data[ ["home_win_prob", "draw_prob", "away_win_prob"] ].values actuals_gw = gw_data[["home_win", "draw", "away_win"]].values y_true_gw = actuals_gw.argmax(axis=1) # Calculate Weekly Log Loss ll_weekly = log_loss(y_true_gw, probs_gw, labels=[0, 1, 2]) gw_metrics.append( { "Gameweek": gw, "Weekly Log Loss": ll_weekly, "Match Count": len(gw_data), } ) trend_df = pd.DataFrame(gw_metrics) # Calculate Cumulative Log Loss (The "Real" Score) # We need to reconstruct the full history to do this accurately cumulative_scores = [] all_probs = [] all_true = [] for gw in sorted(proj_file["GW"].unique()): gw_data = proj_file[proj_file["GW"] == gw] all_probs.append( gw_data[ ["home_win_prob", "draw_prob", "away_win_prob"] ].values ) all_true.append( gw_data[["home_win", "draw", "away_win"]].values.argmax( axis=1 ) ) # Stack all data up to this point curr_probs = np.vstack(all_probs) curr_true = np.concatenate(all_true) cum_ll = log_loss(curr_true, curr_probs, labels=[0, 1, 2]) cumulative_scores.append(cum_ll) trend_df["Cumulative Log Loss"] = cumulative_scores # --- 2. Plotting --- # Create a dual-line chart fig_trend = go.Figure() # Line 1: Weekly Volatility (Faint) fig_trend.add_trace( go.Scatter( x=trend_df["Gameweek"], y=trend_df["Weekly Log Loss"], mode="lines+markers", name="Weekly Score", line=dict(color="#10f770", width=1, dash="dot"), hovertemplate="GW%{x}: %{y:.3f}", ) ) # Line 2: Cumulative Trend (Strong) fig_trend.add_trace( go.Scatter( x=trend_df["Gameweek"], y=trend_df["Cumulative Log Loss"], mode="lines+markers", name="Cumulative Trend", line=dict(color="#FF4B4B", width=3), hovertemplate="Avg up to GW%{x}: %{y:.3f}", ) ) # Add Benchmark Lines (The Context) fig_trend.add_hline( y=1.0986, line_dash="dot", line_color="yellow", annotation_text="Naive Model (1.06)", annotation_position="top right", ) fig_trend.add_hline( y=0.98, line_dash="dot", line_color="#2cacdf", annotation_text="Bookie Closing (0.98)", annotation_position="bottom right", ) # Styling to match your transparent theme fig_trend.update_layout( title="Log Loss Trend", xaxis_title="Gameweek", yaxis_title="Log Loss (Lower values are better)", height=900, hovermode="x unified", plot_bgcolor="rgba(0,0,0,0.8)", paper_bgcolor="rgba(0,0,0,0.8)", yaxis=dict( autorange="reversed" ), # Invert Y axis so "Lower" (Better) is higher up visually? # Actually, standard is usually 0 at bottom. Let's keep standard but remember lower is better. ) # Note: I didn't invert Y-axis because it can be confusing, but keep in mind the trend should go DOWN. st.plotly_chart(fig_trend, use_container_width=True) # Insight Text current_trend = trend_df["Cumulative Log Loss"].iloc[-1] start_trend = trend_df["Cumulative Log Loss"].iloc[0] improvement = start_trend - current_trend if improvement > 0: st.success( f"Model has improved by {improvement:.4f} since the start of tracking." ) else: st.info( f"Mdel performance has degraded slightly ({improvement:.4f})." ) with bs: brier_metrics = [] cumulative_probs = [] cumulative_actuals = [] # Sort by Gameweek to ensure correct cumulative calculation sorted_gws = sorted(proj_file["GW"].unique()) for gw in sorted_gws: gw_data = proj_file[proj_file["GW"] == gw] # Extract matrices probs_gw = gw_data[ ["home_win_prob", "draw_prob", "away_win_prob"] ].values # One-hot encode the actuals (Home/Draw/Away columns) actuals_gw_onehot = gw_data[["home_win", "draw", "away_win"]].values # A. Weekly Brier Score # Formula: Mean of sum of squared differences for all 3 outcomes brier_weekly = np.mean( np.sum((probs_gw - actuals_gw_onehot) ** 2, axis=1) ) # B. Update Cumulative Lists cumulative_probs.append(probs_gw) cumulative_actuals.append(actuals_gw_onehot) # C. Calculate Cumulative Brier (The "Real" Score) curr_probs_stack = np.vstack(cumulative_probs) curr_actuals_stack = np.vstack(cumulative_actuals) brier_cum = np.mean( np.sum((curr_probs_stack - curr_actuals_stack) ** 2, axis=1) ) brier_metrics.append( { "Gameweek": gw, "Weekly Brier": brier_weekly, "Cumulative Brier": brier_cum, } ) brier_df = pd.DataFrame(brier_metrics) # --- 2. Plotting --- fig_brier = go.Figure() # Line 1: Weekly Volatility (Faint) fig_brier.add_trace( go.Scatter( x=brier_df["Gameweek"], y=brier_df["Weekly Brier"], mode="lines+markers", name="Weekly Score", line=dict(color="#ec0ba9", width=1, dash="dot"), hovertemplate="GW%{x}: %{y:.4f}", ) ) # Line 2: Cumulative Trend (Strong Blue) fig_brier.add_trace( go.Scatter( x=brier_df["Gameweek"], y=brier_df["Cumulative Brier"], mode="lines+markers", name="Cumulative Trend", line=dict(color="#0068C9", width=3), hovertemplate="Avg up to GW%{x}: %{y:.4f}", ) ) # Add Reference Line (Random Guess) # Random guess (0.33/0.33/0.33) results in a Brier of ~0.667 fig_brier.add_hline( y=0.667, line_dash="dot", line_color="yellow", annotation_text="Naive Model (0.667)", annotation_position="bottom right", ) # Styling fig_brier.update_layout( title="Multi-class Brier Score History", xaxis_title="Gameweek", yaxis_title="Brier Score (Lower values are better)", height=900, hovermode="x unified", plot_bgcolor="rgba(0,0,0,0.8)", paper_bgcolor="rgba(0,0,0,0.8)", yaxis=dict(autorange="reversed"), ) st.plotly_chart(fig_brier, use_container_width=True) # Insight Logic curr_brier = brier_df["Cumulative Brier"].iloc[-1] if curr_brier < 0.60: st.success( f"Cumulative Brier Score ({curr_brier:.3f}) is well below the random baseline." ) elif curr_brier < 0.66: st.success( f"Beating the naive model with the current Brier Score ({curr_brier:.3f})" ) else: st.success( f"Score ({curr_brier:.4f}) is close to or worse than random guessing." ) with rps: rps_metrics = [] cumulative_rps_sum = 0 match_count = 0 sorted_gws = sorted(proj_file["GW"].unique()) for gw in sorted_gws: gw_data = proj_file[proj_file["GW"] == gw] # Extract Probs p_home = gw_data["home_win_prob"].values p_draw = gw_data["draw_prob"].values # We don't technically need p_away for the calculation, just H and D for the CDFs # Extract Actuals (1 or 0) obs_home = gw_data["home_win"].values obs_draw = gw_data[ "draw_prob" ].values # Wait, this should be actual outcome, not prob # Let's fix the extraction of actuals to be safe obs_home = gw_data["home_win"].values obs_draw = gw_data["draw"].values # --- CALCULATE RPS FOR THIS BATCH --- # RPS Formula for 3 outcomes (Home, Draw, Away): # RPS = 0.5 * [ (CDF_pred_1 - CDF_obs_1)^2 + (CDF_pred_2 - CDF_obs_2)^2 ] # 1. Cumulative Distribution Functions (CDF) cdf_pred_1 = p_home cdf_pred_2 = p_home + p_draw cdf_obs_1 = obs_home cdf_obs_2 = obs_home + obs_draw # 2. Sum of squared differences rps_per_match = 0.5 * ( (cdf_pred_1 - cdf_obs_1) ** 2 + (cdf_pred_2 - cdf_obs_2) ** 2 ) # Weekly Stats rps_weekly_avg = np.mean(rps_per_match) # Cumulative Stats cumulative_rps_sum += np.sum(rps_per_match) match_count += len(gw_data) rps_cumulative_avg = cumulative_rps_sum / match_count rps_metrics.append( { "Gameweek": gw, "Weekly RPS": rps_weekly_avg, "Cumulative RPS": rps_cumulative_avg, } ) rps_df = pd.DataFrame(rps_metrics) # --- 2. Plotting --- fig_rps = go.Figure() # Line 1: Weekly Volatility (Faint Green) fig_rps.add_trace( go.Scatter( x=rps_df["Gameweek"], y=rps_df["Weekly RPS"], mode="lines+markers", name="Weekly RPS", line=dict(color="#e68f0e", width=1, dash="dot"), hovertemplate="GW%{x}: %{y:.4f}", ) ) # Line 2: Cumulative Trend (Solid Green) fig_rps.add_trace( go.Scatter( x=rps_df["Gameweek"], y=rps_df["Cumulative RPS"], mode="lines+markers", name="Cumulative Trend", line=dict(color="#09B4AB", width=3), # SeaGreen hovertemplate="Avg up to GW%{x}: %{y:.4f}", ) ) # Add Reference Line (Random Guess) # Random guess (0.33/0.33/0.33) results in RPS ~0.27 fig_rps.add_hline( y=0.27, line_dash="dot", line_color="yellow", annotation_text="Naive Model (~0.27)", annotation_position="bottom right", ) # Styling fig_rps.update_layout( title="RPS History", xaxis_title="Gameweek", yaxis_title="RPS (Lower values are Better)", height=900, hovermode="x unified", plot_bgcolor="rgba(0,0,0,0.8)", paper_bgcolor="rgba(0,0,0,0.8)", yaxis=dict(autorange="reversed"), ) st.plotly_chart(fig_rps, use_container_width=True) # Insight Logic curr_rps = rps_df["Cumulative RPS"].iloc[-1] # Benchmarks for RPS if curr_rps < 0.20: st.success( f"RPS ({curr_rps:.4f}) is exceptionally low, superb accuracy." ) elif curr_rps < 0.22: st.success( f"RPS ({curr_rps:.4f}) is above average, decentish performance." ) else: st.success( f"RPS ({curr_rps:.4f}) is approaching random guessing territory (~0.27)." ) # ========================================== # TAB 2: GOALS & xG ACCURACY # ========================================== with main_tab2: st.warning( "Note: Data excludes GW1 & GW2 projections (I forgot to save the projections :sweat_smile:)." ) # Prepare Data actual_goals_all = pd.concat( [proj_file["home_goals"], proj_file["away_goals"]], ignore_index=True ) expected_goals_all = pd.concat( [proj_file["expected_home_goals"], proj_file["expected_away_goals"]], ignore_index=True, ) xg_all = pd.concat( [proj_file["xg_home"], proj_file["xg_away"]], ignore_index=True ) # Create a clean DF for plotting plot_df = pd.DataFrame( { "Actual Goals": actual_goals_all, "Predicted Goals": expected_goals_all, "xG Generated": xg_all, } ) col_g1, col_g2 = st.columns(2) # -- Left Side: Projections vs Actuals -- with col_g1: st.subheader("Goals v/s Projected Goals") rmse_goals = np.sqrt( mean_squared_error(actual_goals_all, expected_goals_all) ) mae_goals = mean_absolute_error(actual_goals_all, expected_goals_all) # Metrics Row m1, m2 = st.columns(2) m1.metric("RMSE", f"{rmse_goals:.3f}") m2.metric("MAE", f"{mae_goals:.3f}") # Interactive Scatter Plot fig_goals = px.scatter( plot_df, x="Predicted Goals", y="Actual Goals", trendline="ols", title="Goals v/s Projected Goals", opacity=0.5, ) fig_goals.update_layout( height=400, plot_bgcolor="rgba(0,0,0,0.8)", paper_bgcolor="rgba(0,0,0,0.8)", ) st.plotly_chart( fig_goals, use_container_width=True, config={"staticPlot": False} ) # -- Right Side: Projections vs xG -- with col_g2: st.subheader("xG v/s Projected Goals") rmse_xg = np.sqrt(mean_squared_error(xg_all, expected_goals_all)) mae_xg = mean_absolute_error(xg_all, expected_goals_all) m1, m2 = st.columns(2) m1.metric("RMSE", f"{rmse_xg:.3f}") m2.metric("MAE", f"{mae_xg:.3f}") # Interactive Scatter Plot fig_xg = px.scatter( plot_df, x="Predicted Goals", y="xG Generated", trendline="ols", title="xG v/s Projected Goals", color_discrete_sequence=["#FF4B4B"], opacity=0.5, ) fig_xg.update_layout( height=400, plot_bgcolor="rgba(0,0,0,0.8)", paper_bgcolor="rgba(0,0,0,0.8)", ) st.plotly_chart(fig_xg, use_container_width=True) # ========================================== # TAB 3: xMINS & xPTS ACCURACY # ========================================== with main_tab3: st.warning( "Note: Data excludes GW1 & GW2 projections (I forgot to save the projections :sweat_smile:)." ) # Helper to render the fancy dataframe def render_stats_display(stats_dict): if stats_dict: c1, c2 = st.columns(2) # We combine them into one dataframe for a cleaner look or keep separate # Let's keep separate but use column_config with c1: st.markdown("#### xMins Accuracy") st.dataframe( stats_dict["xMins"], hide_index=True, use_container_width=True, column_config={ "R2": st.column_config.ProgressColumn( "R2 Score", help="R-Squared Score (Higher is better)", format="%.3f", min_value=-1, max_value=1, ), "MAE": st.column_config.NumberColumn("MAE", format="%.3f"), "RMSE": st.column_config.NumberColumn( "RMSE", format="%.3f" ), }, ) with c2: st.markdown("#### xPts Accuracy") st.dataframe( stats_dict["xPts"], hide_index=True, use_container_width=True, column_config={ "R2": st.column_config.ProgressColumn( "R2 Score", help="R-Squared Score (Higher is better)", format="%.3f", min_value=-1, max_value=1, ), "MAE": st.column_config.NumberColumn("MAE", format="%.3f"), "RMSE": st.column_config.NumberColumn( "RMSE", format="%.3f" ), }, ) else: st.error("No data found for the defined gameweeks.") sub_tab_nolimit, sub_tab_nonzero = st.tabs( ["Show All Players", "Active Players Only (1+ xMins)"] ) with sub_tab_nolimit: st.caption("Including bench/injured/unavailable/non-starter players.") stats_no_limit = get_player_stats(pts_file, min_xmins=0) render_stats_display(stats_no_limit) with sub_tab_nonzero: st.caption("Excluding players with 0 projected xMins.") stats_limit = get_player_stats(pts_file, min_xmins=1) render_stats_display(stats_limit) with tab3: st.subheader("Team Ratings Overview") items = [ "The Attack rating is the **projected goals for** versus an average team (does not account for home advantage).", "The Defence rating is the **projected goals against** versus an average team (does not account for home advantage).", "Diff is just the difference between the Attack and Defence ratings.", ] bulleted_string = "\n\n".join([f":arrow_right: {item}" for item in items]) st.info(bulleted_string) @st.cache_data(ttl=3600) # Cache for 24 hours def get_image_base64(file_path): """Cache team logo encoding - significant speedup""" if os.path.exists(file_path): with open(file_path, "rb") as f: data = f.read() encoded = base64.b64encode(data).decode() return f"data:image/png;base64,{encoded}" return None df = load_team_ratings_cached() df.index = df.index + 1 # 1. COMPLETE MAP OF TEAM NAMES TO RELIABLE LOGO URLs # Using crests.football-data.org IDs which are stable and reliable. team_ids = { "Arsenal": 57, "Manchester City": 65, "Liverpool": 64, "Chelsea": 61, "Newcastle United": 67, "Aston Villa": 58, "Manchester United": 66, "Brentford": 402, "Brighton and Hove Albion": 397, "AFC Bournemouth": 1044, "Tottenham Hotspur": 73, "Crystal Palace": 354, "Fulham": 63, "Nottingham Forest": 351, "Everton": 62, "Leeds United": 341, "West Ham United": 563, "Wolverhampton Wanderers": 76, "Sunderland": 71, "Burnley": 328, } # Create the URL column dynamically # We use .get() to avoid errors if a team name has a typo, and fallback to None df["TeamID"] = df["Team"].map(team_ids) # Generate the Base64 strings for the images df["Logo"] = df["TeamID"].apply( lambda x: get_image_base64(f"logos/{int(x)}.png") if pd.notnull(x) else None ) tab_plot, tab_data = st.tabs(["Scatter Plot", "Data Table"]) with tab_plot: # Base Chart Definitions base = alt.Chart(df).encode( x=alt.X( "Defence", scale=alt.Scale(reverse=True, domain=[0.6, 1.9], clamp=True), axis=alt.Axis(title="Better Defence ⮕)", tickCount=5, titlePadding=40), ), y=alt.Y( "Attack", scale=alt.Scale(domain=[0.6, 2.0]), axis=alt.Axis(title="Better Attack ⮕)", tickCount=5, titlePadding=50), ), tooltip=["Team", "Attack", "Defence", "Diff"], ) # Layer 1: The Team Logos images = base.mark_image(width=40, height=50).encode(url="Logo") # Layer 2: Fallback Dots (just in case a logo is missing) # Combine layers and increase chart size chart = ( (images) .interactive() .properties( height=660, padding={"left": 20, "top": 20, "right": 20, "bottom": 20}, ) ) st.altair_chart(chart, use_container_width=True) with tab_data: st.dataframe( df[["Team", "Attack", "Defence", "Diff"]], height=740, use_container_width=True, ) with tab4: st.header("Projections for the upcoming fixtures") def get_fresh_data(): # Load the file df = load_fixture_projections_cached() # Ensure no whitespace issues in headers df.columns = df.columns.str.strip() # Select top 10 rows df_subset = df.head(10) return df_subset df = get_fresh_data() # We use the original column names (keys) but map them to nice labels df["home_win_prob"] = df["home_win_prob"] * 100 df["away_win_prob"] = df["away_win_prob"] * 100 df["draw_prob"] = df["draw_prob"] * 100 df["home_clean_sheet_odds"] = df["home_clean_sheet_odds"] * 100 df["away_clean_sheet_odds"] = df["away_clean_sheet_odds"] * 100 st.dataframe( df, column_order=[ "home_team", "home_win_prob", "draw_prob", "away_win_prob", "away_team", "expected_home_goals", "expected_away_goals", "home_clean_sheet_odds", "away_clean_sheet_odds", ], column_config={ "home_team": "Home Team", "away_team": "Away Team", "home_win_prob": st.column_config.ProgressColumn( "Home Win", format="%.1f%%", # Display as percentage min_value=0, max_value=100, ), "draw_prob": st.column_config.ProgressColumn( "Draw", format="%.1f%%", min_value=0, max_value=100, ), "away_win_prob": st.column_config.ProgressColumn( "Away Win", format="%.1f%%", min_value=0, max_value=100, ), "expected_home_goals": st.column_config.NumberColumn( "Home xG", format="%.2f" ), "expected_away_goals": st.column_config.NumberColumn( "Away xG", format="%.2f" ), "home_clean_sheet_odds": st.column_config.ProgressColumn( "Home CS%", format="%.1f%%", min_value=0, max_value=100, ), # Format 0.3 as 30% "away_clean_sheet_odds": st.column_config.ProgressColumn( "Away CS%", format="%.1f%%", min_value=0, max_value=100, ), }, hide_index=True, use_container_width=True, height=438, row_height=40, )