# pip install torch botorch gpytorch pyro-ppl pandas numpy scikit-learn matplotlib gradio import torch, pandas as pd, numpy as np from botorch.models import SaasFullyBayesianSingleTaskGP from botorch.fit import fit_fully_bayesian_model_nuts from botorch.acquisition import qExpectedImprovement, qLogExpectedImprovement from botorch.optim import optimize_acqf from sklearn.preprocessing import MinMaxScaler import matplotlib.pyplot as plt from typing import List, Tuple, Optional, Dict import os import gradio as gr import tempfile import io from matplotlib.figure import Figure import time class CustomizableBayesianOptimizer: def __init__( self, data_file: str, input_features: List[str], output_features: List[str], objective_weights: Dict[str, float], input_bounds: Dict[str, Tuple[float, float]] = None, output_bounds: Dict[str, Tuple[float, float]] = None, num_samples: int = 128, # Reduced from 512 warmup_steps: int = 64, # Reduced from 128 thinning: int = 8, # Reduced from 16 use_gpu: bool = True, timeout: int = 600 # 10 minute timeout ): """ Initialize the Bayesian optimizer with customizable parameters. """ self.start_time = time.time() self.timeout = timeout # Check for GPU availability self.device = torch.device("cuda" if torch.cuda.is_available() and use_gpu else "cpu") print(f"Using device: {self.device}") self.data_file = data_file self.input_features = input_features self.output_features = output_features self.objective_weights = objective_weights self.input_bounds = input_bounds self.output_bounds = output_bounds self.mcmc_settings = { "num_samples": num_samples, "warmup_steps": warmup_steps, "thinning": thinning } # Load data self.df = pd.read_csv(self.data_file) print(f"Loaded data with columns: {list(self.df.columns)}") print(f"Data shape: {self.df.shape}") # Initialize scalers self.scaler_x = MinMaxScaler(feature_range=(0, 1)) self.scalers_y = {feat: MinMaxScaler(feature_range=(0, 1)) for feat in output_features} # Process data self.process_data() def process_data(self): """Load and normalize data from CSV file.""" # Verify input features exist for feat in self.input_features: if feat not in self.df.columns: raise KeyError(f"Input feature '{feat}' not found in data file. Available columns: {list(self.df.columns)}") # Normalize input features self.X = torch.as_tensor( self.scaler_x.fit_transform(self.df[self.input_features].values), dtype=torch.double, device=self.device ) # Normalize each output feature separately self.Y_dict = {} for feat in self.output_features: self.Y_dict[feat] = torch.as_tensor( self.scalers_y[feat].fit_transform(self.df[feat].values.reshape(-1, 1)), dtype=torch.double, device=self.device ) # Calculate the composite score based on weights score = np.zeros(len(self.df)) for feat, weight in self.objective_weights.items(): if feat in self.df.columns: score += weight * self.df[feat].values self.score = score self.score_scaler = MinMaxScaler(feature_range=(0, 1)) self.Y_score = torch.as_tensor( self.score_scaler.fit_transform(score.reshape(-1, 1)), dtype=torch.double, device=self.device ) def check_timeout(self): """Check if execution has timed out""" if time.time() - self.start_time > self.timeout: raise TimeoutError(f"Optimization timed out after {self.timeout} seconds") def build_models(self, update_progress=None): """Build individual models for each output and a model for the score.""" # Individual models for each output feature self.individual_models = {} total_features = len(self.output_features) + 1 # +1 for score model for i, feat in enumerate(self.output_features): print(f"Building model for {feat}...") if update_progress: update_progress(0.1 + (i / total_features) * 0.4, f"Building model for {feat}...") self.check_timeout() model = SaasFullyBayesianSingleTaskGP( self.X, self.Y_dict[feat] ) fit_fully_bayesian_model_nuts( model, num_samples=self.mcmc_settings["num_samples"], warmup_steps=self.mcmc_settings["warmup_steps"], thinning=self.mcmc_settings["thinning"] ) self.individual_models[feat] = model # Model for overall score print("Building model for combined score...") if update_progress: update_progress(0.5, "Building model for combined score...") self.check_timeout() self.score_model = SaasFullyBayesianSingleTaskGP( self.X, self.Y_score ) fit_fully_bayesian_model_nuts( self.score_model, num_samples=self.mcmc_settings["num_samples"], warmup_steps=self.mcmc_settings["warmup_steps"], thinning=self.mcmc_settings["thinning"] ) def optimize(self, bounds: List[Tuple[float, float]] = None, update_progress=None): """Run Bayesian optimization to find next experimental point.""" if update_progress: update_progress(0.6, "Starting optimization...") self.check_timeout() if bounds is None: # Default normalized bounds [0,1] for all features bounds = torch.tensor([[0.0] * len(self.input_features), [1.0] * len(self.input_features)], dtype=torch.double, device=self.device) else: # Normalize user-provided bounds bounds_min = [b[0] for b in bounds] bounds_max = [b[1] for b in bounds] bounds_min_norm = self.scaler_x.transform(np.array([bounds_min]))[0] bounds_max_norm = self.scaler_x.transform(np.array([bounds_max]))[0] bounds = torch.tensor([bounds_min_norm, bounds_max_norm], dtype=torch.double, device=self.device) # Create acquisition function acq = qLogExpectedImprovement(model=self.score_model, best_f=self.Y_score.max()) # Optimize acquisition function if update_progress: update_progress(0.7, "Optimizing acquisition function...") self.check_timeout() cand, _ = optimize_acqf( acq_function=acq, bounds=bounds, q=1, num_restarts=10, # Reduced from 25 raw_samples=256, # Reduced from 512 options={"batch_limit": 5, "maxiter": 100}, # Reduced maxiter from 200 ) # Denormalize next point next_point = self.scaler_x.inverse_transform(cand.cpu().detach().numpy())[0] self.next_point = next_point # Apply bounds if provided if self.input_bounds: for i, feat in enumerate(self.input_features): if feat in self.input_bounds: self.next_point[i] = np.clip( self.next_point[i], self.input_bounds[feat][0], self.input_bounds[feat][1] ) # Predict values and uncertainties for next point if update_progress: update_progress(0.8, "Predicting outcomes for next point...") self.check_timeout() self.predict_for_next_point() return self.next_point def predict_for_next_point(self): """Predict values and uncertainties for the next experimental point.""" # Normalize the next point next_point_norm = torch.tensor( self.scaler_x.transform(self.next_point.reshape(1, -1)), dtype=torch.double, device=self.device ) # Predict for each property self.predictions = {} for feat, model in self.individual_models.items(): self.check_timeout() posterior = model.posterior(next_point_norm) # Get mean and standard deviation mean = posterior.mean.cpu().detach().numpy().flatten()[0] std = np.sqrt(posterior.variance.cpu().detach().numpy().flatten()[0]) # Denormalize predictions mean_denorm = self.scalers_y[feat].inverse_transform([[mean]])[0, 0] # Scale the std by the same factor used in denormalizing the mean std_denorm = std * (self.scalers_y[feat].data_max_[0] - self.scalers_y[feat].data_min_[0]) # Apply output bounds if provided if self.output_bounds and feat in self.output_bounds: mean_denorm = np.clip(mean_denorm, self.output_bounds[feat][0], self.output_bounds[feat][1]) self.predictions[feat] = { "mean": mean_denorm, "std": std_denorm, "uncertainty_percent": (std_denorm / abs(mean_denorm) * 100) if mean_denorm != 0 else 0 } def get_results(self): """Return the next point and predictions.""" result = { "next_point": {feat: val for feat, val in zip(self.input_features, self.next_point)}, "predictions": self.predictions, "objective_weights": self.objective_weights } return result def visualize(self): """Visualize the results and return figure.""" if len(self.input_features) == 2: # For 2D input space fig = Figure(figsize=(10, 8)) ax = fig.add_subplot(111) scatter = ax.scatter( self.df[self.input_features[0]], self.df[self.input_features[1]], c=self.score, cmap="viridis", s=50 ) ax.scatter( self.next_point[0], self.next_point[1], c="red", s=150, edgecolors="black", marker="*", label="Next Suggested Point" ) ax.set_xlabel(self.input_features[0]) ax.set_ylabel(self.input_features[1]) ax.set_title("Bayesian Optimization: Next Suggested Experiment") fig.colorbar(scatter, ax=ax, label="Objective Score") ax.legend() ax.grid(True) # Add prediction information as text info_text = "Predictions:\n" for feat, pred in self.predictions.items(): info_text += f"{feat}: {pred['mean']:.3f} ± {pred['std']:.3f}\n" ax.text(0.02, 0.02, info_text, transform=ax.transAxes, fontsize=10, bbox={"facecolor": "white", "alpha": 0.8, "pad": 5}) fig.tight_layout() return fig else: print("Visualization only supported for 2D input space") return None # Gradio interface function def run_optimization( csv_file, input_features, output_features, input_bounds_min, input_bounds_max, output_bounds_min, output_bounds_max, objective_weights, use_gpu, num_samples, warmup_steps, progress=gr.Progress() ): # Define a simple callback function for progress updates def update_progress(fraction, desc=None): progress(fraction, desc) # Save uploaded file temporarily temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") # Fix for handling Gradio file upload if hasattr(csv_file, 'name'): # This is a NamedString object from Gradio with open(csv_file.name, 'rb') as f: file_content = f.read() temp_file.write(file_content) else: # Fallback for direct bytes content temp_file.write(csv_file) temp_file.close() # Parse input fields input_features = [x.strip() for x in input_features.split(",")] output_features = [x.strip() for x in output_features.split(",")] # Parse bounds input_bounds = {} if input_bounds_min and input_bounds_max: input_mins = [float(x.strip()) for x in input_bounds_min.split(",")] input_maxs = [float(x.strip()) for x in input_bounds_max.split(",")] if len(input_mins) == len(input_features) and len(input_maxs) == len(input_features): for i, feat in enumerate(input_features): input_bounds[feat] = (input_mins[i], input_maxs[i]) output_bounds = {} if output_bounds_min and output_bounds_max: output_mins = [float(x.strip()) for x in output_bounds_min.split(",")] output_maxs = [float(x.strip()) for x in output_bounds_max.split(",")] if len(output_mins) == len(output_features) and len(output_maxs) == len(output_features): for i, feat in enumerate(output_features): output_bounds[feat] = (output_mins[i], output_maxs[i]) # Parse weights weight_values = [float(x.strip()) for x in objective_weights.split(",")] weights = {} if len(weight_values) == len(output_features): for i, feat in enumerate(output_features): weights[feat] = weight_values[i] else: return "Error: Number of weights must match number of output features", None, None update_progress(0.1, "Initializing optimizer...") # Initialize and run optimizer try: # Set a reasonable timeout for Hugging Face Spaces timeout = 300 # 5 minutes optimizer = CustomizableBayesianOptimizer( data_file=temp_file.name, input_features=input_features, output_features=output_features, objective_weights=weights, input_bounds=input_bounds, output_bounds=output_bounds, num_samples=int(num_samples), warmup_steps=int(warmup_steps), use_gpu=use_gpu, timeout=timeout ) update_progress(0.3, "Building models...") optimizer.build_models(update_progress=update_progress) update_progress(0.7, "Running optimization...") optimizer.optimize(update_progress=update_progress) update_progress(0.9, "Generating results...") results = optimizer.get_results() # Prepare results text result_text = "=== Next Experimental Point ===\n" for feat, val in results["next_point"].items(): result_text += f"{feat}: {val:.4f}\n" result_text += "\n=== Predictions with Uncertainty ===\n" for feat, pred in results["predictions"].items(): result_text += f"{feat}: {pred['mean']:.4f} ± {pred['std']:.4f} ({pred['uncertainty_percent']:.2f}% uncertainty)\n" # Generate visualization fig = optimizer.visualize() # Clean up temp file os.unlink(temp_file.name) return result_text, fig, None except TimeoutError as e: return None, None, f"Error: Computation timed out after {timeout} seconds. Try reducing the number of samples or warmup steps." except Exception as e: import traceback error_message = f"Error: {str(e)}\n\n{traceback.format_exc()}" return None, None, error_message # Create Gradio interface with gr.Blocks(title="Bayesian Optimization for Electrolyte Optimization") as demo: gr.Markdown("# Bayesian Optimization for Electrolytes") gr.Markdown("Upload your CSV file and configure the optimization settings.") with gr.Row(): with gr.Column(scale=1): # Input section file_input = gr.File(label="Upload CSV File", file_types=[".csv"]) input_features = gr.Textbox(label="Input Features (comma-separated)", placeholder="x1, x2", value="x1, x2") output_features = gr.Textbox(label="Output Features (comma-separated)", placeholder="CE (%), LCE, Overpotential (V)", value="CE (%), LCE, Overpotential (V)") input_bounds_min = gr.Textbox(label="Input Min Bounds (comma-separated, match order of features)", placeholder="1, 1", value="1, 1") input_bounds_max = gr.Textbox(label="Input Max Bounds (comma-separated, match order of features)", placeholder="6, 6", value="6, 6") output_bounds_min = gr.Textbox(label="Output Min Bounds (comma-separated, match order of features)", placeholder="0, 0, 0", value="0, 0, 0") output_bounds_max = gr.Textbox(label="Output Max Bounds (comma-separated, match order of features)", placeholder="100, 100, 10", value="100, 100, 10") objective_weights = gr.Textbox( label="Objective Weights (comma-separated, match order of output features, use negative for minimization)", placeholder="0.25, 0.25, -0.5", value="0.25, 0.25, -0.5" ) with gr.Row(): use_gpu = gr.Checkbox(label="Use GPU (if available)", value=True) num_samples = gr.Number(label="MCMC Samples", value=128, precision=0) warmup_steps = gr.Number(label="Warmup Steps", value=64, precision=0) submit_btn = gr.Button("Run Optimization") with gr.Column(scale=1): # Output section results_text = gr.Textbox(label="Results", lines=10) plot_output = gr.Plot(label="Optimization Plot") error_output = gr.Textbox(label="Errors (if any)", visible=True) # Connect the interface to the optimization function submit_btn.click( fn=run_optimization, inputs=[ file_input, input_features, output_features, input_bounds_min, input_bounds_max, output_bounds_min, output_bounds_max, objective_weights, use_gpu, num_samples, warmup_steps ], outputs=[results_text, plot_output, error_output] ) gr.Markdown(""" ## Instructions: 1. Upload your CSV file with experimental data 2. Specify input features (e.g., "x1, x2") 3. Specify output features (e.g., "CE (%), LCE, Overpotential (V)") 4. Set bounds for inputs and outputs (comma-separated, matching the order of features) 5. Specify weights for each output (positive for maximization, negative for minimization) 6. Click "Run Optimization" ## Performance Tips - Reduce MCMC Samples and Warmup Steps if facing timeout issues - Default settings (128 samples, 64 warmup) should work on Hugging Face Spaces - The computation is resource-intensive, so expect some wait time The default settings optimize with weights: 0.25 for CE, 0.25 for LCE, -0.5 for Overpotential """) # Add example data for quick testing example_csv = "grid_experiment.csv" if os.path.exists("grid_experiment.csv") else None if example_csv: gr.Examples( examples=[[example_csv, "x1, x2", "CE (%), LCE, Overpotential (V)", "1, 1", "6, 6", "0, 0, 0", "100, 100, 10", "0.25, 0.25, -0.5"]], inputs=[file_input, input_features, output_features, input_bounds_min, input_bounds_max, output_bounds_min, output_bounds_max, objective_weights], ) # Launch the interface if __name__ == "__main__": demo.launch()