import numpy as np import pandas as pd from collections import defaultdict from unidecode import unidecode from sklearn.metrics import log_loss, accuracy_score def prepare_features(data_raw: pd.DataFrame, window: int = 7, verbose: bool = True): """Prepare features from raw EPL data. Returns (feat_df, X_cols, WINDOW, base_df) - feat_df: DataFrame with features aligned to training - X_cols: list of feature column names used for modeling - WINDOW: the rolling window used - base_df: base match DataFrame with cleaned columns (date, home, away, ftr, ...) """ RENAME = { "Date":"date","Time":"time","HomeTeam":"home","AwayTeam":"away", "FTHG":"fthg","FTAG":"ftag","FTR":"ftr", "HTHG":"hthg","HTAG":"htag","HTR":"htr", "Referee":"ref", "HS":"hs","AS":"as","HST":"hst","AST":"ast", "HF":"hf","AF":"af","HC":"hc","AC":"ac", "HY":"hy","AY":"ay","HR":"hr","AR":"ar", # odds (Bet365, William Hill, Pinnacle(PS), VC) "B365H":"b365h","B365D":"b365d","B365A":"b365a", "WHH":"whh","WHD":"whd","WHA":"wha", "PSH":"psh","PSD":"psd","PSA":"psa", "VCH":"vch","VCD":"vcd","VCA":"vca", } df = data_raw.rename(columns=RENAME).copy() # parse date from datetime import datetime def parse_date(x): for fmt in ("%d/%m/%Y", "%d/%m/%y", "%Y-%m-%d"): try: return datetime.strptime(str(x), fmt) except Exception: pass return pd.NaT df["date"] = df["date"].map(parse_date) df = df[~df["date"].isna()].copy() # clean team names def clean_team(s): if pd.isna(s): return s s = unidecode(str(s)).strip() s = " ".join(s.split()) return s df["home"] = df["home"].map(clean_team) df["away"] = df["away"].map(clean_team) # keep valid rows df = df[(df["ftr"].isin(["H","D","A"])) & (~df["home"].isna()) & (~df["away"].isna())].copy() df.sort_values(["date","home","away"], inplace=True, ignore_index=True) # target label_map = {"H":0, "D":1, "A":2} df["y"] = df["ftr"].map(label_map) # ----------------------------- # 3) Odds → implied probabilities (normalize overround) # ----------------------------- def implied_probs(row, prefix): h,d,a = row.get(prefix+"h"), row.get(prefix+"d"), row.get(prefix+"a") if any(pd.isna([h,d,a])): return pd.Series([np.nan,np.nan,np.nan]) if min(h,d,a) <= 1.0: return pd.Series([np.nan,np.nan,np.nan]) inv = np.array([1/h, 1/d, 1/a], dtype=float) s = inv.sum() if s <= 0: return pd.Series([np.nan,np.nan,np.nan]) return pd.Series(inv / s) for bk in ["b365","wh","ps","vc"]: cols_exist = all([(bk+c) in df.columns for c in ["h","d","a"]]) if cols_exist: probs = df.apply(lambda r: implied_probs(r, bk), axis=1, result_type="expand") df[[f"p_{bk}_H", f"p_{bk}_D", f"p_{bk}_A"]] = probs prob_cols = [c for c in df.columns if c.startswith("p_") and c[-2:] in ["_H","_D","_A"]] def avg_prob(suffix): cols = [c for c in prob_cols if c.endswith(suffix)] return df[cols].mean(axis=1) df["p_odds_H"] = avg_prob("_H") df["p_odds_D"] = avg_prob("_D") df["p_odds_A"] = avg_prob("_A") # ----------------------------- # 4) Leak-free features: rolling form + simple Elo # ----------------------------- def result_points(ftr, is_home): if ftr == "D": return 1 if ftr == "H": return 3 if is_home else 0 if ftr == "A": return 0 if is_home else 3 return 0 tm_rows = [] for i, r in df.iterrows(): # home perspective tm_rows.append({ "match_id": i, "date": r["date"], "team": r["home"], "opp": r["away"], "is_home": 1, "gf": r["fthg"], "ga": r["ftag"], "shots_f": r.get("hs", np.nan), "shots_a": r.get("as", np.nan), "sot_f": r.get("hst", np.nan), "sot_a": r.get("ast", np.nan), "corn_f": r.get("hc", np.nan), "corn_a": r.get("ac", np.nan), "y_f": r.get("hy", np.nan), "y_a": r.get("ay", np.nan), "r_f": r.get("hr", np.nan), "r_a": r.get("ar", np.nan), "points": result_points(r["ftr"], True), }) # away perspective tm_rows.append({ "match_id": i, "date": r["date"], "team": r["away"], "opp": r["home"], "is_home": 0, "gf": r["ftag"], "ga": r["fthg"], "shots_f": r.get("as", np.nan), "shots_a": r.get("hs", np.nan), "sot_f": r.get("ast", np.nan), "sot_a": r.get("hst", np.nan), "corn_f": r.get("ac", np.nan), "corn_a": r.get("hc", np.nan), "y_f": r.get("ay", np.nan), "y_a": r.get("hy", np.nan), "r_f": r.get("ar", np.nan), "r_a": r.get("hr", np.nan), "points": result_points(r["ftr"], False), }) tm = pd.DataFrame(tm_rows).sort_values(["team","date"]).reset_index(drop=True) WINDOW = int(window) agg_cols = ["gf","ga","shots_f","shots_a","sot_f","sot_a","corn_f","corn_a","y_f","r_f","points"] for col in agg_cols: tm[f"roll_{col}"] = (tm.groupby("team")[col] .rolling(WINDOW, min_periods=1).mean() .shift(1) # ใช้ข้อมูลก่อนหน้าเท่านั้น .reset_index(level=0, drop=True)) # Elo (ง่าย) BASE_ELO = 1500.0 K = 20.0 HOME_ADV = 60.0 elo = defaultdict(lambda: BASE_ELO) elo_before_home, elo_before_away = [], [] df_sorted = df.sort_values("date").reset_index(drop=True) for i, r in df_sorted.iterrows(): h, a = r["home"], r["away"] eh, ea = elo[h], elo[a] elo_before_home.append(eh); elo_before_away.append(ea) ph = 1.0/(1.0 + 10**(-((eh+HOME_ADV)-ea)/400)) if r["ftr"] == "H": oh, oa = 1.0, 0.0 elif r["ftr"] == "D": oh, oa = 0.5, 0.5 else: oh, oa = 0.0, 1.0 elo[h] = eh + K*(oh - ph) elo[a] = ea + K*((1.0-oh) - (1.0-ph)) df_sorted["elo_home"] = elo_before_home df_sorted["elo_away"] = elo_before_away df_sorted["elo_diff"] = df_sorted["elo_home"] - df_sorted["elo_away"] # Merge rolling features into match rows home_tm = tm[tm["is_home"]==1].copy() away_tm = tm[tm["is_home"]==0].copy() home_feats = home_tm.filter(regex="^roll_").columns.tolist() hf = home_tm[["match_id"] + home_feats].rename(columns={c: f"home_{c}" for c in home_feats}) af = away_tm[["match_id"] + home_feats].rename(columns={c: f"away_{c}" for c in home_feats}) feat_df = df_sorted.merge(hf, left_index=True, right_on="match_id", how="left") \ .merge(af, left_index=True, right_on="match_id", how="left") # Fill odds missing (keep baseline) for c in ["p_odds_H","p_odds_D","p_odds_A"]: if c in feat_df.columns: feat_df[c] = feat_df[c].astype(float).fillna(feat_df[c].mean()) role_feats = [f"home_{c}" for c in home_feats] + [f"away_{c}" for c in home_feats] elo_feats = ["elo_home","elo_away","elo_diff"] odds_feats = ["p_odds_H","p_odds_D","p_odds_A"] X_cols = role_feats + elo_feats + odds_feats for c in X_cols: if c not in feat_df.columns: feat_df[c] = np.nan feat_df[c] = feat_df[c].astype(float).fillna(feat_df[c].median()) # ----------------------------- # 5) Time-based split (kept for compatibility, but not returned) # ----------------------------- n = len(feat_df) idx_train = int(n*0.70) idx_valid = int(n*0.85) if verbose and n > 0: dates_train = feat_df["date"].iloc[:idx_train].max() dates_valid = (feat_df["date"].iloc[idx_train:idx_valid].min(), feat_df["date"].iloc[idx_train:idx_valid].max()) dates_test = (feat_df["date"].iloc[idx_valid:].min(), feat_df["date"].iloc[idx_valid:].max()) print(f"Train up to: {dates_train:%Y-%m-%d}") print(f"Valid: {dates_valid[0]:%Y-%m-%d} .. {dates_valid[1]:%Y-%m-%d}") print(f"Test : {dates_test[0]:%Y-%m-%d} .. {dates_test[1]:%Y-%m-%d}") return feat_df, X_cols, WINDOW, df_sorted