avinashhm's picture
Fix: trading_intelligence/training.py - all 174 tests passing
244371a verified
"""
Training Pipeline
==================
End-to-end training with multi-task learning,
data loading, and proper financial time-series splits.
"""
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional
import os
import json
import time
from trading_intelligence.feature_engine import FeatureEngine
from trading_intelligence.prediction_model import TradingTransformer, MultiTaskLoss
class FinancialTimeSeriesDataset(Dataset):
"""
PyTorch Dataset for financial time series.
Uses walk-forward split (no random shuffling to preserve temporal order).
"""
def __init__(self, X: np.ndarray, y: np.ndarray):
"""
Args:
X: (N, num_features, lookback_window) feature sequences
y: (N, num_targets) target values
"""
self.X = torch.FloatTensor(X)
self.y = torch.FloatTensor(y)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
class TrainingPipeline:
"""
Complete training pipeline for the trading intelligence system.
Features:
1. Data loading and feature engineering
2. Walk-forward temporal splits
3. Multi-task training (direction + return + risk)
4. Learning rate scheduling
5. Early stopping with patience
6. Comprehensive logging
"""
def __init__(
self,
lookback_window: int = 60,
prediction_horizons: List[int] = [1, 5, 20],
d_model: int = 128,
n_heads: int = 8,
n_layers: int = 3,
d_ff: int = 256,
patch_len: int = 8,
stride: int = 4,
dropout: float = 0.1,
learning_rate: float = 1e-3,
batch_size: int = 64,
max_epochs: int = 100,
patience: int = 10,
device: str = 'auto',
):
self.lookback_window = lookback_window
self.prediction_horizons = prediction_horizons
self.d_model = d_model
self.n_heads = n_heads
self.n_layers = n_layers
self.d_ff = d_ff
self.patch_len = patch_len
self.stride = stride
self.dropout = dropout
self.learning_rate = learning_rate
self.batch_size = batch_size
self.max_epochs = max_epochs
self.patience = patience
if device == 'auto':
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
else:
self.device = torch.device(device)
self.feature_engine = FeatureEngine(lookback_window, prediction_horizons)
self.model = None
self.loss_fn = None
self.optimizer = None
self.scheduler = None
self.training_history = []
def prepare_data(self, df: pd.DataFrame,
train_ratio: float = 0.7,
val_ratio: float = 0.15) -> Tuple[DataLoader, DataLoader, DataLoader]:
"""
Prepare data with walk-forward temporal splits.
Args:
df: Raw OHLCV DataFrame
train_ratio: Fraction for training (earliest data)
val_ratio: Fraction for validation (middle)
Returns:
train_loader, val_loader, test_loader
"""
# Feature engineering
features_df = self.feature_engine.compute_all_features(df)
# Normalize features
features_df, self.norm_params = self.feature_engine.normalize_features(features_df)
# Create target columns
target_cols = []
for h in self.prediction_horizons:
target_cols.extend([f'target_direction_{h}', f'target_return_{h}'])
# Create sequences
X, y = self.feature_engine.create_sequences(features_df, target_cols=target_cols)
# Remove any NaN/Inf
valid_mask = np.isfinite(X).all(axis=(1, 2)) & np.isfinite(y).all(axis=1)
X = X[valid_mask]
y = y[valid_mask]
print(f"Total valid samples: {len(X)}")
print(f"Features per sample: {X.shape[1]} channels x {X.shape[2]} timesteps")
print(f"Targets per sample: {y.shape[1]}")
# Temporal split (NO shuffling - preserves time order)
n = len(X)
train_end = int(n * train_ratio)
val_end = int(n * (train_ratio + val_ratio))
X_train, y_train = X[:train_end], y[:train_end]
X_val, y_val = X[train_end:val_end], y[train_end:val_end]
X_test, y_test = X[val_end:], y[val_end:]
print(f"Train: {len(X_train)}, Val: {len(X_val)}, Test: {len(X_test)}")
# Store test data for evaluation
self.X_test = X_test
self.y_test = y_test
# Create DataLoaders
train_dataset = FinancialTimeSeriesDataset(X_train, y_train)
val_dataset = FinancialTimeSeriesDataset(X_val, y_val)
test_dataset = FinancialTimeSeriesDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=self.batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False)
# Initialize model with correct number of channels
self.num_channels = X.shape[1]
self._init_model()
return train_loader, val_loader, test_loader
def _init_model(self):
"""Initialize model, loss, optimizer, and scheduler."""
self.model = TradingTransformer(
num_channels=self.num_channels,
seq_len=self.lookback_window,
patch_len=self.patch_len,
stride=self.stride,
d_model=self.d_model,
n_heads=self.n_heads,
n_layers=self.n_layers,
d_ff=self.d_ff,
num_horizons=len(self.prediction_horizons),
dropout=self.dropout,
).to(self.device)
self.loss_fn = MultiTaskLoss(
num_horizons=len(self.prediction_horizons)
).to(self.device)
total_params = sum(p.numel() for p in self.model.parameters())
print(f"Model initialized: {total_params:,} parameters")
print(f"Device: {self.device}")
self.optimizer = optim.AdamW(
list(self.model.parameters()) + list(self.loss_fn.parameters()),
lr=self.learning_rate,
weight_decay=1e-4
)
self.scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(
self.optimizer, T_0=10, T_mult=2
)
def _parse_targets(self, y_batch: torch.Tensor) -> Dict[str, torch.Tensor]:
"""Parse target tensor into direction and return components."""
num_horizons = len(self.prediction_horizons)
# y layout: [dir_1, ret_1, dir_5, ret_5, dir_20, ret_20]
directions = torch.stack([y_batch[:, i*2] for i in range(num_horizons)], dim=1)
returns = torch.stack([y_batch[:, i*2+1] for i in range(num_horizons)], dim=1)
return {
'direction': directions,
'returns': returns,
}
def train_epoch(self, train_loader: DataLoader) -> Dict[str, float]:
"""Train for one epoch."""
self.model.train()
epoch_losses = {'total': 0, 'direction': 0, 'return': 0, 'risk': 0}
num_batches = 0
for X_batch, y_batch in train_loader:
X_batch = X_batch.to(self.device)
y_batch = y_batch.to(self.device)
# Forward pass
predictions = self.model(X_batch)
targets = self._parse_targets(y_batch)
# Compute loss
losses = self.loss_fn(predictions, targets)
# Backward pass
self.optimizer.zero_grad()
losses['total_loss'].backward()
# Gradient clipping
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.optimizer.step()
# Accumulate
epoch_losses['total'] += losses['total_loss'].item()
epoch_losses['direction'] += losses['direction_loss'].item()
epoch_losses['return'] += losses['return_loss'].item()
epoch_losses['risk'] += losses['risk_loss'].item()
num_batches += 1
return {k: v / max(num_batches, 1) for k, v in epoch_losses.items()}
@torch.no_grad()
def validate(self, val_loader: DataLoader) -> Dict[str, float]:
"""Validate model."""
self.model.eval()
epoch_losses = {'total': 0, 'direction': 0, 'return': 0, 'risk': 0}
all_direction_preds = []
all_direction_targets = []
num_batches = 0
for X_batch, y_batch in val_loader:
X_batch = X_batch.to(self.device)
y_batch = y_batch.to(self.device)
predictions = self.model(X_batch)
targets = self._parse_targets(y_batch)
losses = self.loss_fn(predictions, targets)
epoch_losses['total'] += losses['total_loss'].item()
epoch_losses['direction'] += losses['direction_loss'].item()
epoch_losses['return'] += losses['return_loss'].item()
epoch_losses['risk'] += losses['risk_loss'].item()
# Track direction accuracy
dir_preds = (torch.sigmoid(predictions['direction_logits']) > 0.5).float()
all_direction_preds.append(dir_preds.cpu())
all_direction_targets.append(targets['direction'].cpu())
num_batches += 1
avg_losses = {k: v / max(num_batches, 1) for k, v in epoch_losses.items()}
# Direction accuracy per horizon
if all_direction_preds:
all_preds = torch.cat(all_direction_preds, dim=0)
all_targets = torch.cat(all_direction_targets, dim=0)
for i, h in enumerate(self.prediction_horizons):
acc = (all_preds[:, i] == all_targets[:, i]).float().mean().item()
avg_losses[f'direction_acc_{h}'] = acc
return avg_losses
def train(self, train_loader: DataLoader, val_loader: DataLoader) -> Dict:
"""
Full training loop with early stopping.
Returns training history.
"""
best_val_loss = float('inf')
patience_counter = 0
best_model_state = None
print(f"\n{'='*60}")
print(f"Starting Training ({self.max_epochs} max epochs)")
print(f"{'='*60}")
for epoch in range(self.max_epochs):
start = time.time()
# Train
train_losses = self.train_epoch(train_loader)
# Validate
val_metrics = self.validate(val_loader)
# Update scheduler
self.scheduler.step()
elapsed = time.time() - start
# Log
epoch_record = {
'epoch': epoch + 1,
'train_loss': train_losses['total'],
'val_loss': val_metrics['total'],
'train_dir_loss': train_losses['direction'],
'val_dir_loss': val_metrics['direction'],
'train_ret_loss': train_losses['return'],
'val_ret_loss': val_metrics['return'],
'lr': self.optimizer.param_groups[0]['lr'],
'elapsed': elapsed,
}
for h in self.prediction_horizons:
key = f'direction_acc_{h}'
if key in val_metrics:
epoch_record[key] = val_metrics[key]
self.training_history.append(epoch_record)
# Print progress
acc_str = " | ".join([
f"DA-{h}d: {val_metrics.get(f'direction_acc_{h}', 0):.1%}"
for h in self.prediction_horizons
])
print(
f"Epoch {epoch+1:3d}/{self.max_epochs} | "
f"Train: {train_losses['total']:.4f} | "
f"Val: {val_metrics['total']:.4f} | "
f"{acc_str} | "
f"LR: {self.optimizer.param_groups[0]['lr']:.6f} | "
f"{elapsed:.1f}s"
)
# Early stopping
if val_metrics['total'] < best_val_loss:
best_val_loss = val_metrics['total']
patience_counter = 0
best_model_state = {k: v.cpu().clone() for k, v in self.model.state_dict().items()}
else:
patience_counter += 1
if patience_counter >= self.patience:
print(f"\nEarly stopping at epoch {epoch+1} (patience={self.patience})")
break
# Restore best model
if best_model_state:
self.model.load_state_dict(best_model_state)
self.model.to(self.device)
print(f"Restored best model (val_loss={best_val_loss:.4f})")
return {
'best_val_loss': best_val_loss,
'total_epochs': len(self.training_history),
'history': self.training_history,
}
def save_model(self, path: str):
"""Save model and training artifacts."""
os.makedirs(os.path.dirname(path) if os.path.dirname(path) else '.', exist_ok=True)
save_dict = {
'model_state': self.model.state_dict(),
'loss_fn_state': self.loss_fn.state_dict(),
'norm_params': self.norm_params if hasattr(self, 'norm_params') else {},
'feature_names': self.feature_engine.feature_names,
'config': {
'lookback_window': self.lookback_window,
'prediction_horizons': self.prediction_horizons,
'num_channels': self.num_channels,
'd_model': self.d_model,
'n_heads': self.n_heads,
'n_layers': self.n_layers,
'd_ff': self.d_ff,
'patch_len': self.patch_len,
'stride': self.stride,
'dropout': self.dropout,
},
'training_history': self.training_history,
}
torch.save(save_dict, path)
print(f"Model saved to {path}")
def load_model(self, path: str):
"""Load model from checkpoint."""
checkpoint = torch.load(path, map_location=self.device, weights_only=False)
config = checkpoint['config']
# Restore all architecture params from checkpoint
self.num_channels = config['num_channels']
self.d_model = config.get('d_model', self.d_model)
self.n_heads = config.get('n_heads', self.n_heads)
self.n_layers = config.get('n_layers', self.n_layers)
self.d_ff = config.get('d_ff', self.d_ff)
self.patch_len = config.get('patch_len', self.patch_len)
self.stride = config.get('stride', self.stride)
self.dropout = config.get('dropout', self.dropout)
self.lookback_window = config.get('lookback_window', self.lookback_window)
if 'prediction_horizons' in config:
self.prediction_horizons = config['prediction_horizons']
self._init_model()
self.model.load_state_dict(checkpoint['model_state'])
self.loss_fn.load_state_dict(checkpoint['loss_fn_state'])
self.norm_params = checkpoint.get('norm_params', {})
self.feature_engine.feature_names = checkpoint.get('feature_names', [])
self.training_history = checkpoint.get('training_history', [])
print(f"Model loaded from {path}")