Spaces:
Running
Running
| # %% [markdown] | |
| # # COP Results Joiner | |
| # This notebook joins all Excel files from `data/cop_modelling` into a single Parquet file. | |
| # %% | |
| import pandas as pd | |
| import os | |
| from pathlib import Path | |
| # %% | |
| # Define paths | |
| # Try to resolve data path dynamically based on current working directory | |
| current_dir = Path.cwd() | |
| if (current_dir / "data" / "cop_modelling").exists(): | |
| data_path = current_dir / "data" / "cop_modelling" | |
| elif (current_dir.parent / "data" / "cop_modelling").exists(): | |
| data_path = current_dir.parent / "data" / "cop_modelling" | |
| else: | |
| # Fallback | |
| data_path = Path("..") / "data" / "cop_modelling" | |
| output_file = data_path / "joined_results.parquet" | |
| # Configuration | |
| LOAD_FROM_PARQUET = True # Set to False to rebuild from Excel files | |
| # %% | |
| if LOAD_FROM_PARQUET and output_file.exists(): | |
| print(f"Loading data directly from {output_file.name}...") | |
| joined_df = pd.read_parquet(output_file) | |
| print(f"Loaded shape: {joined_df.shape}") | |
| else: | |
| # Get all Excel files | |
| excel_files = list(data_path.glob("*.xlsx")) | |
| print(f"Found {len(excel_files)} files in {data_path.resolve()}: {[f.name for f in excel_files]}") | |
| # Load and join | |
| dfs = [] | |
| for f in excel_files: | |
| try: | |
| # Results are in 'Results' sheet | |
| df = pd.read_excel(f, sheet_name='Results') | |
| # Drop the first row (which usually contains units) | |
| df = df.iloc[1:].reset_index(drop=True) | |
| # Add a column to identify the source | |
| df['source_file'] = f.name | |
| # Convert columns to numerical if possible, else convert to strings | |
| for col in df.columns: | |
| try: | |
| df[col] = pd.to_numeric(df[col], errors='raise') | |
| except (ValueError, TypeError): | |
| df[col] = df[col].astype(str) | |
| dfs.append(df) | |
| except Exception as e: | |
| print(f"Error reading {f}: {e}") | |
| if not dfs: | |
| raise ValueError(f"No objects to concatenate. Could not find or read any valid Excel files in {data_path.resolve()}.") | |
| joined_df = pd.concat(dfs, ignore_index=True) | |
| print(f"Joined shape: {joined_df.shape}") | |
| # Save to parquet | |
| joined_df.to_parquet(output_file) | |
| print(f"Saved to {output_file}") | |
| # %% | |
| # Quick preview | |
| joined_df | |
| print(joined_df.columns) | |
| # %% | |
| df = joined_df.copy() | |
| # Mayor | |
| df['t_diff_senke'] = df['T_Vorlauf_Senke'] - df['T_Rücklauf_Senke'] | |
| # Menor | |
| df['t_diff_quelle'] = df['T_Vorlauf_Quelle'] - df['T_Rücklauf_Quelle'] | |
| df['temp_hub'] = df['T_Vorlauf_Senke'] - df['T_Rücklauf_Quelle'] | |
| print(df['t_diff_quelle'].value_counts()) | |
| print(df['t_diff_senke'].value_counts()) | |
| print(df['Kompressor_Nr_Stufe1'].value_counts()) | |
| #%% | |
| import pandas as pd | |
| import plotly.graph_objects as go | |
| import ipywidgets as widgets | |
| from IPython.display import display, clear_output | |
| print(df.columns) | |
| # ============================================================ | |
| # Prepare dataframe | |
| # ============================================================ | |
| # Create new column | |
| df['temp_hub'] = df['T_Vorlauf_Senke'] - df['T_Rücklauf_Quelle'] | |
| # Keep required columns | |
| df = df[[ | |
| 'Medium_Senke', | |
| 'Kältemittel', | |
| 'T_Vorlauf_Quelle', | |
| 'T_Rücklauf_Quelle', | |
| 'T_Rücklauf_Senke', | |
| 'T_Vorlauf_Senke', | |
| 'Kompressor_Nr_Stufe1', | |
| 'COP', | |
| 'COP_Lorenz', | |
| 'source_file', | |
| 't_diff_senke', | |
| 't_diff_quelle', | |
| 'temp_hub' | |
| ]].copy() | |
| df = df.dropna() | |
| # Convert columns for filtering | |
| df['Kältemittel_filter'] = df['Kältemittel'].astype(str) | |
| df['Kompressor_filter'] = df['Kompressor_Nr_Stufe1'].astype(float).astype(int).astype(str) | |
| # Combine Kältemittel and compressor stage correctly | |
| df['Kältemittel_stufen'] = ( | |
| df['Kältemittel_filter'] + '_' + df['Kompressor_filter'] | |
| ) | |
| # Sort dataframe by temperature columns | |
| df = df.sort_values( | |
| by=['T_Rücklauf_Quelle', 'T_Vorlauf_Senke'], | |
| ascending=[True, True] | |
| ) | |
| #%% | |
| # ============================================================ | |
| # Train ML models for COP prediction: | |
| # Linear Regression, Polynomial Regression, MLP, XGBoost, | |
| # and Symbolic Regression with PySR | |
| # | |
| # Target: COP | |
| # Predictors: all other variables EXCEPT COP_Lorenz | |
| # | |
| # Metrics: R2, MAE, RMSE, WAPE | |
| # Best model selected by Test_RMSE | |
| # SHAP feature importance for best model | |
| # PySR symbolic formula printed | |
| # ============================================================ | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| import re | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| from IPython.display import display | |
| from sklearn.base import clone | |
| from sklearn.model_selection import train_test_split, KFold, cross_validate | |
| from sklearn.compose import ColumnTransformer | |
| from sklearn.pipeline import Pipeline | |
| from sklearn.preprocessing import StandardScaler, OneHotEncoder, PolynomialFeatures | |
| from sklearn.impute import SimpleImputer | |
| from sklearn.linear_model import LinearRegression, Ridge | |
| from sklearn.neural_network import MLPRegressor | |
| from sklearn.metrics import ( | |
| mean_squared_error, | |
| mean_absolute_error, | |
| r2_score, | |
| make_scorer | |
| ) | |
| # If needed, install: | |
| # pip install xgboost shap pysr | |
| from xgboost import XGBRegressor | |
| import shap | |
| # ============================================================ | |
| # PySR import | |
| # ============================================================ | |
| try: | |
| from pysr import PySRRegressor | |
| pysr_available = False | |
| except ImportError: | |
| pysr_available = False | |
| print("PySR is not installed.") | |
| print("Install it with:") | |
| print("pip install pysr") | |
| print("Note: PySR also needs Julia. First run can take some time.") | |
| # ============================================================ | |
| # 0. Define WAPE metric | |
| # ============================================================ | |
| def wape(y_true, y_pred): | |
| """ | |
| Weighted Absolute Percentage Error. | |
| WAPE = sum(|y_true - y_pred|) / sum(|y_true|) * 100 | |
| """ | |
| y_true = np.asarray(y_true) | |
| y_pred = np.asarray(y_pred) | |
| denominator = np.sum(np.abs(y_true)) | |
| if denominator == 0: | |
| return np.nan | |
| return np.sum(np.abs(y_true - y_pred)) / denominator * 100 | |
| wape_scorer = make_scorer(wape, greater_is_better=False) | |
| # ============================================================ | |
| # 1. Prepare dataframe | |
| # ============================================================ | |
| data = df.copy() | |
| # Optional: create temp_hub if not already existing | |
| if 'temp_hub' not in data.columns: | |
| data['temp_hub'] = data['T_Vorlauf_Senke'] - data['T_Rücklauf_Quelle'] | |
| # Target | |
| target_col = 'COP' | |
| # Columns to remove from predictors | |
| drop_cols = [ | |
| 'COP', # target | |
| 'COP_Lorenz' # explicitly excluded | |
| ] | |
| # Remove rows without target | |
| data = data.dropna(subset=[target_col]).copy() | |
| # Define X and y | |
| X = data.drop(columns=drop_cols, errors='ignore') | |
| y = data[target_col] | |
| print("Target:", target_col) | |
| print("\nPredictor columns:") | |
| print(X.columns.tolist()) | |
| # ============================================================ | |
| # 2. Detect numeric and categorical columns | |
| # ============================================================ | |
| numeric_features = X.select_dtypes(include=[np.number]).columns.tolist() | |
| categorical_features = X.select_dtypes(exclude=[np.number]).columns.tolist() | |
| print("\nNumeric features:") | |
| print(numeric_features) | |
| print("\nCategorical features:") | |
| print(categorical_features) | |
| # ============================================================ | |
| # 3. Train/test split | |
| # ============================================================ | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, | |
| y, | |
| test_size=0.2, | |
| random_state=42 | |
| ) | |
| # ============================================================ | |
| # 4. OneHotEncoder compatibility | |
| # ============================================================ | |
| try: | |
| onehot = OneHotEncoder(handle_unknown='ignore', sparse_output=False) | |
| except TypeError: | |
| onehot = OneHotEncoder(handle_unknown='ignore', sparse=False) | |
| # ============================================================ | |
| # 5. Preprocessors | |
| # ============================================================ | |
| numeric_transformer_standard = Pipeline(steps=[ | |
| ('imputer', SimpleImputer(strategy='median')), | |
| ('scaler', StandardScaler()) | |
| ]) | |
| categorical_transformer = Pipeline(steps=[ | |
| ('imputer', SimpleImputer(strategy='most_frequent')), | |
| ('onehot', onehot) | |
| ]) | |
| preprocessor_standard = ColumnTransformer( | |
| transformers=[ | |
| ('num', numeric_transformer_standard, numeric_features), | |
| ('cat', categorical_transformer, categorical_features) | |
| ], | |
| remainder='drop' | |
| ) | |
| numeric_transformer_poly = Pipeline(steps=[ | |
| ('imputer', SimpleImputer(strategy='median')), | |
| ('poly', PolynomialFeatures(degree=2, include_bias=False)), | |
| ('scaler', StandardScaler()) | |
| ]) | |
| preprocessor_poly = ColumnTransformer( | |
| transformers=[ | |
| ('num_poly', numeric_transformer_poly, numeric_features), | |
| ('cat', categorical_transformer, categorical_features) | |
| ], | |
| remainder='drop' | |
| ) | |
| # ============================================================ | |
| # 6. Helper functions | |
| # ============================================================ | |
| def to_dense(X_array): | |
| """Convert sparse matrix to dense if needed.""" | |
| if hasattr(X_array, "toarray"): | |
| return X_array.toarray() | |
| return X_array | |
| def get_feature_names(preprocessor, X_transformed): | |
| """Get transformed feature names.""" | |
| try: | |
| return list(preprocessor.get_feature_names_out()) | |
| except Exception: | |
| return [f"x{i}" for i in range(X_transformed.shape[1])] | |
| def make_pysr_model(niterations=100, verbosity=0, progress=False): | |
| """ | |
| Create PySR symbolic regression model. | |
| Increase niterations for better formulas. | |
| """ | |
| return PySRRegressor( | |
| niterations=niterations, | |
| binary_operators=[ | |
| "+", | |
| "-", | |
| "*", | |
| "/" | |
| ], | |
| unary_operators=[ | |
| "square", | |
| "cube", | |
| "abs" | |
| ], | |
| model_selection="best", | |
| maxsize=25, | |
| populations=15, | |
| population_size=50, | |
| parsimony=0.001, | |
| random_state=42, | |
| verbosity=verbosity, | |
| progress=progress, | |
| warm_start=False | |
| ) | |
| def evaluate_metrics(y_true, y_pred): | |
| return { | |
| 'R2': r2_score(y_true, y_pred), | |
| 'MAE': mean_absolute_error(y_true, y_pred), | |
| 'RMSE': np.sqrt(mean_squared_error(y_true, y_pred)), | |
| 'WAPE_%': wape(y_true, y_pred) | |
| } | |
| def predict_any_model(model_object, X_raw): | |
| """ | |
| Predict for either sklearn Pipeline or PySR dictionary object. | |
| """ | |
| if isinstance(model_object, Pipeline): | |
| return model_object.predict(X_raw) | |
| elif isinstance(model_object, dict) and model_object.get("type") == "pysr": | |
| preprocessor = model_object["preprocessor"] | |
| model = model_object["model"] | |
| X_proc = preprocessor.transform(X_raw) | |
| X_proc = to_dense(X_proc) | |
| return model.predict(X_proc) | |
| else: | |
| raise ValueError("Unknown model object type.") | |
| # ============================================================ | |
| # 7. Define standard sklearn models | |
| # ============================================================ | |
| models = { | |
| "Linear Regression": Pipeline(steps=[ | |
| ('preprocessor', preprocessor_standard), | |
| ('model', LinearRegression()) | |
| ]), | |
| "Polynomial Regression Degree 2": Pipeline(steps=[ | |
| ('preprocessor', preprocessor_poly), | |
| ('model', Ridge(alpha=1.0)) | |
| ]), | |
| "MLP Regressor": Pipeline(steps=[ | |
| ('preprocessor', preprocessor_standard), | |
| ('model', MLPRegressor( | |
| hidden_layer_sizes=(128, 64), | |
| activation='relu', | |
| solver='adam', | |
| alpha=0.0005, | |
| learning_rate_init=0.001, | |
| max_iter=1000, | |
| random_state=42, | |
| early_stopping=True | |
| )) | |
| ]), | |
| "XGBoost": Pipeline(steps=[ | |
| ('preprocessor', preprocessor_standard), | |
| ('model', XGBRegressor( | |
| n_estimators=500, | |
| max_depth=4, | |
| learning_rate=0.03, | |
| subsample=0.9, | |
| colsample_bytree=0.9, | |
| objective='reg:squarederror', | |
| random_state=42, | |
| n_jobs=-1 | |
| )) | |
| ]) | |
| } | |
| # ============================================================ | |
| # 8. Cross-validation and test evaluation | |
| # ============================================================ | |
| cv = KFold(n_splits=5, shuffle=True, random_state=42) | |
| results = [] | |
| fitted_models = {} | |
| # ------------------------------------------------------------ | |
| # 8.1 Train normal sklearn models | |
| # ------------------------------------------------------------ | |
| for name, pipe in models.items(): | |
| print("\n============================================================") | |
| print(f"Training model: {name}") | |
| print("============================================================") | |
| cv_scores = cross_validate( | |
| pipe, | |
| X_train, | |
| y_train, | |
| cv=cv, | |
| scoring={ | |
| 'r2': 'r2', | |
| 'neg_mae': 'neg_mean_absolute_error', | |
| 'neg_rmse': 'neg_root_mean_squared_error', | |
| 'neg_wape': wape_scorer | |
| }, | |
| n_jobs=-1, | |
| return_train_score=False | |
| ) | |
| cv_r2_mean = cv_scores['test_r2'].mean() | |
| cv_r2_std = cv_scores['test_r2'].std() | |
| cv_mae_mean = -cv_scores['test_neg_mae'].mean() | |
| cv_rmse_mean = -cv_scores['test_neg_rmse'].mean() | |
| cv_wape_mean = -cv_scores['test_neg_wape'].mean() | |
| cv_wape_std = cv_scores['test_neg_wape'].std() | |
| # Fit on full training data | |
| pipe.fit(X_train, y_train) | |
| # Predict test | |
| y_pred = pipe.predict(X_test) | |
| test_metrics = evaluate_metrics(y_test, y_pred) | |
| fitted_models[name] = pipe | |
| results.append({ | |
| 'Model': name, | |
| 'CV_R2_mean': cv_r2_mean, | |
| 'CV_R2_std': cv_r2_std, | |
| 'CV_MAE_mean': cv_mae_mean, | |
| 'CV_RMSE_mean': cv_rmse_mean, | |
| 'CV_WAPE_mean_%': cv_wape_mean, | |
| 'CV_WAPE_std_%': cv_wape_std, | |
| 'Test_R2': test_metrics['R2'], | |
| 'Test_MAE': test_metrics['MAE'], | |
| 'Test_RMSE': test_metrics['RMSE'], | |
| 'Test_WAPE_%': test_metrics['WAPE_%'] | |
| }) | |
| # ------------------------------------------------------------ | |
| # 8.2 Train PySR Symbolic Regression | |
| # ------------------------------------------------------------ | |
| if pysr_available: | |
| print("\n============================================================") | |
| print("Training model: PySR Symbolic Regression") | |
| print("============================================================") | |
| # You can increase these for better symbolic equations. | |
| # If PySR is too slow, reduce them. | |
| PYSR_CV_NITERATIONS = 40 | |
| PYSR_FINAL_NITERATIONS = 120 | |
| cv_r2_scores = [] | |
| cv_mae_scores = [] | |
| cv_rmse_scores = [] | |
| cv_wape_scores = [] | |
| fold_number = 1 | |
| for train_idx, val_idx in cv.split(X_train): | |
| print(f"\nPySR CV fold {fold_number}/5") | |
| X_tr_fold = X_train.iloc[train_idx] | |
| X_val_fold = X_train.iloc[val_idx] | |
| y_tr_fold = y_train.iloc[train_idx] | |
| y_val_fold = y_train.iloc[val_idx] | |
| # Fit fresh preprocessor for this fold | |
| fold_preprocessor = clone(preprocessor_standard) | |
| X_tr_proc = fold_preprocessor.fit_transform(X_tr_fold) | |
| X_val_proc = fold_preprocessor.transform(X_val_fold) | |
| X_tr_proc = to_dense(X_tr_proc) | |
| X_val_proc = to_dense(X_val_proc) | |
| # Fit PySR for this fold | |
| fold_pysr = make_pysr_model( | |
| niterations=PYSR_CV_NITERATIONS, | |
| verbosity=0, | |
| progress=False | |
| ) | |
| fold_pysr.fit(X_tr_proc, np.asarray(y_tr_fold)) | |
| y_val_pred = fold_pysr.predict(X_val_proc) | |
| fold_metrics = evaluate_metrics(y_val_fold, y_val_pred) | |
| cv_r2_scores.append(fold_metrics['R2']) | |
| cv_mae_scores.append(fold_metrics['MAE']) | |
| cv_rmse_scores.append(fold_metrics['RMSE']) | |
| cv_wape_scores.append(fold_metrics['WAPE_%']) | |
| fold_number += 1 | |
| # Fit final PySR model on full training data | |
| print("\nFitting final PySR model on full training data...") | |
| pysr_preprocessor = clone(preprocessor_standard) | |
| X_train_pysr = pysr_preprocessor.fit_transform(X_train) | |
| X_test_pysr = pysr_preprocessor.transform(X_test) | |
| X_train_pysr = to_dense(X_train_pysr) | |
| X_test_pysr = to_dense(X_test_pysr) | |
| pysr_feature_names = get_feature_names(pysr_preprocessor, X_train_pysr) | |
| pysr_model = make_pysr_model( | |
| niterations=PYSR_FINAL_NITERATIONS, | |
| verbosity=1, | |
| progress=True | |
| ) | |
| pysr_model.fit(X_train_pysr, np.asarray(y_train)) | |
| y_pred_pysr = pysr_model.predict(X_test_pysr) | |
| test_metrics_pysr = evaluate_metrics(y_test, y_pred_pysr) | |
| fitted_models["PySR Symbolic Regression"] = { | |
| "type": "pysr", | |
| "preprocessor": pysr_preprocessor, | |
| "model": pysr_model, | |
| "feature_names": pysr_feature_names | |
| } | |
| results.append({ | |
| 'Model': "PySR Symbolic Regression", | |
| 'CV_R2_mean': np.mean(cv_r2_scores), | |
| 'CV_R2_std': np.std(cv_r2_scores), | |
| 'CV_MAE_mean': np.mean(cv_mae_scores), | |
| 'CV_RMSE_mean': np.mean(cv_rmse_scores), | |
| 'CV_WAPE_mean_%': np.mean(cv_wape_scores), | |
| 'CV_WAPE_std_%': np.std(cv_wape_scores), | |
| 'Test_R2': test_metrics_pysr['R2'], | |
| 'Test_MAE': test_metrics_pysr['MAE'], | |
| 'Test_RMSE': test_metrics_pysr['RMSE'], | |
| 'Test_WAPE_%': test_metrics_pysr['WAPE_%'] | |
| }) | |
| else: | |
| print("\nSkipping PySR Symbolic Regression because PySR is not installed.") | |
| # ============================================================ | |
| # 9. Model comparison | |
| # ============================================================ | |
| results_df = pd.DataFrame(results).sort_values(by='Test_RMSE', ascending=True) | |
| print("\n============================================================") | |
| print("MODEL COMPARISON") | |
| print("============================================================") | |
| display(results_df) | |
| # ============================================================ | |
| # 10. Select best model | |
| # ============================================================ | |
| # Best model by RMSE | |
| best_model_name = results_df.iloc[0]['Model'] | |
| best_model = fitted_models[best_model_name] | |
| print("\n============================================================") | |
| print("BEST MODEL") | |
| print("============================================================") | |
| print(best_model_name) | |
| print("\nBest model metrics:") | |
| display(results_df[results_df['Model'] == best_model_name]) | |
| # If you prefer best by WAPE instead, use this: | |
| # results_df = pd.DataFrame(results).sort_values(by='Test_WAPE_%', ascending=True) | |
| # best_model_name = results_df.iloc[0]['Model'] | |
| # best_model = fitted_models[best_model_name] | |
| # ============================================================ | |
| # 11. Print PySR symbolic formula | |
| # ============================================================ | |
| if pysr_available and "PySR Symbolic Regression" in fitted_models: | |
| print("\n============================================================") | |
| print("PYSR SYMBOLIC REGRESSION FORMULA") | |
| print("============================================================") | |
| pysr_info = fitted_models["PySR Symbolic Regression"] | |
| pysr_model_final = pysr_info["model"] | |
| pysr_feature_names = pysr_info["feature_names"] | |
| print("\nBest PySR equation as string:") | |
| print(pysr_model_final) | |
| try: | |
| print("\nBest PySR equation as SymPy expression:") | |
| sympy_formula = pysr_model_final.sympy() | |
| print(sympy_formula) | |
| except Exception as e: | |
| print("\nCould not print SymPy formula.") | |
| print(e) | |
| sympy_formula = None | |
| print("\nAll discovered PySR equations:") | |
| try: | |
| display(pysr_model_final.equations_) | |
| except Exception as e: | |
| print("Could not display equations table.") | |
| print(e) | |
| # Feature mapping x0, x1, x2, ... to transformed columns | |
| print("\nFeature mapping for PySR formula:") | |
| mapping_df = pd.DataFrame({ | |
| "PySR_variable": [f"x{i}" for i in range(len(pysr_feature_names))], | |
| "Original_transformed_feature": pysr_feature_names | |
| }) | |
| # If possible, show only variables used in the formula | |
| try: | |
| formula_string = str(sympy_formula) if sympy_formula is not None else str(pysr_model_final) | |
| used_indices = sorted(set(int(i) for i in re.findall(r'\bx(\d+)\b', formula_string))) | |
| if len(used_indices) > 0: | |
| used_mapping_df = mapping_df.iloc[used_indices] | |
| print("\nVariables used in best PySR formula:") | |
| display(used_mapping_df) | |
| else: | |
| display(mapping_df) | |
| except Exception: | |
| display(mapping_df) | |
| else: | |
| print("\nNo PySR formula available.") | |
| # ============================================================ | |
| # 12. Visualize predicted vs actual for best model | |
| # ============================================================ | |
| y_pred_best = predict_any_model(best_model, X_test) | |
| plt.figure(figsize=(7, 6)) | |
| plt.scatter(y_test, y_pred_best, alpha=0.7) | |
| plt.plot( | |
| [y_test.min(), y_test.max()], | |
| [y_test.min(), y_test.max()], | |
| 'r--', | |
| linewidth=2 | |
| ) | |
| plt.xlabel("Actual COP") | |
| plt.ylabel("Predicted COP") | |
| plt.title(f"Actual vs Predicted COP - {best_model_name}") | |
| plt.grid(True) | |
| plt.show() | |
| residuals = y_test - y_pred_best | |
| plt.figure(figsize=(7, 5)) | |
| plt.scatter(y_pred_best, residuals, alpha=0.7) | |
| plt.axhline(0, color='red', linestyle='--') | |
| plt.xlabel("Predicted COP") | |
| plt.ylabel("Residuals") | |
| plt.title(f"Residual Plot - {best_model_name}") | |
| plt.grid(True) | |
| plt.show() | |
| # ============================================================ | |
| # 13. Model comparison plots | |
| # ============================================================ | |
| plt.figure(figsize=(10, 5)) | |
| plt.bar(results_df['Model'], results_df['Test_WAPE_%']) | |
| plt.ylabel("Test WAPE (%)") | |
| plt.title("Model Comparison by Test WAPE") | |
| plt.xticks(rotation=30, ha='right') | |
| plt.grid(axis='y') | |
| plt.tight_layout() | |
| plt.show() | |
| plt.figure(figsize=(10, 5)) | |
| plt.bar(results_df['Model'], results_df['Test_RMSE']) | |
| plt.ylabel("Test RMSE") | |
| plt.title("Model Comparison by Test RMSE") | |
| plt.xticks(rotation=30, ha='right') | |
| plt.grid(axis='y') | |
| plt.tight_layout() | |
| plt.show() | |
| # ============================================================ | |
| # 14. SHAP explanation for best model | |
| # ============================================================ | |
| print("\n============================================================") | |
| print("SHAP FEATURE IMPORTANCE") | |
| print("============================================================") | |
| # Extract fitted preprocessor and estimator | |
| if isinstance(best_model, Pipeline): | |
| best_preprocessor = best_model.named_steps['preprocessor'] | |
| best_estimator = best_model.named_steps['model'] | |
| elif isinstance(best_model, dict) and best_model.get("type") == "pysr": | |
| best_preprocessor = best_model["preprocessor"] | |
| best_estimator = best_model["model"] | |
| else: | |
| raise ValueError("Unknown best model type.") | |
| # Transform train/test data | |
| X_train_transformed = best_preprocessor.transform(X_train) | |
| X_test_transformed = best_preprocessor.transform(X_test) | |
| X_train_transformed = to_dense(X_train_transformed) | |
| X_test_transformed = to_dense(X_test_transformed) | |
| feature_names = get_feature_names(best_preprocessor, X_train_transformed) | |
| # Convert to DataFrame for SHAP | |
| X_train_shap = pd.DataFrame( | |
| X_train_transformed, | |
| columns=feature_names, | |
| index=X_train.index | |
| ) | |
| X_test_shap = pd.DataFrame( | |
| X_test_transformed, | |
| columns=feature_names, | |
| index=X_test.index | |
| ) | |
| # To keep SHAP fast, sample test rows if dataset is large | |
| max_shap_rows = 200 | |
| if len(X_test_shap) > max_shap_rows: | |
| X_shap_sample = X_test_shap.sample(max_shap_rows, random_state=42) | |
| else: | |
| X_shap_sample = X_test_shap.copy() | |
| # Background sample for SHAP | |
| max_background_rows = 100 | |
| if len(X_train_shap) > max_background_rows: | |
| X_background = X_train_shap.sample(max_background_rows, random_state=42) | |
| else: | |
| X_background = X_train_shap.copy() | |
| print("Best model for SHAP:", best_model_name) | |
| print("SHAP sample shape:", X_shap_sample.shape) | |
| print("Background shape:", X_background.shape) | |
| # ============================================================ | |
| # 15. SHAP explainer depending on model type | |
| # ============================================================ | |
| if best_model_name == "XGBoost": | |
| print("Using TreeExplainer for XGBoost...") | |
| explainer = shap.TreeExplainer(best_estimator) | |
| shap_values = explainer.shap_values(X_shap_sample) | |
| else: | |
| print("Using KernelExplainer for non-tree model...") | |
| print("This can be slower for Linear/Polynomial/MLP/PySR models.") | |
| X_background_np = X_background.values | |
| X_shap_sample_np = X_shap_sample.values | |
| def model_predict_preprocessed(X_array): | |
| return best_estimator.predict(X_array) | |
| explainer = shap.KernelExplainer( | |
| model_predict_preprocessed, | |
| X_background_np | |
| ) | |
| shap_values = explainer.shap_values( | |
| X_shap_sample_np, | |
| nsamples=100 | |
| ) | |
| # If SHAP returns a list, take first element | |
| if isinstance(shap_values, list): | |
| shap_values = shap_values[0] | |
| # ============================================================ | |
| # 16. SHAP summary plots | |
| # ============================================================ | |
| print("\nCreating SHAP bar plot...") | |
| shap.summary_plot( | |
| shap_values, | |
| X_shap_sample, | |
| plot_type="bar", | |
| max_display=25, | |
| show=True | |
| ) | |
| print("\nCreating SHAP beeswarm plot...") | |
| shap.summary_plot( | |
| shap_values, | |
| X_shap_sample, | |
| max_display=25, | |
| show=True | |
| ) | |
| # ============================================================ | |
| # 17. Table of most important features | |
| # ============================================================ | |
| mean_abs_shap = np.abs(shap_values).mean(axis=0) | |
| shap_importance = pd.DataFrame({ | |
| 'feature': feature_names, | |
| 'mean_abs_shap': mean_abs_shap | |
| }).sort_values(by='mean_abs_shap', ascending=False) | |
| print("\nTop 30 most important features:") | |
| display(shap_importance.head(30)) | |
| #%% | |
| ####### Optimziaiton | |
| ####### Optimziaiton | |
| ####### Optimziaiton | |
| ####### Optimziaiton | |
| ####### Optimziaiton | |
| ####### Optimziaiton | |
| ####### Optimziaiton | |
| ####### Optimziaiton | |
| ####### Optimziaiton | |
| ####### Optimziaiton | |
| ####### Optimziaiton | |
| #%% | |
| # ============================================================ | |
| # Black-box optimization of trained MLP / XGBoost model | |
| # Objective: maximize predicted COP | |
| # ============================================================ | |
| import numpy as np | |
| import pandas as pd | |
| from scipy.optimize import differential_evolution | |
| # ============================================================ | |
| # 1. Select model to optimize | |
| # ============================================================ | |
| # Choose one of your trained models: | |
| # "XGBoost" | |
| # "MLP Regressor" | |
| # or use best_model directly | |
| MODEL_TO_OPTIMIZE = "XGBoost" # change to "MLP Regressor" if you want | |
| model_to_optimize = fitted_models[MODEL_TO_OPTIMIZE] | |
| print("Optimizing model:", MODEL_TO_OPTIMIZE) | |
| # ============================================================ | |
| # 2. Recreate X if needed | |
| # ============================================================ | |
| data_opt = df.copy() | |
| if 'temp_hub' not in data_opt.columns: | |
| data_opt['temp_hub'] = data_opt['T_Vorlauf_Senke'] - data_opt['T_Rücklauf_Quelle'] | |
| target_col = 'COP' | |
| drop_cols = [ | |
| 'COP', | |
| 'COP_Lorenz' | |
| ] | |
| data_opt = data_opt.dropna(subset=[target_col]).copy() | |
| X_opt = data_opt.drop(columns=drop_cols, errors='ignore') | |
| y_opt = data_opt[target_col] | |
| print("Available input columns:") | |
| print(X_opt.columns.tolist()) | |
| # ============================================================ | |
| # 3. Choose one existing row as base operating point | |
| # ============================================================ | |
| # This row provides fixed values for variables that are NOT optimized, | |
| # for example Kältemittel, Medium_Senke, source_file, etc. | |
| base_row = X_opt.iloc[0].copy() | |
| print("\nBase row before optimization:") | |
| display(pd.DataFrame([base_row])) | |
| # ============================================================ | |
| # 4. Define input variables to optimize | |
| # ============================================================ | |
| # These are the continuous variables the optimizer can change. | |
| # You can modify this list. | |
| input_variables = [ | |
| 'T_Rücklauf_Quelle', | |
| 'T_Vorlauf_Quelle', | |
| 'T_Rücklauf_Senke', | |
| 'T_Vorlauf_Senke' | |
| ] | |
| # Check if all variables exist | |
| missing_input_vars = [v for v in input_variables if v not in X_opt.columns] | |
| if missing_input_vars: | |
| raise ValueError(f"These input variables are missing in X_opt: {missing_input_vars}") | |
| print("\nOptimized input variables:") | |
| print(input_variables) | |
| # ============================================================ | |
| # 5. Define bounds for each input variable | |
| # ============================================================ | |
| # Option A: | |
| # Use data-driven bounds from your real dataset. | |
| # This avoids crazy extrapolation outside the training domain. | |
| bounds = [] | |
| for var in input_variables: | |
| lower = X_opt[var].quantile(0.05) | |
| upper = X_opt[var].quantile(0.95) | |
| bounds.append((lower, upper)) | |
| print("\nData-driven bounds:") | |
| for var, b in zip(input_variables, bounds): | |
| print(f"{var}: {b}") | |
| # ------------------------------------------------------------ | |
| # Option B: | |
| # If you prefer manual physical bounds, use this instead: | |
| # ------------------------------------------------------------ | |
| # bounds = [ | |
| # (-10, 30), # T_Rücklauf_Quelle | |
| # (-5, 35), # T_Vorlauf_Quelle | |
| # (20, 70), # T_Rücklauf_Senke | |
| # (30, 90), # T_Vorlauf_Senke | |
| # ] | |
| # ============================================================ | |
| # 6. Optional: fix categorical / discrete values | |
| # ============================================================ | |
| # You can force specific values here. | |
| # Only use values that exist in your original data. | |
| # If you do not want to force anything, leave this dictionary empty. | |
| fixed_values = { | |
| # Example: | |
| # 'Kältemittel': 'R290', | |
| # 'Medium_Senke': 'Wasser', | |
| # 'Kompressor_Nr_Stufe1': 1, | |
| } | |
| for col, val in fixed_values.items(): | |
| if col in base_row.index: | |
| base_row[col] = val | |
| # ============================================================ | |
| # 7. Helper function: build candidate input row | |
| # ============================================================ | |
| def build_candidate_row(x_values): | |
| """ | |
| Takes optimizer variables and returns one full input row | |
| with all required model columns. | |
| """ | |
| row = base_row.copy() | |
| # Set optimized input values | |
| for var, val in zip(input_variables, x_values): | |
| row[var] = val | |
| # Recalculate dependent variables if they exist | |
| if all(c in row.index for c in ['T_Vorlauf_Senke', 'T_Rücklauf_Quelle']): | |
| if 'temp_hub' in row.index: | |
| row['temp_hub'] = row['T_Vorlauf_Senke'] - row['T_Rücklauf_Quelle'] | |
| if all(c in row.index for c in ['T_Vorlauf_Senke', 'T_Rücklauf_Senke']): | |
| if 't_diff_senke' in row.index: | |
| row['t_diff_senke'] = row['T_Vorlauf_Senke'] - row['T_Rücklauf_Senke'] | |
| if all(c in row.index for c in ['T_Vorlauf_Quelle', 'T_Rücklauf_Quelle']): | |
| if 't_diff_quelle' in row.index: | |
| row['t_diff_quelle'] = row['T_Vorlauf_Quelle'] - row['T_Rücklauf_Quelle'] | |
| return row | |
| # ============================================================ | |
| # 8. Physical constraints / penalty function | |
| # ============================================================ | |
| def constraint_penalty(row): | |
| """ | |
| Returns penalty if the candidate is physically unrealistic. | |
| The optimizer minimizes objective, so penalties should be positive. | |
| """ | |
| penalty = 0.0 | |
| # Constraint 1: | |
| # Senke Vorlauf should be greater than Senke Rücklauf | |
| if all(c in row.index for c in ['T_Vorlauf_Senke', 'T_Rücklauf_Senke']): | |
| delta_senke = row['T_Vorlauf_Senke'] - row['T_Rücklauf_Senke'] | |
| if delta_senke <= 0: | |
| penalty += 1e6 | |
| # Optional realistic range | |
| if delta_senke < 2: | |
| penalty += 1e4 * (2 - delta_senke) | |
| if delta_senke > 30: | |
| penalty += 1e4 * (delta_senke - 30) | |
| # Constraint 2: | |
| # Quelle Vorlauf should be greater than Quelle Rücklauf | |
| if all(c in row.index for c in ['T_Vorlauf_Quelle', 'T_Rücklauf_Quelle']): | |
| delta_quelle = row['T_Vorlauf_Quelle'] - row['T_Rücklauf_Quelle'] | |
| if delta_quelle <= 0: | |
| penalty += 1e6 | |
| # Optional realistic range | |
| if delta_quelle < 1: | |
| penalty += 1e4 * (1 - delta_quelle) | |
| if delta_quelle > 25: | |
| penalty += 1e4 * (delta_quelle - 25) | |
| # Constraint 3: | |
| # Temperature lift / hub | |
| if all(c in row.index for c in ['T_Vorlauf_Senke', 'T_Rücklauf_Quelle']): | |
| temp_hub = row['T_Vorlauf_Senke'] - row['T_Rücklauf_Quelle'] | |
| if temp_hub <= 0: | |
| penalty += 1e6 | |
| # Optional realistic range | |
| if temp_hub < 10: | |
| penalty += 1e4 * (10 - temp_hub) | |
| if temp_hub > 90: | |
| penalty += 1e4 * (temp_hub - 90) | |
| return penalty | |
| # ============================================================ | |
| # 9. Objective function | |
| # ============================================================ | |
| def objective(x_values): | |
| """ | |
| scipy minimizes this function. | |
| We want to maximize COP, so objective = - predicted COP + penalty. | |
| """ | |
| row = build_candidate_row(x_values) | |
| penalty = constraint_penalty(row) | |
| # Convert single row to DataFrame | |
| X_candidate = pd.DataFrame([row]) | |
| # Make sure column order is identical to training input | |
| X_candidate = X_candidate[X_opt.columns] | |
| # Predict COP | |
| try: | |
| predicted_cop = model_to_optimize.predict(X_candidate)[0] | |
| except Exception as e: | |
| print("Prediction error:", e) | |
| return 1e9 | |
| # Minimize negative COP | |
| return -predicted_cop + penalty | |
| # ============================================================ | |
| # 10. Run black-box optimization | |
| # ============================================================ | |
| result = differential_evolution( | |
| objective, | |
| bounds=bounds, | |
| strategy='best1bin', | |
| maxiter=150, | |
| popsize=20, | |
| tol=1e-6, | |
| mutation=(0.5, 1.0), | |
| recombination=0.7, | |
| seed=42, | |
| polish=True, | |
| workers=1 | |
| ) | |
| # ============================================================ | |
| # 11. Extract optimized result | |
| # ============================================================ | |
| best_x = result.x | |
| best_row = build_candidate_row(best_x) | |
| best_input_df = pd.DataFrame([best_row]) | |
| best_input_df = best_input_df[X_opt.columns] | |
| best_predicted_cop = model_to_optimize.predict(best_input_df)[0] | |
| print("\n============================================================") | |
| print("OPTIMIZATION RESULT") | |
| print("============================================================") | |
| print("\nOptimization success:", result.success) | |
| print("Optimizer message:", result.message) | |
| print("\nBest optimized input variables:") | |
| for var, val in zip(input_variables, best_x): | |
| print(f"{var}: {val:.4f}") | |
| print(f"\nPredicted maximum COP: {best_predicted_cop:.4f}") | |
| print("\nFull optimized input row:") | |
| display(best_input_df) | |
| # ============================================================ | |
| # 12. Compare base row vs optimized row | |
| # ============================================================ | |
| base_input_df = pd.DataFrame([base_row]) | |
| base_input_df = base_input_df[X_opt.columns] | |
| base_predicted_cop = model_to_optimize.predict(base_input_df)[0] | |
| comparison_df = pd.DataFrame({ | |
| 'Variable': X_opt.columns, | |
| 'Base_value': base_input_df.iloc[0].values, | |
| 'Optimized_value': best_input_df.iloc[0].values | |
| }) | |
| print("\n============================================================") | |
| print("BASE VS OPTIMIZED") | |
| print("============================================================") | |
| print(f"Base predicted COP: {base_predicted_cop:.4f}") | |
| print(f"Optimized predicted COP: {best_predicted_cop:.4f}") | |
| print(f"Improvement: {best_predicted_cop - base_predicted_cop:.4f}") | |
| display(comparison_df) | |
| #%% | |
| import subprocess | |
| import sys | |
| def export_to_html(script_name="cop_modelling.py"): | |
| """ | |
| Exports the current script/notebook to an HTML file. | |
| Note: If you are using an Interactive Window in VS Code, you can also | |
| just click the 'Export' button in the toolbar at the top of the window! | |
| """ | |
| print(f"Exporting {script_name} to HTML...") | |
| try: | |
| # First, we need to convert the .py script to a .ipynb notebook using jupytext | |
| print("1. Converting .py to .ipynb format...") | |
| subprocess.run([sys.executable, "-m", "jupytext", "--to", "notebook", script_name], check=True) | |
| notebook_name = script_name.replace(".py", ".ipynb") | |
| print("2. Executing notebook and generating HTML...") | |
| result = subprocess.run( | |
| [sys.executable, "-m", "jupyter", "nbconvert", "--to", "html", "--execute", notebook_name], | |
| capture_output=True, text=True | |
| ) | |
| if result.returncode == 0: | |
| print(f"Successfully exported to HTML! Look for {notebook_name.replace('.ipynb', '.html')} in your directory.") | |
| # Optional: Clean up the intermediate .ipynb file | |
| import os | |
| if os.path.exists(notebook_name): | |
| os.remove(notebook_name) | |
| else: | |
| print("Failed to export. Error:") | |
| print(result.stderr) | |
| except Exception as e: | |
| print(f"Error during export: {e}") | |
| print("Make sure you have jupyter, nbconvert, and jupytext installed.") | |
| # Uncomment the line below to automatically export to HTML when you "Run All" | |
| export_to_html("cop_modelling.py") | |
| # %% | |