|
|
import os |
|
|
import json |
|
|
import joblib |
|
|
import datetime |
|
|
import warnings |
|
|
import requests |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from io import StringIO |
|
|
from tqdm import tqdm |
|
|
from xgboost import XGBClassifier |
|
|
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold |
|
|
from sklearn.preprocessing import StandardScaler |
|
|
from sklearn.metrics import classification_report, roc_auc_score, brier_score_loss, confusion_matrix |
|
|
from sklearn.pipeline import Pipeline |
|
|
from sklearn.multioutput import MultiOutputClassifier |
|
|
from sklearn.base import clone |
|
|
from sklearn.calibration import CalibratedClassifierCV |
|
|
from imblearn.over_sampling import SMOTE, ADASYN, BorderlineSMOTE |
|
|
from imblearn.combine import SMOTEENN, SMOTETomek |
|
|
from imblearn.pipeline import Pipeline as ImbPipeline |
|
|
import matplotlib.pyplot as plt |
|
|
import seaborn as sns |
|
|
|
|
|
|
|
|
warnings.filterwarnings("ignore", category=UserWarning) |
|
|
|
|
|
|
|
|
LEAGUES = { |
|
|
'F1': 'France Ligue 1', 'F2': 'France Ligue 2', |
|
|
'E0': 'England Premier League', 'E1': 'Championship', |
|
|
'D1': 'Germany Bundesliga', 'D2': '2. Bundesliga', |
|
|
'I1': 'Italy Serie A', 'I2': 'Serie B', |
|
|
'SP1': 'Spain La Liga', 'SP2': 'Segunda Division', |
|
|
} |
|
|
REQUIRED_COLS = ['Date', 'HomeTeam', 'AwayTeam', 'FTHG', 'FTAG', 'FTR'] |
|
|
BASE_URL = "https://www.football-data.co.uk/mmz4281" |
|
|
TARGET_COLUMNS = ['Home_Win', 'Away_Win', 'Draw', 'Over2.5', 'BTTS'] |
|
|
|
|
|
def fetch_football_data(): |
|
|
"""Télécharge les données de football des 5 dernières saisons pour les ligues majeures européennes""" |
|
|
current_year = datetime.datetime.now().year |
|
|
seasons = [f"{str(y-1)[-2:]}{str(y)[-2:]}" for y in range(current_year-6, current_year)] |
|
|
all_data = [] |
|
|
|
|
|
for season in tqdm(seasons, desc="Chargement des saisons"): |
|
|
season_code = season[-4:] |
|
|
for league_code, league_name in LEAGUES.items(): |
|
|
try: |
|
|
url = f"{BASE_URL}/{season_code[:2]}{season_code[2:]}/{league_code}.csv" |
|
|
response = requests.get(url, timeout=15) |
|
|
response.raise_for_status() |
|
|
|
|
|
|
|
|
for encoding in ['utf-8', 'latin1', 'iso-8859-1']: |
|
|
try: |
|
|
df = pd.read_csv(StringIO(response.text), encoding=encoding, |
|
|
parse_dates=['Date'], dayfirst=True, on_bad_lines='warn') |
|
|
break |
|
|
except Exception: |
|
|
continue |
|
|
else: |
|
|
print(f"⚠️ Erreur d'encodage : {league_code} {season}") |
|
|
continue |
|
|
|
|
|
if not all(col in df.columns for col in REQUIRED_COLS): |
|
|
print(f"⚠️ Colonnes manquantes : {league_code} {season}") |
|
|
continue |
|
|
|
|
|
df['Season'] = season |
|
|
df['League'] = league_name |
|
|
df['League_Code'] = league_code |
|
|
all_data.append(df) |
|
|
except Exception as e: |
|
|
print(f"⚠️ Erreur {league_code} {season}: {str(e)}") |
|
|
continue |
|
|
|
|
|
if not all_data: |
|
|
raise ValueError("Aucune donnée valide chargée.") |
|
|
|
|
|
result_df = pd.concat(all_data, ignore_index=True).sort_values('Date') |
|
|
print(f"📊 Données chargées : {len(result_df)} matchs de {len(seasons)} saisons") |
|
|
return result_df |
|
|
|
|
|
def preprocess_data(df): |
|
|
"""Prétraite les données et calcule des caractéristiques additionnelles""" |
|
|
|
|
|
df.columns = [col.strip() for col in df.columns] |
|
|
|
|
|
|
|
|
mapping = { |
|
|
'HST': ['HST', 'HS', 'HSTS'], |
|
|
'AST': ['AST', 'AS', 'ASTS'], |
|
|
'HF': ['HF', 'HomeF', 'HFauls'], |
|
|
'AF': ['AF', 'AwayF', 'AFauls'], |
|
|
'HY': ['HY', 'HomeY'], |
|
|
'AY': ['AY', 'AwayY'], |
|
|
'HR': ['HR', 'HomeR'], |
|
|
'AR': ['AR', 'AwayR'], |
|
|
'HC': ['HC'], 'AC': ['AC'], |
|
|
'B365H': ['B365H', 'BbHwin'], 'B365D': ['B365D', 'BbDwin'], 'B365A': ['B365A', 'BbAwin'], |
|
|
'B365O2.5': ['B365O2.5', 'BbOver'], 'B365U2.5': ['B365U2.5', 'BbUnder'], |
|
|
'B365GG': ['B365GG', 'BBBTS'] |
|
|
} |
|
|
|
|
|
|
|
|
for target, sources in mapping.items(): |
|
|
for col in sources: |
|
|
if col in df.columns: |
|
|
df[target] = pd.to_numeric(df[col], errors='coerce') |
|
|
break |
|
|
|
|
|
|
|
|
df['Goal_Diff'] = df['FTHG'] - df['FTAG'] |
|
|
df['Total_Goals'] = df['FTHG'] + df['FTAG'] |
|
|
df['Shot_Diff'] = df.get('HST', 0) - df.get('AST', 0) |
|
|
df['Corners_Diff'] = df.get('HC', 0) - df.get('AC', 0) |
|
|
df['Fouls_Diff'] = df.get('HF', 0) - df.get('AF', 0) |
|
|
df['Yellow_Diff'] = df.get('HY', 0) - df.get('AY', 0) |
|
|
df['Red_Diff'] = df.get('HR', 0) - df.get('AR', 0) |
|
|
|
|
|
|
|
|
df['BTTS'] = ((df['FTHG'] > 0) & (df['FTAG'] > 0)).astype(int) |
|
|
df['Over2.5'] = (df['Total_Goals'] > 2.5).astype(int) |
|
|
|
|
|
|
|
|
for team in ['Home', 'Away']: |
|
|
team_col = f'{team}Team' |
|
|
|
|
|
|
|
|
for stat in ['FTHG', 'FTAG', 'Total_Goals', 'BTTS', 'Over2.5']: |
|
|
if stat in df.columns: |
|
|
df[f'{team}_{stat}_Last3'] = df.groupby(team_col)[stat].transform( |
|
|
lambda x: x.rolling(3, min_periods=1).mean().shift(1)) |
|
|
|
|
|
|
|
|
df[f'{team}_{stat}_Last5'] = df.groupby(team_col)[stat].transform( |
|
|
lambda x: x.rolling(5, min_periods=1).mean().shift(1)) |
|
|
|
|
|
|
|
|
if team == 'Home': |
|
|
df[f'{team}_Goals_Conceded_Last3'] = df.groupby(team_col)['FTAG'].transform( |
|
|
lambda x: x.rolling(3, min_periods=1).mean().shift(1)) |
|
|
df[f'{team}_Goals_Conceded_Last5'] = df.groupby(team_col)['FTAG'].transform( |
|
|
lambda x: x.rolling(5, min_periods=1).mean().shift(1)) |
|
|
else: |
|
|
df[f'{team}_Goals_Conceded_Last3'] = df.groupby(team_col)['FTHG'].transform( |
|
|
lambda x: x.rolling(3, min_periods=1).mean().shift(1)) |
|
|
df[f'{team}_Goals_Conceded_Last5'] = df.groupby(team_col)['FTHG'].transform( |
|
|
lambda x: x.rolling(5, min_periods=1).mean().shift(1)) |
|
|
|
|
|
|
|
|
for card_type in ['HY', 'AY', 'HR', 'AR']: |
|
|
if card_type in df.columns: |
|
|
prefix = card_type[0] |
|
|
if (prefix == 'H' and team == 'Home') or (prefix == 'A' and team == 'Away'): |
|
|
df[f'{team}_Cards_Last3'] = df.groupby(team_col)[card_type].transform( |
|
|
lambda x: x.rolling(3, min_periods=1).mean().shift(1)) |
|
|
|
|
|
|
|
|
odds_columns = { |
|
|
'Implied_Prob_Home': 'B365H', |
|
|
'Implied_Prob_Draw': 'B365D', |
|
|
'Implied_Prob_Away': 'B365A', |
|
|
'Implied_Prob_Over2.5': 'B365O2.5', |
|
|
'Implied_Prob_BTTS': 'B365GG' |
|
|
} |
|
|
|
|
|
for prob_col, odds_col in odds_columns.items(): |
|
|
if odds_col in df.columns: |
|
|
df[prob_col] = 1 / df[odds_col] |
|
|
else: |
|
|
df[prob_col] = 0.5 |
|
|
|
|
|
|
|
|
if all(col in df.columns for col in ['Implied_Prob_Home', 'Implied_Prob_Draw', 'Implied_Prob_Away']): |
|
|
total_prob = (df['Implied_Prob_Home'].fillna(0) + |
|
|
df['Implied_Prob_Draw'].fillna(0) + |
|
|
df['Implied_Prob_Away'].fillna(0)) |
|
|
for prob_col in ['Implied_Prob_Home', 'Implied_Prob_Draw', 'Implied_Prob_Away']: |
|
|
df[prob_col] = df[prob_col] / total_prob |
|
|
|
|
|
|
|
|
df['Points_Home'] = df['FTR'].map({'H': 3, 'D': 1, 'A': 0}) |
|
|
df['Points_Away'] = df['FTR'].map({'A': 3, 'D': 1, 'H': 0}) |
|
|
|
|
|
|
|
|
for period in [3, 5, 10]: |
|
|
df[f'Home_Form{period}'] = df.groupby('HomeTeam')['Points_Home'].transform( |
|
|
lambda x: x.rolling(period, min_periods=1).mean().shift(1)) |
|
|
df[f'Away_Form{period}'] = df.groupby('AwayTeam')['Points_Away'].transform( |
|
|
lambda x: x.rolling(period, min_periods=1).mean().shift(1)) |
|
|
|
|
|
|
|
|
df = df.dropna(subset=['FTHG', 'FTAG', 'FTR', 'Date']) |
|
|
df['Date'] = pd.to_datetime(df['Date'], errors='coerce') |
|
|
df = df.dropna(subset=['Date']).reset_index(drop=True) |
|
|
|
|
|
|
|
|
df['DayOfWeek'] = df['Date'].dt.dayofweek |
|
|
df['Month'] = df['Date'].dt.month |
|
|
|
|
|
|
|
|
|
|
|
seasons = df['Season'].unique() |
|
|
leagues = df['League'].unique() |
|
|
|
|
|
for season in seasons: |
|
|
for league in leagues: |
|
|
season_league_mask = (df['Season'] == season) & (df['League'] == league) |
|
|
season_league_data = df[season_league_mask].sort_values('Date') |
|
|
|
|
|
|
|
|
team_points = {} |
|
|
team_matches = {} |
|
|
|
|
|
|
|
|
for idx, row in season_league_data.iterrows(): |
|
|
home_team = row['HomeTeam'] |
|
|
away_team = row['AwayTeam'] |
|
|
|
|
|
|
|
|
for team in [home_team, away_team]: |
|
|
if team not in team_points: |
|
|
team_points[team] = 0 |
|
|
team_matches[team] = 0 |
|
|
|
|
|
|
|
|
df.loc[idx, 'Home_Rank'] = sorted( |
|
|
[(team, pts/max(1, matches)) for team, pts, matches in |
|
|
zip(team_points.keys(), team_points.values(), team_matches.values())], |
|
|
key=lambda x: x[1], reverse=True |
|
|
).index((home_team, team_points[home_team]/max(1, team_matches[home_team]))) + 1 |
|
|
|
|
|
df.loc[idx, 'Away_Rank'] = sorted( |
|
|
[(team, pts/max(1, matches)) for team, pts, matches in |
|
|
zip(team_points.keys(), team_points.values(), team_matches.values())], |
|
|
key=lambda x: x[1], reverse=True |
|
|
).index((away_team, team_points[away_team]/max(1, team_matches[away_team]))) + 1 |
|
|
|
|
|
|
|
|
if row['FTR'] == 'H': |
|
|
team_points[home_team] += 3 |
|
|
elif row['FTR'] == 'A': |
|
|
team_points[away_team] += 3 |
|
|
else: |
|
|
team_points[home_team] += 1 |
|
|
team_points[away_team] += 1 |
|
|
|
|
|
|
|
|
team_matches[home_team] += 1 |
|
|
team_matches[away_team] += 1 |
|
|
|
|
|
|
|
|
df['Rank_Diff'] = df['Home_Rank'] - df['Away_Rank'] |
|
|
|
|
|
return df |
|
|
|
|
|
def prepare_features(df): |
|
|
"""Prépare les features et les cibles pour l'entraînement""" |
|
|
|
|
|
features = [ |
|
|
'Shot_Diff', |
|
|
'Home_Total_Goals_Last5', |
|
|
'Away_Total_Goals_Last5', |
|
|
'Home_BTTS_Last5', |
|
|
'Implied_Prob_Home' |
|
|
] |
|
|
|
|
|
|
|
|
available_features = [f for f in features if f in df.columns] |
|
|
|
|
|
print(f"Features disponibles: {len(available_features)}/{len(features)}") |
|
|
if len(available_features) < len(features): |
|
|
print(f"Features manquantes: {set(features) - set(available_features)}") |
|
|
|
|
|
X = df[available_features].copy() |
|
|
|
|
|
|
|
|
X = X.apply(pd.to_numeric, errors='coerce') |
|
|
|
|
|
|
|
|
for col in X.columns: |
|
|
|
|
|
X[col] = X[col].fillna(X[col].median()) |
|
|
|
|
|
|
|
|
y = pd.DataFrame({ |
|
|
'Home_Win': (df['FTR'] == 'H').astype(int), |
|
|
'Away_Win': (df['FTR'] == 'A').astype(int), |
|
|
'Draw': (df['FTR'] == 'D').astype(int), |
|
|
'Over2.5': (df['Total_Goals'] > 2.5).astype(int), |
|
|
'BTTS': ((df['FTHG'] > 0) & (df['FTAG'] > 0)).astype(int) |
|
|
}) |
|
|
|
|
|
|
|
|
for col in y.columns: |
|
|
positive_rate = y[col].mean() * 100 |
|
|
print(f"{col}: {positive_rate:.1f}% des cas") |
|
|
|
|
|
return X, y |
|
|
|
|
|
def plot_feature_importance(model, feature_names, targets, save_dir="ml/plots"): |
|
|
"""Génère des graphiques de l'importance des features pour chaque cible""" |
|
|
os.makedirs(save_dir, exist_ok=True) |
|
|
estimators = model.named_steps['model'].estimators_ |
|
|
|
|
|
for i, (target, estimator) in enumerate(zip(targets, estimators)): |
|
|
|
|
|
if hasattr(estimator, 'base_estimator'): |
|
|
|
|
|
base_estimator = estimator.base_estimator |
|
|
if hasattr(base_estimator, 'feature_importances_'): |
|
|
importance = base_estimator.feature_importances_ |
|
|
else: |
|
|
print(f"L'estimateur pour {target} n'a pas d'attribut feature_importances_") |
|
|
continue |
|
|
elif hasattr(estimator, 'feature_importances_'): |
|
|
importance = estimator.feature_importances_ |
|
|
else: |
|
|
print(f"L'estimateur pour {target} n'a pas d'attribut feature_importances_") |
|
|
continue |
|
|
|
|
|
indices = np.argsort(importance)[::-1] |
|
|
|
|
|
|
|
|
plt.figure(figsize=(10, 8)) |
|
|
plt.title(f'Importance des features pour {target}') |
|
|
plt.barh(range(min(15, len(feature_names))), |
|
|
importance[indices][:15], align='center') |
|
|
plt.yticks(range(min(15, len(feature_names))), |
|
|
[feature_names[i] for i in indices[:15]]) |
|
|
plt.xlabel('Importance relative') |
|
|
plt.tight_layout() |
|
|
plt.savefig(f"{save_dir}/feature_importance_{target}.png") |
|
|
plt.close() |
|
|
|
|
|
def train_model(X, y): |
|
|
"""Entraîne des modèles spécifiques pour chaque cible en utilisant XGBoost""" |
|
|
X_train, X_test, y_train, y_test = train_test_split( |
|
|
X, y, test_size=0.2, random_state=42, stratify=y['Draw']) |
|
|
|
|
|
|
|
|
base_model = XGBClassifier( |
|
|
use_label_encoder=False, |
|
|
eval_metric='logloss', |
|
|
random_state=42, |
|
|
n_jobs=-1 |
|
|
) |
|
|
|
|
|
|
|
|
param_grids = { |
|
|
'Home_Win': { |
|
|
'n_estimators': [300, 400, 500], |
|
|
'max_depth': [3, 4, 5], |
|
|
'learning_rate': [0.01, 0.03], |
|
|
'subsample': [0.8, 0.9], |
|
|
'colsample_bytree': [0.8, 0.9] |
|
|
}, |
|
|
'Away_Win': { |
|
|
'n_estimators': [300, 400, 500], |
|
|
'max_depth': [3, 4, 5], |
|
|
'learning_rate': [0.01, 0.03], |
|
|
'subsample': [0.8, 0.9], |
|
|
'colsample_bytree': [0.8, 0.9] |
|
|
}, |
|
|
'Draw': { |
|
|
'n_estimators': [400, 500, 600], |
|
|
'max_depth': [4, 5, 6], |
|
|
'learning_rate': [0.005, 0.01, 0.02], |
|
|
'subsample': [0.8, 0.9], |
|
|
'colsample_bytree': [0.8, 0.9], |
|
|
'scale_pos_weight': [3] |
|
|
}, |
|
|
'Over2.5': { |
|
|
'n_estimators': [300, 400, 500], |
|
|
'max_depth': [4, 5, 6], |
|
|
'learning_rate': [0.01, 0.03], |
|
|
'subsample': [0.8, 0.9], |
|
|
'colsample_bytree': [0.8, 0.9] |
|
|
}, |
|
|
'BTTS': { |
|
|
'n_estimators': [300, 400, 500], |
|
|
'max_depth': [4, 5, 6], |
|
|
'learning_rate': [0.01, 0.03], |
|
|
'subsample': [0.8, 0.9], |
|
|
'colsample_bytree': [0.8, 0.9] |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
resampling_methods = { |
|
|
'Home_Win': SMOTE(random_state=42), |
|
|
'Away_Win': SMOTE(random_state=42), |
|
|
'Draw': SMOTETomek(random_state=42), |
|
|
'Over2.5': SMOTEENN(random_state=42), |
|
|
'BTTS': SMOTEENN(random_state=42) |
|
|
} |
|
|
|
|
|
|
|
|
best_params = {} |
|
|
estimators = [] |
|
|
scaler = StandardScaler() |
|
|
X_train_scaled = scaler.fit_transform(X_train) |
|
|
X_test_scaled = scaler.transform(X_test) |
|
|
|
|
|
print("\n🔍 Optimisation des hyperparamètres pour chaque cible...") |
|
|
|
|
|
for target in y_train.columns: |
|
|
print(f"\nOptimisation pour {target}...") |
|
|
|
|
|
|
|
|
grid_search = GridSearchCV( |
|
|
estimator=base_model, |
|
|
param_grid=param_grids[target], |
|
|
cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=42), |
|
|
scoring='roc_auc', |
|
|
n_jobs=-1, |
|
|
verbose=0 |
|
|
) |
|
|
|
|
|
|
|
|
resampler = resampling_methods[target] |
|
|
X_train_res, y_train_res = resampler.fit_resample(X_train_scaled, y_train[target]) |
|
|
|
|
|
|
|
|
grid_search.fit(X_train_res, y_train_res) |
|
|
|
|
|
|
|
|
best_params[target] = grid_search.best_params_ |
|
|
print(f"Meilleurs paramètres pour {target}: {best_params[target]}") |
|
|
print(f"Meilleur score: {grid_search.best_score_:.4f}") |
|
|
|
|
|
|
|
|
best_model = XGBClassifier(**best_params[target], random_state=42, use_label_encoder=False) |
|
|
|
|
|
|
|
|
best_model.fit( |
|
|
X_train_res, y_train_res, |
|
|
eval_set=[(X_test_scaled, y_test[target])], |
|
|
early_stopping_rounds=20, |
|
|
verbose=False |
|
|
) |
|
|
|
|
|
|
|
|
if target in ['Draw', 'Over2.5', 'BTTS']: |
|
|
print(f"Calibration des probabilités pour {target}...") |
|
|
calibrated_model = CalibratedClassifierCV( |
|
|
best_model, |
|
|
method='isotonic', |
|
|
cv='prefit' |
|
|
) |
|
|
calibrated_model.fit(X_train_scaled, y_train[target]) |
|
|
estimators.append(calibrated_model) |
|
|
else: |
|
|
estimators.append(best_model) |
|
|
|
|
|
|
|
|
multi_model = MultiOutputClassifier(base_model) |
|
|
multi_model.estimators_ = estimators |
|
|
|
|
|
pipeline = Pipeline([ |
|
|
('scaler', scaler), |
|
|
('model', multi_model) |
|
|
]) |
|
|
|
|
|
|
|
|
plot_feature_importance(pipeline, X.columns, y.columns) |
|
|
|
|
|
return pipeline, X_test, y_test, best_params |
|
|
|
|
|
def evaluate_model(model, X_test, y_test): |
|
|
"""Évalue la performance du modèle et génère des visualisations""" |
|
|
try: |
|
|
y_pred = model.predict(X_test) |
|
|
y_proba = [est.predict_proba(X_test)[:, 1] for est in model.named_steps['model'].estimators_] |
|
|
|
|
|
|
|
|
os.makedirs("ml/plots2", exist_ok=True) |
|
|
|
|
|
|
|
|
proba_results = {} |
|
|
print("\n==== RÉSULTATS D'ÉVALUATION ====") |
|
|
|
|
|
plt.figure(figsize=(12, 10)) |
|
|
|
|
|
for i, target in enumerate(y_test.columns): |
|
|
proba = y_proba[i] |
|
|
proba_results[target] = proba |
|
|
|
|
|
print(f"\n=== Évaluation pour {target} ===") |
|
|
print(classification_report(y_test[target], y_pred[:, i])) |
|
|
|
|
|
|
|
|
roc_auc = roc_auc_score(y_test[target], proba) |
|
|
brier = brier_score_loss(y_test[target], proba) |
|
|
print(f"ROC AUC: {roc_auc:.3f}") |
|
|
print(f"Brier Score: {brier:.3f}") |
|
|
|
|
|
|
|
|
cm = confusion_matrix(y_test[target], y_pred[:, i]) |
|
|
print(f"Matrice de confusion:\n{cm}") |
|
|
|
|
|
|
|
|
plt.subplot(2, 3, i+1) |
|
|
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=False) |
|
|
plt.title(f'Matrice de confusion: {target}\nAUC: {roc_auc:.3f}') |
|
|
plt.ylabel('Réel') |
|
|
plt.xlabel('Prédit') |
|
|
|
|
|
plt.tight_layout() |
|
|
plt.savefig("ml/plots2/confusion_matrices.png") |
|
|
plt.close() |
|
|
|
|
|
|
|
|
proba_df = pd.DataFrame(proba_results, index=X_test.index) |
|
|
|
|
|
|
|
|
proba_df['Highest_Proba'] = proba_df[y_test.columns].idxmax(axis=1) |
|
|
proba_df['Highest_Proba_Value'] = proba_df[y_test.columns].max(axis=1) |
|
|
|
|
|
|
|
|
print("\n=== Probabilités moyennes ===") |
|
|
mean_probas = proba_df[y_test.columns].mean().sort_values(ascending=False) |
|
|
print(mean_probas) |
|
|
|
|
|
|
|
|
plt.figure(figsize=(12, 8)) |
|
|
for i, target in enumerate(y_test.columns): |
|
|
plt.subplot(2, 3, i+1) |
|
|
sns.histplot(proba_df[target], bins=20, kde=True) |
|
|
plt.axvline(0.5, color='r', linestyle='--') |
|
|
plt.title(f'Distribution des probabilités: {target}') |
|
|
plt.xlabel('Probabilité') |
|
|
plt.ylabel('Fréquence') |
|
|
|
|
|
plt.tight_layout() |
|
|
plt.savefig("ml/plots2/probability_distributions.png") |
|
|
plt.close() |
|
|
|
|
|
|
|
|
plt.figure(figsize=(12, 8)) |
|
|
|
|
|
for i, target in enumerate(y_test.columns): |
|
|
plt.subplot(2, 3, i+1) |
|
|
|
|
|
|
|
|
n_bins = 10 |
|
|
bins = np.linspace(0, 1, n_bins + 1) |
|
|
binned_preds = np.digitize(proba_df[target], bins) - 1 |
|
|
bin_accs = np.zeros(n_bins) |
|
|
bin_confs = np.zeros(n_bins) |
|
|
bin_sizes = np.zeros(n_bins) |
|
|
|
|
|
for j in range(n_bins): |
|
|
bin_mask = binned_preds == j |
|
|
if np.sum(bin_mask) > 0: |
|
|
bin_accs[j] = np.mean(y_test[target].values[bin_mask]) |
|
|
bin_confs[j] = np.mean(proba_df[target].values[bin_mask]) |
|
|
bin_sizes[j] = np.sum(bin_mask) |
|
|
|
|
|
|
|
|
plt.plot(bin_confs, bin_accs, marker='o', linewidth=2, label='Calibration') |
|
|
plt.plot([0, 1], [0, 1], linestyle='--', color='gray', label='Parfaite calibration') |
|
|
plt.title(f'Courbe de calibration: {target}') |
|
|
plt.xlabel('Probabilité prédite') |
|
|
plt.ylabel('Fréquence observée') |
|
|
plt.legend(loc='lower right') |
|
|
|
|
|
|
|
|
for j in range(n_bins): |
|
|
if bin_sizes[j] > 0: |
|
|
plt.text(bin_confs[j], bin_accs[j], f' {int(bin_sizes[j])}', |
|
|
ha='left', va='center', fontsize=8) |
|
|
|
|
|
plt.tight_layout() |
|
|
plt.savefig("ml/plots2/calibration_curves.png") |
|
|
plt.close() |
|
|
|
|
|
|
|
|
print("\n=== Top 5 des prédictions les plus confiantes ===") |
|
|
print(proba_df.sort_values('Highest_Proba_Value', ascending=False).head(5)) |
|
|
|
|
|
|
|
|
print("\n=== Analyse des erreurs ===") |
|
|
errors = pd.DataFrame() |
|
|
|
|
|
for target in y_test.columns: |
|
|
|
|
|
false_positives = proba_df[ |
|
|
(y_test[target] == 0) & (proba_df[target] > 0.75) |
|
|
].sort_values(target, ascending=False) |
|
|
|
|
|
if not false_positives.empty: |
|
|
print(f"\nFaux positifs les plus confiants pour {target}:") |
|
|
print(false_positives.head(3)) |
|
|
|
|
|
|
|
|
false_negatives = proba_df[ |
|
|
(y_test[target] == 1) & (proba_df[target] < 0.25) |
|
|
].sort_values(target) |
|
|
|
|
|
if not false_negatives.empty: |
|
|
print(f"\nFaux négatifs les plus confiants pour {target}:") |
|
|
print(false_negatives.head(3)) |
|
|
|
|
|
return proba_df |
|
|
|
|
|
except Exception as e: |
|
|
print(f"\n❌ Erreur lors de l'évaluation : {str(e)}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return None |
|
|
|
|
|
def save_model_info(model, X, best_params, targets): |
|
|
"""Sauvegarde le modèle et les informations associées""" |
|
|
os.makedirs("ml", exist_ok=True) |
|
|
|
|
|
|
|
|
joblib.dump(model, "ml/multi_output_model_5.joblib") |
|
|
|
|
|
|
|
|
model_info = { |
|
|
"features": list(X.columns), |
|
|
"targets": targets, |
|
|
"best_params": best_params, |
|
|
"created_at": datetime.datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
with open("ml/model_info_5.json", "w") as f: |
|
|
json.dump(model_info, f, indent=2) |
|
|
|
|
|
print("\n✅ Modèle sauvegardé dans ml/multi_output_model_5.joblib") |
|
|
print("✅ Informations du modèle sauvegardées dans ml/model_info_5.json") |
|
|
|
|
|
def predict_new_matches(model, features_df, feature_names): |
|
|
"""Réalise des prédictions sur de nouveaux matchs""" |
|
|
|
|
|
X_new = features_df[feature_names].copy() |
|
|
|
|
|
|
|
|
y_proba = [est.predict_proba(X_new)[:, 1] for est in model.named_steps['model'].estimators_] |
|
|
|
|
|
|
|
|
results = pd.DataFrame() |
|
|
|
|
|
for i, target in enumerate(TARGET_COLUMNS): |
|
|
results[target] = y_proba[i] |
|
|
|
|
|
|
|
|
results['Highest_Proba'] = results[TARGET_COLUMNS].idxmax(axis=1) |
|
|
results['Highest_Proba_Value'] = results[TARGET_COLUMNS].max(axis=1) |
|
|
|
|
|
return results |
|
|
|
|
|
def main(): |
|
|
"""Fonction principale""" |
|
|
try: |
|
|
print("🚀 Démarrage du processus d'analyse et modélisation...") |
|
|
|
|
|
|
|
|
print("\n⏳ Téléchargement des données...") |
|
|
df = fetch_football_data() |
|
|
|
|
|
|
|
|
print("\n🧹 Prétraitement des données...") |
|
|
df = preprocess_data(df) |
|
|
print(f"\n📊 {len(df)} matchs prêts à l'analyse.") |
|
|
|
|
|
|
|
|
print("\n🔧 Préparation des features...") |
|
|
X, y = prepare_features(df) |
|
|
print(f"\nFeatures utilisées ({len(X.columns)}):") |
|
|
print(", ".join(X.columns)) |
|
|
|
|
|
|
|
|
print("\n🤖 Entraînement du modèle...") |
|
|
model, X_test, y_test, best_params = train_model(X, y) |
|
|
|
|
|
|
|
|
print("\n🔍 Évaluation finale...") |
|
|
evaluate_model(model, X_test, y_test) |
|
|
|
|
|
|
|
|
save_model_info(model, X, best_params, list(y.columns)) |
|
|
|
|
|
|
|
|
latest_data = df.sort_values('Date').tail(100) |
|
|
print(f"\n📝 Les {len(latest_data)} matchs les plus récents sont disponibles pour des prédictions futures.") |
|
|
|
|
|
|
|
|
print("\n🔮 Démonstration de prédiction pour les 5 derniers matchs:") |
|
|
sample_matches = X_test.head(5) |
|
|
predictions = predict_new_matches(model, sample_matches, X.columns) |
|
|
|
|
|
|
|
|
if 'HomeTeam' in df.columns and 'AwayTeam' in df.columns: |
|
|
sample_info = df.loc[sample_matches.index, ['HomeTeam', 'AwayTeam']] |
|
|
predictions = pd.concat([sample_info, predictions], axis=1) |
|
|
|
|
|
print(predictions) |
|
|
|
|
|
print("\n✅ Processus terminé avec succès!") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"\n❌ Erreur: {str(e)}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|