HeatTransPlan / cop_analysis /cop_modelling.py
drzg15's picture
Initial code commit with LFS for binaries
c993983
# %% [markdown]
# # COP Results Joiner
# This notebook joins all Excel files from `data/cop_modelling` into a single Parquet file.
# %%
import pandas as pd
import os
from pathlib import Path
# %%
# Define paths
# Try to resolve data path dynamically based on current working directory
current_dir = Path.cwd()
if (current_dir / "data" / "cop_modelling").exists():
data_path = current_dir / "data" / "cop_modelling"
elif (current_dir.parent / "data" / "cop_modelling").exists():
data_path = current_dir.parent / "data" / "cop_modelling"
else:
# Fallback
data_path = Path("..") / "data" / "cop_modelling"
output_file = data_path / "joined_results.parquet"
# Configuration
LOAD_FROM_PARQUET = True # Set to False to rebuild from Excel files
# %%
if LOAD_FROM_PARQUET and output_file.exists():
print(f"Loading data directly from {output_file.name}...")
joined_df = pd.read_parquet(output_file)
print(f"Loaded shape: {joined_df.shape}")
else:
# Get all Excel files
excel_files = list(data_path.glob("*.xlsx"))
print(f"Found {len(excel_files)} files in {data_path.resolve()}: {[f.name for f in excel_files]}")
# Load and join
dfs = []
for f in excel_files:
try:
# Results are in 'Results' sheet
df = pd.read_excel(f, sheet_name='Results')
# Drop the first row (which usually contains units)
df = df.iloc[1:].reset_index(drop=True)
# Add a column to identify the source
df['source_file'] = f.name
# Convert columns to numerical if possible, else convert to strings
for col in df.columns:
try:
df[col] = pd.to_numeric(df[col], errors='raise')
except (ValueError, TypeError):
df[col] = df[col].astype(str)
dfs.append(df)
except Exception as e:
print(f"Error reading {f}: {e}")
if not dfs:
raise ValueError(f"No objects to concatenate. Could not find or read any valid Excel files in {data_path.resolve()}.")
joined_df = pd.concat(dfs, ignore_index=True)
print(f"Joined shape: {joined_df.shape}")
# Save to parquet
joined_df.to_parquet(output_file)
print(f"Saved to {output_file}")
# %%
# Quick preview
joined_df
print(joined_df.columns)
# %%
df = joined_df.copy()
# Mayor
df['t_diff_senke'] = df['T_Vorlauf_Senke'] - df['T_Rücklauf_Senke']
# Menor
df['t_diff_quelle'] = df['T_Vorlauf_Quelle'] - df['T_Rücklauf_Quelle']
df['temp_hub'] = df['T_Vorlauf_Senke'] - df['T_Rücklauf_Quelle']
print(df['t_diff_quelle'].value_counts())
print(df['t_diff_senke'].value_counts())
print(df['Kompressor_Nr_Stufe1'].value_counts())
#%%
import pandas as pd
import plotly.graph_objects as go
import ipywidgets as widgets
from IPython.display import display, clear_output
print(df.columns)
# ============================================================
# Prepare dataframe
# ============================================================
# Create new column
df['temp_hub'] = df['T_Vorlauf_Senke'] - df['T_Rücklauf_Quelle']
# Keep required columns
df = df[[
'Medium_Senke',
'Kältemittel',
'T_Vorlauf_Quelle',
'T_Rücklauf_Quelle',
'T_Rücklauf_Senke',
'T_Vorlauf_Senke',
'Kompressor_Nr_Stufe1',
'COP',
'COP_Lorenz',
'source_file',
't_diff_senke',
't_diff_quelle',
'temp_hub'
]].copy()
df = df.dropna()
# Convert columns for filtering
df['Kältemittel_filter'] = df['Kältemittel'].astype(str)
df['Kompressor_filter'] = df['Kompressor_Nr_Stufe1'].astype(float).astype(int).astype(str)
# Combine Kältemittel and compressor stage correctly
df['Kältemittel_stufen'] = (
df['Kältemittel_filter'] + '_' + df['Kompressor_filter']
)
# Sort dataframe by temperature columns
df = df.sort_values(
by=['T_Rücklauf_Quelle', 'T_Vorlauf_Senke'],
ascending=[True, True]
)
#%%
# ============================================================
# Train ML models for COP prediction:
# Linear Regression, Polynomial Regression, MLP, XGBoost,
# and Symbolic Regression with PySR
#
# Target: COP
# Predictors: all other variables EXCEPT COP_Lorenz
#
# Metrics: R2, MAE, RMSE, WAPE
# Best model selected by Test_RMSE
# SHAP feature importance for best model
# PySR symbolic formula printed
# ============================================================
import warnings
warnings.filterwarnings("ignore")
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from IPython.display import display
from sklearn.base import clone
from sklearn.model_selection import train_test_split, KFold, cross_validate
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder, PolynomialFeatures
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import (
mean_squared_error,
mean_absolute_error,
r2_score,
make_scorer
)
# If needed, install:
# pip install xgboost shap pysr
from xgboost import XGBRegressor
import shap
# ============================================================
# PySR import
# ============================================================
try:
from pysr import PySRRegressor
pysr_available = False
except ImportError:
pysr_available = False
print("PySR is not installed.")
print("Install it with:")
print("pip install pysr")
print("Note: PySR also needs Julia. First run can take some time.")
# ============================================================
# 0. Define WAPE metric
# ============================================================
def wape(y_true, y_pred):
"""
Weighted Absolute Percentage Error.
WAPE = sum(|y_true - y_pred|) / sum(|y_true|) * 100
"""
y_true = np.asarray(y_true)
y_pred = np.asarray(y_pred)
denominator = np.sum(np.abs(y_true))
if denominator == 0:
return np.nan
return np.sum(np.abs(y_true - y_pred)) / denominator * 100
wape_scorer = make_scorer(wape, greater_is_better=False)
# ============================================================
# 1. Prepare dataframe
# ============================================================
data = df.copy()
# Optional: create temp_hub if not already existing
if 'temp_hub' not in data.columns:
data['temp_hub'] = data['T_Vorlauf_Senke'] - data['T_Rücklauf_Quelle']
# Target
target_col = 'COP'
# Columns to remove from predictors
drop_cols = [
'COP', # target
'COP_Lorenz' # explicitly excluded
]
# Remove rows without target
data = data.dropna(subset=[target_col]).copy()
# Define X and y
X = data.drop(columns=drop_cols, errors='ignore')
y = data[target_col]
print("Target:", target_col)
print("\nPredictor columns:")
print(X.columns.tolist())
# ============================================================
# 2. Detect numeric and categorical columns
# ============================================================
numeric_features = X.select_dtypes(include=[np.number]).columns.tolist()
categorical_features = X.select_dtypes(exclude=[np.number]).columns.tolist()
print("\nNumeric features:")
print(numeric_features)
print("\nCategorical features:")
print(categorical_features)
# ============================================================
# 3. Train/test split
# ============================================================
X_train, X_test, y_train, y_test = train_test_split(
X,
y,
test_size=0.2,
random_state=42
)
# ============================================================
# 4. OneHotEncoder compatibility
# ============================================================
try:
onehot = OneHotEncoder(handle_unknown='ignore', sparse_output=False)
except TypeError:
onehot = OneHotEncoder(handle_unknown='ignore', sparse=False)
# ============================================================
# 5. Preprocessors
# ============================================================
numeric_transformer_standard = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler())
])
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('onehot', onehot)
])
preprocessor_standard = ColumnTransformer(
transformers=[
('num', numeric_transformer_standard, numeric_features),
('cat', categorical_transformer, categorical_features)
],
remainder='drop'
)
numeric_transformer_poly = Pipeline(steps=[
('imputer', SimpleImputer(strategy='median')),
('poly', PolynomialFeatures(degree=2, include_bias=False)),
('scaler', StandardScaler())
])
preprocessor_poly = ColumnTransformer(
transformers=[
('num_poly', numeric_transformer_poly, numeric_features),
('cat', categorical_transformer, categorical_features)
],
remainder='drop'
)
# ============================================================
# 6. Helper functions
# ============================================================
def to_dense(X_array):
"""Convert sparse matrix to dense if needed."""
if hasattr(X_array, "toarray"):
return X_array.toarray()
return X_array
def get_feature_names(preprocessor, X_transformed):
"""Get transformed feature names."""
try:
return list(preprocessor.get_feature_names_out())
except Exception:
return [f"x{i}" for i in range(X_transformed.shape[1])]
def make_pysr_model(niterations=100, verbosity=0, progress=False):
"""
Create PySR symbolic regression model.
Increase niterations for better formulas.
"""
return PySRRegressor(
niterations=niterations,
binary_operators=[
"+",
"-",
"*",
"/"
],
unary_operators=[
"square",
"cube",
"abs"
],
model_selection="best",
maxsize=25,
populations=15,
population_size=50,
parsimony=0.001,
random_state=42,
verbosity=verbosity,
progress=progress,
warm_start=False
)
def evaluate_metrics(y_true, y_pred):
return {
'R2': r2_score(y_true, y_pred),
'MAE': mean_absolute_error(y_true, y_pred),
'RMSE': np.sqrt(mean_squared_error(y_true, y_pred)),
'WAPE_%': wape(y_true, y_pred)
}
def predict_any_model(model_object, X_raw):
"""
Predict for either sklearn Pipeline or PySR dictionary object.
"""
if isinstance(model_object, Pipeline):
return model_object.predict(X_raw)
elif isinstance(model_object, dict) and model_object.get("type") == "pysr":
preprocessor = model_object["preprocessor"]
model = model_object["model"]
X_proc = preprocessor.transform(X_raw)
X_proc = to_dense(X_proc)
return model.predict(X_proc)
else:
raise ValueError("Unknown model object type.")
# ============================================================
# 7. Define standard sklearn models
# ============================================================
models = {
"Linear Regression": Pipeline(steps=[
('preprocessor', preprocessor_standard),
('model', LinearRegression())
]),
"Polynomial Regression Degree 2": Pipeline(steps=[
('preprocessor', preprocessor_poly),
('model', Ridge(alpha=1.0))
]),
"MLP Regressor": Pipeline(steps=[
('preprocessor', preprocessor_standard),
('model', MLPRegressor(
hidden_layer_sizes=(128, 64),
activation='relu',
solver='adam',
alpha=0.0005,
learning_rate_init=0.001,
max_iter=1000,
random_state=42,
early_stopping=True
))
]),
"XGBoost": Pipeline(steps=[
('preprocessor', preprocessor_standard),
('model', XGBRegressor(
n_estimators=500,
max_depth=4,
learning_rate=0.03,
subsample=0.9,
colsample_bytree=0.9,
objective='reg:squarederror',
random_state=42,
n_jobs=-1
))
])
}
# ============================================================
# 8. Cross-validation and test evaluation
# ============================================================
cv = KFold(n_splits=5, shuffle=True, random_state=42)
results = []
fitted_models = {}
# ------------------------------------------------------------
# 8.1 Train normal sklearn models
# ------------------------------------------------------------
for name, pipe in models.items():
print("\n============================================================")
print(f"Training model: {name}")
print("============================================================")
cv_scores = cross_validate(
pipe,
X_train,
y_train,
cv=cv,
scoring={
'r2': 'r2',
'neg_mae': 'neg_mean_absolute_error',
'neg_rmse': 'neg_root_mean_squared_error',
'neg_wape': wape_scorer
},
n_jobs=-1,
return_train_score=False
)
cv_r2_mean = cv_scores['test_r2'].mean()
cv_r2_std = cv_scores['test_r2'].std()
cv_mae_mean = -cv_scores['test_neg_mae'].mean()
cv_rmse_mean = -cv_scores['test_neg_rmse'].mean()
cv_wape_mean = -cv_scores['test_neg_wape'].mean()
cv_wape_std = cv_scores['test_neg_wape'].std()
# Fit on full training data
pipe.fit(X_train, y_train)
# Predict test
y_pred = pipe.predict(X_test)
test_metrics = evaluate_metrics(y_test, y_pred)
fitted_models[name] = pipe
results.append({
'Model': name,
'CV_R2_mean': cv_r2_mean,
'CV_R2_std': cv_r2_std,
'CV_MAE_mean': cv_mae_mean,
'CV_RMSE_mean': cv_rmse_mean,
'CV_WAPE_mean_%': cv_wape_mean,
'CV_WAPE_std_%': cv_wape_std,
'Test_R2': test_metrics['R2'],
'Test_MAE': test_metrics['MAE'],
'Test_RMSE': test_metrics['RMSE'],
'Test_WAPE_%': test_metrics['WAPE_%']
})
# ------------------------------------------------------------
# 8.2 Train PySR Symbolic Regression
# ------------------------------------------------------------
if pysr_available:
print("\n============================================================")
print("Training model: PySR Symbolic Regression")
print("============================================================")
# You can increase these for better symbolic equations.
# If PySR is too slow, reduce them.
PYSR_CV_NITERATIONS = 40
PYSR_FINAL_NITERATIONS = 120
cv_r2_scores = []
cv_mae_scores = []
cv_rmse_scores = []
cv_wape_scores = []
fold_number = 1
for train_idx, val_idx in cv.split(X_train):
print(f"\nPySR CV fold {fold_number}/5")
X_tr_fold = X_train.iloc[train_idx]
X_val_fold = X_train.iloc[val_idx]
y_tr_fold = y_train.iloc[train_idx]
y_val_fold = y_train.iloc[val_idx]
# Fit fresh preprocessor for this fold
fold_preprocessor = clone(preprocessor_standard)
X_tr_proc = fold_preprocessor.fit_transform(X_tr_fold)
X_val_proc = fold_preprocessor.transform(X_val_fold)
X_tr_proc = to_dense(X_tr_proc)
X_val_proc = to_dense(X_val_proc)
# Fit PySR for this fold
fold_pysr = make_pysr_model(
niterations=PYSR_CV_NITERATIONS,
verbosity=0,
progress=False
)
fold_pysr.fit(X_tr_proc, np.asarray(y_tr_fold))
y_val_pred = fold_pysr.predict(X_val_proc)
fold_metrics = evaluate_metrics(y_val_fold, y_val_pred)
cv_r2_scores.append(fold_metrics['R2'])
cv_mae_scores.append(fold_metrics['MAE'])
cv_rmse_scores.append(fold_metrics['RMSE'])
cv_wape_scores.append(fold_metrics['WAPE_%'])
fold_number += 1
# Fit final PySR model on full training data
print("\nFitting final PySR model on full training data...")
pysr_preprocessor = clone(preprocessor_standard)
X_train_pysr = pysr_preprocessor.fit_transform(X_train)
X_test_pysr = pysr_preprocessor.transform(X_test)
X_train_pysr = to_dense(X_train_pysr)
X_test_pysr = to_dense(X_test_pysr)
pysr_feature_names = get_feature_names(pysr_preprocessor, X_train_pysr)
pysr_model = make_pysr_model(
niterations=PYSR_FINAL_NITERATIONS,
verbosity=1,
progress=True
)
pysr_model.fit(X_train_pysr, np.asarray(y_train))
y_pred_pysr = pysr_model.predict(X_test_pysr)
test_metrics_pysr = evaluate_metrics(y_test, y_pred_pysr)
fitted_models["PySR Symbolic Regression"] = {
"type": "pysr",
"preprocessor": pysr_preprocessor,
"model": pysr_model,
"feature_names": pysr_feature_names
}
results.append({
'Model': "PySR Symbolic Regression",
'CV_R2_mean': np.mean(cv_r2_scores),
'CV_R2_std': np.std(cv_r2_scores),
'CV_MAE_mean': np.mean(cv_mae_scores),
'CV_RMSE_mean': np.mean(cv_rmse_scores),
'CV_WAPE_mean_%': np.mean(cv_wape_scores),
'CV_WAPE_std_%': np.std(cv_wape_scores),
'Test_R2': test_metrics_pysr['R2'],
'Test_MAE': test_metrics_pysr['MAE'],
'Test_RMSE': test_metrics_pysr['RMSE'],
'Test_WAPE_%': test_metrics_pysr['WAPE_%']
})
else:
print("\nSkipping PySR Symbolic Regression because PySR is not installed.")
# ============================================================
# 9. Model comparison
# ============================================================
results_df = pd.DataFrame(results).sort_values(by='Test_RMSE', ascending=True)
print("\n============================================================")
print("MODEL COMPARISON")
print("============================================================")
display(results_df)
# ============================================================
# 10. Select best model
# ============================================================
# Best model by RMSE
best_model_name = results_df.iloc[0]['Model']
best_model = fitted_models[best_model_name]
print("\n============================================================")
print("BEST MODEL")
print("============================================================")
print(best_model_name)
print("\nBest model metrics:")
display(results_df[results_df['Model'] == best_model_name])
# If you prefer best by WAPE instead, use this:
# results_df = pd.DataFrame(results).sort_values(by='Test_WAPE_%', ascending=True)
# best_model_name = results_df.iloc[0]['Model']
# best_model = fitted_models[best_model_name]
# ============================================================
# 11. Print PySR symbolic formula
# ============================================================
if pysr_available and "PySR Symbolic Regression" in fitted_models:
print("\n============================================================")
print("PYSR SYMBOLIC REGRESSION FORMULA")
print("============================================================")
pysr_info = fitted_models["PySR Symbolic Regression"]
pysr_model_final = pysr_info["model"]
pysr_feature_names = pysr_info["feature_names"]
print("\nBest PySR equation as string:")
print(pysr_model_final)
try:
print("\nBest PySR equation as SymPy expression:")
sympy_formula = pysr_model_final.sympy()
print(sympy_formula)
except Exception as e:
print("\nCould not print SymPy formula.")
print(e)
sympy_formula = None
print("\nAll discovered PySR equations:")
try:
display(pysr_model_final.equations_)
except Exception as e:
print("Could not display equations table.")
print(e)
# Feature mapping x0, x1, x2, ... to transformed columns
print("\nFeature mapping for PySR formula:")
mapping_df = pd.DataFrame({
"PySR_variable": [f"x{i}" for i in range(len(pysr_feature_names))],
"Original_transformed_feature": pysr_feature_names
})
# If possible, show only variables used in the formula
try:
formula_string = str(sympy_formula) if sympy_formula is not None else str(pysr_model_final)
used_indices = sorted(set(int(i) for i in re.findall(r'\bx(\d+)\b', formula_string)))
if len(used_indices) > 0:
used_mapping_df = mapping_df.iloc[used_indices]
print("\nVariables used in best PySR formula:")
display(used_mapping_df)
else:
display(mapping_df)
except Exception:
display(mapping_df)
else:
print("\nNo PySR formula available.")
# ============================================================
# 12. Visualize predicted vs actual for best model
# ============================================================
y_pred_best = predict_any_model(best_model, X_test)
plt.figure(figsize=(7, 6))
plt.scatter(y_test, y_pred_best, alpha=0.7)
plt.plot(
[y_test.min(), y_test.max()],
[y_test.min(), y_test.max()],
'r--',
linewidth=2
)
plt.xlabel("Actual COP")
plt.ylabel("Predicted COP")
plt.title(f"Actual vs Predicted COP - {best_model_name}")
plt.grid(True)
plt.show()
residuals = y_test - y_pred_best
plt.figure(figsize=(7, 5))
plt.scatter(y_pred_best, residuals, alpha=0.7)
plt.axhline(0, color='red', linestyle='--')
plt.xlabel("Predicted COP")
plt.ylabel("Residuals")
plt.title(f"Residual Plot - {best_model_name}")
plt.grid(True)
plt.show()
# ============================================================
# 13. Model comparison plots
# ============================================================
plt.figure(figsize=(10, 5))
plt.bar(results_df['Model'], results_df['Test_WAPE_%'])
plt.ylabel("Test WAPE (%)")
plt.title("Model Comparison by Test WAPE")
plt.xticks(rotation=30, ha='right')
plt.grid(axis='y')
plt.tight_layout()
plt.show()
plt.figure(figsize=(10, 5))
plt.bar(results_df['Model'], results_df['Test_RMSE'])
plt.ylabel("Test RMSE")
plt.title("Model Comparison by Test RMSE")
plt.xticks(rotation=30, ha='right')
plt.grid(axis='y')
plt.tight_layout()
plt.show()
# ============================================================
# 14. SHAP explanation for best model
# ============================================================
print("\n============================================================")
print("SHAP FEATURE IMPORTANCE")
print("============================================================")
# Extract fitted preprocessor and estimator
if isinstance(best_model, Pipeline):
best_preprocessor = best_model.named_steps['preprocessor']
best_estimator = best_model.named_steps['model']
elif isinstance(best_model, dict) and best_model.get("type") == "pysr":
best_preprocessor = best_model["preprocessor"]
best_estimator = best_model["model"]
else:
raise ValueError("Unknown best model type.")
# Transform train/test data
X_train_transformed = best_preprocessor.transform(X_train)
X_test_transformed = best_preprocessor.transform(X_test)
X_train_transformed = to_dense(X_train_transformed)
X_test_transformed = to_dense(X_test_transformed)
feature_names = get_feature_names(best_preprocessor, X_train_transformed)
# Convert to DataFrame for SHAP
X_train_shap = pd.DataFrame(
X_train_transformed,
columns=feature_names,
index=X_train.index
)
X_test_shap = pd.DataFrame(
X_test_transformed,
columns=feature_names,
index=X_test.index
)
# To keep SHAP fast, sample test rows if dataset is large
max_shap_rows = 200
if len(X_test_shap) > max_shap_rows:
X_shap_sample = X_test_shap.sample(max_shap_rows, random_state=42)
else:
X_shap_sample = X_test_shap.copy()
# Background sample for SHAP
max_background_rows = 100
if len(X_train_shap) > max_background_rows:
X_background = X_train_shap.sample(max_background_rows, random_state=42)
else:
X_background = X_train_shap.copy()
print("Best model for SHAP:", best_model_name)
print("SHAP sample shape:", X_shap_sample.shape)
print("Background shape:", X_background.shape)
# ============================================================
# 15. SHAP explainer depending on model type
# ============================================================
if best_model_name == "XGBoost":
print("Using TreeExplainer for XGBoost...")
explainer = shap.TreeExplainer(best_estimator)
shap_values = explainer.shap_values(X_shap_sample)
else:
print("Using KernelExplainer for non-tree model...")
print("This can be slower for Linear/Polynomial/MLP/PySR models.")
X_background_np = X_background.values
X_shap_sample_np = X_shap_sample.values
def model_predict_preprocessed(X_array):
return best_estimator.predict(X_array)
explainer = shap.KernelExplainer(
model_predict_preprocessed,
X_background_np
)
shap_values = explainer.shap_values(
X_shap_sample_np,
nsamples=100
)
# If SHAP returns a list, take first element
if isinstance(shap_values, list):
shap_values = shap_values[0]
# ============================================================
# 16. SHAP summary plots
# ============================================================
print("\nCreating SHAP bar plot...")
shap.summary_plot(
shap_values,
X_shap_sample,
plot_type="bar",
max_display=25,
show=True
)
print("\nCreating SHAP beeswarm plot...")
shap.summary_plot(
shap_values,
X_shap_sample,
max_display=25,
show=True
)
# ============================================================
# 17. Table of most important features
# ============================================================
mean_abs_shap = np.abs(shap_values).mean(axis=0)
shap_importance = pd.DataFrame({
'feature': feature_names,
'mean_abs_shap': mean_abs_shap
}).sort_values(by='mean_abs_shap', ascending=False)
print("\nTop 30 most important features:")
display(shap_importance.head(30))
#%%
####### Optimziaiton
####### Optimziaiton
####### Optimziaiton
####### Optimziaiton
####### Optimziaiton
####### Optimziaiton
####### Optimziaiton
####### Optimziaiton
####### Optimziaiton
####### Optimziaiton
####### Optimziaiton
#%%
# ============================================================
# Black-box optimization of trained MLP / XGBoost model
# Objective: maximize predicted COP
# ============================================================
import numpy as np
import pandas as pd
from scipy.optimize import differential_evolution
# ============================================================
# 1. Select model to optimize
# ============================================================
# Choose one of your trained models:
# "XGBoost"
# "MLP Regressor"
# or use best_model directly
MODEL_TO_OPTIMIZE = "XGBoost" # change to "MLP Regressor" if you want
model_to_optimize = fitted_models[MODEL_TO_OPTIMIZE]
print("Optimizing model:", MODEL_TO_OPTIMIZE)
# ============================================================
# 2. Recreate X if needed
# ============================================================
data_opt = df.copy()
if 'temp_hub' not in data_opt.columns:
data_opt['temp_hub'] = data_opt['T_Vorlauf_Senke'] - data_opt['T_Rücklauf_Quelle']
target_col = 'COP'
drop_cols = [
'COP',
'COP_Lorenz'
]
data_opt = data_opt.dropna(subset=[target_col]).copy()
X_opt = data_opt.drop(columns=drop_cols, errors='ignore')
y_opt = data_opt[target_col]
print("Available input columns:")
print(X_opt.columns.tolist())
# ============================================================
# 3. Choose one existing row as base operating point
# ============================================================
# This row provides fixed values for variables that are NOT optimized,
# for example Kältemittel, Medium_Senke, source_file, etc.
base_row = X_opt.iloc[0].copy()
print("\nBase row before optimization:")
display(pd.DataFrame([base_row]))
# ============================================================
# 4. Define input variables to optimize
# ============================================================
# These are the continuous variables the optimizer can change.
# You can modify this list.
input_variables = [
'T_Rücklauf_Quelle',
'T_Vorlauf_Quelle',
'T_Rücklauf_Senke',
'T_Vorlauf_Senke'
]
# Check if all variables exist
missing_input_vars = [v for v in input_variables if v not in X_opt.columns]
if missing_input_vars:
raise ValueError(f"These input variables are missing in X_opt: {missing_input_vars}")
print("\nOptimized input variables:")
print(input_variables)
# ============================================================
# 5. Define bounds for each input variable
# ============================================================
# Option A:
# Use data-driven bounds from your real dataset.
# This avoids crazy extrapolation outside the training domain.
bounds = []
for var in input_variables:
lower = X_opt[var].quantile(0.05)
upper = X_opt[var].quantile(0.95)
bounds.append((lower, upper))
print("\nData-driven bounds:")
for var, b in zip(input_variables, bounds):
print(f"{var}: {b}")
# ------------------------------------------------------------
# Option B:
# If you prefer manual physical bounds, use this instead:
# ------------------------------------------------------------
# bounds = [
# (-10, 30), # T_Rücklauf_Quelle
# (-5, 35), # T_Vorlauf_Quelle
# (20, 70), # T_Rücklauf_Senke
# (30, 90), # T_Vorlauf_Senke
# ]
# ============================================================
# 6. Optional: fix categorical / discrete values
# ============================================================
# You can force specific values here.
# Only use values that exist in your original data.
# If you do not want to force anything, leave this dictionary empty.
fixed_values = {
# Example:
# 'Kältemittel': 'R290',
# 'Medium_Senke': 'Wasser',
# 'Kompressor_Nr_Stufe1': 1,
}
for col, val in fixed_values.items():
if col in base_row.index:
base_row[col] = val
# ============================================================
# 7. Helper function: build candidate input row
# ============================================================
def build_candidate_row(x_values):
"""
Takes optimizer variables and returns one full input row
with all required model columns.
"""
row = base_row.copy()
# Set optimized input values
for var, val in zip(input_variables, x_values):
row[var] = val
# Recalculate dependent variables if they exist
if all(c in row.index for c in ['T_Vorlauf_Senke', 'T_Rücklauf_Quelle']):
if 'temp_hub' in row.index:
row['temp_hub'] = row['T_Vorlauf_Senke'] - row['T_Rücklauf_Quelle']
if all(c in row.index for c in ['T_Vorlauf_Senke', 'T_Rücklauf_Senke']):
if 't_diff_senke' in row.index:
row['t_diff_senke'] = row['T_Vorlauf_Senke'] - row['T_Rücklauf_Senke']
if all(c in row.index for c in ['T_Vorlauf_Quelle', 'T_Rücklauf_Quelle']):
if 't_diff_quelle' in row.index:
row['t_diff_quelle'] = row['T_Vorlauf_Quelle'] - row['T_Rücklauf_Quelle']
return row
# ============================================================
# 8. Physical constraints / penalty function
# ============================================================
def constraint_penalty(row):
"""
Returns penalty if the candidate is physically unrealistic.
The optimizer minimizes objective, so penalties should be positive.
"""
penalty = 0.0
# Constraint 1:
# Senke Vorlauf should be greater than Senke Rücklauf
if all(c in row.index for c in ['T_Vorlauf_Senke', 'T_Rücklauf_Senke']):
delta_senke = row['T_Vorlauf_Senke'] - row['T_Rücklauf_Senke']
if delta_senke <= 0:
penalty += 1e6
# Optional realistic range
if delta_senke < 2:
penalty += 1e4 * (2 - delta_senke)
if delta_senke > 30:
penalty += 1e4 * (delta_senke - 30)
# Constraint 2:
# Quelle Vorlauf should be greater than Quelle Rücklauf
if all(c in row.index for c in ['T_Vorlauf_Quelle', 'T_Rücklauf_Quelle']):
delta_quelle = row['T_Vorlauf_Quelle'] - row['T_Rücklauf_Quelle']
if delta_quelle <= 0:
penalty += 1e6
# Optional realistic range
if delta_quelle < 1:
penalty += 1e4 * (1 - delta_quelle)
if delta_quelle > 25:
penalty += 1e4 * (delta_quelle - 25)
# Constraint 3:
# Temperature lift / hub
if all(c in row.index for c in ['T_Vorlauf_Senke', 'T_Rücklauf_Quelle']):
temp_hub = row['T_Vorlauf_Senke'] - row['T_Rücklauf_Quelle']
if temp_hub <= 0:
penalty += 1e6
# Optional realistic range
if temp_hub < 10:
penalty += 1e4 * (10 - temp_hub)
if temp_hub > 90:
penalty += 1e4 * (temp_hub - 90)
return penalty
# ============================================================
# 9. Objective function
# ============================================================
def objective(x_values):
"""
scipy minimizes this function.
We want to maximize COP, so objective = - predicted COP + penalty.
"""
row = build_candidate_row(x_values)
penalty = constraint_penalty(row)
# Convert single row to DataFrame
X_candidate = pd.DataFrame([row])
# Make sure column order is identical to training input
X_candidate = X_candidate[X_opt.columns]
# Predict COP
try:
predicted_cop = model_to_optimize.predict(X_candidate)[0]
except Exception as e:
print("Prediction error:", e)
return 1e9
# Minimize negative COP
return -predicted_cop + penalty
# ============================================================
# 10. Run black-box optimization
# ============================================================
result = differential_evolution(
objective,
bounds=bounds,
strategy='best1bin',
maxiter=150,
popsize=20,
tol=1e-6,
mutation=(0.5, 1.0),
recombination=0.7,
seed=42,
polish=True,
workers=1
)
# ============================================================
# 11. Extract optimized result
# ============================================================
best_x = result.x
best_row = build_candidate_row(best_x)
best_input_df = pd.DataFrame([best_row])
best_input_df = best_input_df[X_opt.columns]
best_predicted_cop = model_to_optimize.predict(best_input_df)[0]
print("\n============================================================")
print("OPTIMIZATION RESULT")
print("============================================================")
print("\nOptimization success:", result.success)
print("Optimizer message:", result.message)
print("\nBest optimized input variables:")
for var, val in zip(input_variables, best_x):
print(f"{var}: {val:.4f}")
print(f"\nPredicted maximum COP: {best_predicted_cop:.4f}")
print("\nFull optimized input row:")
display(best_input_df)
# ============================================================
# 12. Compare base row vs optimized row
# ============================================================
base_input_df = pd.DataFrame([base_row])
base_input_df = base_input_df[X_opt.columns]
base_predicted_cop = model_to_optimize.predict(base_input_df)[0]
comparison_df = pd.DataFrame({
'Variable': X_opt.columns,
'Base_value': base_input_df.iloc[0].values,
'Optimized_value': best_input_df.iloc[0].values
})
print("\n============================================================")
print("BASE VS OPTIMIZED")
print("============================================================")
print(f"Base predicted COP: {base_predicted_cop:.4f}")
print(f"Optimized predicted COP: {best_predicted_cop:.4f}")
print(f"Improvement: {best_predicted_cop - base_predicted_cop:.4f}")
display(comparison_df)
#%%
import subprocess
import sys
def export_to_html(script_name="cop_modelling.py"):
"""
Exports the current script/notebook to an HTML file.
Note: If you are using an Interactive Window in VS Code, you can also
just click the 'Export' button in the toolbar at the top of the window!
"""
print(f"Exporting {script_name} to HTML...")
try:
# First, we need to convert the .py script to a .ipynb notebook using jupytext
print("1. Converting .py to .ipynb format...")
subprocess.run([sys.executable, "-m", "jupytext", "--to", "notebook", script_name], check=True)
notebook_name = script_name.replace(".py", ".ipynb")
print("2. Executing notebook and generating HTML...")
result = subprocess.run(
[sys.executable, "-m", "jupyter", "nbconvert", "--to", "html", "--execute", notebook_name],
capture_output=True, text=True
)
if result.returncode == 0:
print(f"Successfully exported to HTML! Look for {notebook_name.replace('.ipynb', '.html')} in your directory.")
# Optional: Clean up the intermediate .ipynb file
import os
if os.path.exists(notebook_name):
os.remove(notebook_name)
else:
print("Failed to export. Error:")
print(result.stderr)
except Exception as e:
print(f"Error during export: {e}")
print("Make sure you have jupyter, nbconvert, and jupytext installed.")
# Uncomment the line below to automatically export to HTML when you "Run All"
export_to_html("cop_modelling.py")
# %%