import pandas as pd import numpy as np from xgboost import XGBClassifier from sklearn.metrics import accuracy_score, classification_report import ta import joblib import os from sklearn.model_selection import train_test_split, TimeSeriesSplit def preprocess_data(df): """Prétraitement des données avec calcul des indicateurs techniques""" # Convertir les colonnes en float si nécessaire price_columns = ['Close', 'High', 'Low', 'Open'] for col in price_columns: df[col] = pd.to_numeric(df[col], errors='coerce') # Calculer les indicateurs techniques # RSI df['RSI'] = ta.momentum.RSIIndicator(df['Close'], window=14).rsi() # ADX adx = ta.trend.ADXIndicator(df['High'], df['Low'], df['Close'], window=14) df['ADX'] = adx.adx() # Volatilité df['Volatility_20'] = df['Close'].rolling(window=20).std() # MACD macd = ta.trend.MACD(df['Close']) df['MACD'] = macd.macd() # Ichimoku ichimoku = ta.trend.IchimokuIndicator(df['High'], df['Low']) df['Tenkan_sen'] = ichimoku.ichimoku_conversion_line() df['Kijun_sen'] = ichimoku.ichimoku_base_line() df['Senkou_Span_A'] = ichimoku.ichimoku_a() df['Senkou_Span_B'] = ichimoku.ichimoku_b() df['Chikou_Span'] = df['Close'].shift(-26) # Calcul des signaux des stratégies df['Ichimoku_ADX_Volatility_Signal'] = calculate_ichimoku_adx_volatility_signal(df) df['BB_Stoch_ATR_Signal'] = calculate_bb_stoch_atr_signal(df) df['Chikou_MACD_Pente_Signal'] = calculate_chikou_macd_signal(df) df['ADX_Stoch_Volatility_MA_Signal'] = calculate_adx_stoch_volatility_signal(df) # Ajouter les colonnes temporelles df['hour'] = pd.to_datetime(df.index).hour df['day'] = pd.to_datetime(df.index).day df['month'] = pd.to_datetime(df.index).month return df def calculate_ichimoku_adx_volatility_signal(df): """Calcul du signal Ichimoku + ADX + Volatilité""" signal = np.zeros(len(df)) # Conditions pour le signal bullish = (df['Close'] > df['Senkou_Span_A']) & \ (df['Close'] > df['Senkou_Span_B']) & \ (df['ADX'] > 25) & \ (df['Volatility_20'] < df['Volatility_20'].rolling(5).mean()) bearish = (df['Close'] < df['Senkou_Span_A']) & \ (df['Close'] < df['Senkou_Span_B']) & \ (df['ADX'] > 25) & \ (df['Volatility_20'] < df['Volatility_20'].rolling(5).mean()) signal[bullish] = 1 signal[bearish] = -1 return signal def calculate_bb_stoch_atr_signal(df): """Calcul du signal Bollinger + Stochastique + ATR""" bb = ta.volatility.BollingerBands(df['Close']) stoch = ta.momentum.StochasticOscillator(df['High'], df['Low'], df['Close']) atr = ta.volatility.AverageTrueRange(df['High'], df['Low'], df['Close']) signal = np.zeros(len(df)) bullish = (df['Close'] < bb.bollinger_lband()) & \ (stoch.stoch() < 20) & \ (atr.average_true_range() < atr.average_true_range().rolling(5).mean()) bearish = (df['Close'] > bb.bollinger_hband()) & \ (stoch.stoch() > 80) & \ (atr.average_true_range() < atr.average_true_range().rolling(5).mean()) signal[bullish] = 1 signal[bearish] = -1 return signal def calculate_chikou_macd_signal(df): """Calcul du signal Chikou Span + MACD""" signal = np.zeros(len(df)) bullish = (df['Chikou_Span'] > df['Close'].shift(26)) & \ (df['MACD'] > 0) bearish = (df['Chikou_Span'] < df['Close'].shift(26)) & \ (df['MACD'] < 0) signal[bullish] = 1 signal[bearish] = -1 return signal def calculate_adx_stoch_volatility_signal(df): """Calcul du signal ADX + Stochastique + Volatilité""" stoch = ta.momentum.StochasticOscillator(df['High'], df['Low'], df['Close']) signal = np.zeros(len(df)) bullish = (df['ADX'] > 25) & \ (stoch.stoch() < 20) & \ (df['Volatility_20'] < df['Volatility_20'].rolling(5).mean()) bearish = (df['ADX'] > 25) & \ (stoch.stoch() > 80) & \ (df['Volatility_20'] < df['Volatility_20'].rolling(5).mean()) signal[bullish] = 1 signal[bearish] = -1 return signal def calculate_strategy_performance(df, strategies, look_ahead=10): """ Calcule les performances (profit et drawdown) pour chaque stratégie """ max_profits = np.zeros((len(df), len(strategies))) max_drawdowns = np.zeros((len(df), len(strategies))) for i in range(len(df) - look_ahead): close_start = df['Close'].iloc[i] future_closes = df['Close'].iloc[i:i + look_ahead + 1].values future_highs = df['High'].iloc[i:i + look_ahead + 1].values future_lows = df['Low'].iloc[i:i + look_ahead + 1].values for j, strategy in enumerate(strategies): signal = df[strategy].iloc[i] if signal == 1: # Achat max_profit = (max(future_highs) - close_start) / close_start * 100 max_drawdown = (close_start - min(future_lows)) / close_start * 100 max_profits[i, j] = max_profit max_drawdowns[i, j] = max_drawdown if max_drawdown > 0 else 0 elif signal == -1: # Vente max_profit = (close_start - min(future_lows)) / close_start * 100 max_drawdown = (max(future_highs) - close_start) / close_start * 100 max_profits[i, j] = max_profit max_drawdowns[i, j] = max_drawdown if max_drawdown > 0 else 0 else: # Neutre max_profits[i, j] = 0 max_drawdowns[i, j] = 0 # Identifier la meilleure stratégie pour le profit best_strategy_max_profit = np.argmax(max_profits, axis=1) # Identifier la stratégie qui minimise le drawdown best_strategy_max_drawdown = np.full(len(df), -1, dtype=int) for i in range(len(df)): active_strategies = np.where(max_drawdowns[i, :] > 0)[0] if len(active_strategies) > 0: min_drawdown_idx = active_strategies[np.argmin(max_drawdowns[i, active_strategies])] best_strategy_max_drawdown[i] = min_drawdown_idx return best_strategy_max_profit, best_strategy_max_drawdown def train_models(df): """Entraînement des modèles de sélection de stratégie avec split temporel""" strategies = [ 'Ichimoku_ADX_Volatility_Signal', 'BB_Stoch_ATR_Signal', 'Chikou_MACD_Pente_Signal', 'ADX_Stoch_Volatility_MA_Signal' ] continuous_features = ['RSI', 'ADX', 'Volatility_20', 'MACD'] features = strategies + continuous_features # Calcul des meilleures stratégies basé sur les performances print("Calcul des performances des stratégies...") best_strategy_max_profit, best_strategy_max_drawdown = calculate_strategy_performance(df, strategies) # Préparation des données X = df[features].values y_profit = best_strategy_max_profit y_drawdown = best_strategy_max_drawdown # Supprimer les lignes où y_drawdown est -1 valid_drawdown_mask = y_drawdown != -1 X_drawdown = X[valid_drawdown_mask] y_drawdown = y_drawdown[valid_drawdown_mask] # Division temporelle des données (80% train, 20% test) train_size_profit = int(len(X) * 0.8) train_size_drawdown = int(len(X_drawdown) * 0.8) # Split pour le modèle de profit X_train_profit = X[:train_size_profit] X_test_profit = X[train_size_profit:] y_train_profit = y_profit[:train_size_profit] y_test_profit = y_profit[train_size_profit:] # Split pour le modèle de drawdown X_train_drawdown = X_drawdown[:train_size_drawdown] X_test_drawdown = X_drawdown[train_size_drawdown:] y_train_drawdown = y_drawdown[:train_size_drawdown] y_test_drawdown = y_drawdown[train_size_drawdown:] # Afficher la distribution temporelle print("\nPériode d'entraînement profit:") print(f"Du : {df.index[0]}") print(f"Au : {df.index[train_size_profit-1]}") print("\nPériode de test profit:") print(f"Du : {df.index[train_size_profit]}") print(f"Au : {df.index[-1]}") # Validation avec TimeSeriesSplit tscv = TimeSeriesSplit(n_splits=5) # Entraînement du modèle de profit print("\nEntraînement du modèle de profit maximal...") model_profit = XGBClassifier( n_estimators=100, learning_rate=0.1, max_depth=5, random_state=42, eval_metric='mlogloss' ) # Validation croisée temporelle pour le modèle de profit print("\nValidation croisée temporelle pour le modèle de profit:") for fold, (train_index, val_index) in enumerate(tscv.split(X_train_profit)): X_fold_train, X_fold_val = X_train_profit[train_index], X_train_profit[val_index] y_fold_train, y_fold_val = y_train_profit[train_index], y_train_profit[val_index] model_profit.fit(X_fold_train, y_fold_train) fold_score = model_profit.score(X_fold_val, y_fold_val) print(f"Fold {fold + 1} - Score: {fold_score:.4f}") # Entraînement final sur l'ensemble complet d'entraînement model_profit.fit( X_train_profit, y_train_profit, eval_set=[(X_train_profit, y_train_profit), (X_test_profit, y_test_profit)], verbose=True ) # Même processus pour le modèle de drawdown print("\nEntraînement du modèle de drawdown minimal...") model_drawdown = XGBClassifier( n_estimators=100, learning_rate=0.1, max_depth=5, random_state=42, eval_metric='mlogloss' ) print("\nValidation croisée temporelle pour le modèle de drawdown:") for fold, (train_index, val_index) in enumerate(tscv.split(X_train_drawdown)): X_fold_train, X_fold_val = X_train_drawdown[train_index], X_train_drawdown[val_index] y_fold_train, y_fold_val = y_train_drawdown[train_index], y_train_drawdown[val_index] model_drawdown.fit(X_fold_train, y_fold_train) fold_score = model_drawdown.score(X_fold_val, y_fold_val) print(f"Fold {fold + 1} - Score: {fold_score:.4f}") # Entraînement final model_drawdown.fit( X_train_drawdown, y_train_drawdown, eval_set=[(X_train_drawdown, y_train_drawdown), (X_test_drawdown, y_test_drawdown)], verbose=True ) # Évaluation des modèles y_pred_profit = model_profit.predict(X_test_profit) y_pred_drawdown = model_drawdown.predict(X_test_drawdown) print("\nPerformance du modèle de profit maximal sur les données de test:") print(classification_report( y_test_profit, y_pred_profit, target_names=strategies )) print("\nPerformance du modèle de drawdown minimal sur les données de test:") print(classification_report( y_test_drawdown, y_pred_drawdown, target_names=strategies )) # Sauvegarder les périodes d'entraînement et de test split_info = { 'profit': { 'train_start': df.index[0], 'train_end': df.index[train_size_profit-1], 'test_start': df.index[train_size_profit], 'test_end': df.index[-1] }, 'drawdown': { 'train_start': df.index[valid_drawdown_mask][0], 'train_end': df.index[valid_drawdown_mask][train_size_drawdown-1], 'test_start': df.index[valid_drawdown_mask][train_size_drawdown], 'test_end': df.index[valid_drawdown_mask][-1] } } return model_profit, model_drawdown, features, strategies, split_info def save_models(model_profit, model_drawdown, features, strategies, split_info): """Sauvegarde des modèles entraînés et leurs paramètres""" if not os.path.exists('models'): os.makedirs('models') joblib.dump(model_profit, 'models/model_profit.joblib') joblib.dump(model_drawdown, 'models/model_drawdown.joblib') model_params = { 'features': features, 'strategies': strategies, 'split_info': split_info # Sauvegarder les périodes d'entraînement et de test } joblib.dump(model_params, 'models/model_params.joblib') print("Modèles et paramètres sauvegardés dans le dossier 'models/'") def predict_best_strategy(new_data): """ Prédit la meilleure stratégie pour de nouvelles données """ # Charger les modèles et paramètres model_profit = joblib.load('models/model_profit.joblib') model_drawdown = joblib.load('models/model_drawdown.joblib') params = joblib.load('models/model_params.joblib') # Prétraiter les nouvelles données processed_data = preprocess_data(new_data) # Préparer les features X = processed_data[params['features']].values # Faire les prédictions profit_strategy_idx = model_profit.predict(X) drawdown_strategy_idx = model_drawdown.predict(X) # Obtenir les probabilités de prédiction profit_proba = model_profit.predict_proba(X) drawdown_proba = model_drawdown.predict_proba(X) # Convertir les indices en noms de stratégies profit_strategy = params['strategies'][profit_strategy_idx[-1]] drawdown_strategy = params['strategies'][drawdown_strategy_idx[-1]] return { 'best_profit_strategy': profit_strategy, 'profit_confidence': float(np.max(profit_proba[-1])), 'best_drawdown_strategy': drawdown_strategy, 'drawdown_confidence': float(np.max(drawdown_proba[-1])) } def main(): print("Chargement des données...") df=pd.read_csv('EURUSD_4H.csv',sep=';',index_col=0,parse_dates=True) print("Prétraitement des données...") df = preprocess_data(df) print("Entraînement des modèles...") model_profit, model_drawdown, features, strategies, split_info = train_models(df) print("Sauvegarde des modèles...") save_models(model_profit, model_drawdown, features, strategies, split_info) if __name__ == "__main__": main()