Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| import numpy as np | |
| from xgboost import XGBClassifier | |
| from sklearn.metrics import accuracy_score, classification_report | |
| import ta | |
| import joblib | |
| import os | |
| from sklearn.model_selection import train_test_split, TimeSeriesSplit | |
| def preprocess_data(df): | |
| """Prétraitement des données avec calcul des indicateurs techniques""" | |
| # Convertir les colonnes en float si nécessaire | |
| price_columns = ['Close', 'High', 'Low', 'Open'] | |
| for col in price_columns: | |
| df[col] = pd.to_numeric(df[col], errors='coerce') | |
| # Calculer les indicateurs techniques | |
| # RSI | |
| df['RSI'] = ta.momentum.RSIIndicator(df['Close'], window=14).rsi() | |
| # ADX | |
| adx = ta.trend.ADXIndicator(df['High'], df['Low'], df['Close'], window=14) | |
| df['ADX'] = adx.adx() | |
| # Volatilité | |
| df['Volatility_20'] = df['Close'].rolling(window=20).std() | |
| # MACD | |
| macd = ta.trend.MACD(df['Close']) | |
| df['MACD'] = macd.macd() | |
| # Ichimoku | |
| ichimoku = ta.trend.IchimokuIndicator(df['High'], df['Low']) | |
| df['Tenkan_sen'] = ichimoku.ichimoku_conversion_line() | |
| df['Kijun_sen'] = ichimoku.ichimoku_base_line() | |
| df['Senkou_Span_A'] = ichimoku.ichimoku_a() | |
| df['Senkou_Span_B'] = ichimoku.ichimoku_b() | |
| df['Chikou_Span'] = df['Close'].shift(-26) | |
| # Calcul des signaux des stratégies | |
| df['Ichimoku_ADX_Volatility_Signal'] = calculate_ichimoku_adx_volatility_signal(df) | |
| df['BB_Stoch_ATR_Signal'] = calculate_bb_stoch_atr_signal(df) | |
| df['Chikou_MACD_Pente_Signal'] = calculate_chikou_macd_signal(df) | |
| df['ADX_Stoch_Volatility_MA_Signal'] = calculate_adx_stoch_volatility_signal(df) | |
| # Ajouter les colonnes temporelles | |
| df['hour'] = pd.to_datetime(df.index).hour | |
| df['day'] = pd.to_datetime(df.index).day | |
| df['month'] = pd.to_datetime(df.index).month | |
| return df | |
| def calculate_ichimoku_adx_volatility_signal(df): | |
| """Calcul du signal Ichimoku + ADX + Volatilité""" | |
| signal = np.zeros(len(df)) | |
| # Conditions pour le signal | |
| bullish = (df['Close'] > df['Senkou_Span_A']) & \ | |
| (df['Close'] > df['Senkou_Span_B']) & \ | |
| (df['ADX'] > 25) & \ | |
| (df['Volatility_20'] < df['Volatility_20'].rolling(5).mean()) | |
| bearish = (df['Close'] < df['Senkou_Span_A']) & \ | |
| (df['Close'] < df['Senkou_Span_B']) & \ | |
| (df['ADX'] > 25) & \ | |
| (df['Volatility_20'] < df['Volatility_20'].rolling(5).mean()) | |
| signal[bullish] = 1 | |
| signal[bearish] = -1 | |
| return signal | |
| def calculate_bb_stoch_atr_signal(df): | |
| """Calcul du signal Bollinger + Stochastique + ATR""" | |
| bb = ta.volatility.BollingerBands(df['Close']) | |
| stoch = ta.momentum.StochasticOscillator(df['High'], df['Low'], df['Close']) | |
| atr = ta.volatility.AverageTrueRange(df['High'], df['Low'], df['Close']) | |
| signal = np.zeros(len(df)) | |
| bullish = (df['Close'] < bb.bollinger_lband()) & \ | |
| (stoch.stoch() < 20) & \ | |
| (atr.average_true_range() < atr.average_true_range().rolling(5).mean()) | |
| bearish = (df['Close'] > bb.bollinger_hband()) & \ | |
| (stoch.stoch() > 80) & \ | |
| (atr.average_true_range() < atr.average_true_range().rolling(5).mean()) | |
| signal[bullish] = 1 | |
| signal[bearish] = -1 | |
| return signal | |
| def calculate_chikou_macd_signal(df): | |
| """Calcul du signal Chikou Span + MACD""" | |
| signal = np.zeros(len(df)) | |
| bullish = (df['Chikou_Span'] > df['Close'].shift(26)) & \ | |
| (df['MACD'] > 0) | |
| bearish = (df['Chikou_Span'] < df['Close'].shift(26)) & \ | |
| (df['MACD'] < 0) | |
| signal[bullish] = 1 | |
| signal[bearish] = -1 | |
| return signal | |
| def calculate_adx_stoch_volatility_signal(df): | |
| """Calcul du signal ADX + Stochastique + Volatilité""" | |
| stoch = ta.momentum.StochasticOscillator(df['High'], df['Low'], df['Close']) | |
| signal = np.zeros(len(df)) | |
| bullish = (df['ADX'] > 25) & \ | |
| (stoch.stoch() < 20) & \ | |
| (df['Volatility_20'] < df['Volatility_20'].rolling(5).mean()) | |
| bearish = (df['ADX'] > 25) & \ | |
| (stoch.stoch() > 80) & \ | |
| (df['Volatility_20'] < df['Volatility_20'].rolling(5).mean()) | |
| signal[bullish] = 1 | |
| signal[bearish] = -1 | |
| return signal | |
| def calculate_strategy_performance(df, strategies, look_ahead=10): | |
| """ | |
| Calcule les performances (profit et drawdown) pour chaque stratégie | |
| """ | |
| max_profits = np.zeros((len(df), len(strategies))) | |
| max_drawdowns = np.zeros((len(df), len(strategies))) | |
| for i in range(len(df) - look_ahead): | |
| close_start = df['Close'].iloc[i] | |
| future_closes = df['Close'].iloc[i:i + look_ahead + 1].values | |
| future_highs = df['High'].iloc[i:i + look_ahead + 1].values | |
| future_lows = df['Low'].iloc[i:i + look_ahead + 1].values | |
| for j, strategy in enumerate(strategies): | |
| signal = df[strategy].iloc[i] | |
| if signal == 1: # Achat | |
| max_profit = (max(future_highs) - close_start) / close_start * 100 | |
| max_drawdown = (close_start - min(future_lows)) / close_start * 100 | |
| max_profits[i, j] = max_profit | |
| max_drawdowns[i, j] = max_drawdown if max_drawdown > 0 else 0 | |
| elif signal == -1: # Vente | |
| max_profit = (close_start - min(future_lows)) / close_start * 100 | |
| max_drawdown = (max(future_highs) - close_start) / close_start * 100 | |
| max_profits[i, j] = max_profit | |
| max_drawdowns[i, j] = max_drawdown if max_drawdown > 0 else 0 | |
| else: # Neutre | |
| max_profits[i, j] = 0 | |
| max_drawdowns[i, j] = 0 | |
| # Identifier la meilleure stratégie pour le profit | |
| best_strategy_max_profit = np.argmax(max_profits, axis=1) | |
| # Identifier la stratégie qui minimise le drawdown | |
| best_strategy_max_drawdown = np.full(len(df), -1, dtype=int) | |
| for i in range(len(df)): | |
| active_strategies = np.where(max_drawdowns[i, :] > 0)[0] | |
| if len(active_strategies) > 0: | |
| min_drawdown_idx = active_strategies[np.argmin(max_drawdowns[i, active_strategies])] | |
| best_strategy_max_drawdown[i] = min_drawdown_idx | |
| return best_strategy_max_profit, best_strategy_max_drawdown | |
| def train_models(df): | |
| """Entraînement des modèles de sélection de stratégie avec split temporel""" | |
| strategies = [ | |
| 'Ichimoku_ADX_Volatility_Signal', | |
| 'BB_Stoch_ATR_Signal', | |
| 'Chikou_MACD_Pente_Signal', | |
| 'ADX_Stoch_Volatility_MA_Signal' | |
| ] | |
| continuous_features = ['RSI', 'ADX', 'Volatility_20', 'MACD'] | |
| features = strategies + continuous_features | |
| # Calcul des meilleures stratégies basé sur les performances | |
| print("Calcul des performances des stratégies...") | |
| best_strategy_max_profit, best_strategy_max_drawdown = calculate_strategy_performance(df, strategies) | |
| # Préparation des données | |
| X = df[features].values | |
| y_profit = best_strategy_max_profit | |
| y_drawdown = best_strategy_max_drawdown | |
| # Supprimer les lignes où y_drawdown est -1 | |
| valid_drawdown_mask = y_drawdown != -1 | |
| X_drawdown = X[valid_drawdown_mask] | |
| y_drawdown = y_drawdown[valid_drawdown_mask] | |
| # Division temporelle des données (80% train, 20% test) | |
| train_size_profit = int(len(X) * 0.8) | |
| train_size_drawdown = int(len(X_drawdown) * 0.8) | |
| # Split pour le modèle de profit | |
| X_train_profit = X[:train_size_profit] | |
| X_test_profit = X[train_size_profit:] | |
| y_train_profit = y_profit[:train_size_profit] | |
| y_test_profit = y_profit[train_size_profit:] | |
| # Split pour le modèle de drawdown | |
| X_train_drawdown = X_drawdown[:train_size_drawdown] | |
| X_test_drawdown = X_drawdown[train_size_drawdown:] | |
| y_train_drawdown = y_drawdown[:train_size_drawdown] | |
| y_test_drawdown = y_drawdown[train_size_drawdown:] | |
| # Afficher la distribution temporelle | |
| print("\nPériode d'entraînement profit:") | |
| print(f"Du : {df.index[0]}") | |
| print(f"Au : {df.index[train_size_profit-1]}") | |
| print("\nPériode de test profit:") | |
| print(f"Du : {df.index[train_size_profit]}") | |
| print(f"Au : {df.index[-1]}") | |
| # Validation avec TimeSeriesSplit | |
| tscv = TimeSeriesSplit(n_splits=5) | |
| # Entraînement du modèle de profit | |
| print("\nEntraînement du modèle de profit maximal...") | |
| model_profit = XGBClassifier( | |
| n_estimators=100, | |
| learning_rate=0.1, | |
| max_depth=5, | |
| random_state=42, | |
| eval_metric='mlogloss' | |
| ) | |
| # Validation croisée temporelle pour le modèle de profit | |
| print("\nValidation croisée temporelle pour le modèle de profit:") | |
| for fold, (train_index, val_index) in enumerate(tscv.split(X_train_profit)): | |
| X_fold_train, X_fold_val = X_train_profit[train_index], X_train_profit[val_index] | |
| y_fold_train, y_fold_val = y_train_profit[train_index], y_train_profit[val_index] | |
| model_profit.fit(X_fold_train, y_fold_train) | |
| fold_score = model_profit.score(X_fold_val, y_fold_val) | |
| print(f"Fold {fold + 1} - Score: {fold_score:.4f}") | |
| # Entraînement final sur l'ensemble complet d'entraînement | |
| model_profit.fit( | |
| X_train_profit, | |
| y_train_profit, | |
| eval_set=[(X_train_profit, y_train_profit), (X_test_profit, y_test_profit)], | |
| verbose=True | |
| ) | |
| # Même processus pour le modèle de drawdown | |
| print("\nEntraînement du modèle de drawdown minimal...") | |
| model_drawdown = XGBClassifier( | |
| n_estimators=100, | |
| learning_rate=0.1, | |
| max_depth=5, | |
| random_state=42, | |
| eval_metric='mlogloss' | |
| ) | |
| print("\nValidation croisée temporelle pour le modèle de drawdown:") | |
| for fold, (train_index, val_index) in enumerate(tscv.split(X_train_drawdown)): | |
| X_fold_train, X_fold_val = X_train_drawdown[train_index], X_train_drawdown[val_index] | |
| y_fold_train, y_fold_val = y_train_drawdown[train_index], y_train_drawdown[val_index] | |
| model_drawdown.fit(X_fold_train, y_fold_train) | |
| fold_score = model_drawdown.score(X_fold_val, y_fold_val) | |
| print(f"Fold {fold + 1} - Score: {fold_score:.4f}") | |
| # Entraînement final | |
| model_drawdown.fit( | |
| X_train_drawdown, | |
| y_train_drawdown, | |
| eval_set=[(X_train_drawdown, y_train_drawdown), (X_test_drawdown, y_test_drawdown)], | |
| verbose=True | |
| ) | |
| # Évaluation des modèles | |
| y_pred_profit = model_profit.predict(X_test_profit) | |
| y_pred_drawdown = model_drawdown.predict(X_test_drawdown) | |
| print("\nPerformance du modèle de profit maximal sur les données de test:") | |
| print(classification_report( | |
| y_test_profit, | |
| y_pred_profit, | |
| target_names=strategies | |
| )) | |
| print("\nPerformance du modèle de drawdown minimal sur les données de test:") | |
| print(classification_report( | |
| y_test_drawdown, | |
| y_pred_drawdown, | |
| target_names=strategies | |
| )) | |
| # Sauvegarder les périodes d'entraînement et de test | |
| split_info = { | |
| 'profit': { | |
| 'train_start': df.index[0], | |
| 'train_end': df.index[train_size_profit-1], | |
| 'test_start': df.index[train_size_profit], | |
| 'test_end': df.index[-1] | |
| }, | |
| 'drawdown': { | |
| 'train_start': df.index[valid_drawdown_mask][0], | |
| 'train_end': df.index[valid_drawdown_mask][train_size_drawdown-1], | |
| 'test_start': df.index[valid_drawdown_mask][train_size_drawdown], | |
| 'test_end': df.index[valid_drawdown_mask][-1] | |
| } | |
| } | |
| return model_profit, model_drawdown, features, strategies, split_info | |
| def save_models(model_profit, model_drawdown, features, strategies, split_info): | |
| """Sauvegarde des modèles entraînés et leurs paramètres""" | |
| if not os.path.exists('models'): | |
| os.makedirs('models') | |
| joblib.dump(model_profit, 'models/model_profit.joblib') | |
| joblib.dump(model_drawdown, 'models/model_drawdown.joblib') | |
| model_params = { | |
| 'features': features, | |
| 'strategies': strategies, | |
| 'split_info': split_info # Sauvegarder les périodes d'entraînement et de test | |
| } | |
| joblib.dump(model_params, 'models/model_params.joblib') | |
| print("Modèles et paramètres sauvegardés dans le dossier 'models/'") | |
| def predict_best_strategy(new_data): | |
| """ | |
| Prédit la meilleure stratégie pour de nouvelles données | |
| """ | |
| # Charger les modèles et paramètres | |
| model_profit = joblib.load('models/model_profit.joblib') | |
| model_drawdown = joblib.load('models/model_drawdown.joblib') | |
| params = joblib.load('models/model_params.joblib') | |
| # Prétraiter les nouvelles données | |
| processed_data = preprocess_data(new_data) | |
| # Préparer les features | |
| X = processed_data[params['features']].values | |
| # Faire les prédictions | |
| profit_strategy_idx = model_profit.predict(X) | |
| drawdown_strategy_idx = model_drawdown.predict(X) | |
| # Obtenir les probabilités de prédiction | |
| profit_proba = model_profit.predict_proba(X) | |
| drawdown_proba = model_drawdown.predict_proba(X) | |
| # Convertir les indices en noms de stratégies | |
| profit_strategy = params['strategies'][profit_strategy_idx[-1]] | |
| drawdown_strategy = params['strategies'][drawdown_strategy_idx[-1]] | |
| return { | |
| 'best_profit_strategy': profit_strategy, | |
| 'profit_confidence': float(np.max(profit_proba[-1])), | |
| 'best_drawdown_strategy': drawdown_strategy, | |
| 'drawdown_confidence': float(np.max(drawdown_proba[-1])) | |
| } | |
| def main(): | |
| print("Chargement des données...") | |
| df=pd.read_csv('EURUSD_4H.csv',sep=';',index_col=0,parse_dates=True) | |
| print("Prétraitement des données...") | |
| df = preprocess_data(df) | |
| print("Entraînement des modèles...") | |
| model_profit, model_drawdown, features, strategies, split_info = train_models(df) | |
| print("Sauvegarde des modèles...") | |
| save_models(model_profit, model_drawdown, features, strategies, split_info) | |
| if __name__ == "__main__": | |
| main() |