|
|
from __future__ import annotations |
|
|
|
|
|
import random |
|
|
from dataclasses import dataclass |
|
|
from typing import Any, Dict, List, Optional, Tuple |
|
|
|
|
|
import numpy as np |
|
|
from sklearn.ensemble import ExtraTreesRegressor |
|
|
|
|
|
from edgeeda.agents.base import Action, Agent |
|
|
from edgeeda.config import Config |
|
|
from edgeeda.utils import sanitize_variant_prefix, stable_hash |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Obs: |
|
|
x: np.ndarray |
|
|
y: float |
|
|
fidelity: str |
|
|
variant: str |
|
|
|
|
|
|
|
|
class SurrogateUCBAgent(Agent): |
|
|
""" |
|
|
Agentic tuner: |
|
|
- Generates candidates (random) |
|
|
- Fits a lightweight surrogate (ExtraTrees) on observed rewards (for a given fidelity) |
|
|
- Chooses next action via UCB: mean + kappa * std (std estimated across trees) |
|
|
|
|
|
Multi-fidelity policy: |
|
|
- Always start at cheapest fidelity for new variants |
|
|
- Promote a subset to next fidelity when budget allows |
|
|
""" |
|
|
|
|
|
def __init__(self, cfg: Config, kappa: float = 1.0, init_random: int = 6): |
|
|
self.cfg = cfg |
|
|
self.kappa = kappa |
|
|
self.init_random = init_random |
|
|
self.stage_names = cfg.flow.fidelities |
|
|
self.knob_names = list(cfg.tuning.knobs.keys()) |
|
|
self.variant_prefix = sanitize_variant_prefix(cfg.experiment.name) |
|
|
|
|
|
self.obs: List[Obs] = [] |
|
|
self.variant_stage: Dict[str, int] = {} |
|
|
self._variant_knobs: Dict[str, Dict[str, Any]] = {} |
|
|
self.counter = 0 |
|
|
|
|
|
def _encode(self, knobs: Dict[str, Any]) -> np.ndarray: |
|
|
xs = [] |
|
|
for name in self.knob_names: |
|
|
spec = self.cfg.tuning.knobs[name] |
|
|
v = float(knobs[name]) |
|
|
|
|
|
xs.append((v - float(spec.min)) / max(1e-9, (float(spec.max) - float(spec.min)))) |
|
|
return np.array(xs, dtype=np.float32) |
|
|
|
|
|
def _sample_knobs(self) -> Dict[str, Any]: |
|
|
out: Dict[str, Any] = {} |
|
|
for name, spec in self.cfg.tuning.knobs.items(): |
|
|
if spec.type == "int": |
|
|
out[name] = random.randint(int(spec.min), int(spec.max)) |
|
|
else: |
|
|
out[name] = float(spec.min) + random.random() * (float(spec.max) - float(spec.min)) |
|
|
out[name] = round(out[name], 3) |
|
|
return out |
|
|
|
|
|
def _fit_surrogate(self, fidelity: str) -> Optional[ExtraTreesRegressor]: |
|
|
data = [o for o in self.obs if o.fidelity == fidelity] |
|
|
if len(data) < max(5, self.init_random): |
|
|
return None |
|
|
X = np.stack([o.x for o in data], axis=0) |
|
|
y = np.array([o.y for o in data], dtype=np.float32) |
|
|
model = ExtraTreesRegressor( |
|
|
n_estimators=128, |
|
|
random_state=0, |
|
|
min_samples_leaf=2, |
|
|
n_jobs=-1, |
|
|
) |
|
|
model.fit(X, y) |
|
|
return model |
|
|
|
|
|
def _predict_ucb(self, model: ExtraTreesRegressor, Xcand: np.ndarray) -> np.ndarray: |
|
|
|
|
|
preds = np.stack([t.predict(Xcand) for t in model.estimators_], axis=0) |
|
|
mu = preds.mean(axis=0) |
|
|
sd = preds.std(axis=0) |
|
|
return mu + self.kappa * sd |
|
|
|
|
|
def propose(self) -> Action: |
|
|
self.counter += 1 |
|
|
|
|
|
|
|
|
promotable = [v for v, st in self.variant_stage.items() if st < len(self.stage_names) - 1] |
|
|
if promotable and random.random() < 0.35: |
|
|
|
|
|
best_v = None |
|
|
best_y = float("-inf") |
|
|
for v in promotable: |
|
|
st = self.variant_stage[v] |
|
|
fid = self.stage_names[st] |
|
|
|
|
|
ys = [o.y for o in self.obs if o.fidelity == fid and o.variant == v] |
|
|
if ys: |
|
|
y = max(ys) |
|
|
if y > best_y: |
|
|
best_y = y |
|
|
best_v = v |
|
|
if best_v is not None: |
|
|
st = self.variant_stage[best_v] + 1 |
|
|
self.variant_stage[best_v] = st |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
knobs = self._variant_knobs.get(best_v, self._sample_knobs()) |
|
|
return Action(variant=best_v, fidelity=self.stage_names[st], knobs=knobs) |
|
|
|
|
|
|
|
|
knobs = self._sample_knobs() |
|
|
x = self._encode(knobs) |
|
|
|
|
|
fid0 = self.stage_names[0] |
|
|
model = self._fit_surrogate(fid0) |
|
|
|
|
|
if model is not None: |
|
|
|
|
|
cands = [] |
|
|
Xc = [] |
|
|
for _ in range(32): |
|
|
kk = self._sample_knobs() |
|
|
cands.append(kk) |
|
|
Xc.append(self._encode(kk)) |
|
|
Xc = np.stack(Xc, axis=0) |
|
|
ucb = self._predict_ucb(model, Xc) |
|
|
best_i = int(np.argmax(ucb)) |
|
|
knobs = cands[best_i] |
|
|
|
|
|
variant = f"{self.variant_prefix}_u{self.counter:05d}_{stable_hash(str(knobs))}" |
|
|
self.variant_stage[variant] = 0 |
|
|
self._variant_knobs[variant] = knobs |
|
|
return Action(variant=variant, fidelity=fid0, knobs=knobs) |
|
|
|
|
|
def observe(self, action: Action, ok: bool, reward: Optional[float], metrics_flat: Optional[Dict[str, Any]]) -> None: |
|
|
if ok and reward is not None: |
|
|
x = self._encode(action.knobs) |
|
|
self.obs.append(Obs(x=x, y=float(reward), fidelity=action.fidelity, variant=action.variant)) |
|
|
|
|
|
self._variant_knobs[action.variant] = action.knobs |
|
|
|