Tskunz's picture
Adding the Model and App
7acfdcf verified
"""
Hybrid forecasting model combining Linear Trend + XGBoost Seasonality.
This model addresses XGBoost's poor extrapolation by decomposing the problem:
1. Linear Regression on time index for trend (extrapolates well)
2. XGBoost on detrended residuals for seasonality (captures non-linear patterns)
3. Combines both for final forecast: Prediction = Trend + Detrended_XGBoost
This approach gives the best of both worlds:
- Linear trend that projects accurately into the future
- XGBoost seasonality that learns complex patterns
"""
import pandas as pd
import numpy as np
from typing import Tuple, Optional, Dict
import logging
from pathlib import Path
import pickle
# Linear regression for trend
from sklearn.linear_model import LinearRegression
# XGBoost with GPU support
import xgboost as xgb
# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
from .config import (
model_config,
gpu_config,
MODELS_DIR,
RESULTS_DIR
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
sns.set_style("whitegrid")
class HybridTrendSeasonalityForecaster:
"""
Hybrid forecasting model combining Linear Trend + XGBoost Seasonality.
Architecture:
1. Train a simple Linear Regression on time index [0, 1, 2, ..., n-1]
to capture long-term trend that extrapolates well
2. Compute detrended residuals: y_detrended = y_train - trend_pred_train
3. Train XGBoost on detrended residuals using rich features
4. Final prediction: y_pred = trend_pred + xgb_pred
"""
def __init__(self, use_gpu: bool = True):
"""
Initialize the hybrid forecaster.
Args:
use_gpu: Whether to use GPU acceleration for XGBoost
"""
self.use_gpu = use_gpu and gpu_config.use_gpu
# Model components
self.trend_model = None # Linear regression for trend
self.xgb_model = None # XGBoost for detrended residuals
# Training data info
self.n_train = None
self.trend_slope = None
self.trend_intercept = None
# Metadata
self.feature_names = None
self.train_stats = {}
def _create_time_index(self, n_samples: int) -> np.ndarray:
"""Create time index [0, 1, 2, ..., n-1] for trend modeling."""
return np.arange(n_samples).reshape(-1, 1)
def fit(
self,
X_train: pd.DataFrame,
y_train: pd.Series,
X_val: Optional[pd.DataFrame] = None,
y_val: Optional[pd.Series] = None,
plot_decomposition: bool = True,
val_start_idx: int = None
):
"""
Fit the hybrid forecasting model.
Args:
X_train: Training features
y_train: Training target (load values)
X_val: Validation features (optional)
y_val: Validation target (optional)
plot_decomposition: Whether to plot trend and seasonality components
val_start_idx: Starting time index for validation data (if None, assumes end of X_train)
"""
logger.info("Fitting Hybrid Trend+Seasonality forecaster...")
# Store feature names and training size
self.feature_names = X_train.columns.tolist()
self.n_train = len(X_train)
# ============================================================================
# STEP 1: Fit Linear Trend Model
# ============================================================================
logger.info("Step 1: Fitting linear trend model...")
time_index = self._create_time_index(len(y_train))
self.trend_model = LinearRegression()
self.trend_model.fit(time_index, y_train.values)
# Get trend predictions for training data
trend_pred_train = self.trend_model.predict(time_index)
# Store slope and intercept for reference
self.trend_slope = float(self.trend_model.coef_[0])
self.trend_intercept = float(self.trend_model.intercept_)
logger.info(f" Trend equation: y = {self.trend_intercept:.2f} + {self.trend_slope:.6f} * t")
logger.info(f" Trend range: [{trend_pred_train.min():.2f}, {trend_pred_train.max():.2f}] MW")
# ============================================================================
# STEP 2: Compute Detrended Residuals
# ============================================================================
logger.info("Step 2: Computing detrended residuals...")
y_detrended = y_train.values - trend_pred_train
logger.info(f" Detrended mean: {y_detrended.mean():.4f} MW (should be ≈0)")
logger.info(f" Detrended std: {y_detrended.std():.2f} MW")
logger.info(f" Detrended range: [{y_detrended.min():.2f}, {y_detrended.max():.2f}] MW")
# ============================================================================
# STEP 3: Train XGBoost on Detrended Residuals
# ============================================================================
logger.info("Step 3: Training XGBoost on detrended residuals...")
# Prepare XGBoost parameters
params = {
'objective': 'reg:squarederror',
'max_depth': model_config.xgb_max_depth,
'learning_rate': model_config.xgb_learning_rate,
'subsample': model_config.xgb_subsample,
'colsample_bytree': model_config.xgb_colsample_bytree,
'reg_alpha': model_config.xgb_reg_alpha,
'reg_lambda': model_config.xgb_reg_lambda,
'tree_method': 'hist',
'device': 'cuda' if self.use_gpu else 'cpu',
'random_state': model_config.random_seed,
'verbosity': 1
}
# Create DMatrix for training
dtrain = xgb.DMatrix(X_train, label=y_detrended)
# Prepare validation set if provided
eval_set = []
if X_val is not None and y_val is not None:
# Determine validation start index (temporal position in full dataset)
if val_start_idx is None:
# If not provided, assume validation comes at END of training
# This is incorrect! Validation should be part of training data
val_start_idx = self.n_train - len(y_val)
logger.warning(f"val_start_idx not provided. Assuming validation at indices [{val_start_idx}:{self.n_train}]")
# Predict trend for validation set at CORRECT time indices
time_index_val = self._create_time_index(len(y_val))
time_index_val_corrected = time_index_val + val_start_idx # Use CORRECT temporal position
trend_pred_val = self.trend_model.predict(time_index_val_corrected)
logger.info(f"Validation time indices: [{val_start_idx}:{val_start_idx + len(y_val)}]")
# Compute detrended validation target
y_detrended_val = y_val.values - trend_pred_val
dval = xgb.DMatrix(X_val, label=y_detrended_val)
eval_set = [(dtrain, 'train'), (dval, 'val')]
# Train XGBoost model
evals_result = {}
self.xgb_model = xgb.train(
params,
dtrain,
num_boost_round=model_config.xgb_n_estimators,
evals=eval_set if eval_set else [(dtrain, 'train')],
early_stopping_rounds=model_config.xgb_early_stopping_rounds,
evals_result=evals_result,
verbose_eval=50
)
# Store training statistics
self.train_stats = {
'train_rmse': evals_result['train']['rmse'][-1] if 'train' in evals_result else None,
'val_rmse': evals_result['val']['rmse'][-1] if 'val' in evals_result else None,
'best_iteration': self.xgb_model.best_iteration,
'n_features': len(self.feature_names),
'trend_slope': self.trend_slope,
'trend_intercept': self.trend_intercept
}
logger.info(f"Training complete. Best iteration: {self.xgb_model.best_iteration}")
if self.train_stats['val_rmse']:
logger.info(f"Validation RMSE (on detrended): {self.train_stats['val_rmse']:.2f}")
# Plot decomposition if requested
if plot_decomposition:
self._plot_decomposition(y_train.values, trend_pred_train, y_detrended)
def predict(self, X: pd.DataFrame, start_idx: int = None) -> np.ndarray:
"""
Make predictions combining trend and seasonality.
Args:
X: Feature matrix
start_idx: Starting time index (if None, assumes continuation from training)
Returns:
Predictions array: trend_pred + xgb_pred
"""
if self.trend_model is None or self.xgb_model is None:
raise ValueError("Model must be fitted before prediction")
n_pred = len(X)
# If start_idx not provided, assume continuation from training data
if start_idx is None:
start_idx = self.n_train
# ============================================================================
# PREDICT TREND: Simple linear extrapolation
# ============================================================================
time_index_test = self._create_time_index(n_pred)
time_index_test_shifted = time_index_test + start_idx
trend_pred = self.trend_model.predict(time_index_test_shifted)
# ============================================================================
# PREDICT SEASONALITY: XGBoost on features
# ============================================================================
dtest = xgb.DMatrix(X)
seasonality_pred = self.xgb_model.predict(dtest)
# ============================================================================
# COMBINE: Final Prediction = Trend + Seasonality
# ============================================================================
y_pred = trend_pred + seasonality_pred
return y_pred
def evaluate(
self,
X_test: pd.DataFrame,
y_test: pd.Series,
plot: bool = True,
start_idx: int = None
) -> Dict[str, float]:
"""
Evaluate model performance.
Args:
X_test: Test features
y_test: Test target
plot: Whether to plot predictions
start_idx: Starting time index for test set
Returns:
Dictionary of evaluation metrics
"""
logger.info("Evaluating model on test set...")
# Make predictions
y_pred = self.predict(X_test, start_idx=start_idx)
# Calculate metrics
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
mae = mean_absolute_error(y_test, y_pred)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
mape = np.mean(np.abs((y_test.values - y_pred) / y_test.values)) * 100
r2 = r2_score(y_test, y_pred)
metrics = {
'MAE': mae,
'RMSE': rmse,
'MAPE': mape,
'R2': r2
}
logger.info("Test Set Metrics:")
for metric, value in metrics.items():
logger.info(f" {metric}: {value:.4f}")
# Plot if requested
if plot:
self._plot_predictions(y_test.values, y_pred)
return metrics
def _plot_decomposition(
self,
y_observed: np.ndarray,
y_trend: np.ndarray,
y_detrended: np.ndarray
):
"""Plot observed, trend, and detrended components."""
fig, axes = plt.subplots(3, 1, figsize=(15, 10))
# Plot 1: Observed
axes[0].plot(y_observed, linewidth=1, color='steelblue', label='Observed')
axes[0].plot(y_trend, linewidth=2, color='red', alpha=0.7, label='Linear Trend')
axes[0].set_title('Observed Load vs Linear Trend', fontsize=12, fontweight='bold')
axes[0].set_ylabel('Load (MW)')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# Plot 2: Trend
axes[1].plot(y_trend, linewidth=1.5, color='red')
axes[1].set_title('Linear Trend Component', fontsize=12, fontweight='bold')
axes[1].set_ylabel('Trend (MW)')
axes[1].grid(True, alpha=0.3)
# Plot 3: Detrended (Seasonality + Noise)
axes[2].plot(y_detrended, linewidth=1, color='green', alpha=0.7)
axes[2].axhline(y=0, color='k', linestyle='--', alpha=0.3)
axes[2].set_title('Detrended Component (Seasonality + Noise)', fontsize=12, fontweight='bold')
axes[2].set_ylabel('Detrended Load (MW)')
axes[2].set_xlabel('Time Index')
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
# Save plot
plot_path = RESULTS_DIR / 'hybrid_decomposition.png'
plt.savefig(plot_path, dpi=300, bbox_inches='tight')
logger.info(f"Saved decomposition plot to {plot_path}")
plt.close()
def _plot_predictions(self, y_true: np.ndarray, y_pred: np.ndarray):
"""Plot actual vs predicted values."""
fig, axes = plt.subplots(2, 1, figsize=(15, 10))
# Time series plot (first 7 days)
n_plot = min(24 * 7, len(y_true)) # 1 week
axes[0].plot(y_true[:n_plot], label='Actual', linewidth=2, alpha=0.7, color='steelblue')
axes[0].plot(y_pred[:n_plot], label='Predicted', linewidth=2, alpha=0.7, color='coral')
axes[0].set_title('Load Forecast vs Actual (First Week)', fontsize=12, fontweight='bold')
axes[0].set_xlabel('Hour')
axes[0].set_ylabel('Load (MW)')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# Scatter plot
axes[1].scatter(y_true, y_pred, alpha=0.3, s=10, color='steelblue')
axes[1].plot([y_true.min(), y_true.max()],
[y_true.min(), y_true.max()],
'r--', linewidth=2, label='Perfect Prediction')
axes[1].set_title('Predicted vs Actual Load', fontsize=12, fontweight='bold')
axes[1].set_xlabel('Actual Load (MW)')
axes[1].set_ylabel('Predicted Load (MW)')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
# Save plot
plot_path = RESULTS_DIR / 'hybrid_forecast_predictions.png'
plt.savefig(plot_path, dpi=300, bbox_inches='tight')
logger.info(f"Saved prediction plot to {plot_path}")
plt.close()
def get_feature_importance(self, top_n: int = 20) -> pd.DataFrame:
"""
Get feature importance from XGBoost model.
Args:
top_n: Number of top features to return
Returns:
DataFrame with feature importance scores
"""
if self.xgb_model is None:
raise ValueError("Model must be fitted first")
# Get importance scores
importance = self.xgb_model.get_score(importance_type='gain')
# Convert to dataframe
importance_df = pd.DataFrame([
{'feature': k, 'importance': v}
for k, v in importance.items()
]).sort_values('importance', ascending=False)
return importance_df.head(top_n)
def save_model(self, filename: str = "hybrid_forecaster.pkl"):
"""Save model to disk."""
filepath = MODELS_DIR / filename
model_data = {
'trend_model': self.trend_model,
'xgb_model': self.xgb_model,
'n_train': self.n_train,
'trend_slope': self.trend_slope,
'trend_intercept': self.trend_intercept,
'feature_names': self.feature_names,
'train_stats': self.train_stats
}
with open(filepath, 'wb') as f:
pickle.dump(model_data, f)
logger.info(f"Saved model to {filepath}")
def load_model(self, filename: str = "hybrid_forecaster.pkl"):
"""Load model from disk."""
filepath = MODELS_DIR / filename
with open(filepath, 'rb') as f:
model_data = pickle.load(f)
self.trend_model = model_data['trend_model']
self.xgb_model = model_data['xgb_model']
self.n_train = model_data['n_train']
self.trend_slope = model_data['trend_slope']
self.trend_intercept = model_data['trend_intercept']
self.feature_names = model_data['feature_names']
self.train_stats = model_data['train_stats']
logger.info(f"Loaded model from {filepath}")
if __name__ == "__main__":
print("="*60)
print("Hybrid Trend+Seasonality Forecasting Demo")
print("="*60)
logger.info("This module provides hybrid load forecasting")
logger.info("Import and use with preprocessed ERCOT data")