Spaces:
Running
Running
| import numpy as np | |
| import pandas as pd | |
| import tensorflow as tf | |
| import os | |
| import tempfile | |
| import matplotlib.pyplot as plt | |
| from sklearn.preprocessing import MinMaxScaler | |
| from sklearn.metrics import r2_score, root_mean_squared_error | |
| from tensorflow.keras.models import Sequential | |
| from tensorflow.keras.layers import LSTM, Bidirectional, GRU, Dense | |
| from tensorflow.keras.callbacks import EarlyStopping | |
| from statsmodels.tsa.arima.model import ARIMA | |
| from statsmodels.tsa.holtwinters import ExponentialSmoothing | |
| from prophet import Prophet | |
| def prepare_data(df, lag=10): | |
| values = df.values.astype("float32") | |
| timestamps = df.index | |
| X, y, time = [], [], [] | |
| for i in range(len(values) - lag): | |
| X.append(values[i : i + lag]) | |
| y.append(values[i + lag, 0]) | |
| time.append(timestamps[i + lag]) | |
| return np.array(X), np.array(y), np.array(time) | |
| def scale_data(X, y): | |
| n_features = X.shape[-1] | |
| scaler_X = MinMaxScaler() | |
| X_scaled = scaler_X.fit_transform(X.reshape(-1, n_features)).reshape(X.shape) | |
| scaler_y = MinMaxScaler() | |
| y_scaled = scaler_y.fit_transform(y.reshape(-1, 1)).flatten() | |
| return X_scaled, y_scaled, scaler_X, scaler_y | |
| def invert_prediction(scaler, pred): | |
| return scaler.inverse_transform(pred.reshape(-1, 1)).flatten() | |
| def create_model(input_shape, model_type, neurons): | |
| model = Sequential() | |
| if model_type == "LSTM": | |
| model.add(LSTM(neurons, input_shape=input_shape)) | |
| elif model_type == "BiLSTM": | |
| model.add(Bidirectional(LSTM(neurons), input_shape=input_shape)) | |
| elif model_type == "GRU": | |
| model.add(GRU(neurons, input_shape=input_shape)) | |
| else: | |
| raise ValueError(f"Unsupported model type: {model_type}") | |
| model.add(Dense(1)) | |
| model.compile(loss="mse", optimizer="adam") | |
| return model | |
| def run_forecast( | |
| df, | |
| model_type, | |
| is_multivariate, | |
| horizon, | |
| lag, | |
| neurons, | |
| epochs, | |
| batch_size, | |
| future_horizon=0, | |
| device="CPU", | |
| arima_order=(5,1,0) | |
| ): | |
| if device == "GPU": | |
| physical_devices = tf.config.list_physical_devices('GPU') | |
| if physical_devices: | |
| tf.config.experimental.set_memory_growth(physical_devices[0], True) | |
| if model_type in ["ARIMA", "ExponentialSmoothing", "Prophet"]: | |
| return run_classical_forecast(df, model_type, horizon, future_horizon, arima_order) | |
| X, y, time = prepare_data(df, lag=lag) | |
| train_size = len(X) - horizon | |
| X_train, X_test = X[:train_size], X[train_size:] | |
| y_train, y_test = y[:train_size], y[train_size:] | |
| X_train, y_train, scaler_X, scaler_y = scale_data(X_train, y_train) | |
| X_test, y_test, _, _ = scale_data(X_test, y_test) | |
| model = create_model((lag, X.shape[-1]), model_type, neurons) | |
| early_stop = EarlyStopping(monitor="loss", patience=10, restore_best_weights=True) | |
| model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, verbose=1, shuffle=False, callbacks=[early_stop]) | |
| y_test_pred = invert_prediction(scaler_y, model.predict(X_test)) | |
| y_test = invert_prediction(scaler_y, y_test) | |
| future_pred = [] | |
| if future_horizon > 0: | |
| last_input = X_test[-1] | |
| for _ in range(future_horizon): | |
| pred_scaled = model.predict(last_input.reshape(1, lag, -1)) | |
| pred_inv = invert_prediction(scaler_y, pred_scaled) | |
| future_pred.append(pred_inv[0]) | |
| next_input = np.roll(last_input, -1, axis=0) | |
| next_input[-1, 0] = pred_scaled | |
| last_input = next_input | |
| rmse = root_mean_squared_error(y_test, y_test_pred) | |
| r2 = r2_score(y_test, y_test_pred) | |
| metrics = f"Test RMSE: {rmse:.3f}, Test R2: {r2:.3f}" | |
| export_df = pd.DataFrame({"Test_Actual": y_test, "Test_Predicted": y_test_pred}) | |
| export_path = os.path.join(tempfile.gettempdir(), "forecast_result.csv") | |
| export_df.to_csv(export_path, index=False) | |
| future_path = "" | |
| if future_pred: | |
| future_path = os.path.join(tempfile.gettempdir(), "forecast_future.csv") | |
| pd.DataFrame({"Future_Forecast": future_pred}).to_csv(future_path, index=False) | |
| fig = plt.figure(figsize=(12, 6)) | |
| plt.plot(y_test, label="Test Actual") | |
| plt.plot(y_test_pred, label="Test Predicted", linestyle="--") | |
| if future_pred: | |
| plt.plot(range(len(y_test), len(y_test) + future_horizon), future_pred, label="Future Forecast", linestyle="-.") | |
| plt.title("Forecast Result") | |
| plt.xlabel("Time Step") | |
| plt.ylabel("Value") | |
| plt.legend() | |
| return y_test_pred, y_test, future_pred, fig, metrics, export_path, future_path | |
| def run_classical_forecast(df, model_type, horizon, future_horizon=0, arima_order=(5,1,0)): | |
| ts = df.iloc[:, 0] | |
| ts.index = pd.to_datetime(df.index) | |
| if model_type == "ARIMA": | |
| model = ARIMA(ts, order=arima_order) | |
| model_fit = model.fit() | |
| forecast = model_fit.forecast(steps=horizon) | |
| elif model_type == "ExponentialSmoothing": | |
| model = ExponentialSmoothing(ts, trend="add", seasonal=None) | |
| model_fit = model.fit() | |
| forecast = model_fit.forecast(horizon) | |
| elif model_type == "Prophet": | |
| df_prophet = ts.reset_index() | |
| df_prophet.columns = ["ds", "y"] | |
| model = Prophet() | |
| model.fit(df_prophet) | |
| future = model.make_future_dataframe(periods=horizon) | |
| forecast_df = model.predict(future) | |
| forecast = forecast_df["yhat"].iloc[-horizon:].values | |
| else: | |
| raise ValueError(f"Model type {model_type} not supported.") | |
| actual = ts[-horizon:].values | |
| future_forecast = None | |
| if future_horizon > 0: | |
| if model_type == "Prophet": | |
| future = model.make_future_dataframe(periods=horizon + future_horizon) | |
| forecast_df = model.predict(future) | |
| future_forecast = forecast_df["yhat"].iloc[-future_horizon:].values | |
| else: | |
| future_forecast = model_fit.forecast(horizon + future_horizon)[-future_horizon:] | |
| rmse_val = np.sqrt(root_mean_squared_error(actual, forecast)) | |
| r2_val = r2_score(actual, forecast) | |
| metrics = f"Test RMSE: {rmse_val:.3f}, Test R2: {r2_val:.3f}" | |
| export_df = pd.DataFrame({"Test_Actual": actual, "Test_Predicted": forecast}) | |
| export_path = os.path.join(tempfile.gettempdir(), f"{model_type.lower()}_forecast_result.csv") | |
| export_df.to_csv(export_path, index=False) | |
| future_path = "" | |
| if future_forecast is not None: | |
| future_path = os.path.join(tempfile.gettempdir(), f"{model_type.lower()}_forecast_future.csv") | |
| pd.DataFrame({"Future_Forecast": future_forecast}).to_csv(future_path, index=False) | |
| fig = plt.figure(figsize=(12, 6)) | |
| # Clean forecast for plotting | |
| if isinstance(forecast, pd.Series): | |
| forecast = forecast.values | |
| if isinstance(actual, pd.Series): | |
| actual = actual.values | |
| plt.plot(actual, label="Test Actual") | |
| plt.plot(forecast, label="Test Predicted", linestyle="--") | |
| if future_forecast is not None: | |
| plt.plot(range(len(actual), len(actual) + len(future_forecast)), future_forecast, label="Future Forecast", linestyle="-.") | |
| plt.title(f"{model_type} Forecast Result") | |
| plt.legend() | |
| return forecast, actual, future_forecast, fig, metrics, export_path, future_path | |