import os import pickle import joblib import pandas as pd import numpy as np from statsmodels.tsa.arima.model import ARIMA from sklearn.preprocessing import MinMaxScaler import warnings warnings.filterwarnings('ignore') class StockForecaster: """ A comprehensive stock forecasting class that combines ARIMA and LSTM models. """ def __init__(self): self.arima_model = None self.lstm_model = None self.scaler = MinMaxScaler() self.optimal_arima_params = (1, 1, 1) # Default parameters def preprocess_data(self, data): """ Preprocess stock data for forecasting. Args: data (pd.DataFrame): Stock data with 'date' and 'close' columns Returns: pd.Series: Processed time series data """ if isinstance(data, pd.DataFrame): if 'date' in data.columns: data['date'] = pd.to_datetime(data['date']) data = data.set_index('date').sort_index() if 'close' in data.columns: return data['close'].dropna() return data.dropna() def find_optimal_arima_params(self, ts_data, max_p=3, max_d=2, max_q=3): """ Find optimal ARIMA parameters using AIC criterion. Args: ts_data (pd.Series): Time series data max_p, max_d, max_q (int): Maximum values for ARIMA parameters Returns: tuple: Optimal (p, d, q) parameters """ best_aic = np.inf best_params = (1, 1, 1) for p in range(max_p + 1): for d in range(max_d + 1): for q in range(max_q + 1): try: model = ARIMA(ts_data, order=(p, d, q)) fitted_model = model.fit() aic = fitted_model.aic if aic < best_aic: best_aic = aic best_params = (p, d, q) except: continue self.optimal_arima_params = best_params return best_params def train_arima(self, ts_data): """ Train ARIMA model on time series data. Args: ts_data (pd.Series): Time series data Returns: statsmodels.ARIMAResults: Fitted ARIMA model """ try: # Find optimal parameters if not set if self.optimal_arima_params == (1, 1, 1): self.find_optimal_arima_params(ts_data) # Fit ARIMA model arima_model = ARIMA(ts_data, order=self.optimal_arima_params) self.arima_model = arima_model.fit() return self.arima_model except Exception as e: print(f"ARIMA training error: {e}") return None def create_lstm_sequences(self, data, sequence_length=60): """ Create sequences for LSTM training. Args: data (np.array): Scaled time series data sequence_length (int): Length of input sequences Returns: tuple: (X, y) arrays for LSTM training """ X, y = [], [] for i in range(sequence_length, len(data)): X.append(data[i-sequence_length:i]) y.append(data[i]) return np.array(X), np.array(y) def train_simple_lstm(self, ts_data, sequence_length=60): """ Train a simplified LSTM model or use trend-based prediction. Args: ts_data (pd.Series): Time series data sequence_length (int): Sequence length for LSTM Returns: dict: Model information and scaler """ try: # Scale the data scaled_data = self.scaler.fit_transform(ts_data.values.reshape(-1, 1)) # For demo purposes, we'll use a trend-based approach # In production, you'd train an actual LSTM here self.lstm_model = { 'type': 'trend_based', 'data': scaled_data, 'sequence_length': sequence_length } return self.lstm_model except Exception as e: print(f"LSTM training error: {e}") return None def forecast_arima(self, steps=30): """ Generate ARIMA forecast. Args: steps (int): Number of steps to forecast Returns: np.array: Forecasted values """ if self.arima_model is None: raise ValueError("ARIMA model not trained. Call train_arima first.") try: forecast = self.arima_model.forecast(steps=steps) return forecast.values if hasattr(forecast, 'values') else forecast except Exception as e: print(f"ARIMA forecast error: {e}") return None def forecast_lstm(self, steps=30): """ Generate LSTM forecast (simplified trend-based approach). Args: steps (int): Number of steps to forecast Returns: np.array: Forecasted values """ if self.lstm_model is None: raise ValueError("LSTM model not trained. Call train_simple_lstm first.") try: # Simple trend-based forecast for demo scaled_data = self.lstm_model['data'] sequence_length = self.lstm_model['sequence_length'] # Calculate recent trend recent_data = scaled_data[-min(10, len(scaled_data)):] recent_trend = np.mean(np.diff(recent_data.flatten())) # Generate forecast last_value = scaled_data[-1][0] forecast_scaled = [] for i in range(steps): next_val = last_value + recent_trend * (i + 1) * 0.1 forecast_scaled.append([next_val]) # Inverse transform forecast = self.scaler.inverse_transform(forecast_scaled).flatten() return forecast except Exception as e: print(f"LSTM forecast error: {e}") return None def save_models(self, filepath_prefix): """ Save trained models to disk. Args: filepath_prefix (str): Prefix for saved model files """ try: if self.arima_model is not None: with open(f"{filepath_prefix}_arima.pkl", 'wb') as f: pickle.dump(self.arima_model, f) if self.lstm_model is not None: joblib.dump(self.lstm_model, f"{filepath_prefix}_lstm.pkl") joblib.dump(self.scaler, f"{filepath_prefix}_scaler.pkl") print(f"Models saved with prefix: {filepath_prefix}") except Exception as e: print(f"Error saving models: {e}") def load_models(self, filepath_prefix): """ Load trained models from disk. Args: filepath_prefix (str): Prefix for saved model files """ try: # Load ARIMA model arima_path = f"{filepath_prefix}_arima.pkl" if os.path.exists(arima_path): with open(arima_path, 'rb') as f: self.arima_model = pickle.load(f) # Load LSTM model lstm_path = f"{filepath_prefix}_lstm.pkl" if os.path.exists(lstm_path): self.lstm_model = joblib.load(lstm_path) # Load scaler scaler_path = f"{filepath_prefix}_scaler.pkl" if os.path.exists(scaler_path): self.scaler = joblib.load(scaler_path) print(f"Models loaded from prefix: {filepath_prefix}") except Exception as e: print(f"Error loading models: {e}") # Example usage and model training script if __name__ == "__main__": # This would be run to pre-train models forecaster = StockForecaster() # Example with sample data dates = pd.date_range('2020-01-01', periods=1000, freq='D') prices = 100 + np.cumsum(np.random.randn(1000) * 0.5) sample_data = pd.DataFrame({ 'date': dates, 'close': prices }) # Preprocess and train ts_data = forecaster.preprocess_data(sample_data) print("Training ARIMA model...") forecaster.train_arima(ts_data) print("Training LSTM model...") forecaster.train_simple_lstm(ts_data) # Generate forecasts arima_forecast = forecaster.forecast_arima(30) lstm_forecast = forecaster.forecast_lstm(30) print(f"ARIMA forecast shape: {arima_forecast.shape if arima_forecast is not None else 'None'}") print(f"LSTM forecast shape: {lstm_forecast.shape if lstm_forecast is not None else 'None'}") # Save models forecaster.save_models("models/stock_forecaster")