fpl-solver / fpl_streamlit_app.py
AnayShukla's picture
Clean Production Release
f7cecf3
import streamlit as st
import pandas as pd
import numpy as np
import json
import math
import requests
import os
from scipy.stats import nbinom
from PIL import Image
import altair as alt
import base64
import sklearn # noqa: F401
from sklearn.metrics import (
brier_score_loss,
mean_squared_error,
mean_absolute_error,
r2_score,
log_loss,
)
import plotly.express as px
import plotly.graph_objects as go
import hashlib
# --- Configuration for Admin Access ---
ADMIN_PASSWORD = (
"Monkeyrocks11$$" # <<< IMPORTANT: CHANGE THIS TO A STRONG PASSWORD IN REAL USE!
)
GROUP_FILE = "player_groups.json"
TEAMS_DICT = {
"Arsenal": 1,
"Aston Villa": 2,
"Burnley": 3,
"AFC Bournemouth": 4,
"Brentford": 5,
"Brighton and Hove Albion": 6,
"Chelsea": 7,
"Crystal Palace": 8,
"Everton": 9,
"Fulham": 10,
"Leeds United": 11,
"Liverpool": 12,
"Manchester City": 13,
"Manchester United": 14,
"Newcastle United": 15,
"Nottingham Forest": 16,
"Sunderland": 17,
"Tottenham Hotspur": 18,
"West Ham United": 19,
"Wolverhampton Wanderers": 20,
}
TEAMS_DICT_REVERSE = {
1: "Arsenal",
2: "Aston Villa",
3: "Burnley",
4: "Bournemouth",
5: "Brentford",
6: "Brighton",
7: "Chelsea",
8: "Crystal Palace",
9: "Everton",
10: "Fulham",
11: "Leeds",
12: "Liverpool",
13: "Man City",
14: "Man Utd",
15: "Newcastle",
16: "Nott'm Forest",
17: "Sunderland",
18: "Spurs",
19: "West Ham",
20: "Wolves",
}
@st.cache_data(ttl=3600)
def get_base64_of_bin_file(bin_file):
with open(bin_file, "rb") as f:
data = f.read()
return base64.b64encode(data).decode()
def set_bg_hack(main_bg, opacity=0.15):
"""
A function to unpack an image from root folder and set as bg.
Parameters
----------
main_bg : str
The file path to the image.
opacity : float
The opacity of the overlay (0.0 to 1.0).
0.0 = fully visible image, 1.0 = solid color cover.
"""
main_bg_ext = "png"
# Read and encode the image
b64_encoded = get_base64_of_bin_file(main_bg)
# CSS to inject
# The linear-gradient overlays a semi-transparent black layer
st.markdown(
f"""
<style>
.stApp {{
background: linear-gradient(rgba(0, 0, 0, {opacity}), rgba(0, 0, 0, {opacity})),
url(data:image/{main_bg_ext};base64,{b64_encoded});
background-size: cover;
background-position: center center;
background-repeat: no-repeat;
background-attachment: fixed;
}}
/* Mobile optimization - lighter background on small screens */
@media (max-width: 768px) {{
.stApp {{
background-attachment: scroll; /* Better performance on mobile */
background-size: cover;
}}
}}
</style>
""",
unsafe_allow_html=True,
)
def sidebar_bg(side_bg):
side_bg_ext = "png"
with open(side_bg, "rb") as f:
img_data = f.read()
b64_encoded = base64.b64encode(img_data).decode()
st.markdown(
f"""
<style>
[data-testid="stSidebar"] > div:first-child {{
background: none;
}}
/* The Virtual Layer */
[data-testid="stSidebar"]::before {{
content: "";
position: absolute;
top: 0;
left: 0;
width: 100%;
height: 100%;
/* The Image */
background: url(data:image/{side_bg_ext};base64,{b64_encoded});
background-repeat: no-repeat;
background-position: center;
background-size: cover;
/* CONTRAST / OPACITY CONTROLS */
/* 1. To make it darker (better contrast for white text): */
filter: brightness(0.35);
/* 2. Alternatively, to make it see-through (opacity): */
/* opacity: 1.0; */
/* Keep it behind the text */
z-index: -1;
}}
</style>
""",
unsafe_allow_html=True,
)
side_bg = "luigiside.png"
sidebar_bg(side_bg)
set_bg_hack("luigismansion.jpg")
st.markdown(
"""
<style>
/* Desktop sidebar */
section[data-testid="stSidebar"] {
width: 340px !important;
}
/* Mobile responsive adjustments */
@media (max-width: 768px) {
section[data-testid="stSidebar"] {
width: 100% !important;
}
/* Make tables scroll horizontally on mobile */
.stDataFrame {
overflow-x: auto;
-webkit-overflow-scrolling: touch;
}
}
</style>
""",
unsafe_allow_html=True,
)
def set_custom_font(font_path):
with open(font_path, "rb") as f:
data = f.read()
b64 = base64.b64encode(data).decode()
font_css = f"""
<style>
/* 1. Load your custom font */
@font-face {{
font-family: 'MyCustomFont';
src: url(data:font/ttf;base64,{b64}) format('truetype');
}}
/* 2. Apply it to the whole app */
html, body, [class*="css"] {{
font-family: 'MyCustomFont', sans-serif;
}}
/* 3. Icon Fix */
[data-testid="stIconMaterial"] {{
font-family: "Material Symbols Rounded", "Material Icons" !important;
font-weight: normal !important;
/* Optional: Remove shadow from icons if they look blurry */
/* text-shadow: none !important; */
}}
/* Global Font Family & SHADOW FIX */
* {{
font-family: 'MyCustomFont', sans-serif !important;
/* Black shadow with 4px blur for visibility */
text-shadow: 10px 2px 20px rgba(0, 0, 0, 0.8) !important;
}}
/* Specific Sizes - Responsive with clamp() */
h1 {{
font-size: clamp(20px, 5vw, 30px) !important;
text-shadow: 3px 3px 6px rgba(0, 0, 0, 0.9) !important;
}}
h2, h3 {{
font-size: clamp(16px, 4vw, 22px) !important;
}}
p, div, label {{
font-size: clamp(10px, 2.5vw, 12px) !important;
}}
button p {{
font-size: clamp(8px, 2vw, 10px) !important;
}}
/* Mobile specific: reduce shadow intensity */
@media (max-width: 768px) {{
* {{
text-shadow: 5px 1px 10px rgba(0, 0, 0, 0.8) !important;
}}
}}
</style>
"""
st.markdown(font_css, unsafe_allow_html=True)
set_custom_font("luigifont.ttf")
def load_player_groups():
if os.path.exists(GROUP_FILE):
try:
with open(GROUP_FILE, "r") as f:
return json.load(f)
except: # noqa: E722
return {}
return {}
def save_player_groups(groups):
with open(GROUP_FILE, "w") as f:
json.dump(groups, f, indent=4)
if "player_groups" not in st.session_state:
st.session_state.player_groups = load_player_groups()
# --- Initialize all session state variables at the top level ---
# This ensures they are always present, even on reruns.
st.session_state.setdefault("initialized", False)
st.session_state.setdefault("finalized_df", None)
st.session_state.setdefault("match_df", None)
# Player penalty shares: Initialized with a default dict, will be overridden by loaded data
st.session_state.setdefault("player_penalty_shares", {})
st.session_state.setdefault("admin_persistent_availability_multipliers", {})
st.session_state.setdefault("user_availability_multipliers", {}) # Session-only
st.session_state.setdefault("selected_availability_player", None)
st.session_state.setdefault("selected_availability_player_non_admin", None)
st.session_state.setdefault("MINS_THRESHOLD", 30)
# MINS_SCALING_BONUS is explicitly 0.0, as confirmed previously.
st.session_state.setdefault("MINS_SCALING_BONUS", 0.0)
st.session_state.setdefault("pos_map", {})
st.session_state.setdefault("teams_dict_1", {})
st.session_state.setdefault("teams_dict", {})
# FIX: Adjusted points_config to remove CBIT/CBITR related multipliers, as they are now tiered.
st.session_state.setdefault(
"points_config",
{
"goal": {1: 10, 2: 6, 3: 5, 4: 4},
"assist": 3,
"clean_sheet": {1: 4, 2: 4, 3: 1, 4: 0},
"saves_per_3": 1,
# "cbit_per_10": 2, # Removed: now handled by tiered logic
# "cbitr_per_12": 2, # Removed: now handled by tiered logic
# "cbit_cbitr_multiplier": 0.45, # Removed: now handled by tiered logic
"penalty_points_per_position": {2: 0.8, 3: 0.6, 4: 0.5},
},
)
st.session_state.setdefault(
"user_xmins_overrides", {}
) # Session-only overrides for ALL users
st.session_state.setdefault(
"admin_persistent_xmins_overrides", {}
) # Admin-specific, saved to file
st.session_state.setdefault("user_baseline_overrides", {}) # Crucial: Initialized here
st.session_state.setdefault(
"user_player_status_overrides", {}
) # Crucial: Initialized here
st.session_state.setdefault("output_df", None)
st.session_state.setdefault("team_baselines", None)
st.session_state.setdefault("admin_persistent_share_overrides", {})
# Default decay and ramp-up rates (will be loaded/overridden from rates_config.json)
# These provide a fallback if rates_config.json doesn't exist or loading fails.
st.session_state.setdefault(
"decay_rates",
{
"default": 0.99,
"suspended": 0.99,
"injured_decay": 0.99,
"rotational_risk": 0.95,
},
)
st.session_state.setdefault(
"ramp_up_rates",
{
"default": 3,
"injured": 9,
"suspended": 3,
"starter": 0,
"rotational_risk": 2,
},
)
st.session_state.setdefault("team_skepticism", {})
st.session_state.setdefault("RAMP_UP_PERIOD", 3)
# Initialize session state for selected players in dropdowns
st.session_state.setdefault("selected_xm_player", None)
st.session_state.setdefault("selected_status_player", None)
st.session_state.setdefault("selected_baseline_player", None)
st.session_state.setdefault("selected_penalty_player", None)
# Admin login status
st.session_state.setdefault("is_admin_logged_in", False)
@st.cache_data(ttl=300, show_spinner="Loading FPL data...")
def fetch_fpl_data_cached():
"""
Cached version of FPL API fetch - reduces load time by 5-10 seconds
TTL of 300 seconds (5 minutes) keeps data fresh enough
"""
try:
url = "https://fantasy.premierleague.com/api/bootstrap-static/"
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
st.error(f"Failed to fetch FPL data: {e}")
return None
@st.cache_data(ttl=3600, show_spinner=False)
def load_team_ratings_cached():
"""Cache team ratings CSV - loaded on Tab 3"""
df = pd.read_csv("team_ratings_dual_speed.csv")
return df
@st.cache_data(ttl=3600, show_spinner=False)
def load_fixture_projections_cached():
"""Cache fixture projections CSV - loaded on Tab 4"""
df = pd.read_csv("ewmapois_model.csv")
df.columns = df.columns.str.strip()
return df
# --- Persistence Functions ---
def make_json_serializable(obj):
"""Convert numpy/pandas types to JSON serializable types"""
if hasattr(obj, "item"): # numpy scalar
return obj.item()
elif hasattr(obj, "tolist"): # numpy array
return obj.tolist()
elif isinstance(obj, dict):
return {str(k): make_json_serializable(v) for k, v in obj.items()}
elif isinstance(obj, (list, tuple)):
return [make_json_serializable(item) for item in obj]
elif isinstance(obj, (int, float, str, bool)) or obj is None:
return obj
else:
# For any other type, try to convert to string
return str(obj)
def save_admin_overrides():
"""
Save ONLY admin-controlled overrides (baselines, status, penalties, global rates,
and admin-persistent weekly xMins) to JSON files.
"""
try:
# Save baseline overrides
baseline_data = make_json_serializable(st.session_state.user_baseline_overrides)
with open("user_baseline_overrides.json", "w", encoding="utf-8") as f:
json.dump(baseline_data, f, indent=4, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
# Save player status overrides
status_data = make_json_serializable(
st.session_state.user_player_status_overrides
)
with open("user_player_status_overrides.json", "w", encoding="utf-8") as f:
json.dump(status_data, f, indent=4, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
# Save player penalty shares
penalty_shares_data = make_json_serializable(
st.session_state.player_penalty_shares
)
with open("player_penalty_shares.json", "w", encoding="utf-8") as f:
json.dump(penalty_shares_data, f, indent=4, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
# Save rates config
rates_config = {
"decay_rates": make_json_serializable(st.session_state.decay_rates),
"ramp_up_rates": make_json_serializable(st.session_state.ramp_up_rates),
"RAMP_UP_PERIOD": make_json_serializable(st.session_state.RAMP_UP_PERIOD),
"MINS_THRESHOLD": make_json_serializable(st.session_state.MINS_THRESHOLD),
}
with open("rates_config.json", "w", encoding="utf-8") as f:
json.dump(rates_config, f, indent=4, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
# Save admin-persistent xMins overrides
admin_xmins_data = make_json_serializable(
st.session_state.admin_persistent_xmins_overrides
)
with open("admin_persistent_xmins_overrides.json", "w", encoding="utf-8") as f:
json.dump(admin_xmins_data, f, indent=4, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
# Save admin-persistent availability multipliers
availability_data = make_json_serializable(
st.session_state.admin_persistent_availability_multipliers
)
with open(
"admin_persistent_availability_multipliers.json", "w", encoding="utf-8"
) as f:
json.dump(availability_data, f, indent=4, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
share_data = make_json_serializable(
st.session_state.admin_persistent_share_overrides
)
try:
with open(
"admin_persistent_share_overrides.json", "w", encoding="utf-8"
) as f:
json.dump(share_data, f, indent=4, ensure_ascii=False)
except Exception as e:
st.error(f"Error saving share overrides: {e}")
return True
except Exception as e:
st.error(f"Error saving admin overrides: {e}")
return False
def load_admin_overrides():
"""
Load ONLY admin-controlled overrides from JSON files.
user_xmins_overrides (session-only) are NOT loaded from file.
"""
try:
# Load baseline overrides
if (
os.path.exists("user_baseline_overrides.json")
and os.path.getsize("user_baseline_overrides.json") > 0
):
with open("user_baseline_overrides.json", "r", encoding="utf-8") as f:
loaded_baselines = json.load(f)
# Convert string keys back to integers
st.session_state.user_baseline_overrides = {
int(pid): stat_dict for pid, stat_dict in loaded_baselines.items()
}
else:
st.session_state.user_baseline_overrides = {}
# Load player status overrides
if (
os.path.exists("user_player_status_overrides.json")
and os.path.getsize("user_player_status_overrides.json") > 0
):
with open("user_player_status_overrides.json", "r", encoding="utf-8") as f:
loaded_status = json.load(f)
st.session_state.user_player_status_overrides = {
int(pid): status_dict for pid, status_dict in loaded_status.items()
}
else:
st.session_state.user_player_status_overrides = {}
# Load player penalty shares
if (
os.path.exists("player_penalty_shares.json")
and os.path.getsize("player_penalty_shares.json") > 0
):
with open("player_penalty_shares.json", "r", encoding="utf-8") as f:
loaded_penalty_shares = json.load(f)
st.session_state.player_penalty_shares = {
int(pid): share for pid, share in loaded_penalty_shares.items()
}
else:
# Default penalty shares if file not found
st.session_state.player_penalty_shares = {
16: 0.65,
17: 0.15,
30: 0.05,
666: 0.3,
48: 0.4,
64: 0.7,
81: 0.9,
97: 0.25,
136: 0.8,
121: 0.09,
178: 0.8,
158: 0.05,
202: 0.25,
215: 0.6,
216: 0.02,
235: 0.9,
249: 0.1,
266: 0.6,
267: 0.04,
283: 0.4,
299: 0.85,
311: 0.1,
310: 0.1,
337: 0.6,
327: 0.55,
343: 0.4,
362: 0.7,
381: 0.95,
382: 0.1,
386: 0.05,
430: 0.95,
413: 0.15,
449: 0.9,
119: 0.1,
450: 0.05,
499: 0.85,
485: 0.2,
474: 0.02,
525: 0.85,
515: 0.25,
596: 0.9,
612: 0.8,
624: 0.25,
625: 0.04,
647: 0.1,
654: 0.85,
}
# Load rates config
if (
os.path.exists("rates_config.json")
and os.path.getsize("rates_config.json") > 0
):
with open("rates_config.json", "r", encoding="utf-8") as f:
rates_config = json.load(f)
st.session_state.decay_rates = rates_config.get(
"decay_rates", st.session_state.decay_rates
)
st.session_state.ramp_up_rates = rates_config.get(
"ramp_up_rates", st.session_state.ramp_up_rates
)
st.session_state.RAMP_UP_PERIOD = rates_config.get(
"RAMP_UP_PERIOD", st.session_state.RAMP_UP_PERIOD
)
st.session_state.MINS_THRESHOLD = rates_config.get(
"MINS_THRESHOLD", st.session_state.MINS_THRESHOLD
)
# Load admin-persistent xMins overrides
if (
os.path.exists("admin_persistent_xmins_overrides.json")
and os.path.getsize("admin_persistent_xmins_overrides.json") > 0
):
with open(
"admin_persistent_xmins_overrides.json", "r", encoding="utf-8"
) as f:
loaded_xmins = json.load(f)
st.session_state.admin_persistent_xmins_overrides = {
int(pid): {int(gw): val for gw, val in gw_dict.items()}
for pid, gw_dict in loaded_xmins.items()
}
else:
st.session_state.admin_persistent_xmins_overrides = {}
if (
os.path.exists("admin_persistent_availability_multipliers.json")
and os.path.getsize("admin_persistent_availability_multipliers.json") > 0
):
with open(
"admin_persistent_availability_multipliers.json", "r", encoding="utf-8"
) as f:
loaded_availability = json.load(f)
st.session_state.admin_persistent_availability_multipliers = {
int(pid): {int(gw): val for gw, val in gw_dict.items()}
for pid, gw_dict in loaded_availability.items()
}
else:
st.session_state.admin_persistent_availability_multipliers = {}
# Load share overrides
try:
if os.path.exists("admin_persistent_share_overrides.json"):
with open(
"admin_persistent_share_overrides.json", "r", encoding="utf-8"
) as f:
st.session_state.admin_persistent_share_overrides = json.load(f)
else:
st.session_state.admin_persistent_share_overrides = {}
except Exception as e:
st.error(f"Error loading share overrides: {e}")
st.session_state.admin_persistent_share_overrides = {}
return True
except Exception as e:
st.error(f"Error loading admin overrides: {e}")
return False
# --- Data Loading and Initial Processing Functions ---
# ORIGINAL FUNCTION: merge_player_baseline_stats
@st.cache_data(ttl=3600) # Cache for 24 hours
def load_baseline_stats_cached(gk_path, outfield_path):
"""Cache the baseline stats CSVs"""
gk_stats_df = pd.read_csv(gk_path)
outfield_stats_df = pd.read_csv(outfield_path)
# Strip columns here too
gk_stats_df.columns = gk_stats_df.columns.str.strip()
outfield_stats_df.columns = outfield_stats_df.columns.str.strip()
gk_stats_df["player_name"] = gk_stats_df["player_name"].str.strip()
outfield_stats_df["player_name"] = outfield_stats_df["player_name"].str.strip()
return gk_stats_df, outfield_stats_df
def merge_player_baseline_stats(
df, gk_stats_csv_path, outfield_stats_csv_path, gk_element_type=1
):
main_df = df.copy()
try:
gk_stats_df, outfield_stats_df = load_baseline_stats_cached(
gk_stats_csv_path, outfield_stats_csv_path
)
except FileNotFoundError:
st.error(
f"Error: {gk_stats_csv_path} not found. Please ensure it's in the same directory as the app."
)
return None # Indicate failure
except Exception as e:
st.error(f"Error loading {gk_stats_csv_path}: {e}")
return None
try:
outfield_stats_df = pd.read_csv(outfield_stats_csv_path)
except FileNotFoundError:
st.error(
f"Error: {outfield_stats_csv_path} not found. Please ensure it's in the same directory as the app."
)
return None # Indicate failure
except Exception as e:
st.error(f"Error loading {outfield_stats_csv_path}: {e}")
return None
main_df.columns = main_df.columns.str.strip()
main_df["name"] = main_df["name"].str.strip()
gk_mask = main_df["element_type"] == gk_element_type
gk_players = main_df[gk_mask].copy()
outfield_players = main_df[~gk_mask].copy()
gk_merged = gk_players.merge(
gk_stats_df, left_on="name", right_on="player_name", how="left"
)
outfield_merged = outfield_players.merge(
outfield_stats_df, left_on="name", right_on="player_name", how="left"
)
gk_baseline_cols = [
col for col in gk_stats_df.columns if col.startswith("baseline_")
]
outfield_baseline_cols = [
col for col in outfield_stats_df.columns if col.startswith("baseline_")
]
for col in outfield_baseline_cols:
if col not in gk_merged.columns:
gk_merged[col] = np.nan
for col in gk_baseline_cols:
if col not in outfield_merged.columns:
outfield_merged[col] = np.nan
if "player_name" in gk_merged.columns:
gk_merged = gk_merged.drop("player_name", axis=1)
if "player_name" in outfield_merged.columns:
outfield_merged = outfield_merged.drop("player_name", axis=1)
final_df = pd.concat([gk_merged, outfield_merged], ignore_index=True)
final_df = final_df.sort_values("id").reset_index(drop=True)
# Fill NaNs specifically for baseline columns with 0.
all_baseline_cols_combined = list(set(gk_baseline_cols + outfield_baseline_cols))
for col in all_baseline_cols_combined:
if col in final_df.columns:
final_df[col] = final_df[col].fillna(0)
return final_df
@st.cache_data(ttl=3600)
def load_data_and_setup_initial_df():
# Load or create dummy players.csv
r = fetch_fpl_data_cached()
players = pd.DataFrame(r["elements"])
players["name"] = players["first_name"] + " " + players["second_name"]
# Removed 'chance_of_playing_next_round' and 'chance_of_playing_this_round' from drops
players = players.drop(
columns=[
"can_transact",
"can_select",
"cost_change_event",
"cost_change_event_fall",
"cost_change_start",
"cost_change_start_fall",
"dreamteam_count",
"ep_next",
"ep_this",
"form",
"in_dreamteam",
"news_added",
"photo",
"removed",
"special",
"transfers_in",
"transfers_in_event",
"transfers_out",
"transfers_out_event",
"value_form",
"value_season",
"region",
"team_join_date",
"birth_date",
"now_cost_rank",
"now_cost_rank_type",
"form_rank",
"form_rank_type",
"points_per_game_rank",
"points_per_game_rank_type",
"selected_rank",
"selected_rank_type",
"selected_by_percent",
"code",
"penalties_text",
"has_temporary_code",
"first_name",
"second_name",
]
)
columns_order = [
"id",
"name",
"web_name",
"element_type",
"now_cost",
"team",
"chance_of_playing_this_round", # Keep this for info, but not directly for decay
"news",
]
players = players[columns_order]
players["now_cost"] = players["now_cost"] / 10
# Load or create dummy rename.json
try:
with open("rename.json", "r", encoding="utf-8") as file:
rename_dict = json.load(file)
except FileNotFoundError:
st.error(
"Error: rename.json not found. Please ensure it's in the same directory as the app."
)
return None
except Exception as e:
st.error(f"Error loading rename.json: {e}")
return None
players["name"] = players["name"].replace(rename_dict)
# Merge player baseline statistics
finalized_df = merge_player_baseline_stats(
players,
"statistical_weighted_baselines_gk.csv",
"statistical_weighted_baselines.csv",
gk_element_type=1,
)
if finalized_df is None: # Propagate error if merge_player_baseline_stats failed
return None
# Calculate Avg_BPS
finalized_df["Avg_BPS"] = 0.0
finalized_df.loc[finalized_df["element_type"] == 1, "Avg_BPS"] = finalized_df[
"baseline_gk_bps_p90"
].astype(float)
finalized_df.loc[finalized_df["element_type"] == 2, "Avg_BPS"] = (
finalized_df["baseline_Neutral_BPS_p90"] + finalized_df["baseline_Def_BPS_p90"]
)
finalized_df.loc[finalized_df["element_type"] == 3, "Avg_BPS"] = (
finalized_df["baseline_Neutral_BPS_p90"] + finalized_df["baseline_Mid_BPS_p90"]
)
finalized_df.loc[finalized_df["element_type"] == 4, "Avg_BPS"] = (
finalized_df["baseline_Neutral_BPS_p90"] + finalized_df["baseline_Fwd_BPS_p90"]
)
return finalized_df
@st.cache_data(ttl=3600) # Cache for 24 hours
def load_team_baselines_cached():
"""Cache the team baselines Excel file"""
team_baselines = pd.read_excel("team_totals.xlsx", sheet_name="Sheet2")
return team_baselines
@st.cache_data
def load_data_and_setup_initial_df_2(finalized_df):
teams_dict = TEAMS_DICT
teams_dict_1 = TEAMS_DICT_REVERSE
# Load team_totals.xlsx from file
try:
team_baselines = load_team_baselines_cached()
team_baselines["Teams"] = team_baselines["Teams"].replace(teams_dict)
except FileNotFoundError:
st.error(
"Error: team_totals.xlsx not found. Please ensure it's in the same directory as the app."
)
return None, None, None, None # Indicate failure
except Exception as e:
st.error(f"Error loading team_totals.xlsx: {e}")
return None, None, None, None
# Map team stats to players (these are for calculating shares later)
finalized_df["Team_xG"] = finalized_df["team"].map(
team_baselines.set_index("Teams")["xG"].to_dict()
)
finalized_df["Team_xA"] = finalized_df["team"].map(
team_baselines.set_index("Teams")["xA"].to_dict()
)
finalized_df["Team_xCBIT"] = finalized_df["team"].map(
team_baselines.set_index("Teams")["CBIT"].to_dict()
)
finalized_df["Team_xCBITR"] = finalized_df["team"].map(
team_baselines.set_index("Teams")["CBITR"].to_dict()
)
finalized_df["Team_YC"] = finalized_df["team"].map(
team_baselines.set_index("Teams")["YC"].to_dict()
)
# FIX: Changed 'df' to 'finalized_df' in the next 6 lines
finalized_df["Team_RC"] = finalized_df["team"].map(
team_baselines.set_index("Teams")["RC"].to_dict()
)
# Load ewmapois_model.csv (match data) from file
try:
match_df = load_fixture_projections_cached()
except FileNotFoundError:
st.error(
"Error: ewmapois_model.csv not found. Please ensure it's in the same directory as the app."
)
return None, None, None, None # Indicate failure
except Exception as e:
st.error(f"Error loading ewmapois_model.csv: {e}")
return None, None, None, None
# Map team names to numbers for match_df
match_df["home_team_num"] = match_df["home_team"].map(teams_dict)
match_df["away_team_num"] = match_df["away_team"].map(teams_dict)
# Ensure 'Team' column in finalized_df is string names, not IDs
finalized_df["Team"] = finalized_df["team"].map(teams_dict_1)
# Recalculate share percentages, handling division by zero for team totals
# Using .replace(0, np.nan) to prevent ZeroDivisionError, then fill NaNs
finalized_df["xG_share"] = finalized_df["baseline_xG_p90"] / finalized_df[
"Team_xG"
].replace(0, np.nan)
finalized_df["xA_share"] = finalized_df["baseline_xA_p90"] / finalized_df[
"Team_xA"
].replace(0, np.nan)
finalized_df["xCBIT_share"] = finalized_df["baseline_CBIT_p90"] / finalized_df[
"Team_xCBIT"
].replace(0, np.nan)
finalized_df["xCBITR_share"] = finalized_df["baseline_CBITR_p90"] / finalized_df[
"Team_xCBITR"
].replace(0, np.nan)
finalized_df["YC_share"] = finalized_df["baseline_yc_p90"] / finalized_df[
"Team_YC"
].replace(0, np.nan)
finalized_df["RC_share"] = finalized_df["baseline_rc_p90"] / finalized_df[
"Team_RC"
].replace(0, np.nan)
# Fill NaNs resulting from division by zero with 0 or a reasonable default
finalized_df.fillna(0, inplace=True)
# Recalculate shares after initial data load
# This call was moved after the relevant finalized_df share calculations above
# to ensure finalized_df already has the Team_xG, Team_xA, etc. columns.
# The recalculate_player_shares function works on a copy, so it's safe.
finalized_df = recalculate_player_shares(finalized_df, team_baselines)
st.session_state.finalized_df = finalized_df.copy()
return finalized_df, match_df, teams_dict, teams_dict_1
def poisson_probability_of_conceding_2_or_more_goals(lambd):
"""Calculates the probability of conceding 2 or more goals using Poisson distribution."""
p_0 = math.exp(-lambd)
p_1 = lambd * math.exp(-lambd)
return 1 - p_0 - p_1
def poisson_pmf(k, lambd):
"""Calculates the Poisson Probability Mass Function P(X=k)."""
if k < 0:
return 0.0
if lambd < 1e-9: # Treat very small lambda as zero for stability
return 1.0 if k == 0 else 0.0
return (lambd**k * math.exp(-lambd)) / math.factorial(k)
def neg_binom_probability_of_value(expected_mean, value, dispersion=1.0):
"""
Calculates the exact probability (PMF) of getting exactly 'value' events.
Used for: Saves, Goals, Assists.
"""
if expected_mean <= 0:
return 0.0
if dispersion <= 1.0: # Fallback to Poisson if no dispersion
return poisson_pmf(value, expected_mean)
# Convert Mean + Dispersion to n, p
p = 1 / dispersion
n = (expected_mean * p) / (1 - p)
return nbinom.pmf(value, n, p)
def neg_binom_probability_at_least(expected_mean, threshold, dispersion=1.0):
"""
Calculates probability of getting 'threshold' OR MORE events.
Used for: DefCons (CBIT), Recoveries.
"""
if expected_mean <= 0:
return 0.0
if dispersion <= 1.0:
# Use existing Poisson logic if dispersion is low
return 1 - poisson_cdf(threshold - 1, expected_mean)
p = 1 / dispersion
n = (expected_mean * p) / (1 - p)
# Probability of X >= threshold is (1 - CDF(threshold - 1))
return 1 - nbinom.cdf(threshold - 1, n, p)
def calculate_expected_conceded_points(lambd):
"""
Calculates the expected fantasy points from goals conceded based on a
-1 point penalty for every 2 goals.
"""
total_expected_points = 0
# We check up to 10 goals, which is a safe upper limit for a single match.
max_goals_to_check = 10
for k in range(max_goals_to_check + 1):
# Calculate the probability of conceding exactly k goals
prob_k = poisson_pmf(k=k, lambd=lambd)
# Determine the fantasy points for this outcome
# Integer division (//) is perfect for this rule.
# 0//2=0, 1//2=0, 2//2=1, 3//2=1, 4//2=2, etc.
points_for_k_goals = -(k // 2)
# Add the weighted value (probability * points) to the total
total_expected_points += prob_k * points_for_k_goals
return total_expected_points
def poisson_cdf(k, lambd):
"""Calculates the Poisson Cumulative Distribution Function P(X<=k)."""
if k < 0:
return 0.0
if lambd < 1e-9: # Treat very small lambda as zero for stability
return 1.0 if k >= 0 else 0.0
return sum(poisson_pmf(i, lambd) for i in range(math.floor(k) + 1))
def recalculate_player_shares(finalized_df_copy, team_baselines):
"""
Recalculates player-specific shares of team statistics.
This function should be called whenever baseline stats are updated.
"""
df = finalized_df_copy.copy() # Work on a copy
# Ensure team_baselines are correctly mapped to player df
teams_dict = TEAMS_DICT
# Ensure team_baselines has 'Teams' mapped to numbers as it is used for mapping
team_baselines_mapped = team_baselines.copy()
team_baselines_mapped["Teams"] = team_baselines_mapped["Teams"].replace(teams_dict)
# Map team stats to players (these are for calculating shares later)
df["Team_xG"] = df["team"].map(
team_baselines_mapped.set_index("Teams")["xG"].to_dict()
)
df["Team_xA"] = df["team"].map(
team_baselines_mapped.set_index("Teams")["xA"].to_dict()
)
df["Team_xCBIT"] = df["team"].map(
team_baselines_mapped.set_index("Teams")["CBIT"].to_dict()
)
df["Team_xCBITR"] = df["team"].map(
team_baselines_mapped.set_index("Teams")["CBITR"].to_dict()
)
df["Team_YC"] = df["team"].map(
team_baselines_mapped.set_index("Teams")["YC"].to_dict()
)
df["Team_RC"] = df["team"].map(
team_baselines_mapped.set_index("Teams")["RC"].to_dict()
)
# Calculate share percentages, handling division by zero for team totals
# Using .replace(0, np.nan) to prevent ZeroDivisionError, then fill NaNs
df["xG_share"] = df["baseline_xG_p90"] / df["Team_xG"].replace(0, np.nan)
df["xA_share"] = df["baseline_xA_p90"] / df["Team_xA"].replace(0, np.nan)
df["xCBIT_share"] = df["baseline_CBIT_p90"] / df["Team_xCBIT"].replace(0, np.nan)
df["xCBITR_share"] = df["baseline_CBITR_p90"] / df["Team_xCBITR"].replace(0, np.nan)
df["YC_share"] = df["baseline_yc_p90"] / df["Team_YC"].replace(0, np.nan)
df["RC_share"] = df["baseline_rc_p90"] / df["Team_RC"].replace(0, np.nan)
# Fill NaNs resulting from division by zero with 0 or a reasonable default
df.fillna(0, inplace=True) # Ensure no NaNs from division by zero cause issues
return df
def apply_team_skepticism(df, skepticism_factors):
"""
Applies a skepticism multiplier to a player's base points based on their team.
"""
if not skepticism_factors:
return df
for team_id, multiplier in skepticism_factors.items():
# Get player IDs for the specified team
players_on_team = df[df["team"] == team_id].index
# Apply the multiplier to the base_pts
df.loc[players_on_team, "base_pts"] *= multiplier
return df
# --- Main Calculation Logic (encapsulated) ---
def get_df_hash(df):
"""Create a stable hash for a DataFrame"""
return hashlib.md5(pd.util.hash_pandas_object(df, index=True).values).hexdigest()
def calculate_single_match_points(
player,
match_row,
xMins_in_match,
points_config,
is_gk=False,
is_def=False,
is_mid=False,
is_fwd=False,
):
"""
Calculates points for a single match given the xMins and match projections.
Includes full logic for CBIT, CBITR, Penalty Saves, and dynamic BPS.
"""
if xMins_in_match <= 0:
return 0.0
scaling_factor = xMins_in_match / 90.0
player_team_num = player["team"]
player_pos = player["element_type"]
# 1. Identify Home/Away and get Opponent Stats
if player_team_num == match_row["home_team_num"]:
team_proj_goals = match_row["mc_home_goals_mean"]
team_conc_goals = match_row["mc_away_goals_mean"]
team_proj_assists = match_row["mc_home_assists_xa_mean"]
team_proj_cbit = match_row["mc_home_CBIT_mean"]
team_proj_cbitr = match_row["mc_home_CBITR_mean"]
team_proj_saves = match_row["mc_home_keeper_saves_mean"]
team_proj_yc = match_row["mc_home_yc_mean"]
team_proj_rc = match_row["mc_home_rc_mean"]
cs_odds = match_row["home_clean_sheet_odds"]
else:
team_proj_goals = match_row["mc_away_goals_mean"]
team_conc_goals = match_row["mc_home_goals_mean"]
team_proj_assists = match_row["mc_away_assists_xa_mean"]
team_proj_cbit = match_row["mc_away_CBIT_mean"]
team_proj_cbitr = match_row["mc_away_CBITR_mean"]
team_proj_saves = match_row["mc_away_keeper_saves_mean"]
team_proj_yc = match_row["mc_away_yc_mean"]
team_proj_rc = match_row["mc_away_rc_mean"]
cs_odds = match_row["away_clean_sheet_odds"]
# 2. Player Share Calculations
proj_goals = player["xG_share"] * team_proj_goals
proj_assists = player["xA_share"] * team_proj_assists
# CBIT / CBITR Projections
proj_cbit = player["xCBIT_share"] * team_proj_cbit
proj_cbitr = player["xCBITR_share"] * team_proj_cbitr
# GK Specific Projections
proj_saves = 0
proj_pen_saves = 0
if is_gk:
proj_saves = (player["baseline_xSaves_p90"] + team_proj_saves) / 2
proj_pen_saves = player["baseline_pksave_p90"]
# --- GOALS (Poisson) ---
pts_goals = 0.0
for k in range(9): # Check 0 to 8 goals
prob = poisson_pmf(k, proj_goals)
pts_per_goal = points_config["goal"][player_pos]
pts_goals += prob * k * pts_per_goal
pts_goals *= scaling_factor
# --- ASSISTS (Poisson) ---
pts_assists = 0.0
for k in range(9):
prob = poisson_pmf(k, proj_assists)
pts_assists += prob * k * points_config["assist"]
pts_assists *= scaling_factor
# --- CLEAN SHEET ---
pts_cs = 0.0
if xMins_in_match >= 60:
pts_cs = cs_odds * points_config["clean_sheet"][player_pos]
else:
pts_cs = (cs_odds * points_config["clean_sheet"][player_pos]) * scaling_factor
# --- CONCEDED ---
pts_conc = 0.0
if (is_gk or is_def) and team_conc_goals is not None:
# Expected points usually negative, calculated via Poisson
raw_conc = calculate_expected_conceded_points(team_conc_goals)
pts_conc = raw_conc * scaling_factor
# --- CARDS ---
pts_yc = (player["YC_share"] * team_proj_yc * -1) * scaling_factor
pts_rc = (player["RC_share"] * team_proj_rc * -3) * scaling_factor
# --- SAVES (GK) ---
pts_saves = 0.0
if is_gk:
# Saves points (1 pt per 3 saves)
# Using Negative Binomial approximation for saves distribution
expected_saves_pts_unscaled = 0.0
for k_saves in range(21):
prob_k = neg_binom_probability_of_value(proj_saves, k_saves, dispersion=1.5)
pts_k = (k_saves // 3) * points_config["saves_per_3"]
expected_saves_pts_unscaled += prob_k * pts_k
pts_saves = expected_saves_pts_unscaled * scaling_factor
# --- PENALTY SAVES (GK) ---
pts_pen_save = 0.0
if is_gk:
expected_pen_saved_pts_unscaled = 0.0
for k_pen in range(3):
prob_k = poisson_pmf(k_pen, proj_pen_saves)
pts_k = k_pen * 5
expected_pen_saved_pts_unscaled += prob_k * pts_k
pts_pen_save = expected_pen_saved_pts_unscaled * scaling_factor
# --- CBIT (Defenders) ---
pts_cbit = 0.0
if is_def:
cbit_threshold = 10
prob_hit = neg_binom_probability_at_least(
proj_cbit, cbit_threshold, dispersion=3.2
)
pts_cbit = prob_hit * 2 * scaling_factor
# --- CBITR (Mids/Fwds) ---
pts_cbitr = 0.0
if is_mid:
cbitr_threshold = 12
prob_hit = neg_binom_probability_at_least(
proj_cbitr, cbitr_threshold, dispersion=2.8
)
pts_cbitr = prob_hit * 2 * scaling_factor
elif is_fwd:
cbitr_threshold = 12
prob_hit = neg_binom_probability_at_least(
proj_cbitr, cbitr_threshold, dispersion=1.7
)
pts_cbitr = prob_hit * 2 * scaling_factor
# --- PENALTY POINTS (Taker) ---
pts_penalty = 0.0
# Assuming standard calculation or passed via config.
# If you have specific penalty taker logic, ensure 'penalty_pts_raw' is passed or calculated here.
# For now, using the basic structure from your loop:
if player["id"] in st.session_state.player_penalty_shares:
pen_share = st.session_state.player_penalty_shares[player["id"]]
base_pen_pts = points_config["penalty_points_per_position"].get(player_pos, 0)
pts_penalty = (base_pen_pts * pen_share) * scaling_factor
# --- APPEARANCE (Per Match) ---
pts_app = 0.0
if xMins_in_match > 60:
pts_app = 2
elif xMins_in_match > 0:
pts_app = 1
# --- BONUS POINTS (Detailed Reconstruction) ---
# 1. Floor
bps_floor = player["baseline_bps_floor_p90"] * scaling_factor
# 2. Mins Played BPS
bps_mins = 6 if xMins_in_match >= 60 else (3 if xMins_in_match > 0 else 0)
# 3. Events BPS (Approximation using Expected Counts)
# Using the RAW projected counts (scaled) to estimate BPS
scaled_goals = proj_goals * scaling_factor
scaled_assists = proj_assists * scaling_factor
scaled_saves = proj_saves * scaling_factor if is_gk else 0
scaled_pen_saves = proj_pen_saves * scaling_factor if is_gk else 0
scaled_yc = player["YC_share"] * team_proj_yc * scaling_factor
scaled_rc = player["RC_share"] * team_proj_rc * scaling_factor
bps_goals = 0
if is_fwd:
bps_goals = scaled_goals * 24
elif is_mid:
bps_goals = scaled_goals * 18
else:
bps_goals = scaled_goals * 12 # Def/GK
bps_assists = scaled_assists * 9
bps_cs = 0
if (is_gk or is_def) and xMins_in_match >= 60:
# Expected CS BPS = Probability of CS * 12 BPS
bps_cs = cs_odds * 12
bps_saves = scaled_saves * 2
bps_pen_saves = scaled_pen_saves * 15
bps_cards = (scaled_yc * -3) + (scaled_rc * -9)
total_projected_bps = (
bps_floor
+ bps_mins
+ bps_goals
+ bps_assists
+ bps_cs
+ bps_saves
+ bps_pen_saves
+ bps_cards
)
pts_bonus = 0.0
if not is_gk:
# Your specific formula: Total BPS / 29.4
pts_bonus = total_projected_bps / 29.4
# --- FINAL SUM ---
total_match_pts = (
pts_goals
+ pts_assists
+ pts_cs
+ pts_conc
+ pts_yc
+ pts_rc
+ pts_saves
+ pts_pen_save
+ pts_cbit
+ pts_cbitr
+ pts_penalty
+ pts_app
+ pts_bonus
)
return total_match_pts
# @st.cache_data(show_spinner=False, hash_funcs={pd.DataFrame: get_df_hash})
def calculate_all_points(
player_df_base,
match_df,
player_penalty_shares,
MINS_SCALING_BONUS,
pos_map,
teams_dict_1,
teams_dict,
points_config,
effective_xmins_overrides, # Renamed parameter to reflect it's the merged overrides
MINS_THRESHOLD, # Pass decay related parameters
RAMP_UP_PERIOD, # Pass decay related parameters
decay_rates, # Pass decay related parameters
ramp_up_rates, # Pass decay related parameters
user_player_status_overrides, # Pass player status overrides
team_skepticism,
effective_availability_multipliers,
):
RAMP_UP_PERIOD = 3
player_df = player_df_base.copy() # Work on a copy of the base player_df
# Initialize the output DataFrame structure
final_df_output = pd.DataFrame(
{
"Pos": player_df["element_type"].map(pos_map),
"ID": player_df["id"],
"Name": player_df["web_name"],
"BV": player_df["now_cost"],
"SV": player_df["now_cost"],
"Team": player_df["Team"],
}
)
# Persistence Setup
continuous_xMins_progression = player_df["baseline_xMins"].copy()
has_baseline_xmins_override = getattr(player_df, "attrs", {}).get(
"has_baseline_xmins_override", False
)
all_baseline_overrides = getattr(player_df, "attrs", {}).get(
"all_baseline_overrides", {}
)
unique_gws = sorted(match_df["GW"].unique())
st.session_state.group_cache = {}
# --- MAIN GAMEWEEK LOOP ---
for gw_idx, gw in enumerate(unique_gws):
# 1. Apply Baseline Overrides (First GW only)
if has_baseline_xmins_override and gw == 1:
for index, player in player_df.iterrows():
player_id = player["id"]
if (
player_id in all_baseline_overrides
and "baseline_xMins" in all_baseline_overrides[player_id]
):
continuous_xMins_progression.loc[index] = all_baseline_overrides[
player_id
]["baseline_xMins"]
# 2. Setup GW DataFrame
gw_calc_df = pd.DataFrame(index=player_df.index)
gw_calc_df["team"] = player_df["team"]
gw_calc_df["id"] = player_df["id"]
gw_calc_df["web_name"] = player_df["web_name"]
gw_calc_df["player_name"] = player_df["name"]
gw_calc_df["xG_share"] = player_df["xG_share"]
gw_calc_df["xA_share"] = player_df["xA_share"]
gw_calc_df["baseline_xMins"] = player_df["baseline_xMins"]
gw_calc_df["baseline_bps_floor_p90"] = player_df["baseline_bps_floor_p90"]
gw_calc_df["base_pts"] = 0.0
xMins_for_current_gw_display = pd.Series(index=player_df.index, dtype=float)
next_gw_continuous_xMins = pd.Series(index=player_df.index, dtype=float)
# ============================================================
# VECTORIZED XMINS CALCULATION (UNCHANGED)
# ============================================================
player_ids_array = player_df["id"].values
n_players = len(player_ids_array)
status_list = [
user_player_status_overrides.get(pid, {"status": "default"})["status"]
for pid in player_ids_array
]
weeks_out_list = [
user_player_status_overrides.get(pid, {}).get("weeks_out", 0)
for pid in player_ids_array
]
status_array = np.array(status_list, dtype=object)
weeks_out_array = np.array(weeks_out_list)
is_not_starter = status_array == "not_a_starter"
is_suspended = status_array == "suspended"
is_injured = status_array == "injured"
is_default = ~(is_not_starter | is_suspended | is_injured)
baseline_mins_array = player_df["baseline_xMins"].values
prev_continuous_xmins_array = continuous_xMins_progression.values
calculated_xmins_array = np.zeros(n_players, dtype=float)
next_continuous_xmins_array = np.zeros(n_players, dtype=float)
first_gw = min(unique_gws)
is_first_gw = gw == first_gw
is_available_first_gw = ~(is_not_starter | is_suspended | is_injured)
# CASE 1: First GW + Available
if is_first_gw:
mask_first_available = is_available_first_gw
calculated_xmins_array[mask_first_available] = baseline_mins_array[
mask_first_available
]
# CASE 2: Not a starter
calculated_xmins_array[is_not_starter] = 0
# CASE 3: Suspended
mask_suspended_during = is_suspended & (gw <= weeks_out_array)
mask_suspended_return = is_suspended & (gw == weeks_out_array + 1)
mask_suspended_after = is_suspended & (gw > weeks_out_array + 1)
calculated_xmins_array[mask_suspended_during] = 0
calculated_xmins_array[mask_suspended_return] = baseline_mins_array[
mask_suspended_return
]
decay_rate_susp = decay_rates.get("suspended", decay_rates.get("default", 0.99))
ramp_rate_susp = ramp_up_rates.get("suspended", ramp_up_rates.get("default", 0))
mask_susp_decay = mask_suspended_after & (
prev_continuous_xmins_array >= MINS_THRESHOLD
)
mask_susp_ramp = mask_suspended_after & (
prev_continuous_xmins_array < MINS_THRESHOLD
)
calculated_xmins_array[mask_susp_decay] = (
prev_continuous_xmins_array[mask_susp_decay] * decay_rate_susp
)
calculated_xmins_array[mask_susp_ramp] = np.minimum(
prev_continuous_xmins_array[mask_susp_ramp] + ramp_rate_susp, 90
)
# CASE 4: Injured
mask_injured_out = is_injured & (gw <= weeks_out_array)
calculated_xmins_array[mask_injured_out] = 0
mask_injured_recovering = is_injured & (gw > weeks_out_array)
weeks_since_injury_array = np.maximum(0, gw - weeks_out_array)
mask_ramp_phase = mask_injured_recovering & (
weeks_since_injury_array <= RAMP_UP_PERIOD
)
calculated_xmins_array[mask_ramp_phase] = (
baseline_mins_array[mask_ramp_phase] / RAMP_UP_PERIOD
) * weeks_since_injury_array[mask_ramp_phase]
mask_post_ramp = mask_injured_recovering & (
weeks_since_injury_array > RAMP_UP_PERIOD
)
decay_rate_default = decay_rates.get("default", 0.99)
ramp_rate_default = ramp_up_rates.get(
"default", ramp_up_rates.get("injured", 0)
)
mask_post_decay = mask_post_ramp & (
prev_continuous_xmins_array >= MINS_THRESHOLD
)
mask_post_ramp_up = mask_post_ramp & (
prev_continuous_xmins_array < MINS_THRESHOLD
)
calculated_xmins_array[mask_post_decay] = (
prev_continuous_xmins_array[mask_post_decay] * decay_rate_default
)
calculated_xmins_array[mask_post_ramp_up] = np.minimum(
prev_continuous_xmins_array[mask_post_ramp_up] + ramp_rate_default, 90
)
# CASE 5: Default/healthy
mask_default_calc = is_default & ~(is_first_gw & is_available_first_gw)
element_type_array = player_df["element_type"].values
is_gk = element_type_array == 1
mask_gk_default = mask_default_calc & is_gk
calculated_xmins_array[mask_gk_default] = prev_continuous_xmins_array[
mask_gk_default
]
mask_outfield_default = mask_default_calc & (~is_gk)
mask_outf_decay = mask_outfield_default & (
prev_continuous_xmins_array >= MINS_THRESHOLD
)
calculated_xmins_array[mask_outf_decay] = (
prev_continuous_xmins_array[mask_outf_decay] * decay_rate_default
)
mask_outf_ramp = (
mask_outfield_default
& (prev_continuous_xmins_array < MINS_THRESHOLD)
& (baseline_mins_array > 0)
)
calculated_xmins_array[mask_outf_ramp] = np.minimum(
prev_continuous_xmins_array[mask_outf_ramp] + ramp_rate_default, 90
)
calculated_xmins_array = np.clip(calculated_xmins_array, 0, 90)
next_continuous_xmins_array = calculated_xmins_array.copy()
# ==============================================
# APPLY OVERRIDES AND AVAILABILITY
# ==============================================
xMins_for_current_gw_display = calculated_xmins_array.copy()
for idx in range(n_players):
player_id = player_ids_array[idx]
# Apply Availability
availability_mult = 1.0
if player_id in effective_availability_multipliers:
if gw in effective_availability_multipliers[player_id]:
availability_mult = effective_availability_multipliers[player_id][
gw
]
xMins_for_current_gw_display[idx] *= availability_mult
# Apply Manual Overrides
if player_id in effective_xmins_overrides:
if gw in effective_xmins_overrides[player_id]:
xMins_for_current_gw_display[idx] = effective_xmins_overrides[
player_id
][gw]
xMins_for_current_gw_display = pd.Series(
xMins_for_current_gw_display, index=player_df.index
)
next_gw_continuous_xMins = pd.Series(
next_continuous_xmins_array, index=player_df.index
)
# Assign calculated xMins to DataFrame for the current GW
gw_calc_df[f"{gw}_xMins"] = xMins_for_current_gw_display
# ============================================================
# STREAMLINED MATCH SCORING LOOP (NEW FEATURE)
# ============================================================
gw_matches = match_df[match_df["GW"] == gw]
for index, player in player_df.iterrows():
player_team_num = player["team"]
# Find matches for this player in this GW
my_matches = gw_matches[
(gw_matches["home_team_num"] == player_team_num)
| (gw_matches["away_team_num"] == player_team_num)
]
# If Blank GW, score is 0
if my_matches.empty:
gw_calc_df.loc[index, "base_pts"] = 0
gw_calc_df.loc[index, f"{gw}_xMins"] = 0
continue
# --- DGW FATIGUE LOGIC ---
# Get the base xMins calculated for this week (e.g. 90)
base_gw_mins = gw_calc_df.loc[index, f"{gw}_xMins"]
mins_per_match = base_gw_mins # Default for Single GW
# If DGW (match count > 1) AND player is expected to play significant mins:
# We apply a 0.95 factor per match to simulate rotation/fatigue.
# E.g. 90 mins becomes 85.5 mins PER MATCH (171 total), effectively capping 180.
if len(my_matches) > 1 and base_gw_mins > 35:
mins_per_match = base_gw_mins * 0.97
# Sum points for all matches in the gameweek
total_gw_pts = 0.0
for _, match_row in my_matches.iterrows():
# Call the Helper Function
pts = calculate_single_match_points(
player=player,
match_row=match_row,
xMins_in_match=mins_per_match,
points_config=points_config,
is_gk=(player["element_type"] == 1),
is_def=(player["element_type"] == 2),
is_mid=(player["element_type"] == 3),
is_fwd=(player["element_type"] == 4),
)
total_gw_pts += pts
gw_calc_df.loc[index, "base_pts"] = total_gw_pts
# Apply Team Skepticism
gw_calc_df = apply_team_skepticism(gw_calc_df, team_skepticism)
# Final Total Points for this GW
gw_calc_df["total_pts"] = gw_calc_df["base_pts"]
# Store in Final Output
final_df_output[f"{gw}_xMins"] = round(gw_calc_df[f"{gw}_xMins"], 0)
final_df_output[f"{gw}_Pts"] = round(gw_calc_df["total_pts"], 2)
# Update progression for next loop
continuous_xMins_progression = next_gw_continuous_xMins.copy()
# Calculate Totals and Averages
final_df_output["Total Points"] = final_df_output.filter(like="_Pts").sum(axis=1)
final_df_output["Average Points"] = round(
(final_df_output.filter(like="_Pts").sum(axis=1)) / len(unique_gws), 2
)
return final_df_output
def get_modified_finalized_df():
"""Function to apply baseline overrides to the finalized_df"""
if st.session_state.finalized_df is None:
return None
modified_df = st.session_state.finalized_df.copy()
direct_override_stats = [
"baseline_xMins",
"baseline_pksave_p90",
"Avg_BPS",
] # Avg_BPS is also a direct override
all_overrides = st.session_state.user_baseline_overrides.copy()
has_baseline_xmins_override = any(
"baseline_xMins" in overrides for overrides in all_overrides.values()
)
for player_id, overrides in st.session_state.user_baseline_overrides.items():
mask = modified_df["id"] == player_id
if mask.any():
for stat_name, override_value in overrides.items():
if stat_name in modified_df.columns:
if stat_name in direct_override_stats:
# Apply direct value override
modified_df.loc[mask, stat_name] = override_value
else:
# Apply multiplier override
original_value = st.session_state.finalized_df[
st.session_state.finalized_df["id"] == player_id
].iloc[0][stat_name]
modified_df.loc[mask, stat_name] = (
original_value * override_value
)
# After applying direct and multiplier overrides for *all* baseline stats,
# now recalculate Avg_BPS for players where it hasn't been directly overridden.
# The 'modified_df' now contains the adjusted baseline stats for other metrics
# which will feed into this Avg_BPS calculation.
for index, row in modified_df.iterrows():
player_id = row["id"]
# Check if Avg_BPS was directly overridden by the admin.
# If it was, we DO NOT recalculate it; we use the overridden value already applied.
if (
player_id in st.session_state.user_baseline_overrides
and "Avg_BPS" in st.session_state.user_baseline_overrides[player_id]
):
# The direct override for Avg_BPS would have already been applied earlier in this function.
# So, we simply skip recalculation for this player.
continue
# If Avg_BPS was NOT directly overridden, then calculate it based on
# the (potentially adjusted by multipliers) other baseline stats.
if row["element_type"] == 1:
modified_df.loc[index, "Avg_BPS"] = row["baseline_gk_bps_p90"]
elif row["element_type"] == 2:
modified_df.loc[index, "Avg_BPS"] = (
row["baseline_Neutral_BPS_p90"] + row["baseline_Def_BPS_p90"]
)
elif row["element_type"] == 3:
modified_df.loc[index, "Avg_BPS"] = (
row["baseline_Neutral_BPS_p90"] + row["baseline_Mid_BPS_p90"]
)
elif row["element_type"] == 4:
modified_df.loc[index, "Avg_BPS"] = (
row["baseline_Neutral_BPS_p90"] + row["baseline_Fwd_BPS_p90"]
)
modified_df.attrs["has_baseline_xmins_override"] = has_baseline_xmins_override
modified_df.attrs["all_baseline_overrides"] = all_overrides
return modified_df
# Function to recalculate with current overrides
def recalculate_projections():
"""Recalculate projections with current overrides"""
# Get the dataframe with the applied baseline overrides
if st.session_state.finalized_df is None:
return
modified_finalized_df = get_modified_finalized_df()
# Recalculate player shares using the modified_finalized_df
# This is crucial for non-xMins baseline updates to take effect
modified_finalized_df = recalculate_player_shares(
modified_finalized_df, st.session_state.team_baselines
)
# Determine the effective xMins overrides to pass to calculate_all_points
# Admin-persistent overrides are the base, and session-only overrides are layered on top.
effective_xmins_overrides = {
pid: gw_data.copy()
for pid, gw_data in st.session_state.admin_persistent_xmins_overrides.items()
}
# Now, overlay user's session-only overrides (these are temporary and override if present)
for player_id, gw_data in st.session_state.user_xmins_overrides.items():
if player_id not in effective_xmins_overrides:
effective_xmins_overrides[player_id] = {}
effective_xmins_overrides[player_id].update(gw_data)
effective_availability_multipliers = {
pid: gw_data.copy()
for pid, gw_data in st.session_state.admin_persistent_availability_multipliers.items()
}
# Now, overlay user's session-only availability multipliers
for player_id, gw_data in st.session_state.user_availability_multipliers.items():
if player_id not in effective_availability_multipliers:
effective_availability_multipliers[player_id] = {}
effective_availability_multipliers[player_id].update(gw_data)
st.session_state.output_df = calculate_all_points(
modified_finalized_df, # Pass the *fully modified* DataFrame
st.session_state.match_df,
st.session_state.player_penalty_shares,
st.session_state.MINS_SCALING_BONUS,
st.session_state.pos_map,
st.session_state.teams_dict_1,
st.session_state.teams_dict,
st.session_state.points_config,
effective_xmins_overrides, # Pass the effective dictionary
st.session_state.MINS_THRESHOLD,
st.session_state.RAMP_UP_PERIOD,
st.session_state.decay_rates,
st.session_state.ramp_up_rates,
st.session_state.user_player_status_overrides,
st.session_state.team_skepticism,
effective_availability_multipliers,
)
def apply_loaded_baseline_overrides_on_startup():
"""
Initializes bps_std_devs from the base finalized_df.
It does NOT apply baseline overrides to finalized_df directly here;
that is handled by get_modified_finalized_df().
"""
if st.session_state.finalized_df is not None:
# Recalculate bps_std_devs based on the *pristine* finalized_df
# This is important for initial display before any user overrides are applied.
st.session_state.bps_std_devs = (
st.session_state.finalized_df.groupby("element_type")["Avg_BPS"]
.std()
.to_dict()
)
def update_sync_value_from_slider(sync_key, slider_key):
"""Updates a session state variable with the value from a slider."""
st.session_state[sync_key] = st.session_state[slider_key]
def update_sync_value_from_number_input(sync_key, num_input_key):
"""Updates a session state variable with the value from a number input."""
st.session_state[sync_key] = st.session_state[num_input_key]
# --- Streamlit Application Layout and Logic ---
im = Image.open("image.png")
st.set_page_config(layout="wide", page_title="Luigi's Mansion", page_icon=im)
st.title("Luigi's Mansion")
st.markdown(
"Yes you can play around with xMins here (I made it more for my convenience rather than yours but nonetheless, enjoy!)"
)
st.markdown("---")
# Load any existing admin overrides (baselines, status, penalties, global rates)
# user_xmins_overrides is NOT loaded here; it is session-only.
load_admin_overrides()
# Initial call to setup and calculate data (only runs once or on explicit rerun)
if not st.session_state.initialized:
# Use a spinner to indicate loading
with st.spinner(
"Preparing data and running initial projections... This may take a while..."
):
initial_finalized_df = load_data_and_setup_initial_df()
if initial_finalized_df is None: # Handle case where initial data load failed
st.stop() # Stop execution if data isn't loaded correctly
# Load team_baselines here once
teams_dict = TEAMS_DICT
# Correctly load team_baselines from file
try:
st.session_state.team_baselines = pd.read_excel(
"team_totals.xlsx", sheet_name="Sheet2"
)
st.session_state.team_baselines["Teams"] = st.session_state.team_baselines[
"Teams"
].replace(teams_dict)
except FileNotFoundError:
st.error(
"Error: team_totals.xlsx not found. Please ensure it's in the same directory as the app."
)
st.stop()
except Exception as e:
st.error(f"Error loading team_totals.xlsx: {e}")
st.stop()
finalized_df, match_df, teams_dict_updated, teams_dict_1_updated = (
load_data_and_setup_initial_df_2(initial_finalized_df)
)
if (
finalized_df is None
): # match_df, teams_dict, teams_dict_1 would also be None if load_data_and_setup_initial_df_2 failed
st.stop() # Stop execution if data isn't loaded correctly
# MINS_SCALING_BONUS is explicitly 0.0 at initialization.
st.session_state.MINS_SCALING_BONUS = 0.0
# Ensure bps_std_devs is calculated based on the loaded data
st.session_state.pos_map = {1: "G", 2: "D", 3: "M", 4: "F"}
st.session_state.teams_dict_1 = teams_dict_1_updated
st.session_state.teams_dict = teams_dict_updated
# FIX: Adjusted points_config to remove CBIT/CBITR related multipliers, as they are now tiered.
st.session_state.points_config = {
"goal": {1: 10, 2: 6, 3: 5, 4: 4},
"assist": 3,
"clean_sheet": {1: 4, 2: 4, 3: 1, 4: 0},
"saves_per_3": 1,
"penalty_points_per_position": {2: 0.9, 3: 0.7, 4: 0.5},
}
# Store the base dataframes
st.session_state.match_df = match_df
st.session_state.finalized_df = finalized_df
apply_loaded_baseline_overrides_on_startup() # Applies baseline overrides to st.session_state.finalized_df
# Perform the initial full calculation by calling the recalculation function
# This ensures all loaded overrides (baselines, status, penalty shares, rates,
# and admin_persistent_xmins_overrides) are properly incorporated into the output_df.
recalculate_projections()
st.session_state.initialized = True
# Initialize selected player state variables after data is loaded
# We need to map the ID to the unique display name for initial selection
if (
st.session_state.output_df is not None
and not st.session_state.output_df.empty
):
# Create a dictionary to map player ID to its unique display string
player_id_to_display_name_map = {
row["id"]: f"{row['web_name']} (ID: {row['id']})"
for _, row in st.session_state.finalized_df[
["id", "web_name"]
].iterrows()
}
player_display_names = list(
player_id_to_display_name_map.values()
) # Moved here for consistency
# Set initial selected players using their unique display names
if st.session_state.selected_xm_player:
# Find player ID by name in output_df (assuming Name column corresponds to web_name)
# and then get the display name from the map to ensure consistency.
player_id_from_name = st.session_state.finalized_df[
st.session_state.finalized_df["web_name"]
== st.session_state.selected_xm_player.split(" (ID:")[
0
] # Extract web_name
]["id"].iloc[0]
st.session_state.selected_xm_player = player_id_to_display_name_map.get(
player_id_from_name
)
elif player_display_names: # Check if list is not empty before accessing
st.session_state.selected_xm_player = player_display_names[0]
if st.session_state.selected_status_player:
player_id_from_name = st.session_state.finalized_df[
st.session_state.finalized_df["web_name"]
== st.session_state.selected_status_player.split(" (ID:")[0]
]["id"].iloc[0]
st.session_state.selected_status_player = (
player_id_to_display_name_map.get(player_id_from_name)
)
elif player_display_names: # Check if list is not empty before accessing
st.session_state.selected_status_player = player_display_names[0]
if st.session_state.selected_baseline_player:
player_id_from_name = st.session_state.finalized_df[
st.session_state.finalized_df["web_name"]
== st.session_state.selected_baseline_player.split(" (ID:")[0]
]["id"].iloc[0]
st.session_state.selected_baseline_player = (
player_id_to_display_name_map.get(player_id_from_name)
)
elif player_display_names: # Check if list is not empty before accessing
st.session_state.selected_baseline_player = player_display_names[0]
if st.session_state.selected_penalty_player:
player_id_from_name = st.session_state.finalized_df[
st.session_state.finalized_df["web_name"]
== st.session_state.selected_penalty_player.split(" (ID:")[0]
]["id"].iloc[0]
st.session_state.selected_penalty_player = (
player_id_to_display_name_map.get(player_id_from_name)
)
elif player_display_names: # Check if list is not empty before accessing
st.session_state.selected_penalty_player = player_display_names[0]
st.success(
"Data loaded and initial calculations complete! Use the sidebar to make adjustments."
)
# --- SIDEBAR EDITING FUNCTIONALITY ---
with st.sidebar:
st.header("Adjust Player Stats")
# Admin Login Section
st.subheader("Admin Login")
admin_password_input = st.text_input(
"Enter Admin Password", type="password", key="admin_password_input"
)
if admin_password_input == ADMIN_PASSWORD:
st.session_state.is_admin_logged_in = True
st.success("Admin logged in!")
elif admin_password_input != "" and admin_password_input != ADMIN_PASSWORD:
st.session_state.is_admin_logged_in = False
st.error("Incorrect password.")
if st.session_state.is_admin_logged_in:
if st.button("Log Out Admin", key="admin_logout_button"):
st.session_state.is_admin_logged_in = False
st.rerun()
st.divider()
# Create a mapping for player names and IDs for display
# This ensures unique selection even for players with the same web_name
player_id_to_display_name_map = {}
player_display_name_to_id_map = {} # Initialize the reverse map
player_display_names = []
# Ensure finalized_df exists before attempting to create the map and list
if st.session_state.finalized_df is not None:
player_id_to_display_name_map = {
row["id"]: f"{row['web_name']} (ID: {row['id']})"
for _, row in st.session_state.finalized_df[["id", "web_name"]].iterrows()
}
player_display_names = list(player_id_to_display_name_map.values())
player_display_name_to_id_map = { # Populate the reverse map
v: k for k, v in player_id_to_display_name_map.items()
}
# Minutes Editor Section (Always visible for all users)
st.subheader("Adjust Expected Minutes (Weekly Override)")
# Ensure output_df is not None before trying to access its columns
gw_cols = []
if st.session_state.output_df is not None:
gw_cols = [
c for c in st.session_state.output_df.columns if c.endswith("_xMins")
]
gw_choices = sorted(
[int(col.split("_")[0]) for col in gw_cols]
) # Ensure sorted integers
# Determine initial index for selected_xm_player
initial_xm_index = 0
if (
st.session_state.selected_xm_player
and st.session_state.selected_xm_player in player_display_names
):
initial_xm_index = player_display_names.index(
st.session_state.selected_xm_player
)
elif player_display_names: # Check if list is not empty before accessing
st.session_state.selected_xm_player = player_display_names[0]
initial_xm_index = 0
player_selected_display = st.selectbox(
"Select Player for xMins Override",
player_display_names,
key="xm_sidebar_player",
index=initial_xm_index,
on_change=lambda: setattr(
st.session_state, "selected_xm_player", st.session_state.xm_sidebar_player
),
)
# Get the actual player ID from the selected display name
player_id = player_display_name_to_id_map.get(player_selected_display)
gw_selected = st.selectbox(
"Select Gameweek for xMins Override", gw_choices, key="xm_sidebar_gw"
)
if (
player_id is not None and gw_selected and st.session_state.output_df is not None
): # Use player_id for lookup and check output_df
gw = int(gw_selected)
# Get the current projected xMins for this player and GW (which includes baseline and decay)
current_projected_xmins_col = f"{gw}_xMins"
# Use player_id for lookup in output_df
current_projected_xmins = st.session_state.output_df[
st.session_state.output_df["ID"] == player_id
][current_projected_xmins_col].iloc[0]
if st.session_state.is_admin_logged_in:
st.info(
"Your changes here will be **saved** across sessions because you are logged in as admin."
)
# Use the admin persistent override if it exists, otherwise use the current projected value as default
default_val = st.session_state.admin_persistent_xmins_overrides.get(
player_id, {}
).get(gw, current_projected_xmins)
new_xmins = st.number_input(
"Expected Minutes",
0.0, # min_value
90.0, # max_value
float(default_val), # value
step=1.0, # Changed step to 0.01
format="%.2f",
key=f"xmins_sidebar_slider_{player_id}_{gw}",
)
col1, col2 = st.columns([1, 1])
if col1.button(
"Update xMins Override", key=f"update_xmins_sidebar_{player_id}_{gw}"
):
st.session_state.admin_persistent_xmins_overrides.setdefault(
player_id, {}
)[gw] = float(new_xmins)
with st.spinner("Recalculating..."):
recalculate_projections()
save_admin_overrides() # Auto-save for admin
st.success("Minutes override updated and saved!")
st.info(
"Please wait 2-3 seconds for the data to fully update before making further changes."
)
st.rerun()
if col2.button(
"Reset xMins Override", key=f"reset_xmins_sidebar_{player_id}_{gw}"
):
if player_id in st.session_state.admin_persistent_xmins_overrides:
st.session_state.admin_persistent_xmins_overrides[player_id].pop(
gw, None
)
if not st.session_state.admin_persistent_xmins_overrides[player_id]:
del st.session_state.admin_persistent_xmins_overrides[player_id]
with st.spinner("Recalculating..."):
recalculate_projections()
save_admin_overrides() # Auto-save for admin
st.success("xMins override reset to default and saved!")
st.info(
"Please wait 2-3 seconds for the data to fully update before making further changes."
)
st.rerun()
st.markdown("---")
st.subheader("🔗 Connected Groups (Redistribution)")
if (
st.session_state.finalized_df is None
or st.session_state.finalized_df.empty
):
st.warning(
"⚠️ Data not loaded. Please click 'Update Projections' first."
)
else:
df_source = st.session_state.finalized_df.copy() # Work on a copy
# 1. Find Name Column
name_col = None
for c in ["Name", "web_name", "player_name", "name", "Player"]:
if c in df_source.columns:
name_col = c
break
# 2. Find ID Column (Crucial for differentiation)
id_col = "id" if "id" in df_source.columns else None
if not name_col or not id_col:
st.error(
f"❌ Missing Name or ID column. Cols: {list(df_source.columns)}"
)
else:
# 3. Create Unique Display Names: "Name (ID)"
# This handles duplicate names like 'Wilson'
df_source["unique_display"] = (
df_source[name_col].astype(str)
+ " ("
+ df_source[id_col].astype(str)
+ ")"
)
all_player_options = sorted(
df_source["unique_display"].unique().tolist()
)
# 4. Group Selector
existing_groups = list(st.session_state.player_groups.keys())
group_action = st.radio(
"Action",
["Edit Existing", "Create New"],
horizontal=True,
key="grp_action_unique",
)
selected_group_name = None
if group_action == "Create New":
selected_group_name = st.text_input("Enter New Group Name")
elif existing_groups:
selected_group_name = st.selectbox(
"Select Group to Edit", existing_groups
)
# 5. Member Selector
if selected_group_name:
current_members = st.session_state.player_groups.get(
selected_group_name, []
)
updated_members = st.multiselect(
f"Members of '{selected_group_name}'",
options=all_player_options,
default=[
p for p in current_members if p in all_player_options
],
)
c1, c2 = st.columns(2)
if c1.button("💾 Save Group"):
st.session_state.player_groups[selected_group_name] = (
updated_members
)
save_player_groups(st.session_state.player_groups)
st.success(f"Saved group '{selected_group_name}'")
if c2.button("❌ Delete Group"):
if selected_group_name in st.session_state.player_groups:
del st.session_state.player_groups[selected_group_name]
save_player_groups(st.session_state.player_groups)
st.rerun()
else: # Not admin logged in
st.info(
"Your changes here are only for your current session and will NOT be saved."
)
st.info(
"Remember to press the 'Update Button' once you have entered the desired value."
)
# Use the user session-only override if it exists, otherwise use the current projected value as default
default_val = st.session_state.user_xmins_overrides.get(player_id, {}).get(
gw, current_projected_xmins
)
new_xmins = st.number_input(
"Expected Minutes",
0.0, # min_value
90.0, # max_value
float(default_val), # value
step=1.0, # Changed step to 0.01
format="%.2f",
key=f"xmins_sidebar_slider_{player_id}_{gw}",
)
col1, col2 = st.columns([1, 1])
if col1.button(
"Update xMins Override", key=f"update_xmins_sidebar_{player_id}_{gw}"
):
st.session_state.user_xmins_overrides.setdefault(player_id, {})[gw] = (
float(new_xmins)
)
with st.spinner("Recalculating..."):
recalculate_projections()
st.success("Minutes override updated for this session!")
st.info(
"Please wait 2-3 seconds for the data to fully update before making further changes."
)
st.rerun()
if col2.button(
"Reset xMins Override", key=f"reset_xmins_sidebar_{player_id}_{gw}"
):
if player_id in st.session_state.user_xmins_overrides:
st.session_state.user_xmins_overrides[player_id].pop(gw, None)
with st.spinner("Recalculating..."):
recalculate_projections()
st.success("xMins override reset to default for this session!")
st.info(
"Please wait 2-3 seconds for the data to fully update before making further changes."
)
st.rerun()
st.divider()
# --- Non-Admin Baseline xMins Adjustment ---
if not st.session_state.is_admin_logged_in:
st.subheader("Adjust Baseline Expected Minutes (Temporary)")
st.info(
"Your changes here are only for your current session and will NOT be saved."
)
st.info(
"Remember to press the 'Update Button' once you have entered the desired value."
)
# Determine initial index for selected_baseline_player
initial_baseline_index = 0
if (
st.session_state.selected_baseline_player
and st.session_state.selected_baseline_player in player_display_names
):
initial_baseline_index = player_display_names.index(
st.session_state.selected_baseline_player
)
elif player_display_names:
st.session_state.selected_baseline_player = player_display_names[0]
initial_baseline_index = 0
baseline_player_selected_display_non_admin = st.selectbox(
"Select Player for Baseline xMins Editing",
options=player_display_names,
key="baseline_sidebar_player_non_admin",
index=initial_baseline_index,
on_change=lambda: setattr(
st.session_state,
"selected_baseline_player",
st.session_state.baseline_sidebar_player_non_admin,
),
)
if (
baseline_player_selected_display_non_admin
and st.session_state.finalized_df is not None
):
player_id_non_admin = player_display_name_to_id_map.get(
baseline_player_selected_display_non_admin
)
# Only allow editing of baseline_xMins for non-admins
selected_stat_non_admin = "baseline_xMins"
# Get current value (either overridden or original)
if (
player_id_non_admin in st.session_state.user_baseline_overrides
and selected_stat_non_admin
in st.session_state.user_baseline_overrides[player_id_non_admin]
):
current_val_non_admin = st.session_state.user_baseline_overrides[
player_id_non_admin
][selected_stat_non_admin]
else:
player_row_non_admin = st.session_state.finalized_df[
st.session_state.finalized_df["id"] == player_id_non_admin
].iloc[0]
current_val_non_admin = player_row_non_admin[selected_stat_non_admin]
new_stat_value_non_admin = st.number_input(
"Expected Baseline Minutes",
min_value=0.0, # min_value
max_value=90.0, # max_value
value=float(current_val_non_admin), # Ensure float
step=1.0, # Changed step to 0.01,
format="%.2f",
key=f"baseline_slider_non_admin_{player_id_non_admin}",
)
col1_non_admin, col2_non_admin = st.columns([1, 1])
if col1_non_admin.button(
"Update Baseline xMins",
key=f"update_baseline_non_admin_{player_id_non_admin}",
):
if player_id_non_admin not in st.session_state.user_baseline_overrides:
st.session_state.user_baseline_overrides[player_id_non_admin] = {}
st.session_state.user_baseline_overrides[player_id_non_admin][
selected_stat_non_admin
] = new_stat_value_non_admin
with st.spinner("Recalculating..."):
recalculate_projections()
st.success("Baseline xMins updated for this session!")
st.rerun()
if col2_non_admin.button(
"Reset Baseline xMins",
key=f"reset_baseline_non_admin_{player_id_non_admin}",
):
if (
player_id_non_admin in st.session_state.user_baseline_overrides
and selected_stat_non_admin
in st.session_state.user_baseline_overrides[player_id_non_admin]
):
del st.session_state.user_baseline_overrides[player_id_non_admin][
selected_stat_non_admin
]
if not st.session_state.user_baseline_overrides[
player_id_non_admin
]:
del st.session_state.user_baseline_overrides[
player_id_non_admin
]
with st.spinner("Recalculating..."):
recalculate_projections()
st.success("Baseline xMins reset to default for this session!")
st.rerun()
st.divider()
if not st.session_state.is_admin_logged_in:
st.subheader("Adjust Availability (Temporary)")
st.info(
"Your changes here are only for your current session and will NOT be saved."
)
st.info(
"Remember to press the 'Update Button' once you have entered the desired value."
)
# Determine initial index for selected_availability_player_non_admin
initial_availability_index_non_admin = 0
if (
st.session_state.selected_availability_player_non_admin
and st.session_state.selected_availability_player_non_admin
in player_display_names
):
initial_availability_index_non_admin = player_display_names.index(
st.session_state.selected_availability_player_non_admin
)
elif player_display_names:
st.session_state.selected_availability_player_non_admin = (
player_display_names[0]
)
initial_availability_index_non_admin = 0
availability_player_selected_display_non_admin = st.selectbox(
"Select Player to edit Availability for",
options=player_display_names,
key="availability_sidebar_player_non_admin",
index=initial_availability_index_non_admin,
on_change=lambda: setattr(
st.session_state,
"selected_availability_player_non_admin",
st.session_state.availability_sidebar_player_non_admin,
),
)
if availability_player_selected_display_non_admin:
player_id = player_display_name_to_id_map.get(
availability_player_selected_display_non_admin
)
gw_selected = st.selectbox(
"Select the gameweek to edit Availability for",
gw_choices,
key="availability_sidebar_gw_non_admin",
)
if player_id is not None and gw_selected:
gw = int(gw_selected)
admin_multiplier = None
if (
player_id
in st.session_state.admin_persistent_availability_multipliers
and gw
in st.session_state.admin_persistent_availability_multipliers[
player_id
]
):
admin_multiplier = (
st.session_state.admin_persistent_availability_multipliers[
player_id
][gw]
)
# Get current multiplier or default to 1.0
if admin_multiplier is not None:
current_multiplier = admin_multiplier
else:
current_multiplier = (
st.session_state.user_availability_multipliers.get(
player_id, {}
).get(gw, 1.0)
)
st.info("Availability is set as a multiplier, ranging from 0-1")
new_multiplier = st.slider(
"Availability",
min_value=0.0,
max_value=1.0,
step=0.01,
value=float(current_multiplier),
key=f"availability_slider_non_admin_{player_id}_{gw}",
)
st.info(f"Availability for GW{gw} set at: {(current_multiplier):.2f}")
col1, col2 = st.columns([1, 1])
if col1.button(
"Update Availability",
key=f"update_availability_non_admin_{player_id}_{gw}",
):
st.session_state.user_availability_multipliers.setdefault(
player_id, {}
)[gw] = new_multiplier
with st.spinner("Recalculating..."):
recalculate_projections()
st.success("Availability multiplier updated for this session!")
st.rerun()
if col2.button(
"Reset Availability",
key=f"reset_availability_non_admin_{player_id}_{gw}",
):
if player_id in st.session_state.user_availability_multipliers:
st.session_state.user_availability_multipliers[player_id].pop(
gw, None
)
with st.spinner("Recalculating..."):
recalculate_projections()
st.success(
"Availability multiplier reset to default for this session!"
)
st.rerun()
st.divider()
# Admin-Only Sections (Conditional Rendering)
if st.session_state.is_admin_logged_in:
# Removed the top-level "Save Admin Changes" button.
# Saves will now be automatic with each update/reset.
# Adjust Player Status Section
st.subheader("Adjust Player Status (Admin)")
# Determine initial index for selected_status_player
initial_status_index = 0
if (
st.session_state.selected_status_player
and st.session_state.selected_status_player in player_display_names
):
initial_status_index = player_display_names.index(
st.session_state.selected_status_player
)
elif (
player_display_names
): # Added check to ensure player_display_names is not empty
st.session_state.selected_status_player = player_display_names[0]
initial_status_index = 0
status_player_selected_display = st.selectbox(
"Select Player to Adjust Status",
options=player_display_names,
key="status_sidebar_player",
index=initial_status_index,
on_change=lambda: setattr(
st.session_state,
"selected_status_player",
st.session_state.status_sidebar_player,
),
)
if status_player_selected_display: # Use player_id from map
player_id = player_display_name_to_id_map.get(
status_player_selected_display
)
# Get current status info for the player
current_status_info = st.session_state.user_player_status_overrides.get(
player_id, {"status": "default"}
)
current_status_type = current_status_info.get("status", "default")
current_weeks_out = current_status_info.get("weeks_out", 0)
status_options = [
"default",
"not_a_starter",
"suspended",
"injured",
"rotational_risk",
]
new_status_type = st.selectbox(
"Select Status",
options=status_options,
index=status_options.index(current_status_type),
key=f"status_selector_{player_id}",
)
new_weeks_out = current_weeks_out
if new_status_type in ["suspended", "injured"]:
new_weeks_out = st.number_input(
"Weeks Out (Player returns after this GW)",
min_value=0,
value=int(current_weeks_out),
step=1,
key=f"weeks_out_input_{player_id}",
)
col1, col2 = st.columns([1, 1])
if col1.button("Update Status", key=f"update_status_{player_id}"):
updated_status_info = {"status": new_status_type}
if new_status_type in ["suspended", "injured"]:
updated_status_info["weeks_out"] = int(new_weeks_out)
st.session_state.user_player_status_overrides[player_id] = (
updated_status_info
)
with st.spinner("Recalculating..."):
recalculate_projections()
save_admin_overrides() # Automatic save
st.success(
f"Status for {status_player_selected_display} updated and saved!"
)
st.rerun()
if col2.button("Reset Status", key=f"reset_status_{player_id}"):
if player_id in st.session_state.user_player_status_overrides:
del st.session_state.user_player_status_overrides[player_id]
with st.spinner("Recalculating..."):
recalculate_projections()
save_admin_overrides() # Automatic save
st.success(
f"Status for {status_player_selected_display} reset to default and saved!"
)
st.rerun()
st.divider()
# Baseline Stats Editor Section (Existing)
st.subheader("Adjust Baseline Stats (Admin)")
initial_baseline_index = 0
if (
st.session_state.selected_baseline_player
and st.session_state.selected_baseline_player in player_display_names
):
initial_baseline_index = player_display_names.index(
st.session_state.selected_baseline_player
)
elif (
player_display_names
): # Added check to ensure player_display_names is not empty
st.session_state.selected_baseline_player = player_display_names[0]
initial_baseline_index = 0
baseline_player_selected_display = st.selectbox(
"Select Player for Baseline Editing",
options=player_display_names,
key="baseline_sidebar_player",
index=initial_baseline_index,
on_change=lambda: setattr(
st.session_state,
"selected_baseline_player",
st.session_state.baseline_sidebar_player,
),
)
if (
baseline_player_selected_display
and st.session_state.finalized_df is not None
): # Use player_id from map and check finalized_df
player_id = player_display_name_to_id_map.get(
baseline_player_selected_display
)
# Define stats that are direct value overrides vs. multiplier overrides
direct_override_stats = ["baseline_xMins", "baseline_pksave_p90", "Avg_BPS"]
multiplier_override_stats = [
"baseline_xSaves_p90",
"baseline_xA_p90",
"baseline_yc_p90",
"baseline_rc_p90",
"baseline_xG_p90",
"baseline_CBIT_p90",
"baseline_CBITR_p90",
]
# Filter to only stats that exist for this player
player_row_current_display_values = st.session_state.output_df[ # Use output_df for *current displayed* values (incl. previous overrides)
st.session_state.output_df["ID"] == player_id
].iloc[0]
player_row_original_baselines = st.session_state.finalized_df[ # Use finalized_df for *original* baselines
st.session_state.finalized_df["id"] == player_id
].iloc[0]
all_editable_stats = direct_override_stats + multiplier_override_stats
available_stats = [
stat
for stat in all_editable_stats
if stat in player_row_original_baselines.index
]
selected_stat = st.selectbox(
"Select Stat to Edit", available_stats, key="stat_selector"
)
if selected_stat:
# Define a unique session state key for this specific player's selected baseline stat
sync_key_baseline_admin = (
f"sync_baseline_admin_{player_id}_{selected_stat}"
)
# Determine the default value for the input widget based on whether it's a direct or multiplier stat
if selected_stat in direct_override_stats:
# For direct override stats, show the currently overridden value, or the original baseline
default_input_value = st.session_state.user_baseline_overrides.get(
player_id, {}
).get(selected_stat, player_row_original_baselines[selected_stat])
min_val, max_val, step_val = (
0.0,
10.0,
0.1,
) # Default, will be overridden below
if "xMins" in selected_stat:
min_val, max_val, step_val = 0.00, 90.00, 1.0
elif "Avg_BPS" in selected_stat:
min_val, max_val, step_val = 0.0, 100.0, 0.1
elif "pksave_p90" in selected_stat:
min_val, max_val, step_val = 0.0, 5.0, 0.01
# Ensure value is float
default_input_value = float(default_input_value)
st.slider(
f"Direct Value for {selected_stat} (Slider)",
min_value=min_val,
max_value=max_val,
value=st.session_state.get(
sync_key_baseline_admin, default_input_value
), # Reads from synchronized state
step=step_val,
format="%.2f" if step_val < 0.1 else "%g",
key=f"slider_baseline_admin_{player_id}_{selected_stat}",
on_change=update_sync_value_from_slider,
args=(
sync_key_baseline_admin,
f"slider_baseline_admin_{player_id}_{selected_stat}",
),
)
st.number_input(
f"Direct Value for {selected_stat} (Input)",
min_value=min_val,
max_value=max_val,
value=st.session_state.get(
sync_key_baseline_admin, default_input_value
), # Reads from synchronized state
step=step_val,
format="%.2f" if step_val < 0.1 else "%g",
key=f"num_input_baseline_admin_{player_id}_{selected_stat}",
on_change=update_sync_value_from_number_input,
args=(
sync_key_baseline_admin,
f"num_input_baseline_admin_{player_id}_{selected_stat}",
),
)
else: # Multiplier override stats
# For multiplier stats, show the currently set multiplier, or default to 1.0
default_input_value = st.session_state.user_baseline_overrides.get(
player_id, {}
).get(selected_stat, 1.0)
min_val, max_val, step_val = 0.0, 5.0, 0.01
# Ensure value is float
default_input_value = float(default_input_value)
st.write(
f"Original value for {selected_stat}: {player_row_original_baselines[selected_stat]:.2f}"
)
st.slider(
f"Multiplier for {selected_stat} (Slider)",
min_value=min_val,
max_value=max_val,
value=st.session_state.get(
sync_key_baseline_admin, default_input_value
), # Reads from synchronized state
step=step_val,
format="%.2f",
key=f"slider_baseline_admin_{player_id}_{selected_stat}",
on_change=update_sync_value_from_slider,
args=(
sync_key_baseline_admin,
f"slider_baseline_admin_{player_id}_{selected_stat}",
),
)
st.number_input(
f"Multiplier for {selected_stat} (Input)",
min_value=min_val,
max_value=max_val,
value=st.session_state.get(
sync_key_baseline_admin, default_input_value
), # Reads from synchronized state
step=step_val,
format="%.2f",
key=f"num_input_baseline_admin_{player_id}_{selected_stat}",
on_change=update_sync_value_from_number_input,
args=(
sync_key_baseline_admin,
f"num_input_baseline_admin_{player_id}_{selected_stat}",
),
)
# Show the effective value if a multiplier is applied
current_multiplier = st.session_state.get(
sync_key_baseline_admin, default_input_value
)
effective_value = (
player_row_original_baselines[selected_stat]
* current_multiplier
)
st.info(f"Effective value: {effective_value:.2f}")
col1, col2 = st.columns([1, 1])
if col1.button(
"Update Stat", key=f"update_baseline_{player_id}_{selected_stat}"
):
if player_id not in st.session_state.user_baseline_overrides:
st.session_state.user_baseline_overrides[player_id] = {}
st.session_state.user_baseline_overrides[player_id][
selected_stat
] = st.session_state[sync_key_baseline_admin]
with st.spinner("Recalculating..."):
recalculate_projections()
save_admin_overrides()
st.success("Baseline stat updated and saved!")
# st.rerun() # REMOVED: Let Streamlit handle reruns naturally
if col2.button(
"Reset Stat", key=f"reset_baseline_{player_id}_{selected_stat}"
):
if (
player_id in st.session_state.user_baseline_overrides
and selected_stat
in st.session_state.user_baseline_overrides[player_id]
):
del st.session_state.user_baseline_overrides[player_id][
selected_stat
]
if not st.session_state.user_baseline_overrides[player_id]:
del st.session_state.user_baseline_overrides[player_id]
with st.spinner("Recalculating..."):
recalculate_projections()
save_admin_overrides()
st.success("Stat reset to default and saved!")
# st.rerun() # REMOVED: Let Streamlit handle reruns naturally
st.divider()
# Adjust Player Penalty Share Section
st.subheader("Adjust Player Penalty Share (Admin)")
# Determine initial index for selected_penalty_player
initial_penalty_index = 0
if (
st.session_state.selected_penalty_player
and st.session_state.selected_penalty_player in player_display_names
):
initial_penalty_index = player_display_names.index(
st.session_state.selected_penalty_player
)
elif (
player_display_names
): # Added check to ensure player_display_names is not empty
st.session_state.selected_penalty_player = player_display_names[0]
initial_penalty_index = 0
penalty_player_selected_display = st.selectbox(
"Select Player for Penalty Share Editing",
options=player_display_names,
key="penalty_sidebar_player",
index=initial_penalty_index,
on_change=lambda: setattr(
st.session_state,
"selected_penalty_player",
st.session_state.penalty_sidebar_player,
),
)
if penalty_player_selected_display: # Use player_id from map
player_id = player_display_name_to_id_map.get(
penalty_player_selected_display
)
# Get current penalty share or default to 0.0
current_penalty_share = st.session_state.player_penalty_shares.get(
player_id, 0.0
)
new_penalty_share = st.slider(
"Penalty Share (0.0 to 1.0)",
min_value=0.0,
max_value=1.0,
value=float(current_penalty_share),
step=0.01,
key=f"penalty_slider_{player_id}",
)
col1, col2 = st.columns([1, 1])
if col1.button("Update Penalty Share", key=f"update_penalty_{player_id}"):
st.session_state.player_penalty_shares[player_id] = new_penalty_share
with st.spinner("Recalculating..."):
recalculate_projections()
save_admin_overrides() # Automatic save
st.success("Penalty share updated and saved!")
st.rerun()
if col2.button("Reset Penalty Share", key=f"reset_penalty_{player_id}"):
if player_id in st.session_state.player_penalty_shares:
del st.session_state.player_penalty_shares[player_id]
with st.spinner("Recalculating..."):
recalculate_projections()
save_admin_overrides() # Automatic save
st.success("Penalty share reset to default and saved!")
st.rerun()
st.divider()
st.subheader("Adjust Availability Multiplier (Admin)")
# Determine initial index for selected_availability_player
initial_availability_index = 0
if (
st.session_state.selected_availability_player
and st.session_state.selected_availability_player in player_display_names
):
initial_availability_index = player_display_names.index(
st.session_state.selected_availability_player
)
elif player_display_names:
st.session_state.selected_availability_player = player_display_names[0]
initial_availability_index = 0
availability_player_selected_display = st.selectbox(
"Select Player for Availability Multiplier",
options=player_display_names,
key="availability_sidebar_player",
index=initial_availability_index,
on_change=lambda: setattr(
st.session_state,
"selected_availability_player",
st.session_state.availability_sidebar_player,
),
)
if availability_player_selected_display:
player_id = player_display_name_to_id_map.get(
availability_player_selected_display
)
gw_selected = st.selectbox(
"Select Gameweek for Availability Multiplier",
gw_choices,
key="availability_sidebar_gw",
)
if player_id is not None and gw_selected:
gw = int(gw_selected)
# Get current multiplier or default to 1.0
current_multiplier = (
st.session_state.admin_persistent_availability_multipliers.get(
player_id, {}
).get(gw, 1.0)
)
new_multiplier = st.slider(
"Availability Multiplier",
min_value=0.0,
max_value=1.0,
step=0.01,
value=float(current_multiplier),
key=f"availability_slider_{player_id}_{gw}",
)
col1, col2 = st.columns([1, 1])
if col1.button(
"Update Multiplier", key=f"update_availability_{player_id}_{gw}"
):
st.session_state.admin_persistent_availability_multipliers.setdefault(
player_id, {}
)[gw] = new_multiplier
with st.spinner("Recalculating..."):
recalculate_projections()
save_admin_overrides()
st.success("Availability multiplier updated and saved!")
st.rerun()
if col2.button(
"Reset Multiplier", key=f"reset_availability_{player_id}_{gw}"
):
if (
player_id
in st.session_state.admin_persistent_availability_multipliers
):
st.session_state.admin_persistent_availability_multipliers[
player_id
].pop(gw, None)
if not st.session_state.admin_persistent_availability_multipliers[
player_id
]:
del st.session_state.admin_persistent_availability_multipliers[
player_id
]
with st.spinner("Recalculating..."):
recalculate_projections()
save_admin_overrides()
st.success("Availability multiplier reset to default and saved!")
st.rerun()
st.divider()
# Adjust Global Rates Section
with st.expander("Adjust Global Decay/Ramp-up Rates (Admin)"):
st.write(
"These rates apply if a player's status doesn't have a specific rate defined."
)
# Decay rates
st.subheader("Decay Rates (Multiplicative)")
for status, rate in st.session_state.decay_rates.items():
new_rate = st.number_input(
f"{status.replace('_', ' ').title()} Decay Rate (e.g., 0.99 for 1% decay)",
min_value=0.0,
max_value=1.0,
value=float(rate),
step=0.01,
key=f"decay_rate_{status}",
)
st.session_state.decay_rates[status] = new_rate
# Ramp-up rates
st.subheader("Ramp-up Rates (Additive)")
for status, rate in st.session_state.ramp_up_rates.items():
new_rate = st.number_input(
f"{status.replace('_', ' ').title()} Ramp-up Rate (minutes per GW)",
min_value=0.0,
max_value=90.0,
value=float(rate),
step=1.0,
key=f"ramp_up_rate_{status}",
)
st.session_state.ramp_up_rates[status] = new_rate
# Ramp-up Period
new_ramp_up_period = st.number_input(
"Injured Player Ramp-up Period (Weeks)",
min_value=1,
max_value=10,
value=st.session_state.RAMP_UP_PERIOD,
step=1,
key="ramp_up_period_input",
)
st.session_state.RAMP_UP_PERIOD = new_ramp_up_period
# Mins Threshold
new_mins_threshold = st.number_input(
"Minutes Threshold (for decay vs. ramp-up)",
min_value=0,
max_value=90,
value=st.session_state.MINS_THRESHOLD,
step=1,
key="mins_threshold_input",
)
st.session_state.MINS_THRESHOLD = new_mins_threshold
if st.button("Apply Global Rate Changes", key="apply_global_rates"):
with st.spinner("Recalculating with new rates..."):
recalculate_projections()
save_admin_overrides() # Automatic save
st.success("Global rates updated and saved!")
st.rerun()
st.divider()
# Global Reset Section
st.subheader("Global Reset (Admin)")
if st.button(
"Reset ALL Admin Edits (Baselines, Status, Rates, Penalties, Weekly xMins)",
key="global_reset",
type="secondary",
):
if st.session_state.is_admin_logged_in:
# Use st.warning for confirmation instead of window.confirm
st.warning(
"Are you sure you want to reset ALL admin-controlled edits to their default values? This action cannot be undone and will affect all future sessions."
)
# Add a separate button for actual confirmation
if st.button("Confirm Global Reset", key="confirm_global_reset"):
st.session_state.user_baseline_overrides = {}
st.session_state.user_player_status_overrides = {}
st.session_state.admin_persistent_xmins_overrides = {} # Reset admin persistent xMins
st.session_state.player_penalty_shares = { # Reset to default hardcoded values
16: 0.65,
17: 0.15,
30: 0.05,
666: 0.3,
48: 0.4,
64: 0.7,
81: 0.9,
97: 0.25,
136: 0.8,
121: 0.09,
178: 0.8,
158: 0.05,
202: 0.25,
215: 0.6,
216: 0.02,
235: 0.9,
249: 0.1,
266: 0.6,
267: 0.04,
283: 0.4,
299: 0.85,
311: 0.1,
310: 0.1,
337: 0.6,
327: 0.55,
343: 0.4,
362: 0.7,
381: 0.95,
382: 0.1,
386: 0.05,
430: 0.95,
413: 0.15,
449: 0.9,
119: 0.1,
450: 0.05,
499: 0.85,
485: 0.2,
474: 0.02,
525: 0.85,
515: 0.25,
596: 0.9,
612: 0.8,
624: 0.25,
625: 0.04,
647: 0.1,
654: 0.85,
}
# Reset rates to their absolute initial defaults (hardcoded defaults)
st.session_state.decay_rates = {
"default": 0.99,
"suspended": 0.95,
"injured_decay": 0.95,
"rotational_risk": 0.97,
}
st.session_state.ramp_up_rates = {
"default": 5,
"injured": 10,
"suspended": 5,
"starter": 0,
"rotational_risk": 0,
}
st.session_state.RAMP_UP_PERIOD = 3
st.session_state.MINS_THRESHOLD = 45
with st.spinner("Resetting all admin-controlled edits..."):
recalculate_projections()
save_admin_overrides() # Save the reset state to files
st.success("All admin-controlled edits reset to default and saved!")
st.rerun()
else:
st.error("You must be logged in as an admin to perform a global reset.")
st.divider()
# Show current ADMIN-controlled overrides info
st.subheader("Current Admin Overrides (Saved)")
if st.session_state.user_baseline_overrides:
st.write("**Baseline Overrides:**")
for pid, stat_dict in st.session_state.user_baseline_overrides.items():
# Use the display name from the map if available
player_name_display = player_id_to_display_name_map.get(
pid,
st.session_state.finalized_df[
st.session_state.finalized_df["id"] == pid
]["web_name"].iloc[0]
if st.session_state.finalized_df is not None
and not st.session_state.finalized_df[
st.session_state.finalized_df["id"] == pid
].empty
else f"ID: {pid}",
)
st.write(f"• {player_name_display}: {len(stat_dict)} stats")
else:
st.write("No saved baseline overrides.")
if st.session_state.user_player_status_overrides:
st.write("**Player Status Overrides:**")
for (
pid,
status_dict,
) in st.session_state.user_player_status_overrides.items():
# Use the display name from the map if available
player_name_display = player_id_to_display_name_map.get(
pid,
st.session_state.finalized_df[
st.session_state.finalized_df["id"] == pid
]["web_name"].iloc[0]
if st.session_state.finalized_df is not None
and not st.session_state.finalized_df[
st.session_state.finalized_df["id"] == pid
].empty
else f"ID: {pid}",
)
status_str = status_dict["status"]
if "weeks_out" in status_dict:
status_str += f" (out until GW {status_dict['weeks_out']})"
st.write(f"• {player_name_display}: {status_str}")
else:
st.write("No saved player status overrides.")
if st.session_state.player_penalty_shares:
st.write("**Penalty Share Overrides:**")
for pid, share_value in st.session_state.player_penalty_shares.items():
# Use the display name from the map if available
player_name_display = player_id_to_display_name_map.get(
pid,
st.session_state.finalized_df[
st.session_state.finalized_df["id"] == pid
]["web_name"].iloc[0]
if st.session_state.finalized_df is not None
and not st.session_state.finalized_df[
st.session_state.finalized_df["id"] == pid
].empty
else f"ID: {pid}",
)
if not player_name_display.startswith(
"ID:"
): # Only show name if we found it
st.write(f"• {player_name_display}: {share_value:.2f}")
else:
st.write(f"• Unknown Player (ID: {pid}): {share_value:.2f}")
else:
st.write("No saved penalty share overrides.")
if st.session_state.admin_persistent_xmins_overrides:
st.write("**Weekly xMins Overrides (Admin Saved):**")
for (
pid,
gw_dict,
) in st.session_state.admin_persistent_xmins_overrides.items():
player_name_display = player_id_to_display_name_map.get(
pid,
st.session_state.finalized_df[
st.session_state.finalized_df["id"] == pid
]["web_name"].iloc[0]
if st.session_state.finalized_df is not None
and not st.session_state.finalized_df[
st.session_state.finalized_df["id"] == pid
].empty
else f"ID: {pid}",
)
st.write(f"• {player_name_display}: {gw_dict}")
else:
st.write("No saved weekly xMins overrides by admin.")
else:
st.info("Log in as admin to see and adjust advanced settings.")
# Show current LOCAL (session-only) xMins overrides info (visible to all users, but explicitly stated as temporary)
st.divider()
st.subheader("Current Local Overrides (Per Session)")
if st.session_state.user_xmins_overrides:
st.write("**Minutes Overrides (Local Session):**")
for pid, gw_dict in st.session_state.user_xmins_overrides.items():
# Use the display name from the map if available, otherwise fallback to original web_name
player_name_display = player_id_to_display_name_map.get(
pid,
st.session_state.output_df[st.session_state.output_df["ID"] == pid][
"Name"
].iloc[0]
if st.session_state.output_df is not None
and not st.session_state.output_df[
st.session_state.output_df["ID"] == pid
].empty
else f"ID: {pid}",
)
st.write(f"• {player_name_display}: {gw_dict}")
else:
st.write("No local xMins overrides for this session.")
tab1, tab2, tab3, tab4 = st.tabs(
["Projections", "Accuracy Dashboard", "Team Ratings", "Upcoming Fixtures"]
)
# --- MAIN CONTENT AREA ---
with tab1:
st.subheader("Projected Player Points")
# Display the main table (read-only now since editing is in sidebar)
st.dataframe(
st.session_state.output_df,
hide_index=True,
use_container_width=True,
height=600,
)
# Download button
csv_data = st.session_state.output_df.to_csv(index=False).encode("utf-8")
st.download_button(
label="Download Table as CSV", # Removed emoji
data=csv_data,
file_name="luigis_mansion.csv",
mime="text/csv",
key="download_csv_button",
)
st.markdown("---")
st.markdown(
"Luigi attempts a thing! I have finally managed to make my very own player model, think it might be subpar to the great ones but it's a start at least and I am relatively happy with that. Hope you all have a fun time tinkering!"
)
st.markdown("**Instructions:**")
st.markdown(
"- Use the **sidebar** to adjust player minutes, weekly or across the horizon." # Updated text
)
st.markdown(
"- Please wait **1-2 seconds** after pressing the Update button as your changes get processed and updated."
)
st.markdown(
"- Currently, the overrides **CANNOT** be applied simultaneously, so don't forget to press the **Update** button else your changes will not be processed."
)
st.markdown(
"- The Reset Override button **ONLY** resets the xMins of the player chosen. To revert back to original projections, simply **reload** the page."
)
st.markdown(
"- The CSV is compatible with **Sertalp + Moose's Solver**, you can either rename the file to 'fplreview' or set 'luigis_mansion' as the datasource in the settings json."
)
st.markdown(
"- You can download your customized projections as a CSV file (will be downloaded as 'luigis_mansion.csv')."
)
def local_css():
st.markdown(
"""
<style>
/* 1. Metric Card Styles (Keep as is) */
div[data-testid="stMetric"] {
background-color: var(--secondary-background-color);
border: 2px solid var(--border-color-light);
padding: 10px;
border-radius: 10px;
}
/* 2. MOBILE PORTRAIT FIX (The "Landscape in Portrait" Mode) */
/* Targets mobile devices in Portrait orientation */
@media only screen and (orientation: portrait) and (max-width: 768px) {
/* Target the Plotly & Altair containers */
[data-testid="stPlotlyChart"], [data-testid="stAltairChart"] {
overflow-x: auto !important; /* Force horizontal scroll */
-webkit-overflow-scrolling: touch; /* Smooth scroll on iOS */
padding-bottom: 10px;
border-right: 2px solid #333; /* Visual cue for scrolling */
}
/* FORCE the inner content to be WIDE (Simulate Desktop) */
[data-testid="stPlotlyChart"] > div,
[data-testid="stAltairChart"] > canvas {
width: 900px !important; /* Force wide width */
min-width: 900px !important;
max-width: none !important;
}
/* Adjust height to be shorter so it looks like a wide strip */
[data-testid="stPlotlyChart"] > div {
height: 450px !important;
}
/* Add a visual hint telling user to scroll */
[data-testid="stPlotlyChart"]::after {
content: 'Swipe left/right to view chart';
display: block;
text-align: center;
font-size: 12px;
color: #aaa;
margin-top: 5px;
font-style: italic;
}
}
</style>
""",
unsafe_allow_html=True,
)
# Apply CSS
local_css()
with tab2:
proj_file = pd.read_excel("projections_check.xlsx")
pts_file = pd.read_excel("points_check.xlsx")
def calculate_metrics(y_true, y_prob):
brier = brier_score_loss(y_true, y_prob)
ll = log_loss(y_true, y_prob)
return brier, ll
def multiclass_brier_score(y_true_onehot, y_prob):
"""
Calculates the Brier Score for multi-class classification.
Formula: Mean of sum of squared differences between probabilities and actuals.
"""
return np.mean(np.sum((y_prob - y_true_onehot) ** 2, axis=1))
def get_player_stats(df, min_xmins=0, min_xpts=0):
# ... [Keep your exact existing function code] ...
all_xmins = []
all_actual_mins = []
all_xpts = []
all_actual_pts = []
gameweeks = range(3, 39)
for gw in gameweeks:
xmins_col = f"{gw}_xMins"
actual_mins_col = f"{gw}_Mins"
xpts_col = f"{gw}_Pts"
actual_pts_col = f"{gw}_Actuals"
if all(
col in df.columns
for col in [xmins_col, actual_mins_col, xpts_col, actual_pts_col]
):
mask = (df[xmins_col] >= min_xmins) & (df[xpts_col] >= min_xpts)
all_xmins.extend(df.loc[mask, xmins_col].tolist())
all_actual_mins.extend(df.loc[mask, actual_mins_col].tolist())
all_xpts.extend(df.loc[mask, xpts_col].tolist())
all_actual_pts.extend(df.loc[mask, actual_pts_col].tolist())
if not all_xmins:
return None
# Return a DataFrame friendly format for the new UI
return {
"xMins": pd.DataFrame(
[
{
"Metric": "Minutes",
"MAE": mean_absolute_error(all_actual_mins, all_xmins),
"RMSE": np.sqrt(mean_squared_error(all_actual_mins, all_xmins)),
"R2": r2_score(all_actual_mins, all_xmins),
}
]
),
"xPts": pd.DataFrame(
[
{
"Metric": "Points",
"MAE": mean_absolute_error(all_actual_pts, all_xpts),
"RMSE": np.sqrt(mean_squared_error(all_actual_pts, all_xpts)),
"R2": r2_score(all_actual_pts, all_xpts),
}
]
),
}
# --- 3. Main Interface Tabs ---
st.title("Model Performance Dashboard")
main_tab1, main_tab2, main_tab3 = st.tabs(
["Outcome Accuracy", "Goals & xG", "xMins & xPts"]
)
# ==========================================
# TAB 1: OUTCOME ACCURACY
# ==========================================
with main_tab1:
tab1, tab2 = st.tabs(["Model Performance", "Trend"])
with tab1:
st.warning(
"Note: Data excludes GW1 & GW2 projections (I forgot to save the projections :sweat_smile:)."
)
# --- 1. PREPARE DATA FOR MULTI-CLASS METRICS ---
# Create a matrix of probabilities
probs_matrix = proj_file[
["home_win_prob", "draw_prob", "away_win_prob"]
].values
# Create a One-Hot encoded matrix of actual results
# We need to map Home/Draw/Away cols to a single matrix
# Let's stack them: Column 0=Home, 1=Draw, 2=Away
actuals_matrix = proj_file[["home_win", "draw", "away_win"]].values
# Create a single label array for sklearn log_loss (0, 1, 2)
# argmax gives us the index of the column that has a '1'
y_true_labels = actuals_matrix.argmax(axis=1)
# --- 2. CALCULATE GLOBAL METRICS ---
global_ll = log_loss(y_true_labels, probs_matrix)
global_brier = multiclass_brier_score(actuals_matrix, probs_matrix)
# --- NEW: CALCULATE GLOBAL RPS ---
# We calculate it vector-style for the whole dataframe at once
# 1. Prediction CDFs
p_h = proj_file["home_win_prob"].values
p_d = proj_file["draw_prob"].values
cdf_pred_1 = p_h
cdf_pred_2 = p_h + p_d
# 2. Observation CDFs (Actuals)
# Note: Ensure we use the actual result columns, not probabilities
o_h = proj_file["home_win"].values
o_d = proj_file["draw"].values
cdf_obs_1 = o_h
cdf_obs_2 = o_h + o_d
# 3. Mean RPS
rps_per_match = 0.5 * (
(cdf_pred_1 - cdf_obs_1) ** 2 + (cdf_pred_2 - cdf_obs_2) ** 2
)
global_rps = np.mean(rps_per_match)
st.markdown("### Model Performance")
st.caption(
"These metrics evaluate the entire match probability distribution (Home/Draw/Away) at once."
)
# Display Headline Metrics (Updated to 3 columns)
head1, head2, head3 = st.columns(3)
with head1:
st.metric(
"Multi-class Log Loss",
f"{global_ll:.4f}",
)
with head2:
st.metric(
"Multi-class Brier Score",
f"{global_brier:.4f}",
)
with head3:
st.metric(
"Ranked Probability Score",
f"{global_rps:.4f}",
)
st.divider()
# --- 3. BREAKDOWN METRICS (Keep these for debugging) ---
st.markdown("### Outcome Breakdown")
st.caption(
"Diagnostic metrics to see which specific outcome the model struggles with."
)
col1, col2, col3 = st.columns(3)
# Home
b_home, ll_home = calculate_metrics(
proj_file["home_win"], proj_file["home_win_prob"]
)
with col1:
st.markdown("#### Home Win")
st.metric("Binary Brier", f"{b_home:.4f}")
st.metric("Binary Log Loss", f"{ll_home:.4f}")
# Draw
b_draw, ll_draw = calculate_metrics(
proj_file["draw"], proj_file["draw_prob"]
)
with col2:
st.markdown("#### Draw")
st.metric("Binary Brier", f"{b_draw:.4f}")
st.metric("Binary Log Loss", f"{ll_draw:.4f}")
# Away
b_away, ll_away = calculate_metrics(
proj_file["away_win"], proj_file["away_win_prob"]
)
with col3:
st.markdown("#### Away Win")
st.metric("Binary Brier", f"{b_away:.4f}")
st.metric("Binary Log Loss", f"{ll_away:.4f}")
st.divider()
# Row 3: Clean Sheets (No changes needed here as CS is binary)
actual_cs = pd.concat(
[proj_file["home_clean_sheet"], proj_file["away_clean_sheet"]],
ignore_index=True,
)
pred_cs = pd.concat(
[
proj_file["home_clean_sheet_odds"],
proj_file["away_clean_sheet_odds"],
],
ignore_index=True,
)
b_cs, ll_cs = calculate_metrics(actual_cs, pred_cs)
c1, c2 = st.columns([1, 3])
with c1:
st.markdown("#### Clean Sheets")
st.metric("CS Brier", f"{b_cs:.4f}")
st.metric("CS Log Loss", f"{ll_cs:.4f}")
with c2:
fig_hist = px.histogram(
pred_cs, nbins=20, title="Distribution of CS Probabilities"
)
fig_hist.update_layout(
height=300,
margin=dict(l=20, r=20, t=30, b=20),
plot_bgcolor="rgba(0,0,0,0.5)",
paper_bgcolor="rgba(0,0,0,0.5)",
)
st.plotly_chart(fig_hist, use_container_width=True)
with tab2:
st.warning(
"Note: Data excludes GW1 & GW2 projections (I forgot to save the projections :sweat_smile:)."
)
# --- 1. Prepare Data by Gameweek ---
# We need to extract the Gameweek from the column names or if you have a 'GW' column
# Since your data structure seems to be wide (one row per match), we assume there's a 'GW' column.
# If not, we can infer it or you might need to ensure your excel has a 'GW' column.
# CHECK: Does your dataframe have a 'GW' or 'Gameweek' column?
# If not, add this line to your data loading step:
# proj_file['GW'] = proj_file['gameweek'] # or whatever your column is named
ll, bs, rps = st.tabs(
["Log Loss", "Brier Score", "Ranked Probability Score (RPS)"]
)
with ll:
if "GW" in proj_file.columns:
# Group by Gameweek
gw_metrics = []
for gw in sorted(proj_file["GW"].unique()):
gw_data = proj_file[proj_file["GW"] == gw]
# Get matrices for this specific GW
probs_gw = gw_data[
["home_win_prob", "draw_prob", "away_win_prob"]
].values
actuals_gw = gw_data[["home_win", "draw", "away_win"]].values
y_true_gw = actuals_gw.argmax(axis=1)
# Calculate Weekly Log Loss
ll_weekly = log_loss(y_true_gw, probs_gw, labels=[0, 1, 2])
gw_metrics.append(
{
"Gameweek": gw,
"Weekly Log Loss": ll_weekly,
"Match Count": len(gw_data),
}
)
trend_df = pd.DataFrame(gw_metrics)
# Calculate Cumulative Log Loss (The "Real" Score)
# We need to reconstruct the full history to do this accurately
cumulative_scores = []
all_probs = []
all_true = []
for gw in sorted(proj_file["GW"].unique()):
gw_data = proj_file[proj_file["GW"] == gw]
all_probs.append(
gw_data[
["home_win_prob", "draw_prob", "away_win_prob"]
].values
)
all_true.append(
gw_data[["home_win", "draw", "away_win"]].values.argmax(
axis=1
)
)
# Stack all data up to this point
curr_probs = np.vstack(all_probs)
curr_true = np.concatenate(all_true)
cum_ll = log_loss(curr_true, curr_probs, labels=[0, 1, 2])
cumulative_scores.append(cum_ll)
trend_df["Cumulative Log Loss"] = cumulative_scores
# --- 2. Plotting ---
# Create a dual-line chart
fig_trend = go.Figure()
# Line 1: Weekly Volatility (Faint)
fig_trend.add_trace(
go.Scatter(
x=trend_df["Gameweek"],
y=trend_df["Weekly Log Loss"],
mode="lines+markers",
name="Weekly Score",
line=dict(color="#10f770", width=1, dash="dot"),
hovertemplate="GW%{x}: %{y:.3f}<extra></extra>",
)
)
# Line 2: Cumulative Trend (Strong)
fig_trend.add_trace(
go.Scatter(
x=trend_df["Gameweek"],
y=trend_df["Cumulative Log Loss"],
mode="lines+markers",
name="Cumulative Trend",
line=dict(color="#FF4B4B", width=3),
hovertemplate="Avg up to GW%{x}: <b>%{y:.3f}</b><extra></extra>",
)
)
# Add Benchmark Lines (The Context)
fig_trend.add_hline(
y=1.0986,
line_dash="dot",
line_color="yellow",
annotation_text="Naive Model (1.06)",
annotation_position="top right",
)
fig_trend.add_hline(
y=0.98,
line_dash="dot",
line_color="#2cacdf",
annotation_text="Bookie Closing (0.98)",
annotation_position="bottom right",
)
# Styling to match your transparent theme
fig_trend.update_layout(
title="Log Loss Trend",
xaxis_title="Gameweek",
yaxis_title="Log Loss (Lower values are better)",
height=900,
hovermode="x unified",
plot_bgcolor="rgba(0,0,0,0.8)",
paper_bgcolor="rgba(0,0,0,0.8)",
yaxis=dict(
autorange="reversed"
), # Invert Y axis so "Lower" (Better) is higher up visually?
# Actually, standard is usually 0 at bottom. Let's keep standard but remember lower is better.
)
# Note: I didn't invert Y-axis because it can be confusing, but keep in mind the trend should go DOWN.
st.plotly_chart(fig_trend, use_container_width=True)
# Insight Text
current_trend = trend_df["Cumulative Log Loss"].iloc[-1]
start_trend = trend_df["Cumulative Log Loss"].iloc[0]
improvement = start_trend - current_trend
if improvement > 0:
st.success(
f"Model has improved by {improvement:.4f} since the start of tracking."
)
else:
st.info(
f"Mdel performance has degraded slightly ({improvement:.4f})."
)
with bs:
brier_metrics = []
cumulative_probs = []
cumulative_actuals = []
# Sort by Gameweek to ensure correct cumulative calculation
sorted_gws = sorted(proj_file["GW"].unique())
for gw in sorted_gws:
gw_data = proj_file[proj_file["GW"] == gw]
# Extract matrices
probs_gw = gw_data[
["home_win_prob", "draw_prob", "away_win_prob"]
].values
# One-hot encode the actuals (Home/Draw/Away columns)
actuals_gw_onehot = gw_data[["home_win", "draw", "away_win"]].values
# A. Weekly Brier Score
# Formula: Mean of sum of squared differences for all 3 outcomes
brier_weekly = np.mean(
np.sum((probs_gw - actuals_gw_onehot) ** 2, axis=1)
)
# B. Update Cumulative Lists
cumulative_probs.append(probs_gw)
cumulative_actuals.append(actuals_gw_onehot)
# C. Calculate Cumulative Brier (The "Real" Score)
curr_probs_stack = np.vstack(cumulative_probs)
curr_actuals_stack = np.vstack(cumulative_actuals)
brier_cum = np.mean(
np.sum((curr_probs_stack - curr_actuals_stack) ** 2, axis=1)
)
brier_metrics.append(
{
"Gameweek": gw,
"Weekly Brier": brier_weekly,
"Cumulative Brier": brier_cum,
}
)
brier_df = pd.DataFrame(brier_metrics)
# --- 2. Plotting ---
fig_brier = go.Figure()
# Line 1: Weekly Volatility (Faint)
fig_brier.add_trace(
go.Scatter(
x=brier_df["Gameweek"],
y=brier_df["Weekly Brier"],
mode="lines+markers",
name="Weekly Score",
line=dict(color="#ec0ba9", width=1, dash="dot"),
hovertemplate="GW%{x}: %{y:.4f}<extra></extra>",
)
)
# Line 2: Cumulative Trend (Strong Blue)
fig_brier.add_trace(
go.Scatter(
x=brier_df["Gameweek"],
y=brier_df["Cumulative Brier"],
mode="lines+markers",
name="Cumulative Trend",
line=dict(color="#0068C9", width=3),
hovertemplate="Avg up to GW%{x}: <b>%{y:.4f}</b><extra></extra>",
)
)
# Add Reference Line (Random Guess)
# Random guess (0.33/0.33/0.33) results in a Brier of ~0.667
fig_brier.add_hline(
y=0.667,
line_dash="dot",
line_color="yellow",
annotation_text="Naive Model (0.667)",
annotation_position="bottom right",
)
# Styling
fig_brier.update_layout(
title="Multi-class Brier Score History",
xaxis_title="Gameweek",
yaxis_title="Brier Score (Lower values are better)",
height=900,
hovermode="x unified",
plot_bgcolor="rgba(0,0,0,0.8)",
paper_bgcolor="rgba(0,0,0,0.8)",
yaxis=dict(autorange="reversed"),
)
st.plotly_chart(fig_brier, use_container_width=True)
# Insight Logic
curr_brier = brier_df["Cumulative Brier"].iloc[-1]
if curr_brier < 0.60:
st.success(
f"Cumulative Brier Score ({curr_brier:.3f}) is well below the random baseline."
)
elif curr_brier < 0.66:
st.success(
f"Beating the naive model with the current Brier Score ({curr_brier:.3f})"
)
else:
st.success(
f"Score ({curr_brier:.4f}) is close to or worse than random guessing."
)
with rps:
rps_metrics = []
cumulative_rps_sum = 0
match_count = 0
sorted_gws = sorted(proj_file["GW"].unique())
for gw in sorted_gws:
gw_data = proj_file[proj_file["GW"] == gw]
# Extract Probs
p_home = gw_data["home_win_prob"].values
p_draw = gw_data["draw_prob"].values
# We don't technically need p_away for the calculation, just H and D for the CDFs
# Extract Actuals (1 or 0)
obs_home = gw_data["home_win"].values
obs_draw = gw_data[
"draw_prob"
].values # Wait, this should be actual outcome, not prob
# Let's fix the extraction of actuals to be safe
obs_home = gw_data["home_win"].values
obs_draw = gw_data["draw"].values
# --- CALCULATE RPS FOR THIS BATCH ---
# RPS Formula for 3 outcomes (Home, Draw, Away):
# RPS = 0.5 * [ (CDF_pred_1 - CDF_obs_1)^2 + (CDF_pred_2 - CDF_obs_2)^2 ]
# 1. Cumulative Distribution Functions (CDF)
cdf_pred_1 = p_home
cdf_pred_2 = p_home + p_draw
cdf_obs_1 = obs_home
cdf_obs_2 = obs_home + obs_draw
# 2. Sum of squared differences
rps_per_match = 0.5 * (
(cdf_pred_1 - cdf_obs_1) ** 2 + (cdf_pred_2 - cdf_obs_2) ** 2
)
# Weekly Stats
rps_weekly_avg = np.mean(rps_per_match)
# Cumulative Stats
cumulative_rps_sum += np.sum(rps_per_match)
match_count += len(gw_data)
rps_cumulative_avg = cumulative_rps_sum / match_count
rps_metrics.append(
{
"Gameweek": gw,
"Weekly RPS": rps_weekly_avg,
"Cumulative RPS": rps_cumulative_avg,
}
)
rps_df = pd.DataFrame(rps_metrics)
# --- 2. Plotting ---
fig_rps = go.Figure()
# Line 1: Weekly Volatility (Faint Green)
fig_rps.add_trace(
go.Scatter(
x=rps_df["Gameweek"],
y=rps_df["Weekly RPS"],
mode="lines+markers",
name="Weekly RPS",
line=dict(color="#e68f0e", width=1, dash="dot"),
hovertemplate="GW%{x}: %{y:.4f}<extra></extra>",
)
)
# Line 2: Cumulative Trend (Solid Green)
fig_rps.add_trace(
go.Scatter(
x=rps_df["Gameweek"],
y=rps_df["Cumulative RPS"],
mode="lines+markers",
name="Cumulative Trend",
line=dict(color="#09B4AB", width=3), # SeaGreen
hovertemplate="Avg up to GW%{x}: <b>%{y:.4f}</b><extra></extra>",
)
)
# Add Reference Line (Random Guess)
# Random guess (0.33/0.33/0.33) results in RPS ~0.27
fig_rps.add_hline(
y=0.27,
line_dash="dot",
line_color="yellow",
annotation_text="Naive Model (~0.27)",
annotation_position="bottom right",
)
# Styling
fig_rps.update_layout(
title="RPS History",
xaxis_title="Gameweek",
yaxis_title="RPS (Lower values are Better)",
height=900,
hovermode="x unified",
plot_bgcolor="rgba(0,0,0,0.8)",
paper_bgcolor="rgba(0,0,0,0.8)",
yaxis=dict(autorange="reversed"),
)
st.plotly_chart(fig_rps, use_container_width=True)
# Insight Logic
curr_rps = rps_df["Cumulative RPS"].iloc[-1]
# Benchmarks for RPS
if curr_rps < 0.20:
st.success(
f"RPS ({curr_rps:.4f}) is exceptionally low, superb accuracy."
)
elif curr_rps < 0.22:
st.success(
f"RPS ({curr_rps:.4f}) is above average, decentish performance."
)
else:
st.success(
f"RPS ({curr_rps:.4f}) is approaching random guessing territory (~0.27)."
)
# ==========================================
# TAB 2: GOALS & xG ACCURACY
# ==========================================
with main_tab2:
st.warning(
"Note: Data excludes GW1 & GW2 projections (I forgot to save the projections :sweat_smile:)."
)
# Prepare Data
actual_goals_all = pd.concat(
[proj_file["home_goals"], proj_file["away_goals"]], ignore_index=True
)
expected_goals_all = pd.concat(
[proj_file["expected_home_goals"], proj_file["expected_away_goals"]],
ignore_index=True,
)
xg_all = pd.concat(
[proj_file["xg_home"], proj_file["xg_away"]], ignore_index=True
)
# Create a clean DF for plotting
plot_df = pd.DataFrame(
{
"Actual Goals": actual_goals_all,
"Predicted Goals": expected_goals_all,
"xG Generated": xg_all,
}
)
col_g1, col_g2 = st.columns(2)
# -- Left Side: Projections vs Actuals --
with col_g1:
st.subheader("Goals v/s Projected Goals")
rmse_goals = np.sqrt(
mean_squared_error(actual_goals_all, expected_goals_all)
)
mae_goals = mean_absolute_error(actual_goals_all, expected_goals_all)
# Metrics Row
m1, m2 = st.columns(2)
m1.metric("RMSE", f"{rmse_goals:.3f}")
m2.metric("MAE", f"{mae_goals:.3f}")
# Interactive Scatter Plot
fig_goals = px.scatter(
plot_df,
x="Predicted Goals",
y="Actual Goals",
trendline="ols",
title="Goals v/s Projected Goals",
opacity=0.5,
)
fig_goals.update_layout(
height=400,
plot_bgcolor="rgba(0,0,0,0.8)",
paper_bgcolor="rgba(0,0,0,0.8)",
)
st.plotly_chart(
fig_goals, use_container_width=True, config={"staticPlot": False}
)
# -- Right Side: Projections vs xG --
with col_g2:
st.subheader("xG v/s Projected Goals")
rmse_xg = np.sqrt(mean_squared_error(xg_all, expected_goals_all))
mae_xg = mean_absolute_error(xg_all, expected_goals_all)
m1, m2 = st.columns(2)
m1.metric("RMSE", f"{rmse_xg:.3f}")
m2.metric("MAE", f"{mae_xg:.3f}")
# Interactive Scatter Plot
fig_xg = px.scatter(
plot_df,
x="Predicted Goals",
y="xG Generated",
trendline="ols",
title="xG v/s Projected Goals",
color_discrete_sequence=["#FF4B4B"],
opacity=0.5,
)
fig_xg.update_layout(
height=400,
plot_bgcolor="rgba(0,0,0,0.8)",
paper_bgcolor="rgba(0,0,0,0.8)",
)
st.plotly_chart(fig_xg, use_container_width=True)
# ==========================================
# TAB 3: xMINS & xPTS ACCURACY
# ==========================================
with main_tab3:
st.warning(
"Note: Data excludes GW1 & GW2 projections (I forgot to save the projections :sweat_smile:)."
)
# Helper to render the fancy dataframe
def render_stats_display(stats_dict):
if stats_dict:
c1, c2 = st.columns(2)
# We combine them into one dataframe for a cleaner look or keep separate
# Let's keep separate but use column_config
with c1:
st.markdown("#### xMins Accuracy")
st.dataframe(
stats_dict["xMins"],
hide_index=True,
use_container_width=True,
column_config={
"R2": st.column_config.ProgressColumn(
"R2 Score",
help="R-Squared Score (Higher is better)",
format="%.3f",
min_value=-1,
max_value=1,
),
"MAE": st.column_config.NumberColumn("MAE", format="%.3f"),
"RMSE": st.column_config.NumberColumn(
"RMSE", format="%.3f"
),
},
)
with c2:
st.markdown("#### xPts Accuracy")
st.dataframe(
stats_dict["xPts"],
hide_index=True,
use_container_width=True,
column_config={
"R2": st.column_config.ProgressColumn(
"R2 Score",
help="R-Squared Score (Higher is better)",
format="%.3f",
min_value=-1,
max_value=1,
),
"MAE": st.column_config.NumberColumn("MAE", format="%.3f"),
"RMSE": st.column_config.NumberColumn(
"RMSE", format="%.3f"
),
},
)
else:
st.error("No data found for the defined gameweeks.")
sub_tab_nolimit, sub_tab_nonzero = st.tabs(
["Show All Players", "Active Players Only (1+ xMins)"]
)
with sub_tab_nolimit:
st.caption("Including bench/injured/unavailable/non-starter players.")
stats_no_limit = get_player_stats(pts_file, min_xmins=0)
render_stats_display(stats_no_limit)
with sub_tab_nonzero:
st.caption("Excluding players with 0 projected xMins.")
stats_limit = get_player_stats(pts_file, min_xmins=1)
render_stats_display(stats_limit)
with tab3:
st.subheader("Team Ratings Overview")
items = [
"The Attack rating is the **projected goals for** versus an average team (does not account for home advantage).",
"The Defence rating is the **projected goals against** versus an average team (does not account for home advantage).",
"Diff is just the difference between the Attack and Defence ratings.",
]
bulleted_string = "\n\n".join([f":arrow_right: {item}" for item in items])
st.info(bulleted_string)
@st.cache_data(ttl=3600) # Cache for 24 hours
def get_image_base64(file_path):
"""Cache team logo encoding - significant speedup"""
if os.path.exists(file_path):
with open(file_path, "rb") as f:
data = f.read()
encoded = base64.b64encode(data).decode()
return f"data:image/png;base64,{encoded}"
return None
df = load_team_ratings_cached()
df.index = df.index + 1
# 1. COMPLETE MAP OF TEAM NAMES TO RELIABLE LOGO URLs
# Using crests.football-data.org IDs which are stable and reliable.
team_ids = {
"Arsenal": 57,
"Manchester City": 65,
"Liverpool": 64,
"Chelsea": 61,
"Newcastle United": 67,
"Aston Villa": 58,
"Manchester United": 66,
"Brentford": 402,
"Brighton and Hove Albion": 397,
"AFC Bournemouth": 1044,
"Tottenham Hotspur": 73,
"Crystal Palace": 354,
"Fulham": 63,
"Nottingham Forest": 351,
"Everton": 62,
"Leeds United": 341,
"West Ham United": 563,
"Wolverhampton Wanderers": 76,
"Sunderland": 71,
"Burnley": 328,
}
# Create the URL column dynamically
# We use .get() to avoid errors if a team name has a typo, and fallback to None
df["TeamID"] = df["Team"].map(team_ids)
# Generate the Base64 strings for the images
df["Logo"] = df["TeamID"].apply(
lambda x: get_image_base64(f"logos/{int(x)}.png") if pd.notnull(x) else None
)
tab_plot, tab_data = st.tabs(["Scatter Plot", "Data Table"])
with tab_plot:
# Base Chart Definitions
base = alt.Chart(df).encode(
x=alt.X(
"Defence",
scale=alt.Scale(reverse=True, domain=[0.6, 1.9], clamp=True),
axis=alt.Axis(title="Better Defence ⮕)", tickCount=5, titlePadding=40),
),
y=alt.Y(
"Attack",
scale=alt.Scale(domain=[0.6, 2.0]),
axis=alt.Axis(title="Better Attack ⮕)", tickCount=5, titlePadding=50),
),
tooltip=["Team", "Attack", "Defence", "Diff"],
)
# Layer 1: The Team Logos
images = base.mark_image(width=40, height=50).encode(url="Logo")
# Layer 2: Fallback Dots (just in case a logo is missing)
# Combine layers and increase chart size
chart = (
(images)
.interactive()
.properties(
height=660,
padding={"left": 20, "top": 20, "right": 20, "bottom": 20},
)
)
st.altair_chart(chart, use_container_width=True)
with tab_data:
st.dataframe(
df[["Team", "Attack", "Defence", "Diff"]],
height=740,
use_container_width=True,
)
with tab4:
st.header("Projections for the upcoming fixtures")
def get_fresh_data():
# Load the file
df = load_fixture_projections_cached()
# Ensure no whitespace issues in headers
df.columns = df.columns.str.strip()
# Select top 10 rows
df_subset = df.head(10)
return df_subset
df = get_fresh_data()
# We use the original column names (keys) but map them to nice labels
df["home_win_prob"] = df["home_win_prob"] * 100
df["away_win_prob"] = df["away_win_prob"] * 100
df["draw_prob"] = df["draw_prob"] * 100
df["home_clean_sheet_odds"] = df["home_clean_sheet_odds"] * 100
df["away_clean_sheet_odds"] = df["away_clean_sheet_odds"] * 100
st.dataframe(
df,
column_order=[
"home_team",
"home_win_prob",
"draw_prob",
"away_win_prob",
"away_team",
"expected_home_goals",
"expected_away_goals",
"home_clean_sheet_odds",
"away_clean_sheet_odds",
],
column_config={
"home_team": "Home Team",
"away_team": "Away Team",
"home_win_prob": st.column_config.ProgressColumn(
"Home Win",
format="%.1f%%", # Display as percentage
min_value=0,
max_value=100,
),
"draw_prob": st.column_config.ProgressColumn(
"Draw",
format="%.1f%%",
min_value=0,
max_value=100,
),
"away_win_prob": st.column_config.ProgressColumn(
"Away Win",
format="%.1f%%",
min_value=0,
max_value=100,
),
"expected_home_goals": st.column_config.NumberColumn(
"Home xG", format="%.2f"
),
"expected_away_goals": st.column_config.NumberColumn(
"Away xG", format="%.2f"
),
"home_clean_sheet_odds": st.column_config.ProgressColumn(
"Home CS%",
format="%.1f%%",
min_value=0,
max_value=100,
), # Format 0.3 as 30%
"away_clean_sheet_odds": st.column_config.ProgressColumn(
"Away CS%",
format="%.1f%%",
min_value=0,
max_value=100,
),
},
hide_index=True,
use_container_width=True,
height=438,
row_height=40,
)