forecast / model_utils.py
samim-reza's picture
Model uploaded
f828b45 verified
import os
import pickle
import joblib
import pandas as pd
import numpy as np
from statsmodels.tsa.arima.model import ARIMA
from sklearn.preprocessing import MinMaxScaler
import warnings
warnings.filterwarnings('ignore')
class StockForecaster:
"""
A comprehensive stock forecasting class that combines ARIMA and LSTM models.
"""
def __init__(self):
self.arima_model = None
self.lstm_model = None
self.scaler = MinMaxScaler()
self.optimal_arima_params = (1, 1, 1) # Default parameters
def preprocess_data(self, data):
"""
Preprocess stock data for forecasting.
Args:
data (pd.DataFrame): Stock data with 'date' and 'close' columns
Returns:
pd.Series: Processed time series data
"""
if isinstance(data, pd.DataFrame):
if 'date' in data.columns:
data['date'] = pd.to_datetime(data['date'])
data = data.set_index('date').sort_index()
if 'close' in data.columns:
return data['close'].dropna()
return data.dropna()
def find_optimal_arima_params(self, ts_data, max_p=3, max_d=2, max_q=3):
"""
Find optimal ARIMA parameters using AIC criterion.
Args:
ts_data (pd.Series): Time series data
max_p, max_d, max_q (int): Maximum values for ARIMA parameters
Returns:
tuple: Optimal (p, d, q) parameters
"""
best_aic = np.inf
best_params = (1, 1, 1)
for p in range(max_p + 1):
for d in range(max_d + 1):
for q in range(max_q + 1):
try:
model = ARIMA(ts_data, order=(p, d, q))
fitted_model = model.fit()
aic = fitted_model.aic
if aic < best_aic:
best_aic = aic
best_params = (p, d, q)
except:
continue
self.optimal_arima_params = best_params
return best_params
def train_arima(self, ts_data):
"""
Train ARIMA model on time series data.
Args:
ts_data (pd.Series): Time series data
Returns:
statsmodels.ARIMAResults: Fitted ARIMA model
"""
try:
# Find optimal parameters if not set
if self.optimal_arima_params == (1, 1, 1):
self.find_optimal_arima_params(ts_data)
# Fit ARIMA model
arima_model = ARIMA(ts_data, order=self.optimal_arima_params)
self.arima_model = arima_model.fit()
return self.arima_model
except Exception as e:
print(f"ARIMA training error: {e}")
return None
def create_lstm_sequences(self, data, sequence_length=60):
"""
Create sequences for LSTM training.
Args:
data (np.array): Scaled time series data
sequence_length (int): Length of input sequences
Returns:
tuple: (X, y) arrays for LSTM training
"""
X, y = [], []
for i in range(sequence_length, len(data)):
X.append(data[i-sequence_length:i])
y.append(data[i])
return np.array(X), np.array(y)
def train_simple_lstm(self, ts_data, sequence_length=60):
"""
Train a simplified LSTM model or use trend-based prediction.
Args:
ts_data (pd.Series): Time series data
sequence_length (int): Sequence length for LSTM
Returns:
dict: Model information and scaler
"""
try:
# Scale the data
scaled_data = self.scaler.fit_transform(ts_data.values.reshape(-1, 1))
# For demo purposes, we'll use a trend-based approach
# In production, you'd train an actual LSTM here
self.lstm_model = {
'type': 'trend_based',
'data': scaled_data,
'sequence_length': sequence_length
}
return self.lstm_model
except Exception as e:
print(f"LSTM training error: {e}")
return None
def forecast_arima(self, steps=30):
"""
Generate ARIMA forecast.
Args:
steps (int): Number of steps to forecast
Returns:
np.array: Forecasted values
"""
if self.arima_model is None:
raise ValueError("ARIMA model not trained. Call train_arima first.")
try:
forecast = self.arima_model.forecast(steps=steps)
return forecast.values if hasattr(forecast, 'values') else forecast
except Exception as e:
print(f"ARIMA forecast error: {e}")
return None
def forecast_lstm(self, steps=30):
"""
Generate LSTM forecast (simplified trend-based approach).
Args:
steps (int): Number of steps to forecast
Returns:
np.array: Forecasted values
"""
if self.lstm_model is None:
raise ValueError("LSTM model not trained. Call train_simple_lstm first.")
try:
# Simple trend-based forecast for demo
scaled_data = self.lstm_model['data']
sequence_length = self.lstm_model['sequence_length']
# Calculate recent trend
recent_data = scaled_data[-min(10, len(scaled_data)):]
recent_trend = np.mean(np.diff(recent_data.flatten()))
# Generate forecast
last_value = scaled_data[-1][0]
forecast_scaled = []
for i in range(steps):
next_val = last_value + recent_trend * (i + 1) * 0.1
forecast_scaled.append([next_val])
# Inverse transform
forecast = self.scaler.inverse_transform(forecast_scaled).flatten()
return forecast
except Exception as e:
print(f"LSTM forecast error: {e}")
return None
def save_models(self, filepath_prefix):
"""
Save trained models to disk.
Args:
filepath_prefix (str): Prefix for saved model files
"""
try:
if self.arima_model is not None:
with open(f"{filepath_prefix}_arima.pkl", 'wb') as f:
pickle.dump(self.arima_model, f)
if self.lstm_model is not None:
joblib.dump(self.lstm_model, f"{filepath_prefix}_lstm.pkl")
joblib.dump(self.scaler, f"{filepath_prefix}_scaler.pkl")
print(f"Models saved with prefix: {filepath_prefix}")
except Exception as e:
print(f"Error saving models: {e}")
def load_models(self, filepath_prefix):
"""
Load trained models from disk.
Args:
filepath_prefix (str): Prefix for saved model files
"""
try:
# Load ARIMA model
arima_path = f"{filepath_prefix}_arima.pkl"
if os.path.exists(arima_path):
with open(arima_path, 'rb') as f:
self.arima_model = pickle.load(f)
# Load LSTM model
lstm_path = f"{filepath_prefix}_lstm.pkl"
if os.path.exists(lstm_path):
self.lstm_model = joblib.load(lstm_path)
# Load scaler
scaler_path = f"{filepath_prefix}_scaler.pkl"
if os.path.exists(scaler_path):
self.scaler = joblib.load(scaler_path)
print(f"Models loaded from prefix: {filepath_prefix}")
except Exception as e:
print(f"Error loading models: {e}")
# Example usage and model training script
if __name__ == "__main__":
# This would be run to pre-train models
forecaster = StockForecaster()
# Example with sample data
dates = pd.date_range('2020-01-01', periods=1000, freq='D')
prices = 100 + np.cumsum(np.random.randn(1000) * 0.5)
sample_data = pd.DataFrame({
'date': dates,
'close': prices
})
# Preprocess and train
ts_data = forecaster.preprocess_data(sample_data)
print("Training ARIMA model...")
forecaster.train_arima(ts_data)
print("Training LSTM model...")
forecaster.train_simple_lstm(ts_data)
# Generate forecasts
arima_forecast = forecaster.forecast_arima(30)
lstm_forecast = forecaster.forecast_lstm(30)
print(f"ARIMA forecast shape: {arima_forecast.shape if arima_forecast is not None else 'None'}")
print(f"LSTM forecast shape: {lstm_forecast.shape if lstm_forecast is not None else 'None'}")
# Save models
forecaster.save_models("models/stock_forecaster")