| """ |
| Training Pipeline |
| ================== |
| End-to-end training with multi-task learning, |
| data loading, and proper financial time-series splits. |
| """ |
|
|
| import torch |
| import torch.nn as nn |
| import torch.optim as optim |
| from torch.utils.data import Dataset, DataLoader |
| import numpy as np |
| import pandas as pd |
| from typing import Dict, List, Tuple, Optional |
| import os |
| import json |
| import time |
|
|
| from trading_intelligence.feature_engine import FeatureEngine |
| from trading_intelligence.prediction_model import TradingTransformer, MultiTaskLoss |
|
|
|
|
| class FinancialTimeSeriesDataset(Dataset): |
| """ |
| PyTorch Dataset for financial time series. |
| |
| Uses walk-forward split (no random shuffling to preserve temporal order). |
| """ |
| |
| def __init__(self, X: np.ndarray, y: np.ndarray): |
| """ |
| Args: |
| X: (N, num_features, lookback_window) feature sequences |
| y: (N, num_targets) target values |
| """ |
| self.X = torch.FloatTensor(X) |
| self.y = torch.FloatTensor(y) |
| |
| def __len__(self): |
| return len(self.X) |
| |
| def __getitem__(self, idx): |
| return self.X[idx], self.y[idx] |
|
|
|
|
| class TrainingPipeline: |
| """ |
| Complete training pipeline for the trading intelligence system. |
| |
| Features: |
| 1. Data loading and feature engineering |
| 2. Walk-forward temporal splits |
| 3. Multi-task training (direction + return + risk) |
| 4. Learning rate scheduling |
| 5. Early stopping with patience |
| 6. Comprehensive logging |
| """ |
| |
| def __init__( |
| self, |
| lookback_window: int = 60, |
| prediction_horizons: List[int] = [1, 5, 20], |
| d_model: int = 128, |
| n_heads: int = 8, |
| n_layers: int = 3, |
| d_ff: int = 256, |
| patch_len: int = 8, |
| stride: int = 4, |
| dropout: float = 0.1, |
| learning_rate: float = 1e-3, |
| batch_size: int = 64, |
| max_epochs: int = 100, |
| patience: int = 10, |
| device: str = 'auto', |
| ): |
| self.lookback_window = lookback_window |
| self.prediction_horizons = prediction_horizons |
| self.d_model = d_model |
| self.n_heads = n_heads |
| self.n_layers = n_layers |
| self.d_ff = d_ff |
| self.patch_len = patch_len |
| self.stride = stride |
| self.dropout = dropout |
| self.learning_rate = learning_rate |
| self.batch_size = batch_size |
| self.max_epochs = max_epochs |
| self.patience = patience |
| |
| if device == 'auto': |
| self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| else: |
| self.device = torch.device(device) |
| |
| self.feature_engine = FeatureEngine(lookback_window, prediction_horizons) |
| self.model = None |
| self.loss_fn = None |
| self.optimizer = None |
| self.scheduler = None |
| self.training_history = [] |
| |
| def prepare_data(self, df: pd.DataFrame, |
| train_ratio: float = 0.7, |
| val_ratio: float = 0.15) -> Tuple[DataLoader, DataLoader, DataLoader]: |
| """ |
| Prepare data with walk-forward temporal splits. |
| |
| Args: |
| df: Raw OHLCV DataFrame |
| train_ratio: Fraction for training (earliest data) |
| val_ratio: Fraction for validation (middle) |
| |
| Returns: |
| train_loader, val_loader, test_loader |
| """ |
| |
| features_df = self.feature_engine.compute_all_features(df) |
| |
| |
| features_df, self.norm_params = self.feature_engine.normalize_features(features_df) |
| |
| |
| target_cols = [] |
| for h in self.prediction_horizons: |
| target_cols.extend([f'target_direction_{h}', f'target_return_{h}']) |
| |
| |
| X, y = self.feature_engine.create_sequences(features_df, target_cols=target_cols) |
| |
| |
| valid_mask = np.isfinite(X).all(axis=(1, 2)) & np.isfinite(y).all(axis=1) |
| X = X[valid_mask] |
| y = y[valid_mask] |
| |
| print(f"Total valid samples: {len(X)}") |
| print(f"Features per sample: {X.shape[1]} channels x {X.shape[2]} timesteps") |
| print(f"Targets per sample: {y.shape[1]}") |
| |
| |
| n = len(X) |
| train_end = int(n * train_ratio) |
| val_end = int(n * (train_ratio + val_ratio)) |
| |
| X_train, y_train = X[:train_end], y[:train_end] |
| X_val, y_val = X[train_end:val_end], y[train_end:val_end] |
| X_test, y_test = X[val_end:], y[val_end:] |
| |
| print(f"Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}") |
| |
| |
| self.X_test = X_test |
| self.y_test = y_test |
| |
| |
| train_dataset = FinancialTimeSeriesDataset(X_train, y_train) |
| val_dataset = FinancialTimeSeriesDataset(X_val, y_val) |
| test_dataset = FinancialTimeSeriesDataset(X_test, y_test) |
| |
| train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True) |
| val_loader = DataLoader(val_dataset, batch_size=self.batch_size, shuffle=False) |
| test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False) |
| |
| |
| self.num_channels = X.shape[1] |
| self._init_model() |
| |
| return train_loader, val_loader, test_loader |
| |
| def _init_model(self): |
| """Initialize model, loss, optimizer, and scheduler.""" |
| self.model = TradingTransformer( |
| num_channels=self.num_channels, |
| seq_len=self.lookback_window, |
| patch_len=self.patch_len, |
| stride=self.stride, |
| d_model=self.d_model, |
| n_heads=self.n_heads, |
| n_layers=self.n_layers, |
| d_ff=self.d_ff, |
| num_horizons=len(self.prediction_horizons), |
| dropout=self.dropout, |
| ).to(self.device) |
| |
| self.loss_fn = MultiTaskLoss( |
| num_horizons=len(self.prediction_horizons) |
| ).to(self.device) |
| |
| total_params = sum(p.numel() for p in self.model.parameters()) |
| print(f"Model initialized: {total_params:,} parameters") |
| print(f"Device: {self.device}") |
| |
| self.optimizer = optim.AdamW( |
| list(self.model.parameters()) + list(self.loss_fn.parameters()), |
| lr=self.learning_rate, |
| weight_decay=1e-4 |
| ) |
| |
| self.scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts( |
| self.optimizer, T_0=10, T_mult=2 |
| ) |
| |
| def _parse_targets(self, y_batch: torch.Tensor) -> Dict[str, torch.Tensor]: |
| """Parse target tensor into direction and return components.""" |
| num_horizons = len(self.prediction_horizons) |
| |
| |
| directions = torch.stack([y_batch[:, i*2] for i in range(num_horizons)], dim=1) |
| returns = torch.stack([y_batch[:, i*2+1] for i in range(num_horizons)], dim=1) |
| |
| return { |
| 'direction': directions, |
| 'returns': returns, |
| } |
| |
| def train_epoch(self, train_loader: DataLoader) -> Dict[str, float]: |
| """Train for one epoch.""" |
| self.model.train() |
| epoch_losses = {'total': 0, 'direction': 0, 'return': 0, 'risk': 0} |
| num_batches = 0 |
| |
| for X_batch, y_batch in train_loader: |
| X_batch = X_batch.to(self.device) |
| y_batch = y_batch.to(self.device) |
| |
| |
| predictions = self.model(X_batch) |
| targets = self._parse_targets(y_batch) |
| |
| |
| losses = self.loss_fn(predictions, targets) |
| |
| |
| self.optimizer.zero_grad() |
| losses['total_loss'].backward() |
| |
| |
| torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0) |
| |
| self.optimizer.step() |
| |
| |
| epoch_losses['total'] += losses['total_loss'].item() |
| epoch_losses['direction'] += losses['direction_loss'].item() |
| epoch_losses['return'] += losses['return_loss'].item() |
| epoch_losses['risk'] += losses['risk_loss'].item() |
| num_batches += 1 |
| |
| return {k: v / max(num_batches, 1) for k, v in epoch_losses.items()} |
| |
| @torch.no_grad() |
| def validate(self, val_loader: DataLoader) -> Dict[str, float]: |
| """Validate model.""" |
| self.model.eval() |
| epoch_losses = {'total': 0, 'direction': 0, 'return': 0, 'risk': 0} |
| all_direction_preds = [] |
| all_direction_targets = [] |
| num_batches = 0 |
| |
| for X_batch, y_batch in val_loader: |
| X_batch = X_batch.to(self.device) |
| y_batch = y_batch.to(self.device) |
| |
| predictions = self.model(X_batch) |
| targets = self._parse_targets(y_batch) |
| |
| losses = self.loss_fn(predictions, targets) |
| |
| epoch_losses['total'] += losses['total_loss'].item() |
| epoch_losses['direction'] += losses['direction_loss'].item() |
| epoch_losses['return'] += losses['return_loss'].item() |
| epoch_losses['risk'] += losses['risk_loss'].item() |
| |
| |
| dir_preds = (torch.sigmoid(predictions['direction_logits']) > 0.5).float() |
| all_direction_preds.append(dir_preds.cpu()) |
| all_direction_targets.append(targets['direction'].cpu()) |
| num_batches += 1 |
| |
| avg_losses = {k: v / max(num_batches, 1) for k, v in epoch_losses.items()} |
| |
| |
| if all_direction_preds: |
| all_preds = torch.cat(all_direction_preds, dim=0) |
| all_targets = torch.cat(all_direction_targets, dim=0) |
| for i, h in enumerate(self.prediction_horizons): |
| acc = (all_preds[:, i] == all_targets[:, i]).float().mean().item() |
| avg_losses[f'direction_acc_{h}'] = acc |
| |
| return avg_losses |
| |
| def train(self, train_loader: DataLoader, val_loader: DataLoader) -> Dict: |
| """ |
| Full training loop with early stopping. |
| |
| Returns training history. |
| """ |
| best_val_loss = float('inf') |
| patience_counter = 0 |
| best_model_state = None |
| |
| print(f"\n{'='*60}") |
| print(f"Starting Training ({self.max_epochs} max epochs)") |
| print(f"{'='*60}") |
| |
| for epoch in range(self.max_epochs): |
| start = time.time() |
| |
| |
| train_losses = self.train_epoch(train_loader) |
| |
| |
| val_metrics = self.validate(val_loader) |
| |
| |
| self.scheduler.step() |
| |
| elapsed = time.time() - start |
| |
| |
| epoch_record = { |
| 'epoch': epoch + 1, |
| 'train_loss': train_losses['total'], |
| 'val_loss': val_metrics['total'], |
| 'train_dir_loss': train_losses['direction'], |
| 'val_dir_loss': val_metrics['direction'], |
| 'train_ret_loss': train_losses['return'], |
| 'val_ret_loss': val_metrics['return'], |
| 'lr': self.optimizer.param_groups[0]['lr'], |
| 'elapsed': elapsed, |
| } |
| for h in self.prediction_horizons: |
| key = f'direction_acc_{h}' |
| if key in val_metrics: |
| epoch_record[key] = val_metrics[key] |
| |
| self.training_history.append(epoch_record) |
| |
| |
| acc_str = " | ".join([ |
| f"DA-{h}d: {val_metrics.get(f'direction_acc_{h}', 0):.1%}" |
| for h in self.prediction_horizons |
| ]) |
| print( |
| f"Epoch {epoch+1:3d}/{self.max_epochs} | " |
| f"Train: {train_losses['total']:.4f} | " |
| f"Val: {val_metrics['total']:.4f} | " |
| f"{acc_str} | " |
| f"LR: {self.optimizer.param_groups[0]['lr']:.6f} | " |
| f"{elapsed:.1f}s" |
| ) |
| |
| |
| if val_metrics['total'] < best_val_loss: |
| best_val_loss = val_metrics['total'] |
| patience_counter = 0 |
| best_model_state = {k: v.cpu().clone() for k, v in self.model.state_dict().items()} |
| else: |
| patience_counter += 1 |
| if patience_counter >= self.patience: |
| print(f"\nEarly stopping at epoch {epoch+1} (patience={self.patience})") |
| break |
| |
| |
| if best_model_state: |
| self.model.load_state_dict(best_model_state) |
| self.model.to(self.device) |
| print(f"Restored best model (val_loss={best_val_loss:.4f})") |
| |
| return { |
| 'best_val_loss': best_val_loss, |
| 'total_epochs': len(self.training_history), |
| 'history': self.training_history, |
| } |
| |
| def save_model(self, path: str): |
| """Save model and training artifacts.""" |
| os.makedirs(os.path.dirname(path) if os.path.dirname(path) else '.', exist_ok=True) |
| |
| save_dict = { |
| 'model_state': self.model.state_dict(), |
| 'loss_fn_state': self.loss_fn.state_dict(), |
| 'norm_params': self.norm_params if hasattr(self, 'norm_params') else {}, |
| 'feature_names': self.feature_engine.feature_names, |
| 'config': { |
| 'lookback_window': self.lookback_window, |
| 'prediction_horizons': self.prediction_horizons, |
| 'num_channels': self.num_channels, |
| 'd_model': self.d_model, |
| 'n_heads': self.n_heads, |
| 'n_layers': self.n_layers, |
| 'd_ff': self.d_ff, |
| 'patch_len': self.patch_len, |
| 'stride': self.stride, |
| 'dropout': self.dropout, |
| }, |
| 'training_history': self.training_history, |
| } |
| |
| torch.save(save_dict, path) |
| print(f"Model saved to {path}") |
| |
| def load_model(self, path: str): |
| """Load model from checkpoint.""" |
| checkpoint = torch.load(path, map_location=self.device, weights_only=False) |
| config = checkpoint['config'] |
| |
| |
| self.num_channels = config['num_channels'] |
| self.d_model = config.get('d_model', self.d_model) |
| self.n_heads = config.get('n_heads', self.n_heads) |
| self.n_layers = config.get('n_layers', self.n_layers) |
| self.d_ff = config.get('d_ff', self.d_ff) |
| self.patch_len = config.get('patch_len', self.patch_len) |
| self.stride = config.get('stride', self.stride) |
| self.dropout = config.get('dropout', self.dropout) |
| self.lookback_window = config.get('lookback_window', self.lookback_window) |
| if 'prediction_horizons' in config: |
| self.prediction_horizons = config['prediction_horizons'] |
| |
| self._init_model() |
| |
| self.model.load_state_dict(checkpoint['model_state']) |
| self.loss_fn.load_state_dict(checkpoint['loss_fn_state']) |
| self.norm_params = checkpoint.get('norm_params', {}) |
| self.feature_engine.feature_names = checkpoint.get('feature_names', []) |
| self.training_history = checkpoint.get('training_history', []) |
| |
| print(f"Model loaded from {path}") |
|
|