""" LSTM-based stock price forecaster. Uses historical OHLCV data to predict future closing prices. """ import torch import torch.nn as nn import numpy as np import pickle import json import os from datetime import datetime from typing import Dict, List, Optional, Tuple import logging logger = logging.getLogger(__name__) class LSTMModel(nn.Module): """LSTM neural network for time series forecasting.""" def __init__(self, input_size: int = 5, hidden_size: int = 128, num_layers: int = 2, output_size: int = 30, dropout: float = 0.2): super(LSTMModel, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.lstm = nn.LSTM( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0 ) self.fc = nn.Sequential( nn.Linear(hidden_size, 64), nn.ReLU(), nn.Dropout(dropout), nn.Linear(64, output_size) ) def forward(self, x: torch.Tensor) -> torch.Tensor: # x shape: (batch, seq_len, input_size) lstm_out, _ = self.lstm(x) # Take the last output last_output = lstm_out[:, -1, :] # Predict future prices predictions = self.fc(last_output) return predictions class MinMaxScaler: """Simple MinMax scaler for normalization.""" def __init__(self): self.min_vals = None self.max_vals = None self.fitted = False def fit(self, data: np.ndarray) -> 'MinMaxScaler': self.min_vals = data.min(axis=0) self.max_vals = data.max(axis=0) # Avoid division by zero self.range_vals = self.max_vals - self.min_vals self.range_vals[self.range_vals == 0] = 1 self.fitted = True return self def transform(self, data: np.ndarray) -> np.ndarray: if not self.fitted: raise ValueError("Scaler not fitted. Call fit() first.") return (data - self.min_vals) / self.range_vals def fit_transform(self, data: np.ndarray) -> np.ndarray: self.fit(data) return self.transform(data) def inverse_transform(self, data: np.ndarray, col_idx: int = 0) -> np.ndarray: """Inverse transform for a single column (default: close price at index 0).""" if not self.fitted: raise ValueError("Scaler not fitted. Call fit() first.") return data * self.range_vals[col_idx] + self.min_vals[col_idx] class StockForecaster: """ Stock price forecaster using LSTM neural network. The model is lazy-loaded on first use to avoid slow startup times. Supports training on historical OHLCV data and predicting future prices. """ MODEL_DIR = os.path.join(os.path.dirname(__file__), 'forecast_models') def __init__(self, sequence_length: int = 60, forecast_horizon: int = 30): self._models: Dict[str, LSTMModel] = {} self._scalers: Dict[str, MinMaxScaler] = {} self._device = "cuda" if torch.cuda.is_available() else "cpu" self.sequence_length = sequence_length self.forecast_horizon = forecast_horizon # Ensure model directory exists os.makedirs(self.MODEL_DIR, exist_ok=True) def _get_ticker_dir(self, ticker: str) -> str: """Get the directory path for a ticker's model files.""" return os.path.join(self.MODEL_DIR, ticker.upper()) def _load_model(self, ticker: str) -> bool: """ Load a trained model for a ticker from disk. Returns: True if model was loaded successfully, False otherwise. """ ticker = ticker.upper() if ticker in self._models: return True ticker_dir = self._get_ticker_dir(ticker) model_path = os.path.join(ticker_dir, 'model.pt') scaler_path = os.path.join(ticker_dir, 'scaler.pkl') if not os.path.exists(model_path) or not os.path.exists(scaler_path): return False try: logger.info(f"Loading forecast model for {ticker}...") # Load scaler with open(scaler_path, 'rb') as f: self._scalers[ticker] = pickle.load(f) # Build and load model model = LSTMModel(output_size=self.forecast_horizon) model.load_state_dict(torch.load(model_path, map_location=self._device, weights_only=True)) model.to(self._device) model.eval() self._models[ticker] = model logger.info(f"Forecast model for {ticker} loaded successfully on {self._device}") return True except Exception as e: logger.error(f"Failed to load forecast model for {ticker}: {e}") return False def _save_model(self, ticker: str, metadata: Dict) -> None: """Save trained model and scaler to disk.""" ticker = ticker.upper() ticker_dir = self._get_ticker_dir(ticker) os.makedirs(ticker_dir, exist_ok=True) model_path = os.path.join(ticker_dir, 'model.pt') scaler_path = os.path.join(ticker_dir, 'scaler.pkl') metadata_path = os.path.join(ticker_dir, 'metadata.json') # Save model weights torch.save(self._models[ticker].state_dict(), model_path) # Save scaler with open(scaler_path, 'wb') as f: pickle.dump(self._scalers[ticker], f) # Save metadata with open(metadata_path, 'w') as f: json.dump(metadata, f, indent=2) logger.info(f"Forecast model for {ticker} saved to {ticker_dir}") def _prepare_data(self, data: List[Dict]) -> Tuple[np.ndarray, np.ndarray]: """ Prepare OHLCV data for training. Args: data: List of dicts with keys: o, h, l, c, v (open, high, low, close, volume) Returns: Tuple of (X, y) numpy arrays for training """ # Extract OHLCV features - close first for easy inverse transform features = np.array([[d['c'], d['o'], d['h'], d['l'], d['v']] for d in data], dtype=np.float32) # Create sequences X, y = [], [] for i in range(len(features) - self.sequence_length - self.forecast_horizon + 1): X.append(features[i:i + self.sequence_length]) # Target: next forecast_horizon closing prices y.append(features[i + self.sequence_length:i + self.sequence_length + self.forecast_horizon, 0]) return np.array(X), np.array(y) def train(self, ticker: str, data: List[Dict], epochs: int = 50, learning_rate: float = 0.001, batch_size: int = 32) -> Dict: """ Train the LSTM model on historical price data. Args: ticker: Stock ticker symbol data: List of OHLCV dicts (must have at least sequence_length + forecast_horizon entries) epochs: Number of training epochs learning_rate: Learning rate for optimizer batch_size: Training batch size Returns: Dict with training results (loss, metadata) """ ticker = ticker.upper() if len(data) < self.sequence_length + self.forecast_horizon: raise ValueError(f"Insufficient data: need at least {self.sequence_length + self.forecast_horizon} data points") logger.info(f"Training forecast model for {ticker} with {len(data)} data points...") # Prepare data X, y = self._prepare_data(data) # Normalize features scaler = MinMaxScaler() X_flat = X.reshape(-1, X.shape[-1]) scaler.fit(X_flat) X_scaled = np.array([scaler.transform(seq) for seq in X]) # Normalize targets using close price stats y_scaled = (y - scaler.min_vals[0]) / scaler.range_vals[0] self._scalers[ticker] = scaler # Convert to tensors X_tensor = torch.FloatTensor(X_scaled).to(self._device) y_tensor = torch.FloatTensor(y_scaled).to(self._device) # Split train/validation (80/20) split_idx = int(len(X_tensor) * 0.8) X_train, X_val = X_tensor[:split_idx], X_tensor[split_idx:] y_train, y_val = y_tensor[:split_idx], y_tensor[split_idx:] # Build model model = LSTMModel(output_size=self.forecast_horizon) model.to(self._device) criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) # Training loop best_val_loss = float('inf') train_losses = [] for epoch in range(epochs): model.train() epoch_loss = 0 # Mini-batch training for i in range(0, len(X_train), batch_size): batch_X = X_train[i:i + batch_size] batch_y = y_train[i:i + batch_size] optimizer.zero_grad() outputs = model(batch_X) loss = criterion(outputs, batch_y) loss.backward() optimizer.step() epoch_loss += loss.item() avg_train_loss = epoch_loss / (len(X_train) // batch_size + 1) train_losses.append(avg_train_loss) # Validation model.eval() with torch.no_grad(): val_outputs = model(X_val) val_loss = criterion(val_outputs, y_val).item() if val_loss < best_val_loss: best_val_loss = val_loss if (epoch + 1) % 10 == 0: logger.info(f"Epoch {epoch + 1}/{epochs} - Train Loss: {avg_train_loss:.6f}, Val Loss: {val_loss:.6f}") self._models[ticker] = model # Save model and metadata metadata = { "ticker": ticker, "trained_at": datetime.utcnow().isoformat() + "Z", "training_epochs": epochs, "final_train_loss": float(train_losses[-1]), "final_val_loss": float(best_val_loss), "data_points": len(data), "sequence_length": self.sequence_length, "forecast_horizon": self.forecast_horizon, "model_version": "1.0" } self._save_model(ticker, metadata) logger.info(f"Training complete for {ticker}. Final val loss: {best_val_loss:.6f}") return { "status": "training_complete", "ticker": ticker, "epochs": epochs, "final_loss": float(train_losses[-1]), "validation_loss": float(best_val_loss), "data_points": len(data) } def predict(self, ticker: str, recent_data: List[Dict]) -> Dict: """ Generate price forecast using trained model. Args: ticker: Stock ticker symbol recent_data: Most recent OHLCV data (at least sequence_length entries) Returns: Dict with predictions and confidence bounds """ ticker = ticker.upper() # Load model if not in memory if ticker not in self._models: if not self._load_model(ticker): raise ValueError(f"No trained model found for {ticker}. Train the model first.") if len(recent_data) < self.sequence_length: raise ValueError(f"Need at least {self.sequence_length} data points for prediction") # Use last sequence_length data points data = recent_data[-self.sequence_length:] features = np.array([[d['c'], d['o'], d['h'], d['l'], d['v']] for d in data], dtype=np.float32) # Normalize scaler = self._scalers[ticker] features_scaled = scaler.transform(features) # Predict model = self._models[ticker] model.eval() X = torch.FloatTensor(features_scaled).unsqueeze(0).to(self._device) with torch.no_grad(): predictions_scaled = model(X).cpu().numpy()[0] # Inverse transform predictions predictions = scaler.inverse_transform(predictions_scaled, col_idx=0) # Calculate confidence bounds (simple approach: +/- percentage based on historical volatility) recent_closes = [d['c'] for d in recent_data[-30:]] volatility = np.std(recent_closes) / np.mean(recent_closes) confidence_pct = max(0.02, min(0.10, volatility * 2)) # 2-10% bounds upper_bound = predictions * (1 + confidence_pct) lower_bound = predictions * (1 - confidence_pct) # Get last date from data for generating forecast dates last_timestamp = recent_data[-1].get('t', 0) return { "predictions": predictions.tolist(), "upper_bound": upper_bound.tolist(), "lower_bound": lower_bound.tolist(), "last_timestamp": last_timestamp, "forecast_horizon": self.forecast_horizon } def has_model(self, ticker: str) -> bool: """Check if a trained model exists for the ticker.""" ticker = ticker.upper() if ticker in self._models: return True ticker_dir = self._get_ticker_dir(ticker) return os.path.exists(os.path.join(ticker_dir, 'model.pt')) def get_model_metadata(self, ticker: str) -> Optional[Dict]: """Get metadata for a trained model.""" ticker = ticker.upper() metadata_path = os.path.join(self._get_ticker_dir(ticker), 'metadata.json') if not os.path.exists(metadata_path): return None with open(metadata_path, 'r') as f: return json.load(f) def unload_model(self, ticker: str) -> None: """Unload a model from memory to free resources.""" ticker = ticker.upper() if ticker in self._models: del self._models[ticker] del self._scalers[ticker] if torch.cuda.is_available(): torch.cuda.empty_cache() logger.info(f"Forecast model for {ticker} unloaded") # Singleton instance _forecaster_instance: Optional[StockForecaster] = None def get_stock_forecaster() -> StockForecaster: """Get or create the singleton stock forecaster instance.""" global _forecaster_instance if _forecaster_instance is None: _forecaster_instance = StockForecaster() return _forecaster_instance