Spaces:
Build error
Build error
| import os | |
| import pickle | |
| import joblib | |
| import pandas as pd | |
| import numpy as np | |
| from statsmodels.tsa.arima.model import ARIMA | |
| from sklearn.preprocessing import MinMaxScaler | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class StockForecaster: | |
| """ | |
| A comprehensive stock forecasting class that combines ARIMA and LSTM models. | |
| """ | |
| def __init__(self): | |
| self.arima_model = None | |
| self.lstm_model = None | |
| self.scaler = MinMaxScaler() | |
| self.optimal_arima_params = (1, 1, 1) # Default parameters | |
| def preprocess_data(self, data): | |
| """ | |
| Preprocess stock data for forecasting. | |
| Args: | |
| data (pd.DataFrame): Stock data with 'date' and 'close' columns | |
| Returns: | |
| pd.Series: Processed time series data | |
| """ | |
| if isinstance(data, pd.DataFrame): | |
| if 'date' in data.columns: | |
| data['date'] = pd.to_datetime(data['date']) | |
| data = data.set_index('date').sort_index() | |
| if 'close' in data.columns: | |
| return data['close'].dropna() | |
| return data.dropna() | |
| def find_optimal_arima_params(self, ts_data, max_p=3, max_d=2, max_q=3): | |
| """ | |
| Find optimal ARIMA parameters using AIC criterion. | |
| Args: | |
| ts_data (pd.Series): Time series data | |
| max_p, max_d, max_q (int): Maximum values for ARIMA parameters | |
| Returns: | |
| tuple: Optimal (p, d, q) parameters | |
| """ | |
| best_aic = np.inf | |
| best_params = (1, 1, 1) | |
| for p in range(max_p + 1): | |
| for d in range(max_d + 1): | |
| for q in range(max_q + 1): | |
| try: | |
| model = ARIMA(ts_data, order=(p, d, q)) | |
| fitted_model = model.fit() | |
| aic = fitted_model.aic | |
| if aic < best_aic: | |
| best_aic = aic | |
| best_params = (p, d, q) | |
| except: | |
| continue | |
| self.optimal_arima_params = best_params | |
| return best_params | |
| def train_arima(self, ts_data): | |
| """ | |
| Train ARIMA model on time series data. | |
| Args: | |
| ts_data (pd.Series): Time series data | |
| Returns: | |
| statsmodels.ARIMAResults: Fitted ARIMA model | |
| """ | |
| try: | |
| # Find optimal parameters if not set | |
| if self.optimal_arima_params == (1, 1, 1): | |
| self.find_optimal_arima_params(ts_data) | |
| # Fit ARIMA model | |
| arima_model = ARIMA(ts_data, order=self.optimal_arima_params) | |
| self.arima_model = arima_model.fit() | |
| return self.arima_model | |
| except Exception as e: | |
| print(f"ARIMA training error: {e}") | |
| return None | |
| def create_lstm_sequences(self, data, sequence_length=60): | |
| """ | |
| Create sequences for LSTM training. | |
| Args: | |
| data (np.array): Scaled time series data | |
| sequence_length (int): Length of input sequences | |
| Returns: | |
| tuple: (X, y) arrays for LSTM training | |
| """ | |
| X, y = [], [] | |
| for i in range(sequence_length, len(data)): | |
| X.append(data[i-sequence_length:i]) | |
| y.append(data[i]) | |
| return np.array(X), np.array(y) | |
| def train_simple_lstm(self, ts_data, sequence_length=60): | |
| """ | |
| Train a simplified LSTM model or use trend-based prediction. | |
| Args: | |
| ts_data (pd.Series): Time series data | |
| sequence_length (int): Sequence length for LSTM | |
| Returns: | |
| dict: Model information and scaler | |
| """ | |
| try: | |
| # Scale the data | |
| scaled_data = self.scaler.fit_transform(ts_data.values.reshape(-1, 1)) | |
| # For demo purposes, we'll use a trend-based approach | |
| # In production, you'd train an actual LSTM here | |
| self.lstm_model = { | |
| 'type': 'trend_based', | |
| 'data': scaled_data, | |
| 'sequence_length': sequence_length | |
| } | |
| return self.lstm_model | |
| except Exception as e: | |
| print(f"LSTM training error: {e}") | |
| return None | |
| def forecast_arima(self, steps=30): | |
| """ | |
| Generate ARIMA forecast. | |
| Args: | |
| steps (int): Number of steps to forecast | |
| Returns: | |
| np.array: Forecasted values | |
| """ | |
| if self.arima_model is None: | |
| raise ValueError("ARIMA model not trained. Call train_arima first.") | |
| try: | |
| forecast = self.arima_model.forecast(steps=steps) | |
| return forecast.values if hasattr(forecast, 'values') else forecast | |
| except Exception as e: | |
| print(f"ARIMA forecast error: {e}") | |
| return None | |
| def forecast_lstm(self, steps=30): | |
| """ | |
| Generate LSTM forecast (simplified trend-based approach). | |
| Args: | |
| steps (int): Number of steps to forecast | |
| Returns: | |
| np.array: Forecasted values | |
| """ | |
| if self.lstm_model is None: | |
| raise ValueError("LSTM model not trained. Call train_simple_lstm first.") | |
| try: | |
| # Simple trend-based forecast for demo | |
| scaled_data = self.lstm_model['data'] | |
| sequence_length = self.lstm_model['sequence_length'] | |
| # Calculate recent trend | |
| recent_data = scaled_data[-min(10, len(scaled_data)):] | |
| recent_trend = np.mean(np.diff(recent_data.flatten())) | |
| # Generate forecast | |
| last_value = scaled_data[-1][0] | |
| forecast_scaled = [] | |
| for i in range(steps): | |
| next_val = last_value + recent_trend * (i + 1) * 0.1 | |
| forecast_scaled.append([next_val]) | |
| # Inverse transform | |
| forecast = self.scaler.inverse_transform(forecast_scaled).flatten() | |
| return forecast | |
| except Exception as e: | |
| print(f"LSTM forecast error: {e}") | |
| return None | |
| def save_models(self, filepath_prefix): | |
| """ | |
| Save trained models to disk. | |
| Args: | |
| filepath_prefix (str): Prefix for saved model files | |
| """ | |
| try: | |
| if self.arima_model is not None: | |
| with open(f"{filepath_prefix}_arima.pkl", 'wb') as f: | |
| pickle.dump(self.arima_model, f) | |
| if self.lstm_model is not None: | |
| joblib.dump(self.lstm_model, f"{filepath_prefix}_lstm.pkl") | |
| joblib.dump(self.scaler, f"{filepath_prefix}_scaler.pkl") | |
| print(f"Models saved with prefix: {filepath_prefix}") | |
| except Exception as e: | |
| print(f"Error saving models: {e}") | |
| def load_models(self, filepath_prefix): | |
| """ | |
| Load trained models from disk. | |
| Args: | |
| filepath_prefix (str): Prefix for saved model files | |
| """ | |
| try: | |
| # Load ARIMA model | |
| arima_path = f"{filepath_prefix}_arima.pkl" | |
| if os.path.exists(arima_path): | |
| with open(arima_path, 'rb') as f: | |
| self.arima_model = pickle.load(f) | |
| # Load LSTM model | |
| lstm_path = f"{filepath_prefix}_lstm.pkl" | |
| if os.path.exists(lstm_path): | |
| self.lstm_model = joblib.load(lstm_path) | |
| # Load scaler | |
| scaler_path = f"{filepath_prefix}_scaler.pkl" | |
| if os.path.exists(scaler_path): | |
| self.scaler = joblib.load(scaler_path) | |
| print(f"Models loaded from prefix: {filepath_prefix}") | |
| except Exception as e: | |
| print(f"Error loading models: {e}") | |
| # Example usage and model training script | |
| if __name__ == "__main__": | |
| # This would be run to pre-train models | |
| forecaster = StockForecaster() | |
| # Example with sample data | |
| dates = pd.date_range('2020-01-01', periods=1000, freq='D') | |
| prices = 100 + np.cumsum(np.random.randn(1000) * 0.5) | |
| sample_data = pd.DataFrame({ | |
| 'date': dates, | |
| 'close': prices | |
| }) | |
| # Preprocess and train | |
| ts_data = forecaster.preprocess_data(sample_data) | |
| print("Training ARIMA model...") | |
| forecaster.train_arima(ts_data) | |
| print("Training LSTM model...") | |
| forecaster.train_simple_lstm(ts_data) | |
| # Generate forecasts | |
| arima_forecast = forecaster.forecast_arima(30) | |
| lstm_forecast = forecaster.forecast_lstm(30) | |
| print(f"ARIMA forecast shape: {arima_forecast.shape if arima_forecast is not None else 'None'}") | |
| print(f"LSTM forecast shape: {lstm_forecast.shape if lstm_forecast is not None else 'None'}") | |
| # Save models | |
| forecaster.save_models("models/stock_forecaster") |