Spaces:
Running
Running
| """ | |
| Optimization Engine — determines the optimal inference configuration. | |
| Solves: | |
| x* = argmin J(x) = α·Cost(x) + β·Tokens(x) - γ·Quality(x) | |
| Subject to: | |
| Capability(model) >= Complexity(query) | |
| Quality >= threshold | |
| Latency <= max_latency | |
| V1: deterministic rule engine with fixed per-budget-mode weights. | |
| V2: BayesianWeightOptimizer (Optuna) learns α,β,γ from past outcomes. | |
| Falls back to V1 fixed weights if optuna is not installed. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Optional | |
| from llmopt.registry.model_registry import ModelRegistry, ModelSpec | |
| from llmopt.estimator.complexity_estimator import ComplexityResult | |
| logger = logging.getLogger(__name__) | |
| DATA_DIR = Path(__file__).parent.parent.parent / "data" | |
| BAYES_STUDY_PATH = DATA_DIR / "bayesian_study.json" | |
| # --------------------------------------------------------------------------- | |
| # User constraints schema | |
| # --------------------------------------------------------------------------- | |
| class UserConstraints: | |
| """ | |
| Caller-supplied constraints that bound the optimization search space. | |
| """ | |
| budget_mode: str = "balanced" # cheap | balanced | quality | |
| max_cost_per_request: Optional[float] = None # USD hard cap | |
| max_latency_score: Optional[float] = None # lower = faster model | |
| quality_threshold: float = 0.60 # min acceptable quality proxy | |
| exclude_providers: Optional[list[str]] = None # e.g. ["ollama"] for cloud-only | |
| only_providers: Optional[list[str]] = None # e.g. ["openai"] | |
| prefer_local: bool = False # prefer Ollama models | |
| compression_enabled: Optional[bool] = None # None = auto-decide | |
| def __post_init__(self): | |
| if self.exclude_providers is None: | |
| self.exclude_providers = [] | |
| if self.only_providers is None: | |
| self.only_providers = [] | |
| # --------------------------------------------------------------------------- | |
| # Optimization result schema | |
| # --------------------------------------------------------------------------- | |
| class OptimizationResult: | |
| selected_model: str | |
| provider: str | |
| estimated_cost: float # USD for this request | |
| estimated_input_tokens: int | |
| estimated_output_tokens: int | |
| max_tokens: int # hard cap to pass to the API | |
| compression_enabled: bool | |
| system_prompt_style: str # "verbose" | "concise" | "minimal" | |
| rationale: list[str] | |
| fallback_model: Optional[str] = None | |
| objective_score: float = 0.0 # lower is better | |
| def to_dict(self) -> dict: | |
| return self.__dict__.copy() | |
| # --------------------------------------------------------------------------- | |
| # Objective weights per budget mode | |
| # --------------------------------------------------------------------------- | |
| _BUDGET_WEIGHTS = { | |
| # α (cost) β (tokens) γ (quality) | |
| "cheap": (0.60, 0.30, 0.10), | |
| "balanced": (0.40, 0.20, 0.40), | |
| "quality": (0.10, 0.10, 0.80), | |
| } | |
| # max_tokens caps per output-length bucket | |
| _MAX_TOKENS_MAP = { | |
| "short": 150, | |
| "medium": 400, | |
| "long": 900, | |
| "very_long": 1800, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Bayesian Weight Optimizer (V2) | |
| # --------------------------------------------------------------------------- | |
| class BayesianWeightOptimizer: | |
| """ | |
| Uses Optuna to find optimal α,β,γ weights for J(x) based on | |
| accumulated feedback (cost vs quality trade-offs from past requests). | |
| Falls back gracefully to fixed V1 weights if optuna is not installed. | |
| """ | |
| def __init__(self): | |
| self._optuna_available = False | |
| self._studies: dict = {} # one study per budget_mode | |
| try: | |
| import optuna # type: ignore | |
| optuna.logging.set_verbosity(optuna.logging.WARNING) | |
| self._optuna_available = True | |
| logger.info("BayesianWeightOptimizer: Optuna available. Using Bayesian weight tuning.") | |
| except ImportError: | |
| logger.info("BayesianWeightOptimizer: Optuna not installed. Using V1 fixed weights.") | |
| def get_weights(self, budget_mode: str) -> tuple[float, float, float]: | |
| """ | |
| Returns (α, β, γ) weights for the given budget mode. | |
| Uses Bayesian optimization if optuna is available and we have | |
| enough feedback history, otherwise falls back to V1 fixed weights. | |
| """ | |
| if not self._optuna_available: | |
| return _BUDGET_WEIGHTS.get(budget_mode, _BUDGET_WEIGHTS["balanced"]) | |
| # Load saved trials | |
| history = self._load_history(budget_mode) | |
| if len(history) < 5: | |
| # Not enough data yet — use V1 defaults but still warm up | |
| logger.debug(f"Bayesian: Only {len(history)} trials for '{budget_mode}', using V1 defaults.") | |
| return _BUDGET_WEIGHTS.get(budget_mode, _BUDGET_WEIGHTS["balanced"]) | |
| try: | |
| import optuna # type: ignore | |
| study_key = budget_mode | |
| if study_key not in self._studies: | |
| self._studies[study_key] = optuna.create_study(direction="minimize") | |
| # Seed with historical trials | |
| for trial_data in history: | |
| self._studies[study_key].add_trial( | |
| optuna.trial.create_trial( | |
| params={"alpha": trial_data["alpha"], "beta": trial_data["beta"], "gamma": trial_data["gamma"]}, | |
| distributions={ | |
| "alpha": optuna.distributions.FloatDistribution(0.05, 0.90), | |
| "beta": optuna.distributions.FloatDistribution(0.05, 0.60), | |
| "gamma": optuna.distributions.FloatDistribution(0.05, 0.90), | |
| }, | |
| value=trial_data["outcome"], | |
| ) | |
| ) | |
| study = self._studies[study_key] | |
| best = study.best_params | |
| α = best["alpha"] | |
| β = best["beta"] | |
| γ = best["gamma"] | |
| logger.debug(f"Bayesian weights for '{budget_mode}': α={α:.3f} β={β:.3f} γ={γ:.3f}") | |
| return α, β, γ | |
| except Exception as e: | |
| logger.warning(f"Bayesian weight retrieval failed: {e}. Using V1 defaults.") | |
| return _BUDGET_WEIGHTS.get(budget_mode, _BUDGET_WEIGHTS["balanced"]) | |
| def record_outcome( | |
| self, | |
| budget_mode: str, | |
| alpha: float, beta: float, gamma: float, | |
| actual_cost: float, | |
| quality_score: float, | |
| ) -> None: | |
| """ | |
| Records the outcome of a request. The 'outcome' score is what | |
| we want to minimize: actual cost weighted against quality. | |
| Call this after receiving a response + evaluation score. | |
| """ | |
| # Composite outcome: high cost = bad, low quality = bad | |
| # Normalise: assume max_cost ~$0.02, quality in [1,10] → [0,1] | |
| cost_norm = min(actual_cost / 0.02, 1.0) | |
| quality_norm = quality_score / 10.0 | |
| outcome = cost_norm - quality_norm # minimise this | |
| history = self._load_history(budget_mode) | |
| history.append({ | |
| "alpha": alpha, "beta": beta, "gamma": gamma, | |
| "actual_cost": actual_cost, | |
| "quality_score": quality_score, | |
| "outcome": outcome, | |
| }) | |
| self._save_history(budget_mode, history) | |
| # Invalidate the in-memory study so it reloads next time | |
| self._studies.pop(budget_mode, None) | |
| def _load_history(self, budget_mode: str) -> list: | |
| if not BAYES_STUDY_PATH.exists(): | |
| return [] | |
| try: | |
| data = json.loads(BAYES_STUDY_PATH.read_text()) | |
| return data.get(budget_mode, []) | |
| except Exception: | |
| return [] | |
| def _save_history(self, budget_mode: str, history: list) -> None: | |
| existing = {} | |
| if BAYES_STUDY_PATH.exists(): | |
| try: | |
| existing = json.loads(BAYES_STUDY_PATH.read_text()) | |
| except Exception: | |
| pass | |
| existing[budget_mode] = history | |
| BAYES_STUDY_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| BAYES_STUDY_PATH.write_text(json.dumps(existing, indent=2)) | |
| # --------------------------------------------------------------------------- | |
| # Engine | |
| # --------------------------------------------------------------------------- | |
| class OptimizationEngine: | |
| """ | |
| Core decision engine. Selects model + config that minimizes | |
| J(x) = α·Cost + β·Tokens - γ·Quality under user constraints. | |
| V2: Uses BayesianWeightOptimizer to learn optimal α,β,γ weights over time. | |
| """ | |
| def __init__(self, registry: ModelRegistry): | |
| self.registry = registry | |
| self.bayes = BayesianWeightOptimizer() | |
| def optimize( | |
| self, | |
| complexity: ComplexityResult, | |
| output_length_bucket: str, | |
| constraints: Optional[UserConstraints] = None, | |
| ) -> OptimizationResult: | |
| if constraints is None: | |
| constraints = UserConstraints() | |
| α, β, γ = self.bayes.get_weights(constraints.budget_mode) | |
| logger.debug(f"Using weights α={α:.3f} β={β:.3f} γ={γ:.3f} for mode '{constraints.budget_mode}'") | |
| # --- 1. Build candidate set --- | |
| candidates = self.registry.capable_of( | |
| complexity=complexity.score, | |
| min_reasoning=complexity.required_reasoning * 0.85, # 15% headroom | |
| min_coding=complexity.required_coding * 0.85, | |
| min_math=complexity.required_math * 0.85, | |
| exclude_providers=constraints.exclude_providers, | |
| only_providers=constraints.only_providers if constraints.only_providers else None, | |
| ) | |
| if not candidates: | |
| # Hard fallback: use the most capable model in registry | |
| candidates = self.registry.all_models() | |
| candidates = [max(candidates, key=lambda m: m.capability_score)] | |
| # --- 2. Apply hard filters --- | |
| candidates = self._apply_hard_filters(candidates, constraints, complexity) | |
| if not candidates: | |
| candidates = self.registry.all_models() | |
| # --- 3. Score candidates via objective function --- | |
| scored = [] | |
| for model in candidates: | |
| obj, rationale = self._objective( | |
| model, | |
| α, β, γ, | |
| complexity.estimated_input_tokens, | |
| complexity.estimated_output_tokens, | |
| ) | |
| scored.append((obj, model, rationale)) | |
| scored.sort(key=lambda x: x[0]) | |
| best_score, best_model, best_rationale = scored[0] | |
| fallback = scored[1][1].model_name if len(scored) > 1 else None | |
| # --- 4. Determine configuration --- | |
| max_tokens = _MAX_TOKENS_MAP.get(output_length_bucket, 400) | |
| compression = self._should_compress(constraints, complexity, constraints.budget_mode) | |
| system_prompt_style = self._system_prompt_style(constraints.budget_mode, complexity.score) | |
| estimated_cost = best_model.cost_per_request( | |
| complexity.estimated_input_tokens, | |
| complexity.estimated_output_tokens, | |
| ) | |
| return OptimizationResult( | |
| selected_model=best_model.model_name, | |
| provider=best_model.provider, | |
| estimated_cost=round(estimated_cost, 6), | |
| estimated_input_tokens=complexity.estimated_input_tokens, | |
| estimated_output_tokens=complexity.estimated_output_tokens, | |
| max_tokens=max_tokens, | |
| compression_enabled=compression, | |
| system_prompt_style=system_prompt_style, | |
| rationale=best_rationale, | |
| fallback_model=fallback, | |
| objective_score=round(best_score, 4), | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Objective function J(x) = α·Cost + β·Tokens - γ·Quality | |
| # ------------------------------------------------------------------ | |
| def _objective( | |
| self, | |
| model: ModelSpec, | |
| α: float, β: float, γ: float, | |
| input_tokens: int, | |
| output_tokens: int, | |
| ) -> tuple[float, list[str]]: | |
| """ | |
| Normalized objective score. Lower is better. | |
| Costs are normalized to [0,1] against registry max values. | |
| """ | |
| max_cost = self._max_cost_in_registry(input_tokens, output_tokens) | |
| max_tokens = input_tokens + output_tokens | |
| cost_norm = model.cost_per_request(input_tokens, output_tokens) / (max_cost + 1e-9) | |
| token_norm = (input_tokens + output_tokens) / (max_tokens + 1e-9) # uniform here; prompt optimizer changes this | |
| quality_norm = model.capability_score # higher = better | |
| J = α * cost_norm + β * token_norm - γ * quality_norm | |
| rationale = [ | |
| f"model={model.model_name}", | |
| f"provider={model.provider}", | |
| f"capability={model.capability_score:.3f}", | |
| f"cost_norm={cost_norm:.4f}", | |
| f"J={J:.4f} (α={α},β={β},γ={γ})", | |
| ] | |
| return J, rationale | |
| # ------------------------------------------------------------------ | |
| # Hard filters | |
| # ------------------------------------------------------------------ | |
| def _apply_hard_filters( | |
| self, | |
| candidates: list[ModelSpec], | |
| constraints: UserConstraints, | |
| complexity: ComplexityResult, | |
| ) -> list[ModelSpec]: | |
| filtered = [] | |
| for m in candidates: | |
| # Cost cap | |
| if constraints.max_cost_per_request is not None: | |
| est = m.cost_per_request( | |
| complexity.estimated_input_tokens, | |
| complexity.estimated_output_tokens, | |
| ) | |
| if est > constraints.max_cost_per_request: | |
| continue | |
| # Latency cap (latency_score: higher = faster) | |
| if constraints.max_latency_score is not None: | |
| if m.latency_score < constraints.max_latency_score: | |
| continue | |
| # Quality floor: capability_score >= quality_threshold | |
| if m.capability_score < constraints.quality_threshold: | |
| continue | |
| filtered.append(m) | |
| return filtered | |
| # ------------------------------------------------------------------ | |
| # Config helpers | |
| # ------------------------------------------------------------------ | |
| def _should_compress( | |
| self, | |
| constraints: UserConstraints, | |
| complexity: ComplexityResult, | |
| budget_mode: str, | |
| ) -> bool: | |
| if constraints.compression_enabled is not None: | |
| return constraints.compression_enabled | |
| # Auto: compress for long contexts or cheap modes | |
| return budget_mode in ("cheap", "balanced") or complexity.estimated_input_tokens > 800 | |
| def _system_prompt_style(self, budget_mode: str, complexity_score: float) -> str: | |
| if budget_mode == "cheap": | |
| return "minimal" | |
| elif budget_mode == "quality" or complexity_score > 0.75: | |
| return "verbose" | |
| return "concise" | |
| def _max_cost_in_registry(self, input_tokens: int, output_tokens: int) -> float: | |
| costs = [ | |
| m.cost_per_request(input_tokens, output_tokens) | |
| for m in self.registry.all_models() | |
| ] | |
| return max(costs) if costs else 1.0 | |