Spaces:
Sleeping
Sleeping
| # pip install torch botorch gpytorch pyro-ppl pandas numpy scikit-learn matplotlib gradio | |
| import torch, pandas as pd, numpy as np | |
| from botorch.models import SaasFullyBayesianSingleTaskGP | |
| from botorch.fit import fit_fully_bayesian_model_nuts | |
| from botorch.acquisition import qExpectedImprovement, qLogExpectedImprovement | |
| from botorch.optim import optimize_acqf | |
| from sklearn.preprocessing import MinMaxScaler | |
| import matplotlib.pyplot as plt | |
| from typing import List, Tuple, Optional, Dict | |
| import os | |
| import gradio as gr | |
| import tempfile | |
| import io | |
| from matplotlib.figure import Figure | |
| import time | |
| class CustomizableBayesianOptimizer: | |
| def __init__( | |
| self, | |
| data_file: str, | |
| input_features: List[str], | |
| output_features: List[str], | |
| objective_weights: Dict[str, float], | |
| input_bounds: Dict[str, Tuple[float, float]] = None, | |
| output_bounds: Dict[str, Tuple[float, float]] = None, | |
| num_samples: int = 128, # Reduced from 512 | |
| warmup_steps: int = 64, # Reduced from 128 | |
| thinning: int = 8, # Reduced from 16 | |
| use_gpu: bool = True, | |
| timeout: int = 600 # 10 minute timeout | |
| ): | |
| """ | |
| Initialize the Bayesian optimizer with customizable parameters. | |
| """ | |
| self.start_time = time.time() | |
| self.timeout = timeout | |
| # Check for GPU availability | |
| self.device = torch.device("cuda" if torch.cuda.is_available() and use_gpu else "cpu") | |
| print(f"Using device: {self.device}") | |
| self.data_file = data_file | |
| self.input_features = input_features | |
| self.output_features = output_features | |
| self.objective_weights = objective_weights | |
| self.input_bounds = input_bounds | |
| self.output_bounds = output_bounds | |
| self.mcmc_settings = { | |
| "num_samples": num_samples, | |
| "warmup_steps": warmup_steps, | |
| "thinning": thinning | |
| } | |
| # Load data | |
| self.df = pd.read_csv(self.data_file) | |
| print(f"Loaded data with columns: {list(self.df.columns)}") | |
| print(f"Data shape: {self.df.shape}") | |
| # Initialize scalers | |
| self.scaler_x = MinMaxScaler(feature_range=(0, 1)) | |
| self.scalers_y = {feat: MinMaxScaler(feature_range=(0, 1)) for feat in output_features} | |
| # Process data | |
| self.process_data() | |
| def process_data(self): | |
| """Load and normalize data from CSV file.""" | |
| # Verify input features exist | |
| for feat in self.input_features: | |
| if feat not in self.df.columns: | |
| raise KeyError(f"Input feature '{feat}' not found in data file. Available columns: {list(self.df.columns)}") | |
| # Normalize input features | |
| self.X = torch.as_tensor( | |
| self.scaler_x.fit_transform(self.df[self.input_features].values), | |
| dtype=torch.double, | |
| device=self.device | |
| ) | |
| # Normalize each output feature separately | |
| self.Y_dict = {} | |
| for feat in self.output_features: | |
| self.Y_dict[feat] = torch.as_tensor( | |
| self.scalers_y[feat].fit_transform(self.df[feat].values.reshape(-1, 1)), | |
| dtype=torch.double, | |
| device=self.device | |
| ) | |
| # Calculate the composite score based on weights | |
| score = np.zeros(len(self.df)) | |
| for feat, weight in self.objective_weights.items(): | |
| if feat in self.df.columns: | |
| score += weight * self.df[feat].values | |
| self.score = score | |
| self.score_scaler = MinMaxScaler(feature_range=(0, 1)) | |
| self.Y_score = torch.as_tensor( | |
| self.score_scaler.fit_transform(score.reshape(-1, 1)), | |
| dtype=torch.double, | |
| device=self.device | |
| ) | |
| def check_timeout(self): | |
| """Check if execution has timed out""" | |
| if time.time() - self.start_time > self.timeout: | |
| raise TimeoutError(f"Optimization timed out after {self.timeout} seconds") | |
| def build_models(self, update_progress=None): | |
| """Build individual models for each output and a model for the score.""" | |
| # Individual models for each output feature | |
| self.individual_models = {} | |
| total_features = len(self.output_features) + 1 # +1 for score model | |
| for i, feat in enumerate(self.output_features): | |
| print(f"Building model for {feat}...") | |
| if update_progress: | |
| update_progress(0.1 + (i / total_features) * 0.4, f"Building model for {feat}...") | |
| self.check_timeout() | |
| model = SaasFullyBayesianSingleTaskGP( | |
| self.X, | |
| self.Y_dict[feat] | |
| ) | |
| fit_fully_bayesian_model_nuts( | |
| model, | |
| num_samples=self.mcmc_settings["num_samples"], | |
| warmup_steps=self.mcmc_settings["warmup_steps"], | |
| thinning=self.mcmc_settings["thinning"] | |
| ) | |
| self.individual_models[feat] = model | |
| # Model for overall score | |
| print("Building model for combined score...") | |
| if update_progress: | |
| update_progress(0.5, "Building model for combined score...") | |
| self.check_timeout() | |
| self.score_model = SaasFullyBayesianSingleTaskGP( | |
| self.X, | |
| self.Y_score | |
| ) | |
| fit_fully_bayesian_model_nuts( | |
| self.score_model, | |
| num_samples=self.mcmc_settings["num_samples"], | |
| warmup_steps=self.mcmc_settings["warmup_steps"], | |
| thinning=self.mcmc_settings["thinning"] | |
| ) | |
| def optimize(self, bounds: List[Tuple[float, float]] = None, update_progress=None): | |
| """Run Bayesian optimization to find next experimental point.""" | |
| if update_progress: | |
| update_progress(0.6, "Starting optimization...") | |
| self.check_timeout() | |
| if bounds is None: | |
| # Default normalized bounds [0,1] for all features | |
| bounds = torch.tensor([[0.0] * len(self.input_features), | |
| [1.0] * len(self.input_features)], | |
| dtype=torch.double, | |
| device=self.device) | |
| else: | |
| # Normalize user-provided bounds | |
| bounds_min = [b[0] for b in bounds] | |
| bounds_max = [b[1] for b in bounds] | |
| bounds_min_norm = self.scaler_x.transform(np.array([bounds_min]))[0] | |
| bounds_max_norm = self.scaler_x.transform(np.array([bounds_max]))[0] | |
| bounds = torch.tensor([bounds_min_norm, bounds_max_norm], dtype=torch.double, device=self.device) | |
| # Create acquisition function | |
| acq = qLogExpectedImprovement(model=self.score_model, best_f=self.Y_score.max()) | |
| # Optimize acquisition function | |
| if update_progress: | |
| update_progress(0.7, "Optimizing acquisition function...") | |
| self.check_timeout() | |
| cand, _ = optimize_acqf( | |
| acq_function=acq, | |
| bounds=bounds, | |
| q=1, | |
| num_restarts=10, # Reduced from 25 | |
| raw_samples=256, # Reduced from 512 | |
| options={"batch_limit": 5, "maxiter": 100}, # Reduced maxiter from 200 | |
| ) | |
| # Denormalize next point | |
| next_point = self.scaler_x.inverse_transform(cand.cpu().detach().numpy())[0] | |
| self.next_point = next_point | |
| # Apply bounds if provided | |
| if self.input_bounds: | |
| for i, feat in enumerate(self.input_features): | |
| if feat in self.input_bounds: | |
| self.next_point[i] = np.clip( | |
| self.next_point[i], | |
| self.input_bounds[feat][0], | |
| self.input_bounds[feat][1] | |
| ) | |
| # Predict values and uncertainties for next point | |
| if update_progress: | |
| update_progress(0.8, "Predicting outcomes for next point...") | |
| self.check_timeout() | |
| self.predict_for_next_point() | |
| return self.next_point | |
| def predict_for_next_point(self): | |
| """Predict values and uncertainties for the next experimental point.""" | |
| # Normalize the next point | |
| next_point_norm = torch.tensor( | |
| self.scaler_x.transform(self.next_point.reshape(1, -1)), | |
| dtype=torch.double, | |
| device=self.device | |
| ) | |
| # Predict for each property | |
| self.predictions = {} | |
| for feat, model in self.individual_models.items(): | |
| self.check_timeout() | |
| posterior = model.posterior(next_point_norm) | |
| # Get mean and standard deviation | |
| mean = posterior.mean.cpu().detach().numpy().flatten()[0] | |
| std = np.sqrt(posterior.variance.cpu().detach().numpy().flatten()[0]) | |
| # Denormalize predictions | |
| mean_denorm = self.scalers_y[feat].inverse_transform([[mean]])[0, 0] | |
| # Scale the std by the same factor used in denormalizing the mean | |
| std_denorm = std * (self.scalers_y[feat].data_max_[0] - self.scalers_y[feat].data_min_[0]) | |
| # Apply output bounds if provided | |
| if self.output_bounds and feat in self.output_bounds: | |
| mean_denorm = np.clip(mean_denorm, self.output_bounds[feat][0], self.output_bounds[feat][1]) | |
| self.predictions[feat] = { | |
| "mean": mean_denorm, | |
| "std": std_denorm, | |
| "uncertainty_percent": (std_denorm / abs(mean_denorm) * 100) if mean_denorm != 0 else 0 | |
| } | |
| def get_results(self): | |
| """Return the next point and predictions.""" | |
| result = { | |
| "next_point": {feat: val for feat, val in zip(self.input_features, self.next_point)}, | |
| "predictions": self.predictions, | |
| "objective_weights": self.objective_weights | |
| } | |
| return result | |
| def visualize(self): | |
| """Visualize the results and return figure.""" | |
| if len(self.input_features) == 2: | |
| # For 2D input space | |
| fig = Figure(figsize=(10, 8)) | |
| ax = fig.add_subplot(111) | |
| scatter = ax.scatter( | |
| self.df[self.input_features[0]], | |
| self.df[self.input_features[1]], | |
| c=self.score, | |
| cmap="viridis", | |
| s=50 | |
| ) | |
| ax.scatter( | |
| self.next_point[0], | |
| self.next_point[1], | |
| c="red", | |
| s=150, | |
| edgecolors="black", | |
| marker="*", | |
| label="Next Suggested Point" | |
| ) | |
| ax.set_xlabel(self.input_features[0]) | |
| ax.set_ylabel(self.input_features[1]) | |
| ax.set_title("Bayesian Optimization: Next Suggested Experiment") | |
| fig.colorbar(scatter, ax=ax, label="Objective Score") | |
| ax.legend() | |
| ax.grid(True) | |
| # Add prediction information as text | |
| info_text = "Predictions:\n" | |
| for feat, pred in self.predictions.items(): | |
| info_text += f"{feat}: {pred['mean']:.3f} ± {pred['std']:.3f}\n" | |
| ax.text(0.02, 0.02, info_text, transform=ax.transAxes, fontsize=10, | |
| bbox={"facecolor": "white", "alpha": 0.8, "pad": 5}) | |
| fig.tight_layout() | |
| return fig | |
| else: | |
| print("Visualization only supported for 2D input space") | |
| return None | |
| # Gradio interface function | |
| def run_optimization( | |
| csv_file, | |
| input_features, | |
| output_features, | |
| input_bounds_min, | |
| input_bounds_max, | |
| output_bounds_min, | |
| output_bounds_max, | |
| objective_weights, | |
| use_gpu, | |
| num_samples, | |
| warmup_steps, | |
| progress=gr.Progress() | |
| ): | |
| # Define a simple callback function for progress updates | |
| def update_progress(fraction, desc=None): | |
| progress(fraction, desc) | |
| # Save uploaded file temporarily | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") | |
| # Fix for handling Gradio file upload | |
| if hasattr(csv_file, 'name'): | |
| # This is a NamedString object from Gradio | |
| with open(csv_file.name, 'rb') as f: | |
| file_content = f.read() | |
| temp_file.write(file_content) | |
| else: | |
| # Fallback for direct bytes content | |
| temp_file.write(csv_file) | |
| temp_file.close() | |
| # Parse input fields | |
| input_features = [x.strip() for x in input_features.split(",")] | |
| output_features = [x.strip() for x in output_features.split(",")] | |
| # Parse bounds | |
| input_bounds = {} | |
| if input_bounds_min and input_bounds_max: | |
| input_mins = [float(x.strip()) for x in input_bounds_min.split(",")] | |
| input_maxs = [float(x.strip()) for x in input_bounds_max.split(",")] | |
| if len(input_mins) == len(input_features) and len(input_maxs) == len(input_features): | |
| for i, feat in enumerate(input_features): | |
| input_bounds[feat] = (input_mins[i], input_maxs[i]) | |
| output_bounds = {} | |
| if output_bounds_min and output_bounds_max: | |
| output_mins = [float(x.strip()) for x in output_bounds_min.split(",")] | |
| output_maxs = [float(x.strip()) for x in output_bounds_max.split(",")] | |
| if len(output_mins) == len(output_features) and len(output_maxs) == len(output_features): | |
| for i, feat in enumerate(output_features): | |
| output_bounds[feat] = (output_mins[i], output_maxs[i]) | |
| # Parse weights | |
| weight_values = [float(x.strip()) for x in objective_weights.split(",")] | |
| weights = {} | |
| if len(weight_values) == len(output_features): | |
| for i, feat in enumerate(output_features): | |
| weights[feat] = weight_values[i] | |
| else: | |
| return "Error: Number of weights must match number of output features", None, None | |
| update_progress(0.1, "Initializing optimizer...") | |
| # Initialize and run optimizer | |
| try: | |
| # Set a reasonable timeout for Hugging Face Spaces | |
| timeout = 300 # 5 minutes | |
| optimizer = CustomizableBayesianOptimizer( | |
| data_file=temp_file.name, | |
| input_features=input_features, | |
| output_features=output_features, | |
| objective_weights=weights, | |
| input_bounds=input_bounds, | |
| output_bounds=output_bounds, | |
| num_samples=int(num_samples), | |
| warmup_steps=int(warmup_steps), | |
| use_gpu=use_gpu, | |
| timeout=timeout | |
| ) | |
| update_progress(0.3, "Building models...") | |
| optimizer.build_models(update_progress=update_progress) | |
| update_progress(0.7, "Running optimization...") | |
| optimizer.optimize(update_progress=update_progress) | |
| update_progress(0.9, "Generating results...") | |
| results = optimizer.get_results() | |
| # Prepare results text | |
| result_text = "=== Next Experimental Point ===\n" | |
| for feat, val in results["next_point"].items(): | |
| result_text += f"{feat}: {val:.4f}\n" | |
| result_text += "\n=== Predictions with Uncertainty ===\n" | |
| for feat, pred in results["predictions"].items(): | |
| result_text += f"{feat}: {pred['mean']:.4f} ± {pred['std']:.4f} ({pred['uncertainty_percent']:.2f}% uncertainty)\n" | |
| # Generate visualization | |
| fig = optimizer.visualize() | |
| # Clean up temp file | |
| os.unlink(temp_file.name) | |
| return result_text, fig, None | |
| except TimeoutError as e: | |
| return None, None, f"Error: Computation timed out after {timeout} seconds. Try reducing the number of samples or warmup steps." | |
| except Exception as e: | |
| import traceback | |
| error_message = f"Error: {str(e)}\n\n{traceback.format_exc()}" | |
| return None, None, error_message | |
| # Create Gradio interface | |
| with gr.Blocks(title="Bayesian Optimization for Electrolyte Optimization") as demo: | |
| gr.Markdown("# Bayesian Optimization for Electrolytes") | |
| gr.Markdown("Upload your CSV file and configure the optimization settings.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # Input section | |
| file_input = gr.File(label="Upload CSV File", file_types=[".csv"]) | |
| input_features = gr.Textbox(label="Input Features (comma-separated)", placeholder="x1, x2", value="x1, x2") | |
| output_features = gr.Textbox(label="Output Features (comma-separated)", placeholder="CE (%), LCE, Overpotential (V)", value="CE (%), LCE, Overpotential (V)") | |
| input_bounds_min = gr.Textbox(label="Input Min Bounds (comma-separated, match order of features)", placeholder="1, 1", value="1, 1") | |
| input_bounds_max = gr.Textbox(label="Input Max Bounds (comma-separated, match order of features)", placeholder="6, 6", value="6, 6") | |
| output_bounds_min = gr.Textbox(label="Output Min Bounds (comma-separated, match order of features)", placeholder="0, 0, 0", value="0, 0, 0") | |
| output_bounds_max = gr.Textbox(label="Output Max Bounds (comma-separated, match order of features)", placeholder="100, 100, 10", value="100, 100, 10") | |
| objective_weights = gr.Textbox( | |
| label="Objective Weights (comma-separated, match order of output features, use negative for minimization)", | |
| placeholder="0.25, 0.25, -0.5", | |
| value="0.25, 0.25, -0.5" | |
| ) | |
| with gr.Row(): | |
| use_gpu = gr.Checkbox(label="Use GPU (if available)", value=True) | |
| num_samples = gr.Number(label="MCMC Samples", value=128, precision=0) | |
| warmup_steps = gr.Number(label="Warmup Steps", value=64, precision=0) | |
| submit_btn = gr.Button("Run Optimization") | |
| with gr.Column(scale=1): | |
| # Output section | |
| results_text = gr.Textbox(label="Results", lines=10) | |
| plot_output = gr.Plot(label="Optimization Plot") | |
| error_output = gr.Textbox(label="Errors (if any)", visible=True) | |
| # Connect the interface to the optimization function | |
| submit_btn.click( | |
| fn=run_optimization, | |
| inputs=[ | |
| file_input, | |
| input_features, | |
| output_features, | |
| input_bounds_min, | |
| input_bounds_max, | |
| output_bounds_min, | |
| output_bounds_max, | |
| objective_weights, | |
| use_gpu, | |
| num_samples, | |
| warmup_steps | |
| ], | |
| outputs=[results_text, plot_output, error_output] | |
| ) | |
| gr.Markdown(""" | |
| ## Instructions: | |
| 1. Upload your CSV file with experimental data | |
| 2. Specify input features (e.g., "x1, x2") | |
| 3. Specify output features (e.g., "CE (%), LCE, Overpotential (V)") | |
| 4. Set bounds for inputs and outputs (comma-separated, matching the order of features) | |
| 5. Specify weights for each output (positive for maximization, negative for minimization) | |
| 6. Click "Run Optimization" | |
| ## Performance Tips | |
| - Reduce MCMC Samples and Warmup Steps if facing timeout issues | |
| - Default settings (128 samples, 64 warmup) should work on Hugging Face Spaces | |
| - The computation is resource-intensive, so expect some wait time | |
| The default settings optimize with weights: 0.25 for CE, 0.25 for LCE, -0.5 for Overpotential | |
| """) | |
| # Add example data for quick testing | |
| example_csv = "grid_experiment.csv" if os.path.exists("grid_experiment.csv") else None | |
| if example_csv: | |
| gr.Examples( | |
| examples=[[example_csv, "x1, x2", "CE (%), LCE, Overpotential (V)", "1, 1", "6, 6", "0, 0, 0", "100, 100, 10", "0.25, 0.25, -0.5"]], | |
| inputs=[file_input, input_features, output_features, input_bounds_min, input_bounds_max, output_bounds_min, output_bounds_max, objective_weights], | |
| ) | |
| # Launch the interface | |
| if __name__ == "__main__": | |
| demo.launch() | |