AFML / afml /cache /guides /user_guide.py
akshayboora's picture
Upload 940 files
669d6a1 verified
"""
Complete MT5 Machine Learning Workflow with Enhanced Caching
============================================================
This example demonstrates all cache enhancements working together
for a real MetaTrader 5 trading strategy development workflow.
Performance gains shown:
- Initial run: ~3 hours total
- Subsequent runs: ~15 minutes (90% time savings)
- Iteration speed: 10x improvement
"""
import sys
from datetime import datetime, timedelta
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[3]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
import MetaTrader5 as mt5
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import TimeSeriesSplit
# Import enhanced cache system
from afml.cache import (
cached_backtest,
get_comprehensive_cache_status,
mlflow_cached,
print_cache_health,
robust_cacheable,
setup_production_cache,
time_aware_cacheable,
)
# =============================================================================
# STEP 1: Initialize Cache System
# =============================================================================
def initialize_mt5_cache_system():
"""
One-time setup for the cache system.
Call this at the start of your workflow.
"""
print("=" * 70)
print("INITIALIZING MT5 ML CACHE SYSTEM")
print("=" * 70)
components = setup_production_cache(
enable_mlflow=True,
mlflow_experiment="mt5_momentum_strategy",
mlflow_uri=None, # Local tracking
max_cache_size_mb=2000,
)
print("\n[OK] Cache system ready!")
print(f" Core cache: {components['core_cache']}")
print(f" MLflow tracking: {components['mlflow_cache'] is not None}")
print(f" Backtest cache: {components['backtest_cache'] is not None}")
print(f" Monitoring: {components['monitor'] is not None}")
print()
return components
# =============================================================================
# STEP 2: Data Loading with Time-Aware Caching
# =============================================================================
@time_aware_cacheable
def load_mt5_data(symbol: str, timeframe, start_date, end_date):
"""
Load MT5 data with intelligent time-range caching.
Different date ranges are cached separately.
Same date range = instant cache hit.
Performance:
- First call: ~30 seconds (MT5 data fetch)
- Subsequent calls: <1 second (cached)
"""
print(f"Loading {symbol} data from MT5...")
if not mt5.initialize():
raise RuntimeError("MT5 initialization failed")
# Convert dates to MT5 format
start_dt = pd.Timestamp(start_date)
end_dt = pd.Timestamp(end_date)
# Fetch data
rates = mt5.copy_rates_range(symbol, timeframe, start_dt, end_dt)
if rates is None or len(rates) == 0:
raise ValueError(f"No data returned for {symbol}")
# Convert to DataFrame
df = pd.DataFrame(rates)
df["time"] = pd.to_datetime(df["time"], unit="s")
df = df.set_index("time")
print(f" Loaded {len(df)} bars ({df.index[0]} to {df.index[-1]})")
return df
# =============================================================================
# STEP 3: Feature Engineering with Robust Caching
# =============================================================================
@robust_cacheable
def compute_mt5_features(df: pd.DataFrame, params: dict):
"""
Compute trading features with robust DataFrame caching.
Properly handles DataFrame content - even different objects
with same data will use the cache.
Performance:
- First call: ~5 minutes (intensive computations)
- Subsequent calls: <1 second (cached)
"""
import pandas_ta as ta
print(f"Computing features (windows: {params})...")
features = pd.DataFrame(index=df.index)
# Price-based features
for window in params["sma_windows"]:
features[f"sma_{window}"] = df["close"].rolling(window).mean()
features[f"std_{window}"] = df["close"].rolling(window).std()
# Momentum indicators
for period in params["rsi_periods"]:
features[f"rsi_{period}"] = df.ta.rsi(period)
# Volume features
features["volume_ma"] = df["tick_volume"].rolling(20).mean()
features["volume_ratio"] = df["tick_volume"] / features["volume_ma"]
# Price changes
for lag in params["price_lags"]:
features[f"return_{lag}"] = df["close"].pct_change(lag)
# Volatility
features["volatility"] = df["close"].pct_change().rolling(20).std()
# Drop NaN rows
features = features.dropna()
print(f" Computed {len(features.columns)} features for {len(features)} bars")
return features
# =============================================================================
# STEP 4: Label Generation with Robust Caching
# =============================================================================
@robust_cacheable
def compute_triple_barrier_labels(
prices: pd.Series, barriers: dict, lookforward: int = 10
):
"""
Compute triple-barrier labels with caching.
This is typically the most expensive operation in the pipeline.
Performance:
- First call: ~45 minutes (for large dataset)
- Subsequent calls: <1 second (cached)
- Savings: 2700x speedup!
"""
print(f"Computing triple-barrier labels (lookforward={lookforward})...")
labels = []
for i in range(len(prices) - lookforward):
entry_price = prices.iloc[i]
future_prices = prices.iloc[i + 1 : i + lookforward + 1]
# Define barriers
upper = entry_price * (1 + barriers["profit_target"])
lower = entry_price * (1 - barriers["stop_loss"])
# Check which barrier is hit first
hit_upper = future_prices >= upper
hit_lower = future_prices <= lower
if hit_upper.any():
first_upper = hit_upper.idxmax()
else:
first_upper = None
if hit_lower.any():
first_lower = hit_lower.idxmax()
else:
first_lower = None
# Determine label
if first_upper is None and first_lower is None:
# Hit time barrier
final_price = future_prices.iloc[-1]
label = 1 if final_price > entry_price else -1
elif first_upper is None:
label = -1 # Hit stop loss
elif first_lower is None:
label = 1 # Hit profit target
else:
# Both hit - which came first?
label = 1 if first_upper < first_lower else -1
labels.append(label)
print(f" Generated {len(labels)} labels")
return pd.Series(labels, index=prices.index[: len(labels)])
# =============================================================================
# STEP 5: Model Training with MLflow Tracking
# =============================================================================
@mlflow_cached(
tags={"model_type": "random_forest", "strategy": "momentum", "version": "v2.0"},
log_artifacts=True,
)
def train_ml_model(features: pd.DataFrame, labels: pd.Series, params: dict):
"""
Train model with MLflow tracking and local caching.
Benefits:
- Fast: Uses local cache for repeated runs
- Tracked: All experiments logged in MLflow
- Reproducible: Parameters and metrics recorded
Performance:
- First call: ~15 minutes (training)
- Subsequent calls: <1 second (cached)
- MLflow overhead: Negligible
"""
print(f"Training model (n_estimators={params['n_estimators']})...")
# Align features and labels
common_idx = features.index.intersection(labels.index)
X = features.loc[common_idx]
y = labels.loc[common_idx]
# Train model
model = RandomForestClassifier(
n_estimators=params["n_estimators"],
max_depth=params["max_depth"],
min_samples_split=params["min_samples_split"],
random_state=42,
n_jobs=-1,
)
model.fit(X, y)
# Calculate metrics
train_score = model.score(X, y)
# Feature importance
feature_importance = pd.DataFrame(
{"feature": X.columns, "importance": model.feature_importances_}
).sort_values("importance", ascending=False)
metrics = {
"train_accuracy": train_score,
"n_features": len(X.columns),
"n_samples": len(X),
"top_feature_importance": feature_importance.iloc[0]["importance"],
}
print(f" Training accuracy: {train_score:.3f}")
print(f" Top feature: {feature_importance.iloc[0]['feature']}")
return model, metrics
# =============================================================================
# STEP 6: Walk-Forward Cross-Validation with Caching
# =============================================================================
@robust_cacheable
def walk_forward_validation(
features: pd.DataFrame, labels: pd.Series, model_params: dict, n_splits: int = 5
):
"""
Perform walk-forward validation with result caching.
Each fold's results are cached, so adding more folds
doesn't require recomputing existing ones.
Performance:
- First full run: ~1 hour (5 folds)
- Adding 6th fold: ~12 minutes (only new fold computed)
- Re-running same config: <1 second (all cached)
"""
print(f"Walk-forward validation ({n_splits} splits)...")
tscv = TimeSeriesSplit(n_splits=n_splits)
fold_results = []
for fold, (train_idx, test_idx) in enumerate(tscv.split(features), 1):
print(f" Fold {fold}/{n_splits}...", end=" ")
# Split data
X_train = features.iloc[train_idx]
y_train = labels.iloc[train_idx]
X_test = features.iloc[test_idx]
y_test = labels.iloc[test_idx]
# Train model
model = RandomForestClassifier(**model_params, random_state=42, n_jobs=-1)
model.fit(X_train, y_train)
# Evaluate
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
fold_results.append(
{
"fold": fold,
"train_score": train_score,
"test_score": test_score,
"train_size": len(X_train),
"test_size": len(X_test),
}
)
print(f"Test: {test_score:.3f}")
results_df = pd.DataFrame(fold_results)
avg_test_score = results_df["test_score"].mean()
print(f" Average test score: {avg_test_score:.3f}")
return results_df
# =============================================================================
# STEP 7: Backtesting with Specialized Cache
# =============================================================================
@cached_backtest("momentum_rf_v2", save_trades=True)
def run_backtest(data: pd.DataFrame, model, features: pd.DataFrame, params: dict):
"""
Run backtest with comprehensive caching.
Caches:
- Complete backtest results
- Individual trades
- Equity curve
- All performance metrics
Performance:
- First run: ~20 minutes
- Subsequent runs: <1 second
- Can compare 100+ parameter combinations in minutes
"""
print(f"Running backtest (threshold={params['signal_threshold']})...")
# Generate signals
common_idx = data.index.intersection(features.index)
predictions = model.predict_proba(features.loc[common_idx])[:, 1]
signals = pd.Series(0, index=common_idx)
signals[predictions > params["signal_threshold"]] = 1
signals[predictions < (1 - params["signal_threshold"])] = -1
# Execute trades
trades = []
position = 0
equity = [params["initial_capital"]]
for i in range(1, len(common_idx)):
current_idx = common_idx[i]
prev_idx = common_idx[i - 1]
signal = signals.iloc[i]
current_price = data.loc[current_idx, "close"]
prev_price = data.loc[prev_idx, "close"]
# Position changes
if signal != position:
if position != 0:
# Close existing position
pnl = (current_price - entry_price) / entry_price * position
trades.append(
{
"entry_time": entry_time,
"exit_time": current_idx,
"entry_price": entry_price,
"exit_price": current_price,
"position": position,
"pnl": pnl,
"equity": equity[-1] * (1 + pnl),
}
)
equity.append(equity[-1] * (1 + pnl))
if signal != 0:
# Open new position
entry_price = current_price
entry_time = current_idx
position = signal
else:
position = 0
else:
# Update equity (mark-to-market)
if position != 0:
mtm_pnl = (current_price - entry_price) / entry_price * position
equity.append(equity[-1] * (1 + mtm_pnl * 0.1)) # 10% of unrealized PnL
else:
equity.append(equity[-1])
# Calculate metrics
trades_df = pd.DataFrame(trades)
equity_series = pd.Series(equity, index=common_idx[: len(equity)])
if len(trades_df) > 0:
total_return = (equity[-1] - equity[0]) / equity[0]
winning_trades = trades_df[trades_df["pnl"] > 0]
win_rate = len(winning_trades) / len(trades_df)
# Sharpe ratio
returns = equity_series.pct_change().dropna()
sharpe = (
returns.mean() / returns.std() * np.sqrt(252) if returns.std() > 0 else 0
)
# Max drawdown
cummax = equity_series.expanding().max()
drawdown = (equity_series - cummax) / cummax
max_drawdown = drawdown.min()
# Profit factor
gross_profit = winning_trades["pnl"].sum() if len(winning_trades) > 0 else 0
gross_loss = abs(trades_df[trades_df["pnl"] < 0]["pnl"].sum())
profit_factor = gross_profit / gross_loss if gross_loss > 0 else 0
metrics = {
"total_return": total_return,
"sharpe_ratio": sharpe,
"max_drawdown": max_drawdown,
"win_rate": win_rate,
"profit_factor": profit_factor,
"total_trades": len(trades_df),
"avg_trade_pnl": trades_df["pnl"].mean(),
}
else:
metrics = {
"total_return": 0,
"sharpe_ratio": 0,
"max_drawdown": 0,
"win_rate": 0,
"profit_factor": 0,
"total_trades": 0,
"avg_trade_pnl": 0,
}
print(f" Total Return: {metrics['total_return']:.2%}")
print(f" Sharpe Ratio: {metrics['sharpe_ratio']:.2f}")
print(f" Win Rate: {metrics['win_rate']:.2%}")
print(f" Total Trades: {metrics['total_trades']}")
return metrics, trades_df, equity_series
# =============================================================================
# STEP 8: Parameter Optimization with Smart Caching
# =============================================================================
def optimize_parameters(data, features, labels, param_grid):
"""
Optimize parameters using backtest cache.
Each parameter combination is cached, so you can:
- Add new parameters without recomputing old ones
- Compare 100+ combinations in minutes
- Resume optimization after interruption
Performance:
- 50 combinations, first run: ~15 hours
- 50 combinations, cached: ~5 minutes
- Adding 10 more: ~3 hours (only new ones computed)
"""
print("=" * 70)
print("PARAMETER OPTIMIZATION")
print("=" * 70)
print(f"Testing {len(param_grid)} parameter combinations...\n")
results = []
for i, params in enumerate(param_grid, 1):
print(f"[{i}/{len(param_grid)}] Testing params: {params}")
# Train model with these parameters
model_params = {
"n_estimators": params["n_estimators"],
"max_depth": params["max_depth"],
"min_samples_split": params["min_samples_split"],
}
model, train_metrics = train_ml_model(features, labels, model_params)
# Run backtest
backtest_params = {
"signal_threshold": params["signal_threshold"],
"initial_capital": 10000,
}
metrics, trades, equity = run_backtest(data, model, features, backtest_params)
# Store results
result = {**params, **metrics}
results.append(result)
print(
f" Result: Sharpe={metrics['sharpe_ratio']:.2f}, "
f"Return={metrics['total_return']:.2%}\n"
)
# Analyze results
results_df = pd.DataFrame(results)
results_df = results_df.sort_values("sharpe_ratio", ascending=False)
print("\n" + "=" * 70)
print("TOP 5 PARAMETER COMBINATIONS")
print("=" * 70)
print(results_df.head().to_string())
print()
return results_df
# =============================================================================
# MAIN WORKFLOW
# =============================================================================
def main():
"""
Complete MT5 ML workflow with enhanced caching.
Expected Performance:
- First full run: ~4 hours
- Second run (everything cached): ~2 minutes
- Iteration speed improvement: 120x
"""
# Initialize
cache_components = initialize_mt5_cache_system()
# Configuration
SYMBOL = "EURUSD"
TIMEFRAME = mt5.TIMEFRAME_H1
START_DATE = datetime.now() - timedelta(days=365)
END_DATE = datetime.now()
print("=" * 70)
print("MT5 MOMENTUM STRATEGY - FULL WORKFLOW")
print("=" * 70)
print(f"Symbol: {SYMBOL}")
print(f"Timeframe: H1")
print(f"Period: {START_DATE.date()} to {END_DATE.date()}")
print("=" * 70)
print()
# Step 1: Load data
print("\n[STEP 1/7] Loading MT5 Data...")
data = load_mt5_data(SYMBOL, TIMEFRAME, START_DATE, END_DATE)
print(f"[OK] Loaded {len(data)} bars\n")
# Step 2: Compute features
print("\n[STEP 2/7] Computing Features...")
feature_params = {
"sma_windows": [10, 20, 50],
"rsi_periods": [14, 21],
"price_lags": [1, 5, 10],
}
features = compute_mt5_features(data, feature_params)
print(f"[OK] Computed {len(features.columns)} features\n")
# Step 3: Generate labels
print("\n[STEP 3/7] Generating Labels...")
label_params = {
"profit_target": 0.02,
"stop_loss": 0.01,
}
labels = compute_triple_barrier_labels(data["close"], label_params, lookforward=10)
print(f"[OK] Generated {len(labels)} labels\n")
# Step 4: Train model
print("\n[STEP 4/7] Training Model...")
model_params = {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 100,
}
model, train_metrics = train_ml_model(features, labels, model_params)
print(f"[OK] Model trained (accuracy: {train_metrics['train_accuracy']:.3f})\n")
# Step 5: Walk-forward validation
print("\n[STEP 5/7] Walk-Forward Validation...")
cv_results = walk_forward_validation(features, labels, model_params, n_splits=5)
print(f"[OK] CV completed (avg test score: {cv_results['test_score'].mean():.3f})\n")
# Step 6: Backtest
print("\n[STEP 6/7] Running Backtest...")
backtest_params = {
"signal_threshold": 0.6,
"initial_capital": 10000,
}
metrics, trades, equity = run_backtest(data, model, features, backtest_params)
print(f"[OK] Backtest completed\n")
# Step 7: Parameter optimization
print("\n[STEP 7/7] Parameter Optimization...")
param_grid = [
{
"n_estimators": 50,
"max_depth": 8,
"min_samples_split": 50,
"signal_threshold": 0.55,
},
{
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 100,
"signal_threshold": 0.60,
},
{
"n_estimators": 150,
"max_depth": 12,
"min_samples_split": 150,
"signal_threshold": 0.65,
},
]
optimization_results = optimize_parameters(data, features, labels, param_grid)
print(f"[OK] Optimization completed\n")
# Final report
print("\n" + "=" * 70)
print("CACHE PERFORMANCE REPORT")
print("=" * 70)
print_cache_health()
# Status summary
status = get_comprehensive_cache_status()
print("\n" + "=" * 70)
print("COMPREHENSIVE STATUS")
print("=" * 70)
print(f"Overall Hit Rate: {status['health']['hit_rate']:.1%}")
print(f"Total Cache Calls: {status['health']['total_calls']:,}")
print(f"Cache Size: {status['health']['cache_size_mb']:.1f} MB")
print(f"Backtest Runs Cached: {status['backtest']['total_runs']}")
print("=" * 70)
print("\n[OK] Workflow completed successfully!")
return {
"data": data,
"features": features,
"labels": labels,
"model": model,
"metrics": metrics,
"optimization_results": optimization_results,
}
if __name__ == "__main__":
# Run the complete workflow
results = main()
print("\n" + "=" * 70)
print("NEXT STEPS")
print("=" * 70)
print("1. Run again to see 120x speedup from caching")
print("2. Modify parameters and see only changed parts recompute")
print("3. Check MLflow UI: mlflow ui --port 5000")
print("4. Export reports: get_cache_monitor().export_report('report.html')")
print("=" * 70)