# %% [markdown] # # COP Results Joiner # This notebook joins all Excel files from `data/cop_modelling` into a single Parquet file. # %% import pandas as pd import os from pathlib import Path # %% # Define paths # Try to resolve data path dynamically based on current working directory current_dir = Path.cwd() if (current_dir / "data" / "cop_modelling").exists(): data_path = current_dir / "data" / "cop_modelling" elif (current_dir.parent / "data" / "cop_modelling").exists(): data_path = current_dir.parent / "data" / "cop_modelling" else: # Fallback data_path = Path("..") / "data" / "cop_modelling" output_file = data_path / "joined_results.parquet" # Configuration LOAD_FROM_PARQUET = True # Set to False to rebuild from Excel files # %% if LOAD_FROM_PARQUET and output_file.exists(): print(f"Loading data directly from {output_file.name}...") joined_df = pd.read_parquet(output_file) print(f"Loaded shape: {joined_df.shape}") else: # Get all Excel files excel_files = list(data_path.glob("*.xlsx")) print(f"Found {len(excel_files)} files in {data_path.resolve()}: {[f.name for f in excel_files]}") # Load and join dfs = [] for f in excel_files: try: # Results are in 'Results' sheet df = pd.read_excel(f, sheet_name='Results') # Drop the first row (which usually contains units) df = df.iloc[1:].reset_index(drop=True) # Add a column to identify the source df['source_file'] = f.name # Convert columns to numerical if possible, else convert to strings for col in df.columns: try: df[col] = pd.to_numeric(df[col], errors='raise') except (ValueError, TypeError): df[col] = df[col].astype(str) dfs.append(df) except Exception as e: print(f"Error reading {f}: {e}") if not dfs: raise ValueError(f"No objects to concatenate. Could not find or read any valid Excel files in {data_path.resolve()}.") joined_df = pd.concat(dfs, ignore_index=True) print(f"Joined shape: {joined_df.shape}") # Save to parquet joined_df.to_parquet(output_file) print(f"Saved to {output_file}") # %% # Quick preview joined_df print(joined_df.columns) # %% df = joined_df.copy() # Mayor df['t_diff_senke'] = df['T_Vorlauf_Senke'] - df['T_Rücklauf_Senke'] # Menor df['t_diff_quelle'] = df['T_Vorlauf_Quelle'] - df['T_Rücklauf_Quelle'] df['temp_hub'] = df['T_Vorlauf_Senke'] - df['T_Rücklauf_Quelle'] print(df['t_diff_quelle'].value_counts()) print(df['t_diff_senke'].value_counts()) print(df['Kompressor_Nr_Stufe1'].value_counts()) #%% import pandas as pd import plotly.graph_objects as go import ipywidgets as widgets from IPython.display import display, clear_output print(df.columns) # ============================================================ # Prepare dataframe # ============================================================ # Create new column df['temp_hub'] = df['T_Vorlauf_Senke'] - df['T_Rücklauf_Quelle'] # Keep required columns df = df[[ 'Medium_Senke', 'Kältemittel', 'T_Vorlauf_Quelle', 'T_Rücklauf_Quelle', 'T_Rücklauf_Senke', 'T_Vorlauf_Senke', 'Kompressor_Nr_Stufe1', 'COP', 'COP_Lorenz', 'source_file', 't_diff_senke', 't_diff_quelle', 'temp_hub' ]].copy() df = df.dropna() # Convert columns for filtering df['Kältemittel_filter'] = df['Kältemittel'].astype(str) df['Kompressor_filter'] = df['Kompressor_Nr_Stufe1'].astype(float).astype(int).astype(str) # Combine Kältemittel and compressor stage correctly df['Kältemittel_stufen'] = ( df['Kältemittel_filter'] + '_' + df['Kompressor_filter'] ) # Sort dataframe by temperature columns df = df.sort_values( by=['T_Rücklauf_Quelle', 'T_Vorlauf_Senke'], ascending=[True, True] ) #%% # ============================================================ # Train ML models for COP prediction: # Linear Regression, Polynomial Regression, MLP, XGBoost, # and Symbolic Regression with PySR # # Target: COP # Predictors: all other variables EXCEPT COP_Lorenz # # Metrics: R2, MAE, RMSE, WAPE # Best model selected by Test_RMSE # SHAP feature importance for best model # PySR symbolic formula printed # ============================================================ import warnings warnings.filterwarnings("ignore") import re import numpy as np import pandas as pd import matplotlib.pyplot as plt from IPython.display import display from sklearn.base import clone from sklearn.model_selection import train_test_split, KFold, cross_validate from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler, OneHotEncoder, PolynomialFeatures from sklearn.impute import SimpleImputer from sklearn.linear_model import LinearRegression, Ridge from sklearn.neural_network import MLPRegressor from sklearn.metrics import ( mean_squared_error, mean_absolute_error, r2_score, make_scorer ) # If needed, install: # pip install xgboost shap pysr from xgboost import XGBRegressor import shap # ============================================================ # PySR import # ============================================================ try: from pysr import PySRRegressor pysr_available = False except ImportError: pysr_available = False print("PySR is not installed.") print("Install it with:") print("pip install pysr") print("Note: PySR also needs Julia. First run can take some time.") # ============================================================ # 0. Define WAPE metric # ============================================================ def wape(y_true, y_pred): """ Weighted Absolute Percentage Error. WAPE = sum(|y_true - y_pred|) / sum(|y_true|) * 100 """ y_true = np.asarray(y_true) y_pred = np.asarray(y_pred) denominator = np.sum(np.abs(y_true)) if denominator == 0: return np.nan return np.sum(np.abs(y_true - y_pred)) / denominator * 100 wape_scorer = make_scorer(wape, greater_is_better=False) # ============================================================ # 1. Prepare dataframe # ============================================================ data = df.copy() # Optional: create temp_hub if not already existing if 'temp_hub' not in data.columns: data['temp_hub'] = data['T_Vorlauf_Senke'] - data['T_Rücklauf_Quelle'] # Target target_col = 'COP' # Columns to remove from predictors drop_cols = [ 'COP', # target 'COP_Lorenz' # explicitly excluded ] # Remove rows without target data = data.dropna(subset=[target_col]).copy() # Define X and y X = data.drop(columns=drop_cols, errors='ignore') y = data[target_col] print("Target:", target_col) print("\nPredictor columns:") print(X.columns.tolist()) # ============================================================ # 2. Detect numeric and categorical columns # ============================================================ numeric_features = X.select_dtypes(include=[np.number]).columns.tolist() categorical_features = X.select_dtypes(exclude=[np.number]).columns.tolist() print("\nNumeric features:") print(numeric_features) print("\nCategorical features:") print(categorical_features) # ============================================================ # 3. Train/test split # ============================================================ X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) # ============================================================ # 4. OneHotEncoder compatibility # ============================================================ try: onehot = OneHotEncoder(handle_unknown='ignore', sparse_output=False) except TypeError: onehot = OneHotEncoder(handle_unknown='ignore', sparse=False) # ============================================================ # 5. Preprocessors # ============================================================ numeric_transformer_standard = Pipeline(steps=[ ('imputer', SimpleImputer(strategy='median')), ('scaler', StandardScaler()) ]) categorical_transformer = Pipeline(steps=[ ('imputer', SimpleImputer(strategy='most_frequent')), ('onehot', onehot) ]) preprocessor_standard = ColumnTransformer( transformers=[ ('num', numeric_transformer_standard, numeric_features), ('cat', categorical_transformer, categorical_features) ], remainder='drop' ) numeric_transformer_poly = Pipeline(steps=[ ('imputer', SimpleImputer(strategy='median')), ('poly', PolynomialFeatures(degree=2, include_bias=False)), ('scaler', StandardScaler()) ]) preprocessor_poly = ColumnTransformer( transformers=[ ('num_poly', numeric_transformer_poly, numeric_features), ('cat', categorical_transformer, categorical_features) ], remainder='drop' ) # ============================================================ # 6. Helper functions # ============================================================ def to_dense(X_array): """Convert sparse matrix to dense if needed.""" if hasattr(X_array, "toarray"): return X_array.toarray() return X_array def get_feature_names(preprocessor, X_transformed): """Get transformed feature names.""" try: return list(preprocessor.get_feature_names_out()) except Exception: return [f"x{i}" for i in range(X_transformed.shape[1])] def make_pysr_model(niterations=100, verbosity=0, progress=False): """ Create PySR symbolic regression model. Increase niterations for better formulas. """ return PySRRegressor( niterations=niterations, binary_operators=[ "+", "-", "*", "/" ], unary_operators=[ "square", "cube", "abs" ], model_selection="best", maxsize=25, populations=15, population_size=50, parsimony=0.001, random_state=42, verbosity=verbosity, progress=progress, warm_start=False ) def evaluate_metrics(y_true, y_pred): return { 'R2': r2_score(y_true, y_pred), 'MAE': mean_absolute_error(y_true, y_pred), 'RMSE': np.sqrt(mean_squared_error(y_true, y_pred)), 'WAPE_%': wape(y_true, y_pred) } def predict_any_model(model_object, X_raw): """ Predict for either sklearn Pipeline or PySR dictionary object. """ if isinstance(model_object, Pipeline): return model_object.predict(X_raw) elif isinstance(model_object, dict) and model_object.get("type") == "pysr": preprocessor = model_object["preprocessor"] model = model_object["model"] X_proc = preprocessor.transform(X_raw) X_proc = to_dense(X_proc) return model.predict(X_proc) else: raise ValueError("Unknown model object type.") # ============================================================ # 7. Define standard sklearn models # ============================================================ models = { "Linear Regression": Pipeline(steps=[ ('preprocessor', preprocessor_standard), ('model', LinearRegression()) ]), "Polynomial Regression Degree 2": Pipeline(steps=[ ('preprocessor', preprocessor_poly), ('model', Ridge(alpha=1.0)) ]), "MLP Regressor": Pipeline(steps=[ ('preprocessor', preprocessor_standard), ('model', MLPRegressor( hidden_layer_sizes=(128, 64), activation='relu', solver='adam', alpha=0.0005, learning_rate_init=0.001, max_iter=1000, random_state=42, early_stopping=True )) ]), "XGBoost": Pipeline(steps=[ ('preprocessor', preprocessor_standard), ('model', XGBRegressor( n_estimators=500, max_depth=4, learning_rate=0.03, subsample=0.9, colsample_bytree=0.9, objective='reg:squarederror', random_state=42, n_jobs=-1 )) ]) } # ============================================================ # 8. Cross-validation and test evaluation # ============================================================ cv = KFold(n_splits=5, shuffle=True, random_state=42) results = [] fitted_models = {} # ------------------------------------------------------------ # 8.1 Train normal sklearn models # ------------------------------------------------------------ for name, pipe in models.items(): print("\n============================================================") print(f"Training model: {name}") print("============================================================") cv_scores = cross_validate( pipe, X_train, y_train, cv=cv, scoring={ 'r2': 'r2', 'neg_mae': 'neg_mean_absolute_error', 'neg_rmse': 'neg_root_mean_squared_error', 'neg_wape': wape_scorer }, n_jobs=-1, return_train_score=False ) cv_r2_mean = cv_scores['test_r2'].mean() cv_r2_std = cv_scores['test_r2'].std() cv_mae_mean = -cv_scores['test_neg_mae'].mean() cv_rmse_mean = -cv_scores['test_neg_rmse'].mean() cv_wape_mean = -cv_scores['test_neg_wape'].mean() cv_wape_std = cv_scores['test_neg_wape'].std() # Fit on full training data pipe.fit(X_train, y_train) # Predict test y_pred = pipe.predict(X_test) test_metrics = evaluate_metrics(y_test, y_pred) fitted_models[name] = pipe results.append({ 'Model': name, 'CV_R2_mean': cv_r2_mean, 'CV_R2_std': cv_r2_std, 'CV_MAE_mean': cv_mae_mean, 'CV_RMSE_mean': cv_rmse_mean, 'CV_WAPE_mean_%': cv_wape_mean, 'CV_WAPE_std_%': cv_wape_std, 'Test_R2': test_metrics['R2'], 'Test_MAE': test_metrics['MAE'], 'Test_RMSE': test_metrics['RMSE'], 'Test_WAPE_%': test_metrics['WAPE_%'] }) # ------------------------------------------------------------ # 8.2 Train PySR Symbolic Regression # ------------------------------------------------------------ if pysr_available: print("\n============================================================") print("Training model: PySR Symbolic Regression") print("============================================================") # You can increase these for better symbolic equations. # If PySR is too slow, reduce them. PYSR_CV_NITERATIONS = 40 PYSR_FINAL_NITERATIONS = 120 cv_r2_scores = [] cv_mae_scores = [] cv_rmse_scores = [] cv_wape_scores = [] fold_number = 1 for train_idx, val_idx in cv.split(X_train): print(f"\nPySR CV fold {fold_number}/5") X_tr_fold = X_train.iloc[train_idx] X_val_fold = X_train.iloc[val_idx] y_tr_fold = y_train.iloc[train_idx] y_val_fold = y_train.iloc[val_idx] # Fit fresh preprocessor for this fold fold_preprocessor = clone(preprocessor_standard) X_tr_proc = fold_preprocessor.fit_transform(X_tr_fold) X_val_proc = fold_preprocessor.transform(X_val_fold) X_tr_proc = to_dense(X_tr_proc) X_val_proc = to_dense(X_val_proc) # Fit PySR for this fold fold_pysr = make_pysr_model( niterations=PYSR_CV_NITERATIONS, verbosity=0, progress=False ) fold_pysr.fit(X_tr_proc, np.asarray(y_tr_fold)) y_val_pred = fold_pysr.predict(X_val_proc) fold_metrics = evaluate_metrics(y_val_fold, y_val_pred) cv_r2_scores.append(fold_metrics['R2']) cv_mae_scores.append(fold_metrics['MAE']) cv_rmse_scores.append(fold_metrics['RMSE']) cv_wape_scores.append(fold_metrics['WAPE_%']) fold_number += 1 # Fit final PySR model on full training data print("\nFitting final PySR model on full training data...") pysr_preprocessor = clone(preprocessor_standard) X_train_pysr = pysr_preprocessor.fit_transform(X_train) X_test_pysr = pysr_preprocessor.transform(X_test) X_train_pysr = to_dense(X_train_pysr) X_test_pysr = to_dense(X_test_pysr) pysr_feature_names = get_feature_names(pysr_preprocessor, X_train_pysr) pysr_model = make_pysr_model( niterations=PYSR_FINAL_NITERATIONS, verbosity=1, progress=True ) pysr_model.fit(X_train_pysr, np.asarray(y_train)) y_pred_pysr = pysr_model.predict(X_test_pysr) test_metrics_pysr = evaluate_metrics(y_test, y_pred_pysr) fitted_models["PySR Symbolic Regression"] = { "type": "pysr", "preprocessor": pysr_preprocessor, "model": pysr_model, "feature_names": pysr_feature_names } results.append({ 'Model': "PySR Symbolic Regression", 'CV_R2_mean': np.mean(cv_r2_scores), 'CV_R2_std': np.std(cv_r2_scores), 'CV_MAE_mean': np.mean(cv_mae_scores), 'CV_RMSE_mean': np.mean(cv_rmse_scores), 'CV_WAPE_mean_%': np.mean(cv_wape_scores), 'CV_WAPE_std_%': np.std(cv_wape_scores), 'Test_R2': test_metrics_pysr['R2'], 'Test_MAE': test_metrics_pysr['MAE'], 'Test_RMSE': test_metrics_pysr['RMSE'], 'Test_WAPE_%': test_metrics_pysr['WAPE_%'] }) else: print("\nSkipping PySR Symbolic Regression because PySR is not installed.") # ============================================================ # 9. Model comparison # ============================================================ results_df = pd.DataFrame(results).sort_values(by='Test_RMSE', ascending=True) print("\n============================================================") print("MODEL COMPARISON") print("============================================================") display(results_df) # ============================================================ # 10. Select best model # ============================================================ # Best model by RMSE best_model_name = results_df.iloc[0]['Model'] best_model = fitted_models[best_model_name] print("\n============================================================") print("BEST MODEL") print("============================================================") print(best_model_name) print("\nBest model metrics:") display(results_df[results_df['Model'] == best_model_name]) # If you prefer best by WAPE instead, use this: # results_df = pd.DataFrame(results).sort_values(by='Test_WAPE_%', ascending=True) # best_model_name = results_df.iloc[0]['Model'] # best_model = fitted_models[best_model_name] # ============================================================ # 11. Print PySR symbolic formula # ============================================================ if pysr_available and "PySR Symbolic Regression" in fitted_models: print("\n============================================================") print("PYSR SYMBOLIC REGRESSION FORMULA") print("============================================================") pysr_info = fitted_models["PySR Symbolic Regression"] pysr_model_final = pysr_info["model"] pysr_feature_names = pysr_info["feature_names"] print("\nBest PySR equation as string:") print(pysr_model_final) try: print("\nBest PySR equation as SymPy expression:") sympy_formula = pysr_model_final.sympy() print(sympy_formula) except Exception as e: print("\nCould not print SymPy formula.") print(e) sympy_formula = None print("\nAll discovered PySR equations:") try: display(pysr_model_final.equations_) except Exception as e: print("Could not display equations table.") print(e) # Feature mapping x0, x1, x2, ... to transformed columns print("\nFeature mapping for PySR formula:") mapping_df = pd.DataFrame({ "PySR_variable": [f"x{i}" for i in range(len(pysr_feature_names))], "Original_transformed_feature": pysr_feature_names }) # If possible, show only variables used in the formula try: formula_string = str(sympy_formula) if sympy_formula is not None else str(pysr_model_final) used_indices = sorted(set(int(i) for i in re.findall(r'\bx(\d+)\b', formula_string))) if len(used_indices) > 0: used_mapping_df = mapping_df.iloc[used_indices] print("\nVariables used in best PySR formula:") display(used_mapping_df) else: display(mapping_df) except Exception: display(mapping_df) else: print("\nNo PySR formula available.") # ============================================================ # 12. Visualize predicted vs actual for best model # ============================================================ y_pred_best = predict_any_model(best_model, X_test) plt.figure(figsize=(7, 6)) plt.scatter(y_test, y_pred_best, alpha=0.7) plt.plot( [y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', linewidth=2 ) plt.xlabel("Actual COP") plt.ylabel("Predicted COP") plt.title(f"Actual vs Predicted COP - {best_model_name}") plt.grid(True) plt.show() residuals = y_test - y_pred_best plt.figure(figsize=(7, 5)) plt.scatter(y_pred_best, residuals, alpha=0.7) plt.axhline(0, color='red', linestyle='--') plt.xlabel("Predicted COP") plt.ylabel("Residuals") plt.title(f"Residual Plot - {best_model_name}") plt.grid(True) plt.show() # ============================================================ # 13. Model comparison plots # ============================================================ plt.figure(figsize=(10, 5)) plt.bar(results_df['Model'], results_df['Test_WAPE_%']) plt.ylabel("Test WAPE (%)") plt.title("Model Comparison by Test WAPE") plt.xticks(rotation=30, ha='right') plt.grid(axis='y') plt.tight_layout() plt.show() plt.figure(figsize=(10, 5)) plt.bar(results_df['Model'], results_df['Test_RMSE']) plt.ylabel("Test RMSE") plt.title("Model Comparison by Test RMSE") plt.xticks(rotation=30, ha='right') plt.grid(axis='y') plt.tight_layout() plt.show() # ============================================================ # 14. SHAP explanation for best model # ============================================================ print("\n============================================================") print("SHAP FEATURE IMPORTANCE") print("============================================================") # Extract fitted preprocessor and estimator if isinstance(best_model, Pipeline): best_preprocessor = best_model.named_steps['preprocessor'] best_estimator = best_model.named_steps['model'] elif isinstance(best_model, dict) and best_model.get("type") == "pysr": best_preprocessor = best_model["preprocessor"] best_estimator = best_model["model"] else: raise ValueError("Unknown best model type.") # Transform train/test data X_train_transformed = best_preprocessor.transform(X_train) X_test_transformed = best_preprocessor.transform(X_test) X_train_transformed = to_dense(X_train_transformed) X_test_transformed = to_dense(X_test_transformed) feature_names = get_feature_names(best_preprocessor, X_train_transformed) # Convert to DataFrame for SHAP X_train_shap = pd.DataFrame( X_train_transformed, columns=feature_names, index=X_train.index ) X_test_shap = pd.DataFrame( X_test_transformed, columns=feature_names, index=X_test.index ) # To keep SHAP fast, sample test rows if dataset is large max_shap_rows = 200 if len(X_test_shap) > max_shap_rows: X_shap_sample = X_test_shap.sample(max_shap_rows, random_state=42) else: X_shap_sample = X_test_shap.copy() # Background sample for SHAP max_background_rows = 100 if len(X_train_shap) > max_background_rows: X_background = X_train_shap.sample(max_background_rows, random_state=42) else: X_background = X_train_shap.copy() print("Best model for SHAP:", best_model_name) print("SHAP sample shape:", X_shap_sample.shape) print("Background shape:", X_background.shape) # ============================================================ # 15. SHAP explainer depending on model type # ============================================================ if best_model_name == "XGBoost": print("Using TreeExplainer for XGBoost...") explainer = shap.TreeExplainer(best_estimator) shap_values = explainer.shap_values(X_shap_sample) else: print("Using KernelExplainer for non-tree model...") print("This can be slower for Linear/Polynomial/MLP/PySR models.") X_background_np = X_background.values X_shap_sample_np = X_shap_sample.values def model_predict_preprocessed(X_array): return best_estimator.predict(X_array) explainer = shap.KernelExplainer( model_predict_preprocessed, X_background_np ) shap_values = explainer.shap_values( X_shap_sample_np, nsamples=100 ) # If SHAP returns a list, take first element if isinstance(shap_values, list): shap_values = shap_values[0] # ============================================================ # 16. SHAP summary plots # ============================================================ print("\nCreating SHAP bar plot...") shap.summary_plot( shap_values, X_shap_sample, plot_type="bar", max_display=25, show=True ) print("\nCreating SHAP beeswarm plot...") shap.summary_plot( shap_values, X_shap_sample, max_display=25, show=True ) # ============================================================ # 17. Table of most important features # ============================================================ mean_abs_shap = np.abs(shap_values).mean(axis=0) shap_importance = pd.DataFrame({ 'feature': feature_names, 'mean_abs_shap': mean_abs_shap }).sort_values(by='mean_abs_shap', ascending=False) print("\nTop 30 most important features:") display(shap_importance.head(30)) #%% ####### Optimziaiton ####### Optimziaiton ####### Optimziaiton ####### Optimziaiton ####### Optimziaiton ####### Optimziaiton ####### Optimziaiton ####### Optimziaiton ####### Optimziaiton ####### Optimziaiton ####### Optimziaiton #%% # ============================================================ # Black-box optimization of trained MLP / XGBoost model # Objective: maximize predicted COP # ============================================================ import numpy as np import pandas as pd from scipy.optimize import differential_evolution # ============================================================ # 1. Select model to optimize # ============================================================ # Choose one of your trained models: # "XGBoost" # "MLP Regressor" # or use best_model directly MODEL_TO_OPTIMIZE = "XGBoost" # change to "MLP Regressor" if you want model_to_optimize = fitted_models[MODEL_TO_OPTIMIZE] print("Optimizing model:", MODEL_TO_OPTIMIZE) # ============================================================ # 2. Recreate X if needed # ============================================================ data_opt = df.copy() if 'temp_hub' not in data_opt.columns: data_opt['temp_hub'] = data_opt['T_Vorlauf_Senke'] - data_opt['T_Rücklauf_Quelle'] target_col = 'COP' drop_cols = [ 'COP', 'COP_Lorenz' ] data_opt = data_opt.dropna(subset=[target_col]).copy() X_opt = data_opt.drop(columns=drop_cols, errors='ignore') y_opt = data_opt[target_col] print("Available input columns:") print(X_opt.columns.tolist()) # ============================================================ # 3. Choose one existing row as base operating point # ============================================================ # This row provides fixed values for variables that are NOT optimized, # for example Kältemittel, Medium_Senke, source_file, etc. base_row = X_opt.iloc[0].copy() print("\nBase row before optimization:") display(pd.DataFrame([base_row])) # ============================================================ # 4. Define input variables to optimize # ============================================================ # These are the continuous variables the optimizer can change. # You can modify this list. input_variables = [ 'T_Rücklauf_Quelle', 'T_Vorlauf_Quelle', 'T_Rücklauf_Senke', 'T_Vorlauf_Senke' ] # Check if all variables exist missing_input_vars = [v for v in input_variables if v not in X_opt.columns] if missing_input_vars: raise ValueError(f"These input variables are missing in X_opt: {missing_input_vars}") print("\nOptimized input variables:") print(input_variables) # ============================================================ # 5. Define bounds for each input variable # ============================================================ # Option A: # Use data-driven bounds from your real dataset. # This avoids crazy extrapolation outside the training domain. bounds = [] for var in input_variables: lower = X_opt[var].quantile(0.05) upper = X_opt[var].quantile(0.95) bounds.append((lower, upper)) print("\nData-driven bounds:") for var, b in zip(input_variables, bounds): print(f"{var}: {b}") # ------------------------------------------------------------ # Option B: # If you prefer manual physical bounds, use this instead: # ------------------------------------------------------------ # bounds = [ # (-10, 30), # T_Rücklauf_Quelle # (-5, 35), # T_Vorlauf_Quelle # (20, 70), # T_Rücklauf_Senke # (30, 90), # T_Vorlauf_Senke # ] # ============================================================ # 6. Optional: fix categorical / discrete values # ============================================================ # You can force specific values here. # Only use values that exist in your original data. # If you do not want to force anything, leave this dictionary empty. fixed_values = { # Example: # 'Kältemittel': 'R290', # 'Medium_Senke': 'Wasser', # 'Kompressor_Nr_Stufe1': 1, } for col, val in fixed_values.items(): if col in base_row.index: base_row[col] = val # ============================================================ # 7. Helper function: build candidate input row # ============================================================ def build_candidate_row(x_values): """ Takes optimizer variables and returns one full input row with all required model columns. """ row = base_row.copy() # Set optimized input values for var, val in zip(input_variables, x_values): row[var] = val # Recalculate dependent variables if they exist if all(c in row.index for c in ['T_Vorlauf_Senke', 'T_Rücklauf_Quelle']): if 'temp_hub' in row.index: row['temp_hub'] = row['T_Vorlauf_Senke'] - row['T_Rücklauf_Quelle'] if all(c in row.index for c in ['T_Vorlauf_Senke', 'T_Rücklauf_Senke']): if 't_diff_senke' in row.index: row['t_diff_senke'] = row['T_Vorlauf_Senke'] - row['T_Rücklauf_Senke'] if all(c in row.index for c in ['T_Vorlauf_Quelle', 'T_Rücklauf_Quelle']): if 't_diff_quelle' in row.index: row['t_diff_quelle'] = row['T_Vorlauf_Quelle'] - row['T_Rücklauf_Quelle'] return row # ============================================================ # 8. Physical constraints / penalty function # ============================================================ def constraint_penalty(row): """ Returns penalty if the candidate is physically unrealistic. The optimizer minimizes objective, so penalties should be positive. """ penalty = 0.0 # Constraint 1: # Senke Vorlauf should be greater than Senke Rücklauf if all(c in row.index for c in ['T_Vorlauf_Senke', 'T_Rücklauf_Senke']): delta_senke = row['T_Vorlauf_Senke'] - row['T_Rücklauf_Senke'] if delta_senke <= 0: penalty += 1e6 # Optional realistic range if delta_senke < 2: penalty += 1e4 * (2 - delta_senke) if delta_senke > 30: penalty += 1e4 * (delta_senke - 30) # Constraint 2: # Quelle Vorlauf should be greater than Quelle Rücklauf if all(c in row.index for c in ['T_Vorlauf_Quelle', 'T_Rücklauf_Quelle']): delta_quelle = row['T_Vorlauf_Quelle'] - row['T_Rücklauf_Quelle'] if delta_quelle <= 0: penalty += 1e6 # Optional realistic range if delta_quelle < 1: penalty += 1e4 * (1 - delta_quelle) if delta_quelle > 25: penalty += 1e4 * (delta_quelle - 25) # Constraint 3: # Temperature lift / hub if all(c in row.index for c in ['T_Vorlauf_Senke', 'T_Rücklauf_Quelle']): temp_hub = row['T_Vorlauf_Senke'] - row['T_Rücklauf_Quelle'] if temp_hub <= 0: penalty += 1e6 # Optional realistic range if temp_hub < 10: penalty += 1e4 * (10 - temp_hub) if temp_hub > 90: penalty += 1e4 * (temp_hub - 90) return penalty # ============================================================ # 9. Objective function # ============================================================ def objective(x_values): """ scipy minimizes this function. We want to maximize COP, so objective = - predicted COP + penalty. """ row = build_candidate_row(x_values) penalty = constraint_penalty(row) # Convert single row to DataFrame X_candidate = pd.DataFrame([row]) # Make sure column order is identical to training input X_candidate = X_candidate[X_opt.columns] # Predict COP try: predicted_cop = model_to_optimize.predict(X_candidate)[0] except Exception as e: print("Prediction error:", e) return 1e9 # Minimize negative COP return -predicted_cop + penalty # ============================================================ # 10. Run black-box optimization # ============================================================ result = differential_evolution( objective, bounds=bounds, strategy='best1bin', maxiter=150, popsize=20, tol=1e-6, mutation=(0.5, 1.0), recombination=0.7, seed=42, polish=True, workers=1 ) # ============================================================ # 11. Extract optimized result # ============================================================ best_x = result.x best_row = build_candidate_row(best_x) best_input_df = pd.DataFrame([best_row]) best_input_df = best_input_df[X_opt.columns] best_predicted_cop = model_to_optimize.predict(best_input_df)[0] print("\n============================================================") print("OPTIMIZATION RESULT") print("============================================================") print("\nOptimization success:", result.success) print("Optimizer message:", result.message) print("\nBest optimized input variables:") for var, val in zip(input_variables, best_x): print(f"{var}: {val:.4f}") print(f"\nPredicted maximum COP: {best_predicted_cop:.4f}") print("\nFull optimized input row:") display(best_input_df) # ============================================================ # 12. Compare base row vs optimized row # ============================================================ base_input_df = pd.DataFrame([base_row]) base_input_df = base_input_df[X_opt.columns] base_predicted_cop = model_to_optimize.predict(base_input_df)[0] comparison_df = pd.DataFrame({ 'Variable': X_opt.columns, 'Base_value': base_input_df.iloc[0].values, 'Optimized_value': best_input_df.iloc[0].values }) print("\n============================================================") print("BASE VS OPTIMIZED") print("============================================================") print(f"Base predicted COP: {base_predicted_cop:.4f}") print(f"Optimized predicted COP: {best_predicted_cop:.4f}") print(f"Improvement: {best_predicted_cop - base_predicted_cop:.4f}") display(comparison_df) #%% import subprocess import sys def export_to_html(script_name="cop_modelling.py"): """ Exports the current script/notebook to an HTML file. Note: If you are using an Interactive Window in VS Code, you can also just click the 'Export' button in the toolbar at the top of the window! """ print(f"Exporting {script_name} to HTML...") try: # First, we need to convert the .py script to a .ipynb notebook using jupytext print("1. Converting .py to .ipynb format...") subprocess.run([sys.executable, "-m", "jupytext", "--to", "notebook", script_name], check=True) notebook_name = script_name.replace(".py", ".ipynb") print("2. Executing notebook and generating HTML...") result = subprocess.run( [sys.executable, "-m", "jupyter", "nbconvert", "--to", "html", "--execute", notebook_name], capture_output=True, text=True ) if result.returncode == 0: print(f"Successfully exported to HTML! Look for {notebook_name.replace('.ipynb', '.html')} in your directory.") # Optional: Clean up the intermediate .ipynb file import os if os.path.exists(notebook_name): os.remove(notebook_name) else: print("Failed to export. Error:") print(result.stderr) except Exception as e: print(f"Error during export: {e}") print("Make sure you have jupyter, nbconvert, and jupytext installed.") # Uncomment the line below to automatically export to HTML when you "Run All" export_to_html("cop_modelling.py") # %%