| """OptimizationCampaign: manages the full lifecycle of an optimization campaign."""
|
|
|
| import json
|
| import time
|
| from dataclasses import dataclass, field
|
| from pathlib import Path
|
| from typing import Callable, Dict, List, Optional, Tuple
|
|
|
| import torch
|
| from torch import Tensor
|
| import pandas as pd
|
|
|
| from physics_informed_bo.config import OptimizationConfig
|
| from physics_informed_bo.experiment.designer import ExperimentDesigner
|
| from physics_informed_bo.experiment.parameter_space import ParameterSpace
|
|
|
|
|
| @dataclass
|
| class ExperimentRecord:
|
| """Record of a single experiment."""
|
|
|
| iteration: int
|
| parameters: Dict[str, float]
|
| objective: float
|
| timestamp: float = field(default_factory=time.time)
|
| metadata: Dict = field(default_factory=dict)
|
|
|
|
|
| class OptimizationCampaign:
|
| """Manages an end-to-end Bayesian optimization campaign.
|
|
|
| Provides:
|
| - Full experiment tracking and history
|
| - Save/load campaign state
|
| - Convergence monitoring
|
| - Human-in-the-loop workflow support
|
| - Export to DataFrame for analysis
|
|
|
| Example:
|
| campaign = OptimizationCampaign(
|
| name="polymer_optimization",
|
| parameter_space=space,
|
| physics_fn=my_physics_model,
|
| config=OptimizationConfig(max_iterations=30),
|
| )
|
|
|
| # Automated loop
|
| campaign.run_automated(objective_fn=evaluate_experiment)
|
|
|
| # Or human-in-the-loop
|
| next_exp = campaign.suggest_next()
|
| # ... run experiment manually ...
|
| campaign.report_result(next_exp, result_value)
|
| """
|
|
|
| def __init__(
|
| self,
|
| name: str,
|
| parameter_space: ParameterSpace,
|
| physics_fn: Optional[Callable[[Tensor], Tensor]] = None,
|
| initial_data: Optional[Tuple[Tensor, Tensor]] = None,
|
| config: Optional[OptimizationConfig] = None,
|
| maximize: bool = True,
|
| ):
|
| self.name = name
|
| self.maximize = maximize
|
| self.config = config or OptimizationConfig()
|
| self.parameter_space = parameter_space
|
|
|
| self._designer = ExperimentDesigner(
|
| parameter_space=parameter_space,
|
| physics_fn=physics_fn,
|
| initial_data=initial_data,
|
| config=self.config,
|
| )
|
|
|
| self._history: List[ExperimentRecord] = []
|
| self._iteration = 0
|
| self._start_time = time.time()
|
|
|
|
|
| if initial_data is not None:
|
| X_init, y_init = initial_data
|
| if y_init.dim() == 1:
|
| y_init = y_init.unsqueeze(-1)
|
| param_dicts = parameter_space.to_dict(X_init)
|
| for params, y_val in zip(param_dicts, y_init):
|
| self._history.append(
|
| ExperimentRecord(
|
| iteration=0,
|
| parameters=params,
|
| objective=float(y_val),
|
| metadata={"source": "initial_data"},
|
| )
|
| )
|
|
|
| def suggest_next(self, n: int = 1) -> List[Dict]:
|
| """Suggest the next experiment(s) to run.
|
|
|
| Returns:
|
| List of parameter dicts for suggested experiments.
|
| """
|
| self._iteration += 1
|
| candidates = self._designer.suggest(n)
|
| return self.parameter_space.to_dict(candidates)
|
|
|
| def report_result(
|
| self,
|
| parameters: Dict[str, float],
|
| objective: float,
|
| metadata: Optional[Dict] = None,
|
| ) -> None:
|
| """Report the result of a completed experiment.
|
|
|
| Args:
|
| parameters: The parameter values that were tested.
|
| objective: The measured objective value.
|
| metadata: Optional metadata about the experiment.
|
| """
|
| record = ExperimentRecord(
|
| iteration=self._iteration,
|
| parameters=parameters,
|
| objective=objective,
|
| metadata=metadata or {},
|
| )
|
| self._history.append(record)
|
|
|
|
|
| X_new = self.parameter_space.from_dict(parameters).unsqueeze(0)
|
| y_new = torch.tensor([[objective]], dtype=torch.float64)
|
| self._designer.update(X_new, y_new)
|
|
|
| def run_automated(
|
| self,
|
| objective_fn: Callable[[Dict[str, float]], float],
|
| max_iterations: Optional[int] = None,
|
| batch_size: int = 1,
|
| callback: Optional[Callable] = None,
|
| ) -> pd.DataFrame:
|
| """Run a fully automated optimization loop.
|
|
|
| Args:
|
| objective_fn: Function that takes parameter dict and returns objective value.
|
| max_iterations: Max iterations (defaults to config.max_iterations).
|
| batch_size: Number of experiments per iteration.
|
| callback: Optional callback(iteration, best_so_far) called each iteration.
|
|
|
| Returns:
|
| DataFrame of all experiments.
|
| """
|
| max_iter = max_iterations or self.config.max_iterations
|
|
|
| for i in range(max_iter):
|
|
|
| suggestions = self.suggest_next(batch_size)
|
|
|
|
|
| for params in suggestions:
|
| objective = objective_fn(params)
|
| self.report_result(params, objective)
|
|
|
|
|
| if callback:
|
| best = self.get_best()
|
| callback(i + 1, best)
|
|
|
|
|
| if self._check_convergence():
|
| break
|
|
|
| return self.to_dataframe()
|
|
|
| def _check_convergence(self, window: int = 10, tolerance: float = 1e-4) -> bool:
|
| """Check if optimization has converged (no improvement in last `window` iterations)."""
|
| if len(self._history) < window:
|
| return False
|
|
|
| recent = [r.objective for r in self._history[-window:]]
|
| if self.maximize:
|
| best_recent = max(recent)
|
| best_before = max(r.objective for r in self._history[:-window])
|
| return best_recent - best_before < tolerance
|
| else:
|
| best_recent = min(recent)
|
| best_before = min(r.objective for r in self._history[:-window])
|
| return best_before - best_recent < tolerance
|
|
|
| def get_best(self) -> Dict:
|
| """Get the best experiment so far."""
|
| if not self._history:
|
| return {"parameters": {}, "objective": None}
|
|
|
| if self.maximize:
|
| best = max(self._history, key=lambda r: r.objective)
|
| else:
|
| best = min(self._history, key=lambda r: r.objective)
|
|
|
| return {"parameters": best.parameters, "objective": best.objective}
|
|
|
| def to_dataframe(self) -> pd.DataFrame:
|
| """Export campaign history as a pandas DataFrame."""
|
| records = []
|
| for r in self._history:
|
| row = {"iteration": r.iteration, "objective": r.objective}
|
| row.update(r.parameters)
|
| row["timestamp"] = r.timestamp
|
| records.append(row)
|
| return pd.DataFrame(records)
|
|
|
| def save(self, filepath: str) -> None:
|
| """Save campaign state to a JSON file."""
|
| state = {
|
| "name": self.name,
|
| "maximize": self.maximize,
|
| "iteration": self._iteration,
|
| "history": [
|
| {
|
| "iteration": r.iteration,
|
| "parameters": r.parameters,
|
| "objective": r.objective,
|
| "timestamp": r.timestamp,
|
| "metadata": r.metadata,
|
| }
|
| for r in self._history
|
| ],
|
| }
|
| Path(filepath).write_text(json.dumps(state, indent=2))
|
|
|
| def load(self, filepath: str) -> None:
|
| """Load campaign state from a JSON file."""
|
| state = json.loads(Path(filepath).read_text())
|
| self.name = state["name"]
|
| self.maximize = state["maximize"]
|
| self._iteration = state["iteration"]
|
| self._history = [
|
| ExperimentRecord(**r) for r in state["history"]
|
| ]
|
|
|
|
|
| if self._history:
|
| all_params = [r.parameters for r in self._history]
|
| X = torch.stack([self.parameter_space.from_dict(p) for p in all_params])
|
| y = torch.tensor(
|
| [r.objective for r in self._history], dtype=torch.float64
|
| ).unsqueeze(-1)
|
| self._designer.update(X, y)
|
|
|
| @property
|
| def n_experiments(self) -> int:
|
| return len(self._history)
|
|
|
| def summary(self) -> Dict:
|
| """Campaign summary."""
|
| best = self.get_best()
|
| return {
|
| "name": self.name,
|
| "n_experiments": self.n_experiments,
|
| "iteration": self._iteration,
|
| "best": best,
|
| "elapsed_time_s": time.time() - self._start_time,
|
| "model_summary": self._designer.summary(),
|
| }
|
|
|