NealKapadia's picture
Fix progress callback error
372ac31
Raw
History Blame Contribute Delete
20.4 kB
# pip install torch botorch gpytorch pyro-ppl pandas numpy scikit-learn matplotlib gradio
import torch, pandas as pd, numpy as np
from botorch.models import SaasFullyBayesianSingleTaskGP
from botorch.fit import fit_fully_bayesian_model_nuts
from botorch.acquisition import qExpectedImprovement, qLogExpectedImprovement
from botorch.optim import optimize_acqf
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
from typing import List, Tuple, Optional, Dict
import os
import gradio as gr
import tempfile
import io
from matplotlib.figure import Figure
import time
class CustomizableBayesianOptimizer:
def __init__(
self,
data_file: str,
input_features: List[str],
output_features: List[str],
objective_weights: Dict[str, float],
input_bounds: Dict[str, Tuple[float, float]] = None,
output_bounds: Dict[str, Tuple[float, float]] = None,
num_samples: int = 128, # Reduced from 512
warmup_steps: int = 64, # Reduced from 128
thinning: int = 8, # Reduced from 16
use_gpu: bool = True,
timeout: int = 600 # 10 minute timeout
):
"""
Initialize the Bayesian optimizer with customizable parameters.
"""
self.start_time = time.time()
self.timeout = timeout
# Check for GPU availability
self.device = torch.device("cuda" if torch.cuda.is_available() and use_gpu else "cpu")
print(f"Using device: {self.device}")
self.data_file = data_file
self.input_features = input_features
self.output_features = output_features
self.objective_weights = objective_weights
self.input_bounds = input_bounds
self.output_bounds = output_bounds
self.mcmc_settings = {
"num_samples": num_samples,
"warmup_steps": warmup_steps,
"thinning": thinning
}
# Load data
self.df = pd.read_csv(self.data_file)
print(f"Loaded data with columns: {list(self.df.columns)}")
print(f"Data shape: {self.df.shape}")
# Initialize scalers
self.scaler_x = MinMaxScaler(feature_range=(0, 1))
self.scalers_y = {feat: MinMaxScaler(feature_range=(0, 1)) for feat in output_features}
# Process data
self.process_data()
def process_data(self):
"""Load and normalize data from CSV file."""
# Verify input features exist
for feat in self.input_features:
if feat not in self.df.columns:
raise KeyError(f"Input feature '{feat}' not found in data file. Available columns: {list(self.df.columns)}")
# Normalize input features
self.X = torch.as_tensor(
self.scaler_x.fit_transform(self.df[self.input_features].values),
dtype=torch.double,
device=self.device
)
# Normalize each output feature separately
self.Y_dict = {}
for feat in self.output_features:
self.Y_dict[feat] = torch.as_tensor(
self.scalers_y[feat].fit_transform(self.df[feat].values.reshape(-1, 1)),
dtype=torch.double,
device=self.device
)
# Calculate the composite score based on weights
score = np.zeros(len(self.df))
for feat, weight in self.objective_weights.items():
if feat in self.df.columns:
score += weight * self.df[feat].values
self.score = score
self.score_scaler = MinMaxScaler(feature_range=(0, 1))
self.Y_score = torch.as_tensor(
self.score_scaler.fit_transform(score.reshape(-1, 1)),
dtype=torch.double,
device=self.device
)
def check_timeout(self):
"""Check if execution has timed out"""
if time.time() - self.start_time > self.timeout:
raise TimeoutError(f"Optimization timed out after {self.timeout} seconds")
def build_models(self, update_progress=None):
"""Build individual models for each output and a model for the score."""
# Individual models for each output feature
self.individual_models = {}
total_features = len(self.output_features) + 1 # +1 for score model
for i, feat in enumerate(self.output_features):
print(f"Building model for {feat}...")
if update_progress:
update_progress(0.1 + (i / total_features) * 0.4, f"Building model for {feat}...")
self.check_timeout()
model = SaasFullyBayesianSingleTaskGP(
self.X,
self.Y_dict[feat]
)
fit_fully_bayesian_model_nuts(
model,
num_samples=self.mcmc_settings["num_samples"],
warmup_steps=self.mcmc_settings["warmup_steps"],
thinning=self.mcmc_settings["thinning"]
)
self.individual_models[feat] = model
# Model for overall score
print("Building model for combined score...")
if update_progress:
update_progress(0.5, "Building model for combined score...")
self.check_timeout()
self.score_model = SaasFullyBayesianSingleTaskGP(
self.X,
self.Y_score
)
fit_fully_bayesian_model_nuts(
self.score_model,
num_samples=self.mcmc_settings["num_samples"],
warmup_steps=self.mcmc_settings["warmup_steps"],
thinning=self.mcmc_settings["thinning"]
)
def optimize(self, bounds: List[Tuple[float, float]] = None, update_progress=None):
"""Run Bayesian optimization to find next experimental point."""
if update_progress:
update_progress(0.6, "Starting optimization...")
self.check_timeout()
if bounds is None:
# Default normalized bounds [0,1] for all features
bounds = torch.tensor([[0.0] * len(self.input_features),
[1.0] * len(self.input_features)],
dtype=torch.double,
device=self.device)
else:
# Normalize user-provided bounds
bounds_min = [b[0] for b in bounds]
bounds_max = [b[1] for b in bounds]
bounds_min_norm = self.scaler_x.transform(np.array([bounds_min]))[0]
bounds_max_norm = self.scaler_x.transform(np.array([bounds_max]))[0]
bounds = torch.tensor([bounds_min_norm, bounds_max_norm], dtype=torch.double, device=self.device)
# Create acquisition function
acq = qLogExpectedImprovement(model=self.score_model, best_f=self.Y_score.max())
# Optimize acquisition function
if update_progress:
update_progress(0.7, "Optimizing acquisition function...")
self.check_timeout()
cand, _ = optimize_acqf(
acq_function=acq,
bounds=bounds,
q=1,
num_restarts=10, # Reduced from 25
raw_samples=256, # Reduced from 512
options={"batch_limit": 5, "maxiter": 100}, # Reduced maxiter from 200
)
# Denormalize next point
next_point = self.scaler_x.inverse_transform(cand.cpu().detach().numpy())[0]
self.next_point = next_point
# Apply bounds if provided
if self.input_bounds:
for i, feat in enumerate(self.input_features):
if feat in self.input_bounds:
self.next_point[i] = np.clip(
self.next_point[i],
self.input_bounds[feat][0],
self.input_bounds[feat][1]
)
# Predict values and uncertainties for next point
if update_progress:
update_progress(0.8, "Predicting outcomes for next point...")
self.check_timeout()
self.predict_for_next_point()
return self.next_point
def predict_for_next_point(self):
"""Predict values and uncertainties for the next experimental point."""
# Normalize the next point
next_point_norm = torch.tensor(
self.scaler_x.transform(self.next_point.reshape(1, -1)),
dtype=torch.double,
device=self.device
)
# Predict for each property
self.predictions = {}
for feat, model in self.individual_models.items():
self.check_timeout()
posterior = model.posterior(next_point_norm)
# Get mean and standard deviation
mean = posterior.mean.cpu().detach().numpy().flatten()[0]
std = np.sqrt(posterior.variance.cpu().detach().numpy().flatten()[0])
# Denormalize predictions
mean_denorm = self.scalers_y[feat].inverse_transform([[mean]])[0, 0]
# Scale the std by the same factor used in denormalizing the mean
std_denorm = std * (self.scalers_y[feat].data_max_[0] - self.scalers_y[feat].data_min_[0])
# Apply output bounds if provided
if self.output_bounds and feat in self.output_bounds:
mean_denorm = np.clip(mean_denorm, self.output_bounds[feat][0], self.output_bounds[feat][1])
self.predictions[feat] = {
"mean": mean_denorm,
"std": std_denorm,
"uncertainty_percent": (std_denorm / abs(mean_denorm) * 100) if mean_denorm != 0 else 0
}
def get_results(self):
"""Return the next point and predictions."""
result = {
"next_point": {feat: val for feat, val in zip(self.input_features, self.next_point)},
"predictions": self.predictions,
"objective_weights": self.objective_weights
}
return result
def visualize(self):
"""Visualize the results and return figure."""
if len(self.input_features) == 2:
# For 2D input space
fig = Figure(figsize=(10, 8))
ax = fig.add_subplot(111)
scatter = ax.scatter(
self.df[self.input_features[0]],
self.df[self.input_features[1]],
c=self.score,
cmap="viridis",
s=50
)
ax.scatter(
self.next_point[0],
self.next_point[1],
c="red",
s=150,
edgecolors="black",
marker="*",
label="Next Suggested Point"
)
ax.set_xlabel(self.input_features[0])
ax.set_ylabel(self.input_features[1])
ax.set_title("Bayesian Optimization: Next Suggested Experiment")
fig.colorbar(scatter, ax=ax, label="Objective Score")
ax.legend()
ax.grid(True)
# Add prediction information as text
info_text = "Predictions:\n"
for feat, pred in self.predictions.items():
info_text += f"{feat}: {pred['mean']:.3f} ± {pred['std']:.3f}\n"
ax.text(0.02, 0.02, info_text, transform=ax.transAxes, fontsize=10,
bbox={"facecolor": "white", "alpha": 0.8, "pad": 5})
fig.tight_layout()
return fig
else:
print("Visualization only supported for 2D input space")
return None
# Gradio interface function
def run_optimization(
csv_file,
input_features,
output_features,
input_bounds_min,
input_bounds_max,
output_bounds_min,
output_bounds_max,
objective_weights,
use_gpu,
num_samples,
warmup_steps,
progress=gr.Progress()
):
# Define a simple callback function for progress updates
def update_progress(fraction, desc=None):
progress(fraction, desc)
# Save uploaded file temporarily
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
# Fix for handling Gradio file upload
if hasattr(csv_file, 'name'):
# This is a NamedString object from Gradio
with open(csv_file.name, 'rb') as f:
file_content = f.read()
temp_file.write(file_content)
else:
# Fallback for direct bytes content
temp_file.write(csv_file)
temp_file.close()
# Parse input fields
input_features = [x.strip() for x in input_features.split(",")]
output_features = [x.strip() for x in output_features.split(",")]
# Parse bounds
input_bounds = {}
if input_bounds_min and input_bounds_max:
input_mins = [float(x.strip()) for x in input_bounds_min.split(",")]
input_maxs = [float(x.strip()) for x in input_bounds_max.split(",")]
if len(input_mins) == len(input_features) and len(input_maxs) == len(input_features):
for i, feat in enumerate(input_features):
input_bounds[feat] = (input_mins[i], input_maxs[i])
output_bounds = {}
if output_bounds_min and output_bounds_max:
output_mins = [float(x.strip()) for x in output_bounds_min.split(",")]
output_maxs = [float(x.strip()) for x in output_bounds_max.split(",")]
if len(output_mins) == len(output_features) and len(output_maxs) == len(output_features):
for i, feat in enumerate(output_features):
output_bounds[feat] = (output_mins[i], output_maxs[i])
# Parse weights
weight_values = [float(x.strip()) for x in objective_weights.split(",")]
weights = {}
if len(weight_values) == len(output_features):
for i, feat in enumerate(output_features):
weights[feat] = weight_values[i]
else:
return "Error: Number of weights must match number of output features", None, None
update_progress(0.1, "Initializing optimizer...")
# Initialize and run optimizer
try:
# Set a reasonable timeout for Hugging Face Spaces
timeout = 300 # 5 minutes
optimizer = CustomizableBayesianOptimizer(
data_file=temp_file.name,
input_features=input_features,
output_features=output_features,
objective_weights=weights,
input_bounds=input_bounds,
output_bounds=output_bounds,
num_samples=int(num_samples),
warmup_steps=int(warmup_steps),
use_gpu=use_gpu,
timeout=timeout
)
update_progress(0.3, "Building models...")
optimizer.build_models(update_progress=update_progress)
update_progress(0.7, "Running optimization...")
optimizer.optimize(update_progress=update_progress)
update_progress(0.9, "Generating results...")
results = optimizer.get_results()
# Prepare results text
result_text = "=== Next Experimental Point ===\n"
for feat, val in results["next_point"].items():
result_text += f"{feat}: {val:.4f}\n"
result_text += "\n=== Predictions with Uncertainty ===\n"
for feat, pred in results["predictions"].items():
result_text += f"{feat}: {pred['mean']:.4f} ± {pred['std']:.4f} ({pred['uncertainty_percent']:.2f}% uncertainty)\n"
# Generate visualization
fig = optimizer.visualize()
# Clean up temp file
os.unlink(temp_file.name)
return result_text, fig, None
except TimeoutError as e:
return None, None, f"Error: Computation timed out after {timeout} seconds. Try reducing the number of samples or warmup steps."
except Exception as e:
import traceback
error_message = f"Error: {str(e)}\n\n{traceback.format_exc()}"
return None, None, error_message
# Create Gradio interface
with gr.Blocks(title="Bayesian Optimization for Electrolyte Optimization") as demo:
gr.Markdown("# Bayesian Optimization for Electrolytes")
gr.Markdown("Upload your CSV file and configure the optimization settings.")
with gr.Row():
with gr.Column(scale=1):
# Input section
file_input = gr.File(label="Upload CSV File", file_types=[".csv"])
input_features = gr.Textbox(label="Input Features (comma-separated)", placeholder="x1, x2", value="x1, x2")
output_features = gr.Textbox(label="Output Features (comma-separated)", placeholder="CE (%), LCE, Overpotential (V)", value="CE (%), LCE, Overpotential (V)")
input_bounds_min = gr.Textbox(label="Input Min Bounds (comma-separated, match order of features)", placeholder="1, 1", value="1, 1")
input_bounds_max = gr.Textbox(label="Input Max Bounds (comma-separated, match order of features)", placeholder="6, 6", value="6, 6")
output_bounds_min = gr.Textbox(label="Output Min Bounds (comma-separated, match order of features)", placeholder="0, 0, 0", value="0, 0, 0")
output_bounds_max = gr.Textbox(label="Output Max Bounds (comma-separated, match order of features)", placeholder="100, 100, 10", value="100, 100, 10")
objective_weights = gr.Textbox(
label="Objective Weights (comma-separated, match order of output features, use negative for minimization)",
placeholder="0.25, 0.25, -0.5",
value="0.25, 0.25, -0.5"
)
with gr.Row():
use_gpu = gr.Checkbox(label="Use GPU (if available)", value=True)
num_samples = gr.Number(label="MCMC Samples", value=128, precision=0)
warmup_steps = gr.Number(label="Warmup Steps", value=64, precision=0)
submit_btn = gr.Button("Run Optimization")
with gr.Column(scale=1):
# Output section
results_text = gr.Textbox(label="Results", lines=10)
plot_output = gr.Plot(label="Optimization Plot")
error_output = gr.Textbox(label="Errors (if any)", visible=True)
# Connect the interface to the optimization function
submit_btn.click(
fn=run_optimization,
inputs=[
file_input,
input_features,
output_features,
input_bounds_min,
input_bounds_max,
output_bounds_min,
output_bounds_max,
objective_weights,
use_gpu,
num_samples,
warmup_steps
],
outputs=[results_text, plot_output, error_output]
)
gr.Markdown("""
## Instructions:
1. Upload your CSV file with experimental data
2. Specify input features (e.g., "x1, x2")
3. Specify output features (e.g., "CE (%), LCE, Overpotential (V)")
4. Set bounds for inputs and outputs (comma-separated, matching the order of features)
5. Specify weights for each output (positive for maximization, negative for minimization)
6. Click "Run Optimization"
## Performance Tips
- Reduce MCMC Samples and Warmup Steps if facing timeout issues
- Default settings (128 samples, 64 warmup) should work on Hugging Face Spaces
- The computation is resource-intensive, so expect some wait time
The default settings optimize with weights: 0.25 for CE, 0.25 for LCE, -0.5 for Overpotential
""")
# Add example data for quick testing
example_csv = "grid_experiment.csv" if os.path.exists("grid_experiment.csv") else None
if example_csv:
gr.Examples(
examples=[[example_csv, "x1, x2", "CE (%), LCE, Overpotential (V)", "1, 1", "6, 6", "0, 0, 0", "100, 100, 10", "0.25, 0.25, -0.5"]],
inputs=[file_input, input_features, output_features, input_bounds_min, input_bounds_max, output_bounds_min, output_bounds_max, objective_weights],
)
# Launch the interface
if __name__ == "__main__":
demo.launch()