Fast-Api / train_strategy_models.py
thibautmodrin's picture
new push
ac0f7d0
import pandas as pd
import numpy as np
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, classification_report
import ta
import joblib
import os
from sklearn.model_selection import train_test_split, TimeSeriesSplit
def preprocess_data(df):
"""Prétraitement des données avec calcul des indicateurs techniques"""
# Convertir les colonnes en float si nécessaire
price_columns = ['Close', 'High', 'Low', 'Open']
for col in price_columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# Calculer les indicateurs techniques
# RSI
df['RSI'] = ta.momentum.RSIIndicator(df['Close'], window=14).rsi()
# ADX
adx = ta.trend.ADXIndicator(df['High'], df['Low'], df['Close'], window=14)
df['ADX'] = adx.adx()
# Volatilité
df['Volatility_20'] = df['Close'].rolling(window=20).std()
# MACD
macd = ta.trend.MACD(df['Close'])
df['MACD'] = macd.macd()
# Ichimoku
ichimoku = ta.trend.IchimokuIndicator(df['High'], df['Low'])
df['Tenkan_sen'] = ichimoku.ichimoku_conversion_line()
df['Kijun_sen'] = ichimoku.ichimoku_base_line()
df['Senkou_Span_A'] = ichimoku.ichimoku_a()
df['Senkou_Span_B'] = ichimoku.ichimoku_b()
df['Chikou_Span'] = df['Close'].shift(-26)
# Calcul des signaux des stratégies
df['Ichimoku_ADX_Volatility_Signal'] = calculate_ichimoku_adx_volatility_signal(df)
df['BB_Stoch_ATR_Signal'] = calculate_bb_stoch_atr_signal(df)
df['Chikou_MACD_Pente_Signal'] = calculate_chikou_macd_signal(df)
df['ADX_Stoch_Volatility_MA_Signal'] = calculate_adx_stoch_volatility_signal(df)
# Ajouter les colonnes temporelles
df['hour'] = pd.to_datetime(df.index).hour
df['day'] = pd.to_datetime(df.index).day
df['month'] = pd.to_datetime(df.index).month
return df
def calculate_ichimoku_adx_volatility_signal(df):
"""Calcul du signal Ichimoku + ADX + Volatilité"""
signal = np.zeros(len(df))
# Conditions pour le signal
bullish = (df['Close'] > df['Senkou_Span_A']) & \
(df['Close'] > df['Senkou_Span_B']) & \
(df['ADX'] > 25) & \
(df['Volatility_20'] < df['Volatility_20'].rolling(5).mean())
bearish = (df['Close'] < df['Senkou_Span_A']) & \
(df['Close'] < df['Senkou_Span_B']) & \
(df['ADX'] > 25) & \
(df['Volatility_20'] < df['Volatility_20'].rolling(5).mean())
signal[bullish] = 1
signal[bearish] = -1
return signal
def calculate_bb_stoch_atr_signal(df):
"""Calcul du signal Bollinger + Stochastique + ATR"""
bb = ta.volatility.BollingerBands(df['Close'])
stoch = ta.momentum.StochasticOscillator(df['High'], df['Low'], df['Close'])
atr = ta.volatility.AverageTrueRange(df['High'], df['Low'], df['Close'])
signal = np.zeros(len(df))
bullish = (df['Close'] < bb.bollinger_lband()) & \
(stoch.stoch() < 20) & \
(atr.average_true_range() < atr.average_true_range().rolling(5).mean())
bearish = (df['Close'] > bb.bollinger_hband()) & \
(stoch.stoch() > 80) & \
(atr.average_true_range() < atr.average_true_range().rolling(5).mean())
signal[bullish] = 1
signal[bearish] = -1
return signal
def calculate_chikou_macd_signal(df):
"""Calcul du signal Chikou Span + MACD"""
signal = np.zeros(len(df))
bullish = (df['Chikou_Span'] > df['Close'].shift(26)) & \
(df['MACD'] > 0)
bearish = (df['Chikou_Span'] < df['Close'].shift(26)) & \
(df['MACD'] < 0)
signal[bullish] = 1
signal[bearish] = -1
return signal
def calculate_adx_stoch_volatility_signal(df):
"""Calcul du signal ADX + Stochastique + Volatilité"""
stoch = ta.momentum.StochasticOscillator(df['High'], df['Low'], df['Close'])
signal = np.zeros(len(df))
bullish = (df['ADX'] > 25) & \
(stoch.stoch() < 20) & \
(df['Volatility_20'] < df['Volatility_20'].rolling(5).mean())
bearish = (df['ADX'] > 25) & \
(stoch.stoch() > 80) & \
(df['Volatility_20'] < df['Volatility_20'].rolling(5).mean())
signal[bullish] = 1
signal[bearish] = -1
return signal
def calculate_strategy_performance(df, strategies, look_ahead=10):
"""
Calcule les performances (profit et drawdown) pour chaque stratégie
"""
max_profits = np.zeros((len(df), len(strategies)))
max_drawdowns = np.zeros((len(df), len(strategies)))
for i in range(len(df) - look_ahead):
close_start = df['Close'].iloc[i]
future_closes = df['Close'].iloc[i:i + look_ahead + 1].values
future_highs = df['High'].iloc[i:i + look_ahead + 1].values
future_lows = df['Low'].iloc[i:i + look_ahead + 1].values
for j, strategy in enumerate(strategies):
signal = df[strategy].iloc[i]
if signal == 1: # Achat
max_profit = (max(future_highs) - close_start) / close_start * 100
max_drawdown = (close_start - min(future_lows)) / close_start * 100
max_profits[i, j] = max_profit
max_drawdowns[i, j] = max_drawdown if max_drawdown > 0 else 0
elif signal == -1: # Vente
max_profit = (close_start - min(future_lows)) / close_start * 100
max_drawdown = (max(future_highs) - close_start) / close_start * 100
max_profits[i, j] = max_profit
max_drawdowns[i, j] = max_drawdown if max_drawdown > 0 else 0
else: # Neutre
max_profits[i, j] = 0
max_drawdowns[i, j] = 0
# Identifier la meilleure stratégie pour le profit
best_strategy_max_profit = np.argmax(max_profits, axis=1)
# Identifier la stratégie qui minimise le drawdown
best_strategy_max_drawdown = np.full(len(df), -1, dtype=int)
for i in range(len(df)):
active_strategies = np.where(max_drawdowns[i, :] > 0)[0]
if len(active_strategies) > 0:
min_drawdown_idx = active_strategies[np.argmin(max_drawdowns[i, active_strategies])]
best_strategy_max_drawdown[i] = min_drawdown_idx
return best_strategy_max_profit, best_strategy_max_drawdown
def train_models(df):
"""Entraînement des modèles de sélection de stratégie avec split temporel"""
strategies = [
'Ichimoku_ADX_Volatility_Signal',
'BB_Stoch_ATR_Signal',
'Chikou_MACD_Pente_Signal',
'ADX_Stoch_Volatility_MA_Signal'
]
continuous_features = ['RSI', 'ADX', 'Volatility_20', 'MACD']
features = strategies + continuous_features
# Calcul des meilleures stratégies basé sur les performances
print("Calcul des performances des stratégies...")
best_strategy_max_profit, best_strategy_max_drawdown = calculate_strategy_performance(df, strategies)
# Préparation des données
X = df[features].values
y_profit = best_strategy_max_profit
y_drawdown = best_strategy_max_drawdown
# Supprimer les lignes où y_drawdown est -1
valid_drawdown_mask = y_drawdown != -1
X_drawdown = X[valid_drawdown_mask]
y_drawdown = y_drawdown[valid_drawdown_mask]
# Division temporelle des données (80% train, 20% test)
train_size_profit = int(len(X) * 0.8)
train_size_drawdown = int(len(X_drawdown) * 0.8)
# Split pour le modèle de profit
X_train_profit = X[:train_size_profit]
X_test_profit = X[train_size_profit:]
y_train_profit = y_profit[:train_size_profit]
y_test_profit = y_profit[train_size_profit:]
# Split pour le modèle de drawdown
X_train_drawdown = X_drawdown[:train_size_drawdown]
X_test_drawdown = X_drawdown[train_size_drawdown:]
y_train_drawdown = y_drawdown[:train_size_drawdown]
y_test_drawdown = y_drawdown[train_size_drawdown:]
# Afficher la distribution temporelle
print("\nPériode d'entraînement profit:")
print(f"Du : {df.index[0]}")
print(f"Au : {df.index[train_size_profit-1]}")
print("\nPériode de test profit:")
print(f"Du : {df.index[train_size_profit]}")
print(f"Au : {df.index[-1]}")
# Validation avec TimeSeriesSplit
tscv = TimeSeriesSplit(n_splits=5)
# Entraînement du modèle de profit
print("\nEntraînement du modèle de profit maximal...")
model_profit = XGBClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=5,
random_state=42,
eval_metric='mlogloss'
)
# Validation croisée temporelle pour le modèle de profit
print("\nValidation croisée temporelle pour le modèle de profit:")
for fold, (train_index, val_index) in enumerate(tscv.split(X_train_profit)):
X_fold_train, X_fold_val = X_train_profit[train_index], X_train_profit[val_index]
y_fold_train, y_fold_val = y_train_profit[train_index], y_train_profit[val_index]
model_profit.fit(X_fold_train, y_fold_train)
fold_score = model_profit.score(X_fold_val, y_fold_val)
print(f"Fold {fold + 1} - Score: {fold_score:.4f}")
# Entraînement final sur l'ensemble complet d'entraînement
model_profit.fit(
X_train_profit,
y_train_profit,
eval_set=[(X_train_profit, y_train_profit), (X_test_profit, y_test_profit)],
verbose=True
)
# Même processus pour le modèle de drawdown
print("\nEntraînement du modèle de drawdown minimal...")
model_drawdown = XGBClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=5,
random_state=42,
eval_metric='mlogloss'
)
print("\nValidation croisée temporelle pour le modèle de drawdown:")
for fold, (train_index, val_index) in enumerate(tscv.split(X_train_drawdown)):
X_fold_train, X_fold_val = X_train_drawdown[train_index], X_train_drawdown[val_index]
y_fold_train, y_fold_val = y_train_drawdown[train_index], y_train_drawdown[val_index]
model_drawdown.fit(X_fold_train, y_fold_train)
fold_score = model_drawdown.score(X_fold_val, y_fold_val)
print(f"Fold {fold + 1} - Score: {fold_score:.4f}")
# Entraînement final
model_drawdown.fit(
X_train_drawdown,
y_train_drawdown,
eval_set=[(X_train_drawdown, y_train_drawdown), (X_test_drawdown, y_test_drawdown)],
verbose=True
)
# Évaluation des modèles
y_pred_profit = model_profit.predict(X_test_profit)
y_pred_drawdown = model_drawdown.predict(X_test_drawdown)
print("\nPerformance du modèle de profit maximal sur les données de test:")
print(classification_report(
y_test_profit,
y_pred_profit,
target_names=strategies
))
print("\nPerformance du modèle de drawdown minimal sur les données de test:")
print(classification_report(
y_test_drawdown,
y_pred_drawdown,
target_names=strategies
))
# Sauvegarder les périodes d'entraînement et de test
split_info = {
'profit': {
'train_start': df.index[0],
'train_end': df.index[train_size_profit-1],
'test_start': df.index[train_size_profit],
'test_end': df.index[-1]
},
'drawdown': {
'train_start': df.index[valid_drawdown_mask][0],
'train_end': df.index[valid_drawdown_mask][train_size_drawdown-1],
'test_start': df.index[valid_drawdown_mask][train_size_drawdown],
'test_end': df.index[valid_drawdown_mask][-1]
}
}
return model_profit, model_drawdown, features, strategies, split_info
def save_models(model_profit, model_drawdown, features, strategies, split_info):
"""Sauvegarde des modèles entraînés et leurs paramètres"""
if not os.path.exists('models'):
os.makedirs('models')
joblib.dump(model_profit, 'models/model_profit.joblib')
joblib.dump(model_drawdown, 'models/model_drawdown.joblib')
model_params = {
'features': features,
'strategies': strategies,
'split_info': split_info # Sauvegarder les périodes d'entraînement et de test
}
joblib.dump(model_params, 'models/model_params.joblib')
print("Modèles et paramètres sauvegardés dans le dossier 'models/'")
def predict_best_strategy(new_data):
"""
Prédit la meilleure stratégie pour de nouvelles données
"""
# Charger les modèles et paramètres
model_profit = joblib.load('models/model_profit.joblib')
model_drawdown = joblib.load('models/model_drawdown.joblib')
params = joblib.load('models/model_params.joblib')
# Prétraiter les nouvelles données
processed_data = preprocess_data(new_data)
# Préparer les features
X = processed_data[params['features']].values
# Faire les prédictions
profit_strategy_idx = model_profit.predict(X)
drawdown_strategy_idx = model_drawdown.predict(X)
# Obtenir les probabilités de prédiction
profit_proba = model_profit.predict_proba(X)
drawdown_proba = model_drawdown.predict_proba(X)
# Convertir les indices en noms de stratégies
profit_strategy = params['strategies'][profit_strategy_idx[-1]]
drawdown_strategy = params['strategies'][drawdown_strategy_idx[-1]]
return {
'best_profit_strategy': profit_strategy,
'profit_confidence': float(np.max(profit_proba[-1])),
'best_drawdown_strategy': drawdown_strategy,
'drawdown_confidence': float(np.max(drawdown_proba[-1]))
}
def main():
print("Chargement des données...")
df=pd.read_csv('EURUSD_4H.csv',sep=';',index_col=0,parse_dates=True)
print("Prétraitement des données...")
df = preprocess_data(df)
print("Entraînement des modèles...")
model_profit, model_drawdown, features, strategies, split_info = train_models(df)
print("Sauvegarde des modèles...")
save_models(model_profit, model_drawdown, features, strategies, split_info)
if __name__ == "__main__":
main()