import pandas as pd import numpy as np import math from scipy.stats import nbinom def poisson_probability_of_conceding_2_or_more_goals(lambd): """Calculates the probability of conceding 2 or more goals using Poisson distribution.""" p_0 = math.exp(-lambd) p_1 = lambd * math.exp(-lambd) return 1 - p_0 - p_1 def poisson_pmf(k, lambd): """Calculates the Poisson Probability Mass Function P(X=k).""" if k < 0: return 0.0 if lambd < 1e-9: # Treat very small lambda as zero for stability return 1.0 if k == 0 else 0.0 return (lambd**k * math.exp(-lambd)) / math.factorial(k) def neg_binom_probability_of_value(expected_mean, value, dispersion=1.0): """ Calculates the exact probability (PMF) of getting exactly 'value' events. Used for: Saves, Goals, Assists. """ if expected_mean <= 0: return 0.0 if dispersion <= 1.0: # Fallback to Poisson if no dispersion return poisson_pmf(value, expected_mean) # Convert Mean + Dispersion to n, p p = 1 / dispersion n = (expected_mean * p) / (1 - p) return nbinom.pmf(value, n, p) def neg_binom_probability_at_least(expected_mean, threshold, dispersion=1.0): """ Calculates probability of getting 'threshold' OR MORE events. Used for: DefCons (CBIT), Recoveries. """ if expected_mean <= 0: return 0.0 if dispersion <= 1.0: # Use existing Poisson logic if dispersion is low return 1 - poisson_cdf(threshold - 1, expected_mean) p = 1 / dispersion n = (expected_mean * p) / (1 - p) # Probability of X >= threshold is (1 - CDF(threshold - 1)) return 1 - nbinom.cdf(threshold - 1, n, p) def calculate_expected_conceded_points(lambd): """ Calculates the expected fantasy points from goals conceded based on a -1 point penalty for every 2 goals. """ total_expected_points = 0 max_goals_to_check = 10 for k in range(max_goals_to_check + 1): prob_k = poisson_pmf(k=k, lambd=lambd) points_for_k_goals = -(k // 2) total_expected_points += prob_k * points_for_k_goals return total_expected_points def poisson_cdf(k, lambd): """Calculates the Poisson Cumulative Distribution Function P(X<=k).""" if k < 0: return 0.0 if lambd < 1e-9: # Treat very small lambda as zero for stability return 1.0 if k >= 0 else 0.0 return sum(poisson_pmf(i, lambd) for i in range(math.floor(k) + 1)) def apply_team_skepticism(df, skepticism_factors): """ Applies a skepticism multiplier to a player's base points based on their team. """ if not skepticism_factors: return df for team_id, multiplier in skepticism_factors.items(): players_on_team = df[df["team"] == team_id].index df.loc[players_on_team, "base_pts"] *= multiplier return df def calculate_single_match_points( player, match_row, xMins_in_match, points_config, player_penalty_shares, is_gk=False, is_def=False, is_mid=False, is_fwd=False, ): """ Calculates points for a single match given the xMins and match projections. Includes full logic for CBIT, CBITR, Penalty Saves, and dynamic BPS. """ if xMins_in_match <= 0: return {"pts": 0.0, "xG": 0.0, "xA": 0.0, "CS": 0.0, "cbit": 0.0, "cbitr": 0.0} scaling_factor = xMins_in_match / 90.0 player_team_num = player["team"] player_pos = player["element_type"] # 1. Identify Home/Away and get Opponent Stats if player_team_num == match_row["home_team_num"]: team_proj_goals = match_row["mc_home_goals_mean"] team_conc_goals = match_row["mc_away_goals_mean"] team_proj_assists = match_row["mc_home_assists_xa_mean"] team_proj_cbit = match_row["mc_home_CBIT_mean"] team_proj_cbitr = match_row["mc_home_CBITR_mean"] team_proj_saves = match_row["mc_home_keeper_saves_mean"] team_proj_yc = match_row["mc_home_yc_mean"] team_proj_rc = match_row["mc_home_rc_mean"] cs_odds = match_row["home_clean_sheet_odds"] else: team_proj_goals = match_row["mc_away_goals_mean"] team_conc_goals = match_row["mc_home_goals_mean"] team_proj_assists = match_row["mc_away_assists_xa_mean"] team_proj_cbit = match_row["mc_away_CBIT_mean"] team_proj_cbitr = match_row["mc_away_CBITR_mean"] team_proj_saves = match_row["mc_away_keeper_saves_mean"] team_proj_yc = match_row["mc_away_yc_mean"] team_proj_rc = match_row["mc_away_rc_mean"] cs_odds = match_row["away_clean_sheet_odds"] # 2. Player Share Calculations proj_goals = player["xG_share"] * team_proj_goals proj_assists = player["xA_share"] * team_proj_assists proj_cbit = player["xCBIT_share"] * team_proj_cbit proj_cbitr = player["xCBITR_share"] * team_proj_cbitr proj_saves = 0 proj_pen_saves = 0 if is_gk: proj_saves = (player["baseline_xSaves_p90"] + team_proj_saves) / 2 proj_pen_saves = player["baseline_pksave_p90"] # --- GOALS & ASSISTS --- pts_goals = ( sum( poisson_pmf(k, proj_goals) * k * points_config["goal"][player_pos] for k in range(9) ) * scaling_factor ) pts_assists = ( sum( poisson_pmf(k, proj_assists) * k * points_config["assist"] for k in range(9) ) * scaling_factor ) # --- CLEAN SHEET & CONCEDED --- pts_cs = ( cs_odds * points_config["clean_sheet"][player_pos] if xMins_in_match >= 60 else (cs_odds * points_config["clean_sheet"][player_pos]) * scaling_factor ) pts_conc = ( calculate_expected_conceded_points(team_conc_goals) * scaling_factor if (is_gk or is_def) and team_conc_goals is not None else 0.0 ) # --- CARDS --- pts_yc = (player["YC_share"] * team_proj_yc * -1) * scaling_factor pts_rc = (player["RC_share"] * team_proj_rc * -3) * scaling_factor # --- SAVES & PENALTY SAVES (GK) --- pts_saves = 0.0 pts_pen_save = 0.0 if is_gk: expected_saves_pts_unscaled = sum( neg_binom_probability_of_value(proj_saves, k, dispersion=1.5) * ((k // 3) * points_config["saves_per_3"]) for k in range(21) ) pts_saves = expected_saves_pts_unscaled * scaling_factor expected_pen_saved_pts_unscaled = sum( poisson_pmf(k, proj_pen_saves) * (k * 5) for k in range(3) ) pts_pen_save = expected_pen_saved_pts_unscaled * scaling_factor # --- CBIT & CBITR --- pts_cbit = ( ( neg_binom_probability_at_least(proj_cbit, 10, dispersion=3.2) * 2 * scaling_factor ) if is_def else 0.0 ) pts_cbitr = 0.0 if is_mid: pts_cbitr = ( neg_binom_probability_at_least(proj_cbitr, 12, dispersion=2.8) * 2 * scaling_factor ) elif is_fwd: pts_cbitr = ( neg_binom_probability_at_least(proj_cbitr, 12, dispersion=1.7) * 2 * scaling_factor ) # --- PENALTY POINTS (Taker) --- pts_penalty = 0.0 if player_penalty_shares and player["id"] in player_penalty_shares: pen_share = player_penalty_shares[player["id"]] base_pen_pts = points_config["penalty_points_per_position"].get(player_pos, 0) pts_penalty = (base_pen_pts * pen_share) * scaling_factor # --- APPEARANCE --- pts_app = 2 if xMins_in_match > 60 else (1 if xMins_in_match > 0 else 0) # --- BONUS POINTS --- bps_floor = player["baseline_bps_floor_p90"] * scaling_factor bps_mins = 6 if xMins_in_match >= 60 else (3 if xMins_in_match > 0 else 0) scaled_goals = proj_goals * scaling_factor scaled_assists = proj_assists * scaling_factor scaled_saves = proj_saves * scaling_factor if is_gk else 0 scaled_pen_saves = proj_pen_saves * scaling_factor if is_gk else 0 scaled_yc = player["YC_share"] * team_proj_yc * scaling_factor scaled_rc = player["RC_share"] * team_proj_rc * scaling_factor bps_goals = scaled_goals * (24 if is_fwd else (18 if is_mid else 12)) bps_assists = scaled_assists * 9 bps_cs = cs_odds * 12 if (is_gk or is_def) and xMins_in_match >= 60 else 0 bps_saves = scaled_saves * 2 bps_pen_saves = scaled_pen_saves * 15 bps_cards = (scaled_yc * -3) + (scaled_rc * -9) total_projected_bps = ( bps_floor + bps_mins + bps_goals + bps_assists + bps_cs + bps_saves + bps_pen_saves + bps_cards ) pts_bonus = total_projected_bps / 29.4 if not is_gk else 0.0 # --- FINAL SUM --- total_pts = ( pts_goals + pts_assists + pts_cs + pts_conc + pts_yc + pts_rc + pts_saves + pts_pen_save + pts_cbit + pts_cbitr + pts_penalty + pts_app + pts_bonus ) return { "pts": total_pts, "xG": proj_goals * scaling_factor, "xA": proj_assists * scaling_factor, "CS": cs_odds if xMins_in_match >= 60 else cs_odds * scaling_factor, "cbit": proj_cbit * scaling_factor, "cbitr": proj_cbitr * scaling_factor, } def calculate_all_points( player_df_base, match_df, player_penalty_shares, MINS_SCALING_BONUS, pos_map, teams_dict_1, teams_dict, points_config, effective_xmins_overrides, MINS_THRESHOLD, RAMP_UP_PERIOD, decay_rates, ramp_up_rates, user_player_status_overrides, team_skepticism, effective_availability_multipliers, ): RAMP_UP_PERIOD = 3 player_df = player_df_base.copy() final_df_output = pd.DataFrame( { "Pos": player_df["element_type"].map(pos_map), "ID": player_df["id"], "Name": player_df["web_name"], "BV": player_df["now_cost"], "SV": player_df["now_cost"], "Team": player_df["Team"], } ) continuous_xMins_progression = player_df["baseline_xMins"].copy() has_baseline_xmins_override = getattr(player_df, "attrs", {}).get( "has_baseline_xmins_override", False ) all_baseline_overrides = getattr(player_df, "attrs", {}).get( "all_baseline_overrides", {} ) unique_gws = sorted(match_df["GW"].unique()) match_projections_col = {index: {} for index in player_df.index} for gw_idx, gw in enumerate(unique_gws): if has_baseline_xmins_override and gw == 1: for index, player in player_df.iterrows(): player_id = player["id"] if ( player_id in all_baseline_overrides and "baseline_xMins" in all_baseline_overrides[player_id] ): continuous_xMins_progression.loc[index] = all_baseline_overrides[ player_id ]["baseline_xMins"] gw_calc_df = pd.DataFrame(index=player_df.index) gw_calc_df["team"] = player_df["team"] gw_calc_df["id"] = player_df["id"] gw_calc_df["web_name"] = player_df["web_name"] gw_calc_df["player_name"] = player_df["name"] gw_calc_df["xG_share"] = player_df["xG_share"] gw_calc_df["xA_share"] = player_df["xA_share"] gw_calc_df["baseline_xMins"] = player_df["baseline_xMins"] gw_calc_df["baseline_bps_floor_p90"] = player_df["baseline_bps_floor_p90"] gw_calc_df["base_pts"] = 0.0 # VECTORIZED XMINS CALCULATION player_ids_array = player_df["id"].values n_players = len(player_ids_array) status_list = [ user_player_status_overrides.get(pid, {"status": "default"})["status"] for pid in player_ids_array ] weeks_out_list = [ user_player_status_overrides.get(pid, {}).get("weeks_out", 0) for pid in player_ids_array ] status_array = np.array(status_list, dtype=object) weeks_out_array = np.array(weeks_out_list) is_not_starter = status_array == "not_a_starter" is_suspended = status_array == "suspended" is_injured = status_array == "injured" is_default = ~(is_not_starter | is_suspended | is_injured) baseline_mins_array = player_df["baseline_xMins"].values prev_continuous_xmins_array = continuous_xMins_progression.values calculated_xmins_array = np.zeros(n_players, dtype=float) next_continuous_xmins_array = np.zeros(n_players, dtype=float) first_gw = min(unique_gws) is_first_gw = gw == first_gw is_available_first_gw = ~(is_not_starter | is_suspended | is_injured) # CASE 1: First GW + Available if is_first_gw: mask_first_available = is_available_first_gw calculated_xmins_array[mask_first_available] = baseline_mins_array[ mask_first_available ] calculated_xmins_array[is_not_starter] = 0 # CASE 3: Suspended mask_suspended_during = is_suspended & (gw <= weeks_out_array) mask_suspended_return = is_suspended & (gw == weeks_out_array + 1) mask_suspended_after = is_suspended & (gw > weeks_out_array + 1) calculated_xmins_array[mask_suspended_during] = 0 calculated_xmins_array[mask_suspended_return] = baseline_mins_array[ mask_suspended_return ] decay_rate_susp = decay_rates.get("suspended", decay_rates.get("default", 0.99)) ramp_rate_susp = ramp_up_rates.get("suspended", ramp_up_rates.get("default", 0)) mask_susp_decay = mask_suspended_after & ( prev_continuous_xmins_array >= MINS_THRESHOLD ) mask_susp_ramp = mask_suspended_after & ( prev_continuous_xmins_array < MINS_THRESHOLD ) calculated_xmins_array[mask_susp_decay] = ( prev_continuous_xmins_array[mask_susp_decay] * decay_rate_susp ) calculated_xmins_array[mask_susp_ramp] = np.minimum( prev_continuous_xmins_array[mask_susp_ramp] + ramp_rate_susp, 90 ) # CASE 4: Injured mask_injured_out = is_injured & (gw <= weeks_out_array) calculated_xmins_array[mask_injured_out] = 0 mask_injured_recovering = is_injured & (gw > weeks_out_array) weeks_since_injury_array = np.maximum(0, gw - weeks_out_array) mask_ramp_phase = mask_injured_recovering & ( weeks_since_injury_array <= RAMP_UP_PERIOD ) calculated_xmins_array[mask_ramp_phase] = ( baseline_mins_array[mask_ramp_phase] / RAMP_UP_PERIOD ) * weeks_since_injury_array[mask_ramp_phase] mask_post_ramp = mask_injured_recovering & ( weeks_since_injury_array > RAMP_UP_PERIOD ) decay_rate_default = decay_rates.get("default", 0.99) ramp_rate_default = ramp_up_rates.get( "default", ramp_up_rates.get("injured", 0) ) mask_post_decay = mask_post_ramp & ( prev_continuous_xmins_array >= MINS_THRESHOLD ) mask_post_ramp_up = mask_post_ramp & ( prev_continuous_xmins_array < MINS_THRESHOLD ) calculated_xmins_array[mask_post_decay] = ( prev_continuous_xmins_array[mask_post_decay] * decay_rate_default ) calculated_xmins_array[mask_post_ramp_up] = np.minimum( prev_continuous_xmins_array[mask_post_ramp_up] + ramp_rate_default, 90 ) # CASE 5: Default/healthy mask_default_calc = is_default & ~(is_first_gw & is_available_first_gw) element_type_array = player_df["element_type"].values is_gk = element_type_array == 1 mask_gk_default = mask_default_calc & is_gk calculated_xmins_array[mask_gk_default] = prev_continuous_xmins_array[ mask_gk_default ] mask_outfield_default = mask_default_calc & (~is_gk) mask_outf_decay = mask_outfield_default & ( prev_continuous_xmins_array >= MINS_THRESHOLD ) calculated_xmins_array[mask_outf_decay] = ( prev_continuous_xmins_array[mask_outf_decay] * decay_rate_default ) mask_outf_ramp = ( mask_outfield_default & (prev_continuous_xmins_array < MINS_THRESHOLD) & (baseline_mins_array > 0) ) calculated_xmins_array[mask_outf_ramp] = np.minimum( prev_continuous_xmins_array[mask_outf_ramp] + ramp_rate_default, 90 ) calculated_xmins_array = np.clip(calculated_xmins_array, 0, 90) next_continuous_xmins_array = calculated_xmins_array.copy() # APPLY OVERRIDES AND AVAILABILITY xMins_for_current_gw_display = calculated_xmins_array.copy() for idx in range(n_players): player_id = player_ids_array[idx] availability_mult = effective_availability_multipliers.get( player_id, {} ).get(gw, 1.0) xMins_for_current_gw_display[idx] *= availability_mult if ( player_id in effective_xmins_overrides and gw in effective_xmins_overrides[player_id] ): xMins_for_current_gw_display[idx] = effective_xmins_overrides[ player_id ][gw] xMins_for_current_gw_display = pd.Series( xMins_for_current_gw_display, index=player_df.index ) next_gw_continuous_xMins = pd.Series( next_continuous_xmins_array, index=player_df.index ) gw_calc_df[f"{gw}_xMins"] = xMins_for_current_gw_display # STREAMLINED MATCH SCORING LOOP gw_matches = match_df[match_df["GW"] == gw] for index, player in player_df.iterrows(): player_team_num = player["team"] my_matches = gw_matches[ (gw_matches["home_team_num"] == player_team_num) | (gw_matches["away_team_num"] == player_team_num) ] if my_matches.empty: gw_calc_df.loc[index, "base_pts"] = 0 gw_calc_df.loc[index, f"{gw}_xMins"] = 0 gw_calc_df.loc[index, "gw_xG"] = 0.0 gw_calc_df.loc[index, "gw_xA"] = 0.0 gw_calc_df.loc[index, "gw_CS"] = 0.0 gw_calc_df.loc[index, "gw_cbit"] = 0.0 gw_calc_df.loc[index, "gw_cbitr"] = 0.0 continue base_gw_mins = gw_calc_df.loc[index, f"{gw}_xMins"] mins_per_match = ( base_gw_mins * 0.97 if len(my_matches) > 1 and base_gw_mins > 35 else base_gw_mins ) total_gw_pts = 0 total_gw_xg = 0 total_gw_xa = 0 total_gw_cs = 0 total_gw_cbit = 0 total_gw_cbitr = 0 for _, match_row in my_matches.iterrows(): stats = calculate_single_match_points( player=player, match_row=match_row, xMins_in_match=mins_per_match, points_config=points_config, player_penalty_shares=player_penalty_shares, is_gk=(player["element_type"] == 1), is_def=(player["element_type"] == 2), is_mid=(player["element_type"] == 3), is_fwd=(player["element_type"] == 4), ) total_gw_pts += stats["pts"] total_gw_xg += stats["xG"] total_gw_xa += stats["xA"] total_gw_cs += stats["CS"] total_gw_cbit += stats["cbit"] total_gw_cbitr += stats["cbitr"] is_home = player_team_num == match_row["home_team_num"] opp_num = ( match_row["away_team_num"] if is_home else match_row["home_team_num"] ) match_id = ( f"{match_row['home_team_num']}_vs_{match_row['away_team_num']}" ) match_projections_col[index][match_id] = { "opponent_team_id": int(opp_num), "is_home": bool(is_home), "default_gw": int(gw), "Pts": round(stats["pts"], 3), "xMins": round(mins_per_match, 1), "xG": round(stats["xG"], 3), "xA": round(stats["xA"], 3), "CS": round(stats["CS"], 3), } gw_calc_df.loc[index, "base_pts"] = total_gw_pts gw_calc_df.loc[index, "gw_xG"] = total_gw_xg gw_calc_df.loc[index, "gw_xA"] = total_gw_xa gw_calc_df.loc[index, "gw_CS"] = total_gw_cs gw_calc_df.loc[index, "gw_cbit"] = total_gw_cbit gw_calc_df.loc[index, "gw_cbitr"] = total_gw_cbitr gw_calc_df = apply_team_skepticism(gw_calc_df, team_skepticism) gw_calc_df["total_pts"] = gw_calc_df["base_pts"] final_df_output[f"{gw}_xMins"] = round(gw_calc_df[f"{gw}_xMins"], 0) final_df_output[f"{gw}_Pts"] = round(gw_calc_df["total_pts"], 2) final_df_output[f"{gw}_xG"] = round(gw_calc_df["gw_xG"], 2) final_df_output[f"{gw}_xA"] = round(gw_calc_df["gw_xA"], 2) final_df_output[f"{gw}_CS"] = gw_calc_df["gw_CS"] final_df_output[f"{gw}_cbit"] = gw_calc_df["gw_cbit"] final_df_output[f"{gw}_cbitr"] = gw_calc_df["gw_cbitr"] continuous_xMins_progression = next_gw_continuous_xMins.copy() final_df_output["Total Points"] = final_df_output.filter(like="_Pts").sum(axis=1) final_df_output["Average Points"] = round( (final_df_output.filter(like="_Pts").sum(axis=1)) / len(unique_gws), 2 ) final_df_output["match_projections"] = pd.Series(match_projections_col) return final_df_output