alanoee's picture
Initial commit: Energy consumption prediction model
470deb6
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from sklearn.linear_model import LinearRegression
import pickle
import joblib
import os
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')
class EnergyConsumptionPredictor:
def __init__(self):
self.models = {
'random_forest': RandomForestRegressor(n_estimators=100, random_state=42),
'gradient_boosting': GradientBoostingRegressor(n_estimators=100, random_state=42),
'linear_regression': LinearRegression()
}
self.best_model = None
self.best_model_name = None
self.scaler = StandardScaler()
self.feature_columns = None
self.data_stats = {}
def _create_features(self, df):
features_df = df.copy()
# Moving averages
for window in [3, 6]:
if len(df) > window:
features_df[f'consumption_ma_{window}'] = features_df['Consumption'].rolling(window=window).mean()
features_df[f'consumption_std_{window}'] = features_df['Consumption'].rolling(window=window).std()
# Lag features
for lag in [1, 2, 3]:
if len(df) > lag:
features_df[f'consumption_lag_{lag}'] = features_df['Consumption'].shift(lag)
# Seasonal indicators
features_df['is_winter'] = features_df['Month'].isin([12, 1, 2]).astype(int)
features_df['is_summer'] = features_df['Month'].isin([6, 7, 8]).astype(int)
features_df['is_transition'] = features_df['Month'].isin([3, 4, 5, 9, 10, 11]).astype(int)
return features_df
def _prepare_training_data(self, df):
features_df = self._create_features(df)
features_df = features_df.dropna()
exclude_columns = ['Date', 'Consumption', 'Reading', 'Cost']
feature_columns = [col for col in features_df.columns if col not in exclude_columns]
self.feature_columns = feature_columns
X = features_df[feature_columns].values
y = features_df['Consumption'].values
return X, y
def train(self, df):
# Store data statistics for predictions
self.data_stats = {
'mean_consumption': df['Consumption'].mean(),
'std_consumption': df['Consumption'].std(),
'min_date': df['Date'].min(),
'max_date': df['Date'].max(),
'seasonal_patterns': df.groupby('Month')['Consumption'].mean().to_dict()
}
X, y = self._prepare_training_data(df)
if len(X) < 5:
return self._train_baseline_model(df)
X_scaled = self.scaler.fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42, shuffle=False)
model_scores = {}
for model_name, model in self.models.items():
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
r2 = r2_score(y_test, y_pred)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
mae = mean_absolute_error(y_test, y_pred)
cv_scores = cross_val_score(model, X_scaled, y, cv=3, scoring='r2')
model_scores[model_name] = {
'r2_score': r2,
'rmse': rmse,
'mae': mae,
'cv_score': cv_scores.mean()
}
# Select best model based on cross-validation
self.best_model_name = max(model_scores.keys(), key=lambda k: model_scores[k]['cv_score'])
self.best_model = self.models[self.best_model_name]
self.best_model.fit(X_scaled, y)
final_predictions = self.best_model.predict(X_scaled)
return {
'r2_score': r2_score(y, final_predictions),
'rmse': np.sqrt(mean_squared_error(y, final_predictions)),
'mae': mean_absolute_error(y, final_predictions),
'model_name': self.best_model_name,
'all_models': model_scores
}
def _train_baseline_model(self, df):
monthly_avg = df.groupby('Month')['Consumption'].mean()
overall_mean = df['Consumption'].mean()
self.baseline_predictions = monthly_avg.fillna(overall_mean).to_dict()
self.best_model_name = "baseline_seasonal"
return {
'r2_score': 0.0,
'rmse': df['Consumption'].std(),
'mae': df['Consumption'].std() * 0.8,
'model_name': 'baseline_seasonal'
}
def predict_future(self, months=12):
if self.best_model_name == "baseline_seasonal":
return self._predict_baseline(months)
last_date = self.data_stats['max_date']
future_dates = pd.date_range(start=last_date + pd.DateOffset(months=1), periods=months, freq='MS')
predictions = []
for date in future_dates:
features = {
'Month': date.month,
'Year': date.year,
'DayOfYear': date.timetuple().tm_yday,
'Quarter': date.quarter,
'days_since_start': (date - self.data_stats['min_date']).days,
'month_sin': np.sin(2 * np.pi * date.month / 12),
'month_cos': np.cos(2 * np.pi * date.month / 12),
'is_winter': int(date.month in [12, 1, 2]),
'is_summer': int(date.month in [6, 7, 8]),
'is_transition': int(date.month in [3, 4, 5, 9, 10, 11])
}
# Use seasonal patterns for lag/moving average features
seasonal_consumption = self.data_stats['seasonal_patterns'].get(date.month, self.data_stats['mean_consumption'])
for window in [3, 6]:
features[f'consumption_ma_{window}'] = seasonal_consumption
features[f'consumption_std_{window}'] = self.data_stats['std_consumption']
for lag in [1, 2, 3]:
features[f'consumption_lag_{lag}'] = seasonal_consumption
feature_vector = np.array([[features[col] for col in self.feature_columns]])
feature_vector_scaled = self.scaler.transform(feature_vector)
prediction = self.best_model.predict(feature_vector_scaled)[0]
# Add some noise to make predictions more realistic
prediction = max(0, prediction + np.random.normal(0, self.data_stats['std_consumption'] * 0.1))
predictions.append(prediction)
# Calculate costs - using hardcoded values for standalone model
ENERGY_RATE = 0.6972
DISTRIBUTION_MULTIPLIER = 0.5068
VAT_RATE = 0.23
results_df = pd.DataFrame({
'Date': future_dates,
'Predicted_Consumption': predictions,
'Month': future_dates.month,
'Year': future_dates.year
})
energy_cost = results_df['Predicted_Consumption'] * ENERGY_RATE
distribution_fee = energy_cost * DISTRIBUTION_MULTIPLIER
subtotal = energy_cost + distribution_fee
vat = subtotal * VAT_RATE
results_df['Predicted_Cost'] = subtotal + vat
return results_df
def _predict_baseline(self, months):
last_date = self.data_stats['max_date']
future_dates = pd.date_range(start=last_date + pd.DateOffset(months=1), periods=months, freq='MS')
predictions = []
for date in future_dates:
seasonal_pred = self.baseline_predictions.get(date.month, self.data_stats['mean_consumption'])
predictions.append(max(0, seasonal_pred * (1 + np.random.normal(0, 0.1))))
ENERGY_RATE = 0.6972
DISTRIBUTION_MULTIPLIER = 0.5068
VAT_RATE = 0.23
results_df = pd.DataFrame({
'Date': future_dates,
'Predicted_Consumption': predictions,
'Month': future_dates.month,
'Year': future_dates.year
})
energy_cost = results_df['Predicted_Consumption'] * ENERGY_RATE
distribution_fee = energy_cost * DISTRIBUTION_MULTIPLIER
subtotal = energy_cost + distribution_fee
vat = subtotal * VAT_RATE
results_df['Predicted_Cost'] = subtotal + vat
return results_df
def get_feature_importance(self):
if hasattr(self.best_model, 'feature_importances_'):
importance_dict = dict(zip(self.feature_columns, self.best_model.feature_importances_))
return dict(sorted(importance_dict.items(), key=lambda x: x[1], reverse=True))
return {}
def save_model(self, filepath=None, format='joblib'):
if self.best_model is None:
raise ValueError("Model must be trained first. Use train() method.")
if filepath is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
extension = 'joblib' if format == 'joblib' else 'pkl'
filepath = f"energy_model_{self.best_model_name}_{timestamp}.{extension}"
os.makedirs(os.path.dirname(filepath) if os.path.dirname(filepath) else '.', exist_ok=True)
model_data = {
'best_model': self.best_model,
'best_model_name': self.best_model_name,
'scaler': self.scaler,
'feature_columns': self.feature_columns,
'data_stats': self.data_stats,
'models': self.models,
'baseline_predictions': getattr(self, 'baseline_predictions', None),
'metadata': {
'saved_at': datetime.now().isoformat(),
'model_type': self.best_model_name,
'feature_count': len(self.feature_columns) if self.feature_columns else 0
}
}
if format == 'joblib':
joblib.dump(model_data, filepath)
else:
with open(filepath, 'wb') as f:
pickle.dump(model_data, f)
return filepath
def load_model(self, filepath, format='auto'):
if not os.path.exists(filepath):
raise FileNotFoundError(f"File {filepath} does not exist.")
if format == 'auto':
if filepath.endswith('.joblib'):
format = 'joblib'
elif filepath.endswith('.pkl'):
format = 'pickle'
else:
format = 'joblib'
try:
if format == 'joblib':
model_data = joblib.load(filepath)
else:
with open(filepath, 'rb') as f:
model_data = pickle.load(f)
self.best_model = model_data['best_model']
self.best_model_name = model_data['best_model_name']
self.scaler = model_data['scaler']
self.feature_columns = model_data['feature_columns']
self.data_stats = model_data['data_stats']
self.models = model_data['models']
self.baseline_predictions = model_data.get('baseline_predictions')
except Exception as e:
raise ValueError(f"Error loading model: {str(e)}")
@classmethod
def from_file(cls, filepath, format='auto'):
model = cls()
model.load_model(filepath, format)
return model