energy_forecasting / models /custom_models.py
kawaiipeace's picture
update
de78237
import numpy as np
import pandas as pd
import tensorflow as tf
import os
import tempfile
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import r2_score, root_mean_squared_error
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Bidirectional, GRU, Dense
from tensorflow.keras.callbacks import EarlyStopping
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from prophet import Prophet
def prepare_data(df, lag=10):
values = df.values.astype("float32")
timestamps = df.index
X, y, time = [], [], []
for i in range(len(values) - lag):
X.append(values[i : i + lag])
y.append(values[i + lag, 0])
time.append(timestamps[i + lag])
return np.array(X), np.array(y), np.array(time)
def scale_data(X, y):
n_features = X.shape[-1]
scaler_X = MinMaxScaler()
X_scaled = scaler_X.fit_transform(X.reshape(-1, n_features)).reshape(X.shape)
scaler_y = MinMaxScaler()
y_scaled = scaler_y.fit_transform(y.reshape(-1, 1)).flatten()
return X_scaled, y_scaled, scaler_X, scaler_y
def invert_prediction(scaler, pred):
return scaler.inverse_transform(pred.reshape(-1, 1)).flatten()
def create_model(input_shape, model_type, neurons):
model = Sequential()
if model_type == "LSTM":
model.add(LSTM(neurons, input_shape=input_shape))
elif model_type == "BiLSTM":
model.add(Bidirectional(LSTM(neurons), input_shape=input_shape))
elif model_type == "GRU":
model.add(GRU(neurons, input_shape=input_shape))
else:
raise ValueError(f"Unsupported model type: {model_type}")
model.add(Dense(1))
model.compile(loss="mse", optimizer="adam")
return model
def run_forecast(
df,
model_type,
is_multivariate,
horizon,
lag,
neurons,
epochs,
batch_size,
future_horizon=0,
device="CPU",
arima_order=(5,1,0)
):
if device == "GPU":
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
tf.config.experimental.set_memory_growth(physical_devices[0], True)
if model_type in ["ARIMA", "ExponentialSmoothing", "Prophet"]:
return run_classical_forecast(df, model_type, horizon, future_horizon, arima_order)
X, y, time = prepare_data(df, lag=lag)
train_size = len(X) - horizon
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]
X_train, y_train, scaler_X, scaler_y = scale_data(X_train, y_train)
X_test, y_test, _, _ = scale_data(X_test, y_test)
model = create_model((lag, X.shape[-1]), model_type, neurons)
early_stop = EarlyStopping(monitor="loss", patience=10, restore_best_weights=True)
model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, verbose=1, shuffle=False, callbacks=[early_stop])
y_test_pred = invert_prediction(scaler_y, model.predict(X_test))
y_test = invert_prediction(scaler_y, y_test)
future_pred = []
if future_horizon > 0:
last_input = X_test[-1]
for _ in range(future_horizon):
pred_scaled = model.predict(last_input.reshape(1, lag, -1))
pred_inv = invert_prediction(scaler_y, pred_scaled)
future_pred.append(pred_inv[0])
next_input = np.roll(last_input, -1, axis=0)
next_input[-1, 0] = pred_scaled
last_input = next_input
rmse = root_mean_squared_error(y_test, y_test_pred)
r2 = r2_score(y_test, y_test_pred)
metrics = f"Test RMSE: {rmse:.3f}, Test R2: {r2:.3f}"
export_df = pd.DataFrame({"Test_Actual": y_test, "Test_Predicted": y_test_pred})
export_path = os.path.join(tempfile.gettempdir(), "forecast_result.csv")
export_df.to_csv(export_path, index=False)
future_path = ""
if future_pred:
future_path = os.path.join(tempfile.gettempdir(), "forecast_future.csv")
pd.DataFrame({"Future_Forecast": future_pred}).to_csv(future_path, index=False)
fig = plt.figure(figsize=(12, 6))
plt.plot(y_test, label="Test Actual")
plt.plot(y_test_pred, label="Test Predicted", linestyle="--")
if future_pred:
plt.plot(range(len(y_test), len(y_test) + future_horizon), future_pred, label="Future Forecast", linestyle="-.")
plt.title("Forecast Result")
plt.xlabel("Time Step")
plt.ylabel("Value")
plt.legend()
return y_test_pred, y_test, future_pred, fig, metrics, export_path, future_path
def run_classical_forecast(df, model_type, horizon, future_horizon=0, arima_order=(5,1,0)):
ts = df.iloc[:, 0]
ts.index = pd.to_datetime(df.index)
if model_type == "ARIMA":
model = ARIMA(ts, order=arima_order)
model_fit = model.fit()
forecast = model_fit.forecast(steps=horizon)
elif model_type == "ExponentialSmoothing":
model = ExponentialSmoothing(ts, trend="add", seasonal=None)
model_fit = model.fit()
forecast = model_fit.forecast(horizon)
elif model_type == "Prophet":
df_prophet = ts.reset_index()
df_prophet.columns = ["ds", "y"]
model = Prophet()
model.fit(df_prophet)
future = model.make_future_dataframe(periods=horizon)
forecast_df = model.predict(future)
forecast = forecast_df["yhat"].iloc[-horizon:].values
else:
raise ValueError(f"Model type {model_type} not supported.")
actual = ts[-horizon:].values
future_forecast = None
if future_horizon > 0:
if model_type == "Prophet":
future = model.make_future_dataframe(periods=horizon + future_horizon)
forecast_df = model.predict(future)
future_forecast = forecast_df["yhat"].iloc[-future_horizon:].values
else:
future_forecast = model_fit.forecast(horizon + future_horizon)[-future_horizon:]
rmse_val = np.sqrt(root_mean_squared_error(actual, forecast))
r2_val = r2_score(actual, forecast)
metrics = f"Test RMSE: {rmse_val:.3f}, Test R2: {r2_val:.3f}"
export_df = pd.DataFrame({"Test_Actual": actual, "Test_Predicted": forecast})
export_path = os.path.join(tempfile.gettempdir(), f"{model_type.lower()}_forecast_result.csv")
export_df.to_csv(export_path, index=False)
future_path = ""
if future_forecast is not None:
future_path = os.path.join(tempfile.gettempdir(), f"{model_type.lower()}_forecast_future.csv")
pd.DataFrame({"Future_Forecast": future_forecast}).to_csv(future_path, index=False)
fig = plt.figure(figsize=(12, 6))
# Clean forecast for plotting
if isinstance(forecast, pd.Series):
forecast = forecast.values
if isinstance(actual, pd.Series):
actual = actual.values
plt.plot(actual, label="Test Actual")
plt.plot(forecast, label="Test Predicted", linestyle="--")
if future_forecast is not None:
plt.plot(range(len(actual), len(actual) + len(future_forecast)), future_forecast, label="Future Forecast", linestyle="-.")
plt.title(f"{model_type} Forecast Result")
plt.legend()
return forecast, actual, future_forecast, fig, metrics, export_path, future_path