import pandas as pd import numpy as np import torch import torch.nn as nn import datetime import yfinance as yf import joblib from sklearn.preprocessing import MinMaxScaler import json from tqdm import tqdm import os from typing import List, Dict, Any, Union, Tuple class BiLSTMModel(nn.Module): def __init__(self, input_size=1, hidden_size=64, num_layers=2, output_size=1): super(BiLSTMModel, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers # BiLSTM layers self.lstm = nn.LSTM( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True, bidirectional=True ) # Fully connected layer self.fc = nn.Linear(hidden_size * 2, output_size) # *2 because bidirectional def forward(self, x): # Initialize hidden state and cell state batch_size = x.size(0) h0 = torch.zeros(self.num_layers * 2, batch_size, self.hidden_size).to(x.device) # *2 because bidirectional c0 = torch.zeros(self.num_layers * 2, batch_size, self.hidden_size).to(x.device) # Forward propagate LSTM out, _ = self.lstm(x, (h0, c0)) # Get output from last time step out = self.fc(out[:, -1, :]) return out def predict_future(model, last_sequence, steps, scaler_diff, current_price): """Predict future values using trained model and GBM.""" model.eval() # Initialize arrays for differences and actual prices future_prices = [] future_prices.append(current_price) # Create a copy of the last sequence for prediction current_sequence = last_sequence.clone() # Parameters for Geometric Brownian Motion # Using default parameters if historical data isn't available daily_mu = 0.0002 # Default daily drift daily_sigma = 0.02 # Default daily volatility device = next(model.parameters()).device for _ in range(steps): with torch.no_grad(): # Get model prediction for next difference current_sequence_tensor = current_sequence.unsqueeze(0).to(device) pred_diff_scaled = model(current_sequence_tensor) # Inverse transform to get actual difference pred_diff = scaler_diff.inverse_transform(pred_diff_scaled.cpu().numpy())[0][0] # Use GBM to add stochastic component to the predicted difference dt = 1 # One day drift = (daily_mu - 0.5 * daily_sigma**2) * dt diffusion = daily_sigma * np.sqrt(dt) * np.random.normal(0, 1) # Combine model prediction with GBM stochastic_factor = np.exp(drift + diffusion) adjustment = current_price * (stochastic_factor - 1) # Blend model prediction with GBM blend_weight = 0.7 # Higher weight to model prediction blended_diff = (blend_weight * pred_diff) + ((1 - blend_weight) * adjustment) # Calculate next price next_price = current_price + blended_diff # Ensure price doesn't go negative next_price = max(0.01, next_price) # Store results future_prices.append(next_price) # Update current price current_price = next_price # Update sequence for next prediction (with the scaled difference) new_diff_scaled = torch.tensor([[pred_diff_scaled.item()]], dtype=torch.float32) current_sequence = torch.cat([current_sequence[1:], new_diff_scaled], dim=0) future_prices = np.array(future_prices[1:]).reshape(-1, 1) # Remove the initial price return future_prices def fetch_and_prepare_data(ticker_symbol: str, seq_length: int) -> Tuple[np.ndarray, float, pd.DatetimeIndex]: """Fetch ticker data and prepare it for prediction.""" # Fetch data using yfinance ticker = yf.Ticker(ticker_symbol) df = ticker.history(period="max",interval='1d') # Make sure the data has a Close column if 'Close' not in df.columns: raise ValueError(f"No 'Close' price data available for {ticker_symbol}") # Extract closing prices close_prices = df['Close'].values.astype(float).reshape(-1, 1) # Create differenced data diff_close_prices = np.diff(close_prices, axis=0) # Get the last price (for starting predictions) last_price = close_prices[-1][0] # Get the dates dates = df.index # If we don't have enough data for the sequence length, pad with zeros if len(diff_close_prices) < seq_length: padding = np.zeros((seq_length - len(diff_close_prices), 1)) diff_close_prices = np.vstack([padding, diff_close_prices]) return diff_close_prices, last_price, dates, df def predict_stock_prices( ticker_symbols: List[str], model_path: str, scaler_path: str, metadata_path: str ) -> Dict[str, Any]: """ Predict stock prices for multiple ticker symbols for -15 to +15 years. Args: ticker_symbols: List of ticker symbols to predict model_path: Path to the trained BiLSTM model scaler_path: Path to the saved scaler for differences metadata_path: Path to the saved model metadata Returns: Dictionary with ticker symbols as keys and arrays of dates and prices as values """ # Set random seeds for reproducibility torch.manual_seed(42) np.random.seed(42) # Load the model, scaler, and metadata device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # Load model metadata model_metadata = joblib.load(metadata_path) seq_length = model_metadata['seq_length'] # Initialize and load the model model = BiLSTMModel().to(device) model.load_state_dict(torch.load(model_path, map_location=device)) model.eval() # Load the scaler scaler_diff = joblib.load(scaler_path) # Trading days per year (approximately) trading_days_per_year = 252 # Prepare the result dictionary result = {} # Process each ticker symbol for symbol in tqdm(ticker_symbols, desc="Processing tickers"): try: # Fetch and prepare data diff_close_prices, last_price, historical_dates, df = fetch_and_prepare_data(symbol, seq_length) # Scale the differenced data diff_scaled = scaler_diff.transform(diff_close_prices[-seq_length:]) # Convert to tensor last_diff_sequence = torch.tensor(diff_scaled, dtype=torch.float32) # Calculate the number of days to predict (15 years) future_days = trading_days_per_year * 15 # Predict future prices future_prices = predict_future(model, last_diff_sequence, future_days, scaler_diff, last_price) # Create future dates last_date = historical_dates[-1] future_dates = [last_date + datetime.timedelta(days=i+1) for i in range(future_days)] # Format dates to strings for JSON serialization future_dates_str = [date.strftime('%Y-%m-%d') for date in future_dates] # Get historical dates for past 15 years or as many as available past_days = min(len(historical_dates), trading_days_per_year * 15) historical_subset = historical_dates[-past_days:] historical_prices = df['Close'].values[-past_days:] # Format historical dates to strings historical_dates_str = [date.strftime('%Y-%m-%d') for date in historical_subset] # Combine historical and future data all_dates = historical_dates_str + future_dates_str all_prices = np.concatenate([historical_prices, future_prices.flatten()]) # Store in result dictionary result[symbol] = [ {"date": date, "value": float(value)} for date, value in zip(all_dates, all_prices) ] except Exception as e: print(f"Error processing {symbol}: {str(e)}") result[symbol] = {"error": str(e)} return result def batch_predict_to_json( ticker_symbols: List[str], model_path: str, scaler_path: str, metadata_path: str, output_path: str = "stock_predictions.json" ) -> str: """ Batch predict stock prices and save to JSON file. Args: ticker_symbols: List of ticker symbols model_path: Path to the trained model scaler_path: Path to the saved scaler metadata_path: Path to the saved metadata output_path: Path to save the output JSON Returns: Path to the saved JSON file """ # Get predictions predictions = predict_stock_prices(ticker_symbols, model_path, scaler_path, metadata_path) return predictions # Example usage def get_stock_predictions(tickers): # Example ticker list # tickers = ["AAPL", "MSFT", "GOOGL", "AMZN", "META"] # Paths to saved model files model_path = "bilstm_stock_model.pth" scaler_path = "scaler_diff.pkl" metadata_path = "model_metadata.pkl" # Run batch prediction print('ok') output_file = batch_predict_to_json(tickers, model_path, scaler_path, metadata_path) return output_file