Spaces:
Running
Running
| import itertools as _itertools | |
| import json | |
| import os | |
| from typing import Any, Dict, List, Optional | |
| import numpy as np | |
| import pandas as pd | |
| import requests | |
| from fastapi import FastAPI, HTTPException, Depends | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from sqlalchemy.orm import Session | |
| from sqlalchemy.orm.attributes import flag_modified | |
| import engine | |
| from auth import router as auth_router | |
| from fpl_api import get_fpl_team_data | |
| from solver import run_milp_model | |
| from solver_engine import prep_solver_data | |
| from database import get_db, User, SessionLocal, GlobalConfig | |
| # --- PYDANTIC MODELS FOR REACT PAYLOAD --- | |
| class PlayerData(BaseModel): | |
| id: int | |
| name: str | |
| pos: str | |
| team: str | |
| now_cost: float | |
| sell_price: Optional[float] = None | |
| evs: Dict[str, float] # JSON keys are strings; horizon GW keyed | |
| class SolveRequest(BaseModel): | |
| team_id: int | |
| horizon_gws: List[int] | |
| current_squad_ids: List[Any] | |
| market_players: List[PlayerData] | |
| in_the_bank: float | |
| free_transfers: int | |
| settings: dict = {} | |
| comprehensive_settings: dict = {} | |
| class ChipSolveRequest(BaseModel): | |
| team_id: int | |
| horizon_gws: List[int] | |
| current_squad_ids: List[Any] | |
| market_players: List[PlayerData] | |
| in_the_bank: float | |
| free_transfers: int | |
| settings: dict = {} | |
| comprehensive_settings: dict = {} | |
| # { "wc": [gw, ...], "fh": [gw, ...], "bb": [gw, ...], "tc": [gw, ...] } | |
| chip_gw_options: Dict[str, List[int]] = {} | |
| class SettingsPayload(BaseModel): | |
| team_id: int | |
| quick_settings: Dict[str, Any] | |
| advanced_settings: Dict[str, Any] | |
| app = FastAPI(title="Luigi's Mansion FPL API") | |
| def read_root(): | |
| return { | |
| "status": "success", | |
| "message": "Luigi's Mansion FPL Solver API is live and running!", | |
| } | |
| app.include_router(auth_router) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| ADMIN_PASSWORD = "Monkeyrocks11$$" | |
| TEAMS_DICT = { | |
| "Arsenal": 1, | |
| "Aston Villa": 2, | |
| "Burnley": 3, | |
| "AFC Bournemouth": 4, | |
| "Brentford": 5, | |
| "Brighton and Hove Albion": 6, | |
| "Chelsea": 7, | |
| "Crystal Palace": 8, | |
| "Everton": 9, | |
| "Fulham": 10, | |
| "Leeds United": 11, | |
| "Liverpool": 12, | |
| "Manchester City": 13, | |
| "Manchester United": 14, | |
| "Newcastle United": 15, | |
| "Nottingham Forest": 16, | |
| "Sunderland": 17, | |
| "Tottenham Hotspur": 18, | |
| "West Ham United": 19, | |
| "Wolverhampton Wanderers": 20, | |
| } | |
| TEAMS_DICT_REVERSE = {v: k for k, v in TEAMS_DICT.items()} | |
| POS_MAP = {1: "G", 2: "D", 3: "M", 4: "F"} | |
| POINTS_CONFIG = { | |
| "goal": {1: 10, 2: 6, 3: 5, 4: 4}, | |
| "assist": 3, | |
| "clean_sheet": {1: 4, 2: 4, 3: 1, 4: 0}, | |
| "saves_per_3": 1, | |
| "penalty_points_per_position": {2: 0.9, 3: 0.7, 4: 0.5}, | |
| } | |
| class AppData: | |
| finalized_df = None | |
| match_df = None | |
| output_df = None | |
| # All JSON States | |
| player_penalty_shares = {} | |
| admin_xmins_overrides = {} | |
| admin_baseline_overrides = {} | |
| player_status_overrides = {} | |
| availability_multipliers = {} | |
| admin_fixture_overrides = {} | |
| decay_rates = { | |
| "default": 0.99, | |
| "suspended": 0.99, | |
| "injured_decay": 0.99, | |
| "rotational_risk": 0.95, | |
| } | |
| ramp_up_rates = { | |
| "default": 3, | |
| "injured": 9, | |
| "suspended": 3, | |
| "starter": 0, | |
| "rotational_risk": 2, | |
| } | |
| MINS_THRESHOLD = 30 | |
| RAMP_UP_PERIOD = 3 | |
| app_data = AppData() | |
| def load_db_int_keys(db_key, default): | |
| db = SessionLocal() | |
| try: | |
| config = db.query(GlobalConfig).filter(GlobalConfig.key == db_key).first() | |
| if config and config.value: | |
| return {int(k): v for k, v in config.value.items()} | |
| finally: | |
| db.close() | |
| return default | |
| def load_db_string_keys(db_key, default): | |
| db = SessionLocal() | |
| try: | |
| config = db.query(GlobalConfig).filter(GlobalConfig.key == db_key).first() | |
| if config and config.value: | |
| return config.value | |
| finally: | |
| db.close() | |
| return default | |
| def save_config_to_db(db_key, value): | |
| db = SessionLocal() | |
| try: | |
| config = db.query(GlobalConfig).filter(GlobalConfig.key == db_key).first() | |
| if config: | |
| config.value = value | |
| flag_modified(config, "value") | |
| else: | |
| config = GlobalConfig(key=db_key, value=value) | |
| db.add(config) | |
| db.commit() | |
| finally: | |
| db.close() | |
| def load_fpl_data(): | |
| print("Fetching FPL API data...") | |
| r = requests.get( | |
| "https://fantasy.premierleague.com/api/bootstrap-static/", timeout=10 | |
| ).json() | |
| players = pd.DataFrame(r["elements"]) | |
| players["name"] = players["first_name"] + " " + players["second_name"] | |
| players = players[ | |
| [ | |
| "id", | |
| "name", | |
| "web_name", | |
| "element_type", | |
| "now_cost", | |
| "team", | |
| "chance_of_playing_this_round", | |
| "news", | |
| "photo", | |
| ] | |
| ] | |
| players["now_cost"] = players["now_cost"] / 10 | |
| if os.path.exists("rename.json"): | |
| with open("rename.json", "r", encoding="utf-8") as f: | |
| players["name"] = players["name"].replace(json.load(f)) | |
| print("Loading baseline stats...") | |
| gk_stats_df = pd.read_csv("statistical_weighted_baselines_gk.csv").rename( | |
| columns=lambda x: x.strip() | |
| ) | |
| outfield_stats_df = pd.read_csv("statistical_weighted_baselines.csv").rename( | |
| columns=lambda x: x.strip() | |
| ) | |
| gk_stats_df["player_name"] = gk_stats_df["player_name"].str.strip() | |
| outfield_stats_df["player_name"] = outfield_stats_df["player_name"].str.strip() | |
| gk_mask = players["element_type"] == 1 | |
| gk_merged = players[gk_mask].merge( | |
| gk_stats_df, left_on="name", right_on="player_name", how="left" | |
| ) | |
| outfield_merged = players[~gk_mask].merge( | |
| outfield_stats_df, left_on="name", right_on="player_name", how="left" | |
| ) | |
| final_df = ( | |
| pd.concat([gk_merged, outfield_merged], ignore_index=True) | |
| .sort_values("id") | |
| .reset_index(drop=True) | |
| ) | |
| final_df.fillna(0, inplace=True) | |
| final_df["Avg_BPS"] = 0.0 | |
| final_df.loc[final_df["element_type"] == 1, "Avg_BPS"] = final_df[ | |
| "baseline_gk_bps_p90" | |
| ].astype(float) | |
| final_df.loc[final_df["element_type"] == 2, "Avg_BPS"] = ( | |
| final_df["baseline_Neutral_BPS_p90"] + final_df["baseline_Def_BPS_p90"] | |
| ) | |
| final_df.loc[final_df["element_type"] == 3, "Avg_BPS"] = ( | |
| final_df["baseline_Neutral_BPS_p90"] + final_df["baseline_Mid_BPS_p90"] | |
| ) | |
| final_df.loc[final_df["element_type"] == 4, "Avg_BPS"] = ( | |
| final_df["baseline_Neutral_BPS_p90"] + final_df["baseline_Fwd_BPS_p90"] | |
| ) | |
| print("Loading team and match projections...") | |
| team_baselines = pd.read_excel("team_totals.xlsx", sheet_name="Sheet2") | |
| team_baselines["Teams"] = team_baselines["Teams"].replace(TEAMS_DICT) | |
| for stat in ["xG", "xA", "CBIT", "CBITR", "YC", "RC"]: | |
| final_df[f"Team_{stat}"] = final_df["team"].map( | |
| team_baselines.set_index("Teams")[stat].to_dict() | |
| ) | |
| final_df["Team"] = final_df["team"].map(TEAMS_DICT_REVERSE) | |
| final_df["xG_share"] = final_df["baseline_xG_p90"] / final_df["Team_xG"].replace( | |
| 0, np.nan | |
| ) | |
| final_df["xA_share"] = final_df["baseline_xA_p90"] / final_df["Team_xA"].replace( | |
| 0, np.nan | |
| ) | |
| final_df["xCBIT_share"] = final_df["baseline_CBIT_p90"] / final_df[ | |
| "Team_CBIT" | |
| ].replace(0, np.nan) | |
| final_df["xCBITR_share"] = final_df["baseline_CBITR_p90"] / final_df[ | |
| "Team_CBITR" | |
| ].replace(0, np.nan) | |
| final_df["YC_share"] = final_df["baseline_yc_p90"] / final_df["Team_YC"].replace( | |
| 0, np.nan | |
| ) | |
| final_df["RC_share"] = final_df["baseline_rc_p90"] / final_df["Team_RC"].replace( | |
| 0, np.nan | |
| ) | |
| final_df.fillna(0, inplace=True) | |
| match_df = pd.read_csv("ewmapois_model.csv").rename(columns=lambda x: x.strip()) | |
| match_df["home_team_num"] = match_df["home_team"].map(TEAMS_DICT) | |
| match_df["away_team_num"] = match_df["away_team"].map(TEAMS_DICT) | |
| app_data.finalized_df = final_df | |
| app_data.match_df = match_df | |
| # --- LOAD ALL DB OVERRIDES --- | |
| app_data.player_penalty_shares = load_db_int_keys( | |
| "penalty_shares", {16: 0.65, 17: 0.15} | |
| ) | |
| raw_xmins = load_db_int_keys("admin_xmins", {}) | |
| processed_overrides = {} | |
| for pid, gws in raw_xmins.items(): | |
| processed_gws = {} | |
| for gw_key, val in gws.items(): | |
| if str(gw_key).isdigit(): | |
| processed_gws[int(gw_key)] = val | |
| else: | |
| processed_gws[str(gw_key)] = val | |
| processed_overrides[int(pid)] = processed_gws | |
| app_data.admin_xmins_overrides = processed_overrides | |
| app_data.admin_baseline_overrides = load_db_int_keys("admin_baselines", {}) | |
| app_data.player_status_overrides = load_db_int_keys("player_status", {}) | |
| app_data.availability_multipliers = load_db_int_keys("availability", {}) | |
| app_data.admin_fixture_overrides = load_db_string_keys("admin_fixtures", {}) | |
| # --- THE FALLBACK LOGIC --- | |
| # Apply baseline JSON overrides on top of the CSV data. If not in JSON, it naturally keeps the CSV data. | |
| for pid, overrides in app_data.admin_baseline_overrides.items(): | |
| if pid in app_data.finalized_df["id"].values: | |
| if "baseline_xMins" in overrides: | |
| app_data.finalized_df.loc[ | |
| app_data.finalized_df["id"] == pid, "baseline_xMins" | |
| ] = overrides["baseline_xMins"] | |
| print("Running initial FPL point engine...") | |
| app_data.output_df = engine.calculate_all_points( | |
| player_df_base=app_data.finalized_df, | |
| match_df=app_data.match_df, | |
| player_penalty_shares=app_data.player_penalty_shares, | |
| MINS_SCALING_BONUS=0.0, | |
| pos_map=POS_MAP, | |
| teams_dict_1=TEAMS_DICT_REVERSE, | |
| teams_dict=TEAMS_DICT, | |
| points_config=POINTS_CONFIG, | |
| effective_xmins_overrides=app_data.admin_xmins_overrides, | |
| MINS_THRESHOLD=app_data.MINS_THRESHOLD, | |
| RAMP_UP_PERIOD=app_data.RAMP_UP_PERIOD, | |
| decay_rates=app_data.decay_rates, | |
| ramp_up_rates=app_data.ramp_up_rates, | |
| user_player_status_overrides=app_data.player_status_overrides, | |
| team_skepticism={}, | |
| effective_availability_multipliers=app_data.availability_multipliers, | |
| ) | |
| # Inject baseline_xMins into the output so the frontend can display it | |
| app_data.output_df["baseline_xMins"] = app_data.output_df["ID"].map( | |
| app_data.finalized_df.set_index("id")["baseline_xMins"] | |
| ) | |
| # --- BULLETPROOF ADVANCED STATS INJECTION --- | |
| try: | |
| finalized_idx = app_data.finalized_df.set_index("id") | |
| # 1. Add photo and Price so Transfer Market can display images and cost | |
| if "photo" in finalized_idx.columns: | |
| app_data.output_df["photo"] = app_data.output_df["ID"].map( | |
| finalized_idx["photo"] | |
| ) | |
| if "now_cost" in finalized_idx.columns: | |
| app_data.output_df["Price"] = app_data.output_df["ID"].map( | |
| finalized_idx["now_cost"] | |
| ) | |
| # 2. Safely map xG and xA | |
| app_data.output_df["xG"] = app_data.output_df["ID"].map( | |
| lambda pid: round( | |
| (finalized_idx.loc[pid, "baseline_xG_p90"] / 90) | |
| * finalized_idx.loc[pid, "baseline_xMins"], | |
| 2, | |
| ) | |
| if pid in finalized_idx.index and "baseline_xG_p90" in finalized_idx.columns | |
| else 0 | |
| ) | |
| app_data.output_df["xA"] = app_data.output_df["ID"].map( | |
| lambda pid: round( | |
| (finalized_idx.loc[pid, "baseline_xA_p90"] / 90) | |
| * finalized_idx.loc[pid, "baseline_xMins"], | |
| 2, | |
| ) | |
| if pid in finalized_idx.index and "baseline_xA_p90" in finalized_idx.columns | |
| else 0 | |
| ) | |
| # 3. Safely get CS% | |
| unique_gws = sorted(app_data.match_df["GW"].unique()) | |
| for gw in unique_gws: | |
| if f"{gw}_xG" in app_data.output_df.columns: | |
| # Format CS% for the GW | |
| app_data.output_df[f"{gw}_CS_Pct"] = ( | |
| app_data.output_df[f"{gw}_CS"] * 100 | |
| ).apply(lambda x: f"{x:.0f}%") | |
| # Calculate HIT% for the GW using the stored CBIT / CBITR | |
| def calc_hit_gw(row): | |
| pos = row["Pos"] | |
| if pos == "D": | |
| cbit = row.get(f"{gw}_cbit", 0) | |
| prob = engine.neg_binom_probability_at_least( | |
| cbit, 10, dispersion=3.2 | |
| ) | |
| elif pos == "M": | |
| cbitr = row.get(f"{gw}_cbitr", 0) | |
| prob = engine.neg_binom_probability_at_least( | |
| cbitr, 12, dispersion=2.8 | |
| ) | |
| elif pos == "F": | |
| cbitr = row.get(f"{gw}_cbitr", 0) | |
| prob = engine.neg_binom_probability_at_least( | |
| cbitr, 12, dispersion=1.7 | |
| ) | |
| else: | |
| return "-" | |
| return f"{prob * 100:.0f}%" | |
| app_data.output_df[f"{gw}_DefconHit"] = app_data.output_df.apply( | |
| calc_hit_gw, axis=1 | |
| ) | |
| except Exception as e: | |
| print(f"WARNING: Could not inject advanced stats. Reason: {e}") | |
| def startup_event(): | |
| load_fpl_data() | |
| def get_projections(): | |
| if app_data.output_df is None: | |
| raise HTTPException(status_code=503, detail="Loading") | |
| clean_df = app_data.output_df.where(pd.notnull(app_data.output_df), None) | |
| return clean_df.to_dict(orient="records") | |
| class UpdateRequest(BaseModel): | |
| player_id: int | |
| baseline_edit: Optional[float] = None | |
| gw_edits: Dict[str, float] = {} | |
| is_admin: bool = False | |
| admin_password: Optional[str] = None | |
| def update_player(req: UpdateRequest): | |
| if req.is_admin: | |
| if req.admin_password != ADMIN_PASSWORD: | |
| raise HTTPException(status_code=401, detail="Invalid admin password") | |
| # Save Admin Edits directly to the JSON files | |
| if req.baseline_edit is not None: | |
| if req.player_id not in app_data.admin_baseline_overrides: | |
| app_data.admin_baseline_overrides[req.player_id] = {} | |
| app_data.admin_baseline_overrides[req.player_id]["baseline_xMins"] = ( | |
| req.baseline_edit | |
| ) | |
| save_config_to_db("admin_baselines", app_data.admin_baseline_overrides) | |
| for gw_str, xmins in req.gw_edits.items(): | |
| gw_key = int(gw_str) if str(gw_str).isdigit() else str(gw_str) | |
| if req.player_id not in app_data.admin_xmins_overrides: | |
| app_data.admin_xmins_overrides[req.player_id] = {} | |
| app_data.admin_xmins_overrides[req.player_id][gw_key] = xmins | |
| if req.gw_edits: | |
| save_config_to_db("admin_xmins", app_data.admin_xmins_overrides) | |
| player_df = app_data.finalized_df[ | |
| app_data.finalized_df["id"] == req.player_id | |
| ].copy() | |
| if player_df.empty: | |
| raise HTTPException(status_code=404, detail="Player not found") | |
| current_baseline = player_df.iloc[0]["baseline_xMins"] | |
| if req.baseline_edit is not None: | |
| player_df["baseline_xMins"] = req.baseline_edit | |
| current_baseline = req.baseline_edit | |
| effective_overrides = { | |
| req.player_id: app_data.admin_xmins_overrides.get(req.player_id, {}).copy() | |
| } | |
| for gw_str, val in req.gw_edits.items(): | |
| gw_key = int(gw_str) if str(gw_str).isdigit() else str(gw_str) | |
| effective_overrides[req.player_id][gw_key] = val | |
| # Recalculate using all the existing status/penalty configs | |
| updated_row_df = engine.calculate_all_points( | |
| player_df_base=player_df, | |
| match_df=app_data.match_df, | |
| player_penalty_shares=app_data.player_penalty_shares, | |
| MINS_SCALING_BONUS=0.0, | |
| pos_map=POS_MAP, | |
| teams_dict_1=TEAMS_DICT_REVERSE, | |
| teams_dict=TEAMS_DICT, | |
| points_config=POINTS_CONFIG, | |
| effective_xmins_overrides=effective_overrides, | |
| MINS_THRESHOLD=app_data.MINS_THRESHOLD, | |
| RAMP_UP_PERIOD=app_data.RAMP_UP_PERIOD, | |
| decay_rates=app_data.decay_rates, | |
| ramp_up_rates=app_data.ramp_up_rates, | |
| user_player_status_overrides=app_data.player_status_overrides, | |
| team_skepticism={}, | |
| effective_availability_multipliers=app_data.availability_multipliers, | |
| ) | |
| updated_row_df["baseline_xMins"] = current_baseline | |
| if req.is_admin and app_data.output_df is not None: | |
| idx = app_data.output_df[app_data.output_df["ID"] == req.player_id].index | |
| if not idx.empty: | |
| row_idx = idx[0] | |
| for col in updated_row_df.columns: | |
| if col in app_data.output_df.columns: | |
| app_data.output_df.at[row_idx, col] = updated_row_df[col].values[0] | |
| return updated_row_df.iloc[0].to_dict() | |
| def get_ratings(): | |
| if os.path.exists("team_ratings_dual_speed.csv"): | |
| df = pd.read_csv("team_ratings_dual_speed.csv") | |
| df.rename(columns=lambda x: x.strip(), inplace=True) | |
| # Strip trailing spaces just in case! | |
| df["Team"] = df["Team"].str.strip() | |
| return df.to_dict(orient="records") | |
| return [] | |
| def get_fixtures(): | |
| if app_data.match_df is not None: | |
| cols = [ | |
| "GW", | |
| "home_team", | |
| "away_team", | |
| "home_win_prob", | |
| "draw_prob", | |
| "away_win_prob", | |
| "expected_home_goals", | |
| "expected_away_goals", | |
| "home_clean_sheet_odds", | |
| "away_clean_sheet_odds", | |
| ] | |
| df = app_data.match_df[cols].copy() | |
| for col in [ | |
| "home_win_prob", | |
| "draw_prob", | |
| "away_win_prob", | |
| "expected_home_goals", | |
| "expected_away_goals", | |
| "home_clean_sheet_odds", | |
| "away_clean_sheet_odds", | |
| ]: | |
| df[col] = df[col].astype(float).round(3) | |
| return df.to_dict(orient="records") | |
| return [] | |
| def get_accuracy_players(): | |
| import os | |
| import pandas as pd | |
| file_path = "points_check.xlsx" | |
| if os.path.exists(file_path): | |
| df = pd.read_excel(file_path) | |
| df.columns = df.columns.str.strip() | |
| df.fillna(0, inplace=True) | |
| return df.to_dict(orient="records") | |
| # Fallback just in case | |
| csv_path = "points_check.xlsx - Sheet1.csv" | |
| if os.path.exists(csv_path): | |
| df = pd.read_csv(csv_path) | |
| df.columns = df.columns.str.strip() | |
| df.fillna(0, inplace=True) | |
| return df.to_dict(orient="records") | |
| return [] | |
| def get_accuracy_matches(): | |
| import os | |
| import pandas as pd | |
| file_path = "projections_check.xlsx" | |
| if os.path.exists(file_path): | |
| df = pd.read_excel(file_path) | |
| df.columns = df.columns.str.strip() | |
| df.fillna(0, inplace=True) | |
| return df.to_dict(orient="records") | |
| # Fallback just in case | |
| csv_path = "projections_check.xlsx - Sheet1.csv" | |
| if os.path.exists(csv_path): | |
| df = pd.read_csv(csv_path) | |
| df.columns = df.columns.str.strip() | |
| df.fillna(0, inplace=True) | |
| return df.to_dict(orient="records") | |
| return [] | |
| async def get_manager_team(team_id: int): | |
| try: | |
| # 1. Run the precise open-fpl-solver logic to get ITB, FTs, and Selling Prices | |
| fpl_data = get_fpl_team_data(team_id) | |
| team_data = [] | |
| # 2. Merge the official FPL data with your local Projection Data | |
| for pick in fpl_data["squad"]: | |
| pid = pick["id"] | |
| # Find the player in your master projections dataframe | |
| proj_row = app_data.output_df[app_data.output_df["ID"] == pid] | |
| if proj_row.empty: | |
| continue | |
| proj_dict = proj_row.iloc[0].to_dict() | |
| # --- CRITICAL FIX: TRUE SELLING PRICE --- | |
| # Overwrite the global "Cost Price" with the user's personal "Selling Price" | |
| proj_dict["Price"] = pick["selling_price"] | |
| proj_dict["sell_price"] = pick["selling_price"] | |
| # Add photos if needed | |
| base_row = app_data.finalized_df[app_data.finalized_df["id"] == pid] | |
| proj_dict["photo"] = ( | |
| base_row.iloc[0]["photo"] | |
| if not base_row.empty and "photo" in base_row.columns | |
| else "" | |
| ) | |
| team_data.append(proj_dict) | |
| # 3. Send the exact payload React is expecting! | |
| return { | |
| "in_the_bank": fpl_data["in_the_bank"], | |
| "free_transfers": fpl_data["free_transfers"], | |
| "picks": team_data, | |
| } | |
| except Exception as e: | |
| print(f"Error fetching team: {e}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def _load_default_solver_settings() -> dict: | |
| path = os.path.join(os.path.dirname(__file__), "comprehensive_settings.json") | |
| if os.path.exists(path): | |
| with open(path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| return {} | |
| def get_solver_default_settings(): | |
| return _load_default_solver_settings() | |
| async def run_solver(payload: SolveRequest): | |
| try: | |
| data_dict = ( | |
| payload.model_dump() if hasattr(payload, "model_dump") else payload.dict() | |
| ) | |
| # 2. Run the Data Prepper | |
| solver_input = prep_solver_data(data_dict) | |
| # 3. Run the Math Engine | |
| optimal_moves = run_milp_model(solver_input) | |
| return optimal_moves | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| class SensitivityRequest(SolveRequest): | |
| num_sims: int = 50 | |
| analysis_gw: Optional[int] = None # kept for compat, ignored internally | |
| async def run_sensitivity_analysis(payload: SensitivityRequest): | |
| """ | |
| Runs num_sims solves (iterations=1, per-player noise). For regular GWs | |
| aggregates buy/sell/move transfers; for WC/FH GWs aggregates squad | |
| selection % (PSB), lineup %, and positional lineup combinations. | |
| """ | |
| import random | |
| from collections import Counter | |
| try: | |
| data_dict = ( | |
| payload.model_dump() if hasattr(payload, "model_dump") else payload.dict() | |
| ) | |
| num_sims = int(data_dict.pop("num_sims", 50)) | |
| data_dict.pop("analysis_gw", None) | |
| horizon_gws = [int(g) for g in data_dict.get("horizon_gws", [])] | |
| base_settings = dict(data_dict.get("settings") or {}) | |
| base_settings["iterations"] = 1 | |
| # Detect which GWs have WC/FH active | |
| chip_free_gws: set[int] = set() | |
| for chip_key in ("use_wc", "use_fh"): | |
| for g in base_settings.get(chip_key) or []: | |
| chip_free_gws.add(int(g)) | |
| id_to_name: dict[int, str] = {} | |
| id_to_pos: dict[int, str] = {} | |
| for p in data_dict["market_players"]: | |
| pid = int(p["id"]) | |
| id_to_name[pid] = p["name"] | |
| id_to_pos[pid] = p["pos"] | |
| print( | |
| f"Sensitivity: running {num_sims} sims across {len(horizon_gws)} GWs " | |
| f"(chip-free GWs: {chip_free_gws or 'none'})..." | |
| ) | |
| # Regular GW accumulators | |
| gw_data: dict[int, dict] = { | |
| gw: {"buys": {}, "sells": {}, "moves": {}, "lineups": {}, "no_transfer": 0} | |
| for gw in horizon_gws | |
| if gw not in chip_free_gws | |
| } | |
| # WC/FH GW accumulators: per-player squad & lineup counts + combo counters | |
| wc_data: dict[int, dict] = { | |
| gw: { | |
| "squad": {}, # {name: count} (in the 15-man squad) | |
| "lineup": {}, # {name: count} (in the 11-man lineup) | |
| "combos": { | |
| "G": Counter(), | |
| "D": Counter(), | |
| "M": Counter(), | |
| "F": Counter(), | |
| }, | |
| } | |
| for gw in horizon_gws | |
| if gw in chip_free_gws | |
| } | |
| valid_runs = 0 | |
| for sim_idx in range(num_sims): | |
| noisy_players = [] | |
| for p in data_dict["market_players"]: | |
| noise = random.gauss(1.0, 0.12) | |
| noise = max(0.3, min(2.5, noise)) | |
| noisy_evs = {k: round(float(v) * noise, 4) for k, v in p["evs"].items()} | |
| noisy_players.append({**p, "evs": noisy_evs}) | |
| sim_data = { | |
| **data_dict, | |
| "market_players": noisy_players, | |
| "settings": {**base_settings}, | |
| } | |
| try: | |
| solver_input = prep_solver_data(sim_data) | |
| result = run_milp_model(solver_input) | |
| if result["status"] != "success" or not result["solutions"]: | |
| continue | |
| sol = result["solutions"][0] | |
| except Exception as sim_err: | |
| print(f" Sim {sim_idx + 1} failed: {sim_err}") | |
| continue | |
| valid_runs += 1 | |
| for gw_plan in sol["plan"]: | |
| gw = gw_plan["gw"] | |
| if gw in wc_data: | |
| # --- WC/FH GW: squad & lineup selection --- | |
| all_ids = list(gw_plan.get("lineup", [])) + list( | |
| gw_plan.get("bench", []) | |
| ) | |
| lineup_ids = set(gw_plan.get("lineup", [])) | |
| for pid in all_ids: | |
| name = id_to_name.get(pid, str(pid)) | |
| wc_data[gw]["squad"][name] = ( | |
| wc_data[gw]["squad"].get(name, 0) + 1 | |
| ) | |
| if pid in lineup_ids: | |
| wc_data[gw]["lineup"][name] = ( | |
| wc_data[gw]["lineup"].get(name, 0) + 1 | |
| ) | |
| # Lineup combos per position | |
| for pid in lineup_ids: | |
| pass # we accumulate below | |
| pos_players: dict[str, list[str]] = { | |
| "G": [], | |
| "D": [], | |
| "M": [], | |
| "F": [], | |
| } | |
| for pid in sorted(lineup_ids): | |
| pos = id_to_pos.get(pid, "M") | |
| name = id_to_name.get(pid, str(pid)) | |
| if pos in pos_players: | |
| pos_players[pos].append(name) | |
| for pos, names in pos_players.items(): | |
| combo = frozenset(names) | |
| if combo: | |
| wc_data[gw]["combos"][pos][combo] += 1 | |
| elif gw in gw_data: | |
| # --- Regular GW: buy/sell/move --- | |
| transfers_out_ids = gw_plan.get("transfers_out", []) | |
| transfers_in_ids = gw_plan.get("transfers_in", []) | |
| if not transfers_in_ids: | |
| gw_data[gw]["no_transfer"] += 1 | |
| else: | |
| buy_names = [ | |
| id_to_name.get(pid, str(pid)) for pid in transfers_in_ids | |
| ] | |
| sell_names = [ | |
| id_to_name.get(pid, str(pid)) for pid in transfers_out_ids | |
| ] | |
| for name in buy_names: | |
| gw_data[gw]["buys"][name] = ( | |
| gw_data[gw]["buys"].get(name, 0) + 1 | |
| ) | |
| for name in sell_names: | |
| gw_data[gw]["sells"][name] = ( | |
| gw_data[gw]["sells"].get(name, 0) + 1 | |
| ) | |
| sorted_buys = sorted(buy_names) | |
| sorted_sells = sorted(sell_names) | |
| if sorted_buys and sorted_sells: | |
| mk = ( | |
| f"{', '.join(sorted_sells)} -> {', '.join(sorted_buys)}" | |
| ) | |
| gw_data[gw]["moves"][mk] = ( | |
| gw_data[gw]["moves"].get(mk, 0) + 1 | |
| ) | |
| for pid in gw_plan.get("lineup", []): | |
| name = id_to_name.get(pid, str(pid)) | |
| gw_data[gw]["lineups"][name] = ( | |
| gw_data[gw]["lineups"].get(name, 0) + 1 | |
| ) | |
| if valid_runs == 0: | |
| raise Exception( | |
| "All sensitivity simulations failed. Check squad/budget settings." | |
| ) | |
| def to_pct_list(counter: dict, top_n: int = 20) -> list: | |
| return [ | |
| {"name": k, "count": v, "pct": round(v / valid_runs * 100, 1)} | |
| for k, v in sorted(counter.items(), key=lambda x: -x[1])[:top_n] | |
| ] | |
| name_to_pos: dict[str, str] = {v: id_to_pos[k] for k, v in id_to_name.items()} | |
| gw_results: dict[str, dict] = {} | |
| # --- Regular GW results --- | |
| for gw in horizon_gws: | |
| if gw in chip_free_gws: | |
| continue | |
| if gw not in gw_data: | |
| continue | |
| d = gw_data[gw] | |
| pos_groups: dict[str, list] = {"G": [], "D": [], "M": [], "F": []} | |
| for name, cnt in sorted(d["lineups"].items(), key=lambda x: -x[1]): | |
| pos = name_to_pos.get(name, "M") | |
| pct = round(cnt / valid_runs * 100, 1) | |
| if pos in pos_groups: | |
| pos_groups[pos].append({"name": name, "pct": pct, "count": cnt}) | |
| gw_results[str(gw)] = { | |
| "is_chip_free": False, | |
| "buys": to_pct_list(d["buys"]), | |
| "sells": to_pct_list(d["sells"]), | |
| "moves": to_pct_list(d["moves"]), | |
| "lineups": {pos: rows[:8] for pos, rows in pos_groups.items()}, | |
| "no_transfer_pct": round(d["no_transfer"] / valid_runs * 100, 1), | |
| } | |
| # --- WC/FH GW results --- | |
| for gw in horizon_gws: | |
| if gw not in wc_data: | |
| continue | |
| wd = wc_data[gw] | |
| # Per-player squad/lineup pct grouped by position | |
| player_data_by_pos: dict[str, list] = {"G": [], "D": [], "M": [], "F": []} | |
| all_names = set(list(wd["squad"].keys()) + list(wd["lineup"].keys())) | |
| for name in all_names: | |
| sq_cnt = wd["squad"].get(name, 0) | |
| lu_cnt = wd["lineup"].get(name, 0) | |
| pos = name_to_pos.get(name, "M") | |
| if pos in player_data_by_pos: | |
| player_data_by_pos[pos].append( | |
| { | |
| "name": name, | |
| "squad_pct": round(sq_cnt / valid_runs * 100, 1), | |
| "lineup_pct": round(lu_cnt / valid_runs * 100, 1), | |
| "squad_count": sq_cnt, | |
| "lineup_count": lu_cnt, | |
| } | |
| ) | |
| # Sort each position by squad_pct descending | |
| for pos in player_data_by_pos: | |
| player_data_by_pos[pos].sort(key=lambda x: -x["squad_pct"]) | |
| player_data_by_pos[pos] = player_data_by_pos[pos][:12] | |
| # Lineup combos per position (top 5) | |
| combo_data: dict[str, list] = {} | |
| for pos in ("G", "D", "M", "F"): | |
| combos = wd["combos"][pos] | |
| sorted_combos = sorted(combos.items(), key=lambda x: -x[1])[:5] | |
| combo_data[pos] = [ | |
| { | |
| "combination": ", ".join(sorted(combo)), | |
| "pct": round(cnt / valid_runs * 100, 1), | |
| "count": cnt, | |
| } | |
| for combo, cnt in sorted_combos | |
| ] | |
| gw_results[str(gw)] = { | |
| "is_chip_free": True, | |
| "players": player_data_by_pos, | |
| "combos": combo_data, | |
| } | |
| return { | |
| "status": "success", | |
| "num_sims": num_sims, | |
| "valid_runs": valid_runs, | |
| "horizon_gws": horizon_gws, | |
| "gw_results": gw_results, | |
| } | |
| except Exception as e: | |
| print(f"Sensitivity error: {e}") | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| def _generate_chip_combos(chip_gw_options: dict) -> list[dict]: | |
| """ | |
| Generate all valid chip combinations from the per-chip GW option lists. | |
| Rules: | |
| - At most one chip per GW. | |
| - Each chip type used at most once. | |
| - Returns list of dicts like {"use_wc": [], "use_fh": [37], "use_bb": [33], "use_tc": []}. | |
| """ | |
| chip_types = ["wc", "fh", "bb", "tc"] | |
| options: list[list] = [] | |
| for c in chip_types: | |
| gws = [int(g) for g in (chip_gw_options.get(c) or [])] | |
| options.append([None] + gws) # None = don't use this chip | |
| valid: list[dict] = [] | |
| for combo in _itertools.product(*options): | |
| # combo = (wc_gw|None, fh_gw|None, bb_gw|None, tc_gw|None) | |
| used_gws = [g for g in combo if g is not None] | |
| if len(used_gws) != len(set(used_gws)): | |
| continue # Two chips assigned to same GW — invalid | |
| valid.append( | |
| { | |
| f"use_{c}": ([g] if g is not None else []) | |
| for c, g in zip(chip_types, combo) | |
| } | |
| ) | |
| return valid | |
| async def run_chip_solver(payload: ChipSolveRequest): | |
| """ | |
| Evaluate all valid chip combinations from the supplied option lists and | |
| return the top solutions ranked by objective score. | |
| Chip-solve uses fixed settings: decay=1.017, ft_value=0, | |
| ft_value_list all zeros, itb_value=0, ft_use_penalty=0. | |
| """ | |
| try: | |
| data_dict = ( | |
| payload.model_dump() if hasattr(payload, "model_dump") else payload.dict() | |
| ) | |
| chip_gw_options: dict = data_dict.pop("chip_gw_options", {}) | |
| # Fixed chip-solve settings (per run_parallel.py conventions) | |
| chip_fixed = { | |
| "decay_base": 1.017, | |
| "ft_value": 0.0, | |
| "ft_value_list": {}, | |
| "itb_value": 0.0, | |
| "ft_use_penalty": 0.0, | |
| "no_transfer_last_gws": 0, | |
| "iterations": 1, | |
| } | |
| base_settings = {**data_dict.get("settings", {}), **chip_fixed} | |
| combos = _generate_chip_combos(chip_gw_options) | |
| if not combos: | |
| raise Exception( | |
| "No valid chip combinations generated from the supplied GW options." | |
| ) | |
| # Cap at 30 combinations to keep runtime reasonable | |
| combos = combos[:30] | |
| print(f"Chip solve: evaluating {len(combos)} valid chip combination(s)...") | |
| all_solutions = [] | |
| for idx, combo in enumerate(combos): | |
| combo_settings = {**base_settings, **combo} | |
| combo_data = {**data_dict, "settings": combo_settings} | |
| try: | |
| solver_input = prep_solver_data(combo_data) | |
| result = run_milp_model(solver_input) | |
| if result["status"] == "success" and result["solutions"]: | |
| sol = result["solutions"][0] | |
| sol["chip_combo"] = combo # tag which chips were used | |
| sol["combo_id"] = idx + 1 | |
| all_solutions.append(sol) | |
| except Exception as combo_err: | |
| print(f" Combo {idx + 1} ({combo}) failed: {combo_err}") | |
| continue | |
| if not all_solutions: | |
| raise Exception("All chip combinations failed to find optimal solutions.") | |
| all_solutions.sort( | |
| key=lambda s: -(float(s.get("objective_score") or s.get("ev") or 0)) | |
| ) | |
| return {"status": "success", "solutions": all_solutions} | |
| except Exception as e: | |
| print(f"Chip solve error: {e}") | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def save_user_settings(payload: SettingsPayload, db: Session = Depends(get_db)): | |
| user = db.query(User).filter(User.default_team_id == payload.team_id).first() | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| user.solver_settings = { | |
| "quick": payload.quick_settings, | |
| "advanced": payload.advanced_settings, | |
| } | |
| # THE FIX: Violently force SQLAlchemy to commit the JSON column | |
| flag_modified(user, "solver_settings") | |
| db.commit() | |
| return {"status": "success", "message": "Settings saved to cloud."} | |
| # 3. The LOAD Route (GET) | |
| async def load_user_settings(team_id: int, db: Session = Depends(get_db)): | |
| # Find the user by their team_id | |
| user = db.query(User).filter(User.default_team_id == team_id).first() | |
| if not user or not user.solver_settings: | |
| # If no user or no settings, return nulls so React uses local defaults | |
| return {"status": "success", "quick_settings": None, "advanced_settings": None} | |
| return { | |
| "status": "success", | |
| "quick_settings": user.solver_settings.get("quick"), | |
| "advanced_settings": user.solver_settings.get("advanced"), | |
| } | |
| class FixtureOverrideRequest(BaseModel): | |
| overrides: Dict[str, Any] | |
| is_admin: bool = False | |
| admin_password: Optional[str] = None | |
| def get_fixture_overrides(): | |
| # Serves the global fixtures to everyone who loads the website | |
| return app_data.admin_fixture_overrides | |
| def update_fixture_overrides(req: FixtureOverrideRequest): | |
| if req.is_admin: | |
| if req.admin_password != ADMIN_PASSWORD: | |
| raise HTTPException(status_code=401, detail="Invalid admin password") | |
| # 1. Update Python's active RAM instantly! | |
| app_data.admin_fixture_overrides = req.overrides | |
| # 2. Save it to the Hard Drive permanently | |
| save_config_to_db("admin_fixtures", app_data.admin_fixture_overrides) | |
| return {"status": "success", "message": "Global fixtures updated!"} | |
| raise HTTPException(status_code=401, detail="Unauthorized") | |
| def get_xmins_overrides(): | |
| return app_data.admin_xmins_overrides | |
| def get_ratings_history(): | |
| try: | |
| # Reads your history CSV and sends it to React as a JSON array | |
| import pandas as pd | |
| df = pd.read_csv("team_ratings_history.csv") | |
| return df.to_dict(orient="records") | |
| except Exception: | |
| return [] | |