""" Training Pipeline ================== End-to-end training with multi-task learning, data loading, and proper financial time-series splits. """ import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader import numpy as np import pandas as pd from typing import Dict, List, Tuple, Optional import os import json import time from trading_intelligence.feature_engine import FeatureEngine from trading_intelligence.prediction_model import TradingTransformer, MultiTaskLoss class FinancialTimeSeriesDataset(Dataset): """ PyTorch Dataset for financial time series. Uses walk-forward split (no random shuffling to preserve temporal order). """ def __init__(self, X: np.ndarray, y: np.ndarray): """ Args: X: (N, num_features, lookback_window) feature sequences y: (N, num_targets) target values """ self.X = torch.FloatTensor(X) self.y = torch.FloatTensor(y) def __len__(self): return len(self.X) def __getitem__(self, idx): return self.X[idx], self.y[idx] class TrainingPipeline: """ Complete training pipeline for the trading intelligence system. Features: 1. Data loading and feature engineering 2. Walk-forward temporal splits 3. Multi-task training (direction + return + risk) 4. Learning rate scheduling 5. Early stopping with patience 6. Comprehensive logging """ def __init__( self, lookback_window: int = 60, prediction_horizons: List[int] = [1, 5, 20], d_model: int = 128, n_heads: int = 8, n_layers: int = 3, d_ff: int = 256, patch_len: int = 8, stride: int = 4, dropout: float = 0.1, learning_rate: float = 1e-3, batch_size: int = 64, max_epochs: int = 100, patience: int = 10, device: str = 'auto', ): self.lookback_window = lookback_window self.prediction_horizons = prediction_horizons self.d_model = d_model self.n_heads = n_heads self.n_layers = n_layers self.d_ff = d_ff self.patch_len = patch_len self.stride = stride self.dropout = dropout self.learning_rate = learning_rate self.batch_size = batch_size self.max_epochs = max_epochs self.patience = patience if device == 'auto': self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') else: self.device = torch.device(device) self.feature_engine = FeatureEngine(lookback_window, prediction_horizons) self.model = None self.loss_fn = None self.optimizer = None self.scheduler = None self.training_history = [] def prepare_data(self, df: pd.DataFrame, train_ratio: float = 0.7, val_ratio: float = 0.15) -> Tuple[DataLoader, DataLoader, DataLoader]: """ Prepare data with walk-forward temporal splits. Args: df: Raw OHLCV DataFrame train_ratio: Fraction for training (earliest data) val_ratio: Fraction for validation (middle) Returns: train_loader, val_loader, test_loader """ # Feature engineering features_df = self.feature_engine.compute_all_features(df) # Normalize features features_df, self.norm_params = self.feature_engine.normalize_features(features_df) # Create target columns target_cols = [] for h in self.prediction_horizons: target_cols.extend([f'target_direction_{h}', f'target_return_{h}']) # Create sequences X, y = self.feature_engine.create_sequences(features_df, target_cols=target_cols) # Remove any NaN/Inf valid_mask = np.isfinite(X).all(axis=(1, 2)) & np.isfinite(y).all(axis=1) X = X[valid_mask] y = y[valid_mask] print(f"Total valid samples: {len(X)}") print(f"Features per sample: {X.shape[1]} channels x {X.shape[2]} timesteps") print(f"Targets per sample: {y.shape[1]}") # Temporal split (NO shuffling - preserves time order) n = len(X) train_end = int(n * train_ratio) val_end = int(n * (train_ratio + val_ratio)) X_train, y_train = X[:train_end], y[:train_end] X_val, y_val = X[train_end:val_end], y[train_end:val_end] X_test, y_test = X[val_end:], y[val_end:] print(f"Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}") # Store test data for evaluation self.X_test = X_test self.y_test = y_test # Create DataLoaders train_dataset = FinancialTimeSeriesDataset(X_train, y_train) val_dataset = FinancialTimeSeriesDataset(X_val, y_val) test_dataset = FinancialTimeSeriesDataset(X_test, y_test) train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True) val_loader = DataLoader(val_dataset, batch_size=self.batch_size, shuffle=False) test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False) # Initialize model with correct number of channels self.num_channels = X.shape[1] self._init_model() return train_loader, val_loader, test_loader def _init_model(self): """Initialize model, loss, optimizer, and scheduler.""" self.model = TradingTransformer( num_channels=self.num_channels, seq_len=self.lookback_window, patch_len=self.patch_len, stride=self.stride, d_model=self.d_model, n_heads=self.n_heads, n_layers=self.n_layers, d_ff=self.d_ff, num_horizons=len(self.prediction_horizons), dropout=self.dropout, ).to(self.device) self.loss_fn = MultiTaskLoss( num_horizons=len(self.prediction_horizons) ).to(self.device) total_params = sum(p.numel() for p in self.model.parameters()) print(f"Model initialized: {total_params:,} parameters") print(f"Device: {self.device}") self.optimizer = optim.AdamW( list(self.model.parameters()) + list(self.loss_fn.parameters()), lr=self.learning_rate, weight_decay=1e-4 ) self.scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts( self.optimizer, T_0=10, T_mult=2 ) def _parse_targets(self, y_batch: torch.Tensor) -> Dict[str, torch.Tensor]: """Parse target tensor into direction and return components.""" num_horizons = len(self.prediction_horizons) # y layout: [dir_1, ret_1, dir_5, ret_5, dir_20, ret_20] directions = torch.stack([y_batch[:, i*2] for i in range(num_horizons)], dim=1) returns = torch.stack([y_batch[:, i*2+1] for i in range(num_horizons)], dim=1) return { 'direction': directions, 'returns': returns, } def train_epoch(self, train_loader: DataLoader) -> Dict[str, float]: """Train for one epoch.""" self.model.train() epoch_losses = {'total': 0, 'direction': 0, 'return': 0, 'risk': 0} num_batches = 0 for X_batch, y_batch in train_loader: X_batch = X_batch.to(self.device) y_batch = y_batch.to(self.device) # Forward pass predictions = self.model(X_batch) targets = self._parse_targets(y_batch) # Compute loss losses = self.loss_fn(predictions, targets) # Backward pass self.optimizer.zero_grad() losses['total_loss'].backward() # Gradient clipping torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0) self.optimizer.step() # Accumulate epoch_losses['total'] += losses['total_loss'].item() epoch_losses['direction'] += losses['direction_loss'].item() epoch_losses['return'] += losses['return_loss'].item() epoch_losses['risk'] += losses['risk_loss'].item() num_batches += 1 return {k: v / max(num_batches, 1) for k, v in epoch_losses.items()} @torch.no_grad() def validate(self, val_loader: DataLoader) -> Dict[str, float]: """Validate model.""" self.model.eval() epoch_losses = {'total': 0, 'direction': 0, 'return': 0, 'risk': 0} all_direction_preds = [] all_direction_targets = [] num_batches = 0 for X_batch, y_batch in val_loader: X_batch = X_batch.to(self.device) y_batch = y_batch.to(self.device) predictions = self.model(X_batch) targets = self._parse_targets(y_batch) losses = self.loss_fn(predictions, targets) epoch_losses['total'] += losses['total_loss'].item() epoch_losses['direction'] += losses['direction_loss'].item() epoch_losses['return'] += losses['return_loss'].item() epoch_losses['risk'] += losses['risk_loss'].item() # Track direction accuracy dir_preds = (torch.sigmoid(predictions['direction_logits']) > 0.5).float() all_direction_preds.append(dir_preds.cpu()) all_direction_targets.append(targets['direction'].cpu()) num_batches += 1 avg_losses = {k: v / max(num_batches, 1) for k, v in epoch_losses.items()} # Direction accuracy per horizon if all_direction_preds: all_preds = torch.cat(all_direction_preds, dim=0) all_targets = torch.cat(all_direction_targets, dim=0) for i, h in enumerate(self.prediction_horizons): acc = (all_preds[:, i] == all_targets[:, i]).float().mean().item() avg_losses[f'direction_acc_{h}'] = acc return avg_losses def train(self, train_loader: DataLoader, val_loader: DataLoader) -> Dict: """ Full training loop with early stopping. Returns training history. """ best_val_loss = float('inf') patience_counter = 0 best_model_state = None print(f"\n{'='*60}") print(f"Starting Training ({self.max_epochs} max epochs)") print(f"{'='*60}") for epoch in range(self.max_epochs): start = time.time() # Train train_losses = self.train_epoch(train_loader) # Validate val_metrics = self.validate(val_loader) # Update scheduler self.scheduler.step() elapsed = time.time() - start # Log epoch_record = { 'epoch': epoch + 1, 'train_loss': train_losses['total'], 'val_loss': val_metrics['total'], 'train_dir_loss': train_losses['direction'], 'val_dir_loss': val_metrics['direction'], 'train_ret_loss': train_losses['return'], 'val_ret_loss': val_metrics['return'], 'lr': self.optimizer.param_groups[0]['lr'], 'elapsed': elapsed, } for h in self.prediction_horizons: key = f'direction_acc_{h}' if key in val_metrics: epoch_record[key] = val_metrics[key] self.training_history.append(epoch_record) # Print progress acc_str = " | ".join([ f"DA-{h}d: {val_metrics.get(f'direction_acc_{h}', 0):.1%}" for h in self.prediction_horizons ]) print( f"Epoch {epoch+1:3d}/{self.max_epochs} | " f"Train: {train_losses['total']:.4f} | " f"Val: {val_metrics['total']:.4f} | " f"{acc_str} | " f"LR: {self.optimizer.param_groups[0]['lr']:.6f} | " f"{elapsed:.1f}s" ) # Early stopping if val_metrics['total'] < best_val_loss: best_val_loss = val_metrics['total'] patience_counter = 0 best_model_state = {k: v.cpu().clone() for k, v in self.model.state_dict().items()} else: patience_counter += 1 if patience_counter >= self.patience: print(f"\nEarly stopping at epoch {epoch+1} (patience={self.patience})") break # Restore best model if best_model_state: self.model.load_state_dict(best_model_state) self.model.to(self.device) print(f"Restored best model (val_loss={best_val_loss:.4f})") return { 'best_val_loss': best_val_loss, 'total_epochs': len(self.training_history), 'history': self.training_history, } def save_model(self, path: str): """Save model and training artifacts.""" os.makedirs(os.path.dirname(path) if os.path.dirname(path) else '.', exist_ok=True) save_dict = { 'model_state': self.model.state_dict(), 'loss_fn_state': self.loss_fn.state_dict(), 'norm_params': self.norm_params if hasattr(self, 'norm_params') else {}, 'feature_names': self.feature_engine.feature_names, 'config': { 'lookback_window': self.lookback_window, 'prediction_horizons': self.prediction_horizons, 'num_channels': self.num_channels, 'd_model': self.d_model, 'n_heads': self.n_heads, 'n_layers': self.n_layers, 'd_ff': self.d_ff, 'patch_len': self.patch_len, 'stride': self.stride, 'dropout': self.dropout, }, 'training_history': self.training_history, } torch.save(save_dict, path) print(f"Model saved to {path}") def load_model(self, path: str): """Load model from checkpoint.""" checkpoint = torch.load(path, map_location=self.device, weights_only=False) config = checkpoint['config'] # Restore all architecture params from checkpoint self.num_channels = config['num_channels'] self.d_model = config.get('d_model', self.d_model) self.n_heads = config.get('n_heads', self.n_heads) self.n_layers = config.get('n_layers', self.n_layers) self.d_ff = config.get('d_ff', self.d_ff) self.patch_len = config.get('patch_len', self.patch_len) self.stride = config.get('stride', self.stride) self.dropout = config.get('dropout', self.dropout) self.lookback_window = config.get('lookback_window', self.lookback_window) if 'prediction_horizons' in config: self.prediction_horizons = config['prediction_horizons'] self._init_model() self.model.load_state_dict(checkpoint['model_state']) self.loss_fn.load_state_dict(checkpoint['loss_fn_state']) self.norm_params = checkpoint.get('norm_params', {}) self.feature_engine.feature_names = checkpoint.get('feature_names', []) self.training_history = checkpoint.get('training_history', []) print(f"Model loaded from {path}")