import numpy as np import pandas as pd import tensorflow as tf import os import tempfile import matplotlib.pyplot as plt from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import r2_score, root_mean_squared_error from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Bidirectional, GRU, Dense from tensorflow.keras.callbacks import EarlyStopping from statsmodels.tsa.arima.model import ARIMA from statsmodels.tsa.holtwinters import ExponentialSmoothing from prophet import Prophet def prepare_data(df, lag=10): values = df.values.astype("float32") timestamps = df.index X, y, time = [], [], [] for i in range(len(values) - lag): X.append(values[i : i + lag]) y.append(values[i + lag, 0]) time.append(timestamps[i + lag]) return np.array(X), np.array(y), np.array(time) def scale_data(X, y): n_features = X.shape[-1] scaler_X = MinMaxScaler() X_scaled = scaler_X.fit_transform(X.reshape(-1, n_features)).reshape(X.shape) scaler_y = MinMaxScaler() y_scaled = scaler_y.fit_transform(y.reshape(-1, 1)).flatten() return X_scaled, y_scaled, scaler_X, scaler_y def invert_prediction(scaler, pred): return scaler.inverse_transform(pred.reshape(-1, 1)).flatten() def create_model(input_shape, model_type, neurons): model = Sequential() if model_type == "LSTM": model.add(LSTM(neurons, input_shape=input_shape)) elif model_type == "BiLSTM": model.add(Bidirectional(LSTM(neurons), input_shape=input_shape)) elif model_type == "GRU": model.add(GRU(neurons, input_shape=input_shape)) else: raise ValueError(f"Unsupported model type: {model_type}") model.add(Dense(1)) model.compile(loss="mse", optimizer="adam") return model def run_forecast( df, model_type, is_multivariate, horizon, lag, neurons, epochs, batch_size, future_horizon=0, device="CPU", arima_order=(5,1,0) ): if device == "GPU": physical_devices = tf.config.list_physical_devices('GPU') if physical_devices: tf.config.experimental.set_memory_growth(physical_devices[0], True) if model_type in ["ARIMA", "ExponentialSmoothing", "Prophet"]: return run_classical_forecast(df, model_type, horizon, future_horizon, arima_order) X, y, time = prepare_data(df, lag=lag) train_size = len(X) - horizon X_train, X_test = X[:train_size], X[train_size:] y_train, y_test = y[:train_size], y[train_size:] X_train, y_train, scaler_X, scaler_y = scale_data(X_train, y_train) X_test, y_test, _, _ = scale_data(X_test, y_test) model = create_model((lag, X.shape[-1]), model_type, neurons) early_stop = EarlyStopping(monitor="loss", patience=10, restore_best_weights=True) model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, verbose=1, shuffle=False, callbacks=[early_stop]) y_test_pred = invert_prediction(scaler_y, model.predict(X_test)) y_test = invert_prediction(scaler_y, y_test) future_pred = [] if future_horizon > 0: last_input = X_test[-1] for _ in range(future_horizon): pred_scaled = model.predict(last_input.reshape(1, lag, -1)) pred_inv = invert_prediction(scaler_y, pred_scaled) future_pred.append(pred_inv[0]) next_input = np.roll(last_input, -1, axis=0) next_input[-1, 0] = pred_scaled last_input = next_input rmse = root_mean_squared_error(y_test, y_test_pred) r2 = r2_score(y_test, y_test_pred) metrics = f"Test RMSE: {rmse:.3f}, Test R2: {r2:.3f}" export_df = pd.DataFrame({"Test_Actual": y_test, "Test_Predicted": y_test_pred}) export_path = os.path.join(tempfile.gettempdir(), "forecast_result.csv") export_df.to_csv(export_path, index=False) future_path = "" if future_pred: future_path = os.path.join(tempfile.gettempdir(), "forecast_future.csv") pd.DataFrame({"Future_Forecast": future_pred}).to_csv(future_path, index=False) fig = plt.figure(figsize=(12, 6)) plt.plot(y_test, label="Test Actual") plt.plot(y_test_pred, label="Test Predicted", linestyle="--") if future_pred: plt.plot(range(len(y_test), len(y_test) + future_horizon), future_pred, label="Future Forecast", linestyle="-.") plt.title("Forecast Result") plt.xlabel("Time Step") plt.ylabel("Value") plt.legend() return y_test_pred, y_test, future_pred, fig, metrics, export_path, future_path def run_classical_forecast(df, model_type, horizon, future_horizon=0, arima_order=(5,1,0)): ts = df.iloc[:, 0] ts.index = pd.to_datetime(df.index) if model_type == "ARIMA": model = ARIMA(ts, order=arima_order) model_fit = model.fit() forecast = model_fit.forecast(steps=horizon) elif model_type == "ExponentialSmoothing": model = ExponentialSmoothing(ts, trend="add", seasonal=None) model_fit = model.fit() forecast = model_fit.forecast(horizon) elif model_type == "Prophet": df_prophet = ts.reset_index() df_prophet.columns = ["ds", "y"] model = Prophet() model.fit(df_prophet) future = model.make_future_dataframe(periods=horizon) forecast_df = model.predict(future) forecast = forecast_df["yhat"].iloc[-horizon:].values else: raise ValueError(f"Model type {model_type} not supported.") actual = ts[-horizon:].values future_forecast = None if future_horizon > 0: if model_type == "Prophet": future = model.make_future_dataframe(periods=horizon + future_horizon) forecast_df = model.predict(future) future_forecast = forecast_df["yhat"].iloc[-future_horizon:].values else: future_forecast = model_fit.forecast(horizon + future_horizon)[-future_horizon:] rmse_val = np.sqrt(root_mean_squared_error(actual, forecast)) r2_val = r2_score(actual, forecast) metrics = f"Test RMSE: {rmse_val:.3f}, Test R2: {r2_val:.3f}" export_df = pd.DataFrame({"Test_Actual": actual, "Test_Predicted": forecast}) export_path = os.path.join(tempfile.gettempdir(), f"{model_type.lower()}_forecast_result.csv") export_df.to_csv(export_path, index=False) future_path = "" if future_forecast is not None: future_path = os.path.join(tempfile.gettempdir(), f"{model_type.lower()}_forecast_future.csv") pd.DataFrame({"Future_Forecast": future_forecast}).to_csv(future_path, index=False) fig = plt.figure(figsize=(12, 6)) # Clean forecast for plotting if isinstance(forecast, pd.Series): forecast = forecast.values if isinstance(actual, pd.Series): actual = actual.values plt.plot(actual, label="Test Actual") plt.plot(forecast, label="Test Predicted", linestyle="--") if future_forecast is not None: plt.plot(range(len(actual), len(actual) + len(future_forecast)), future_forecast, label="Future Forecast", linestyle="-.") plt.title(f"{model_type} Forecast Result") plt.legend() return forecast, actual, future_forecast, fig, metrics, export_path, future_path