Spaces:
Sleeping
Sleeping
| """ | |
| LLMOpt — main client interface. | |
| This is the single entry point developers use: | |
| from llmopt import LLMOpt, UserConstraints | |
| client = LLMOpt() | |
| result = client.generate("Explain quicksort with Python code", budget_mode="balanced") | |
| print(result.response) | |
| print(result.explain()) | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import time | |
| import os | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Optional, Dict, List | |
| from llmopt.analyzer.query_analyzer import QueryAnalyzer, QueryFeatures | |
| from llmopt.estimator.complexity_estimator import ComplexityEstimator, ComplexityResult | |
| from llmopt.engine.optimization_engine import OptimizationEngine, OptimizationResult, UserConstraints | |
| from llmopt.engine.llmopt_engine import LLMOptEngine | |
| from llmopt.engine.utility_engine import RoutingDecision | |
| from llmopt.optimizer.prompt_optimizer import PromptOptimizer, OptimizedPrompt | |
| from llmopt.router.model_router import ModelRouter, RoutedResponse | |
| from llmopt.registry.model_registry import ModelRegistry | |
| from llmopt.cache.semantic_cache import SemanticCache | |
| from llmopt.evaluation.evaluator import LLMJudge, EvaluationResult | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Full pipeline result | |
| # --------------------------------------------------------------------------- | |
| class GenerateResult: | |
| # Core output | |
| response: str | |
| model_used: str | |
| provider: str | |
| # Cost / token metrics | |
| input_tokens: int | |
| output_tokens: int | |
| total_tokens: int | |
| estimated_cost: float | |
| # Savings metrics (vs naive GPT-4o baseline) | |
| tokens_saved: int | |
| cost_saved: float | |
| compression_ratio: float | |
| # Pipeline internals (for debugging / explainability) | |
| query_features: QueryFeatures | |
| complexity: ComplexityResult | |
| optimization: OptimizationResult | |
| optimized_prompt: OptimizedPrompt | |
| latency_ms: float | |
| evaluation: Optional[EvaluationResult] = None | |
| def explain(self) -> str: | |
| """Human-readable explanation of routing decisions.""" | |
| lines = [ | |
| "=" * 55, | |
| "LLMOpt Decision Explanation", | |
| "=" * 55, | |
| f"Query complexity : {self.complexity.score:.3f} ({self.complexity.tier})", | |
| f"Primary domain : {self.query_features.primary_domain}", | |
| f"Required reasoning: {self.complexity.required_reasoning:.2f}", | |
| f"Required coding : {self.complexity.required_coding:.2f}", | |
| f"Required math : {self.complexity.required_math:.2f}", | |
| "", | |
| f"Selected model : {self.model_used} ({self.provider})", | |
| f"Fallback model : {self.optimization.fallback_model or 'N/A'}", | |
| f"Compression : {'yes' if self.optimization.compression_enabled else 'no'}", | |
| f"System prompt : {self.optimization.system_prompt_style}", | |
| "", | |
| "Scoring rationale:", | |
| ] | |
| for r in self.optimization.rationale: | |
| lines.append(f" • {r}") | |
| lines += [ | |
| "", | |
| f"Tokens : {self.input_tokens} in / {self.output_tokens} out", | |
| f"Tokens saved : {self.tokens_saved} ({self.compression_ratio*100:.1f}% compression)", | |
| f"Cost : ${self.estimated_cost:.6f}", | |
| f"Cost saved : ${self.cost_saved:.6f} vs GPT-4o baseline", | |
| f"Latency : {self.latency_ms:.0f}ms", | |
| "=" * 55, | |
| ] | |
| return "\n".join(lines) | |
| def to_dict(self) -> dict: | |
| return { | |
| "response": self.response, | |
| "model_used": self.model_used, | |
| "provider": self.provider, | |
| "input_tokens": self.input_tokens, | |
| "output_tokens": self.output_tokens, | |
| "total_tokens": self.total_tokens, | |
| "estimated_cost": self.estimated_cost, | |
| "tokens_saved": self.tokens_saved, | |
| "cost_saved": self.cost_saved, | |
| "compression_ratio": self.compression_ratio, | |
| "complexity_score": self.complexity.score, | |
| "complexity_tier": self.complexity.tier, | |
| "latency_ms": self.latency_ms, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Main client | |
| # --------------------------------------------------------------------------- | |
| class LLMOpt: | |
| """ | |
| Adaptive LLM Inference Optimization client. | |
| Usage: | |
| client = LLMOpt() | |
| result = client.generate("What is Python?", budget_mode="cheap") | |
| """ | |
| # GPT-4o costs used as baseline for savings calculation | |
| _BASELINE_MODEL = "gpt-4o" | |
| _BASELINE_INPUT_COST = 0.0025 | |
| _BASELINE_OUTPUT_COST = 0.010 | |
| def __init__( | |
| self, | |
| registry_path: Optional[Path] = None, | |
| ollama_base_url: Optional[str] = None, | |
| log_level: str = "WARNING", | |
| use_v2_engine: bool = True, | |
| ): | |
| logging.basicConfig(level=getattr(logging, log_level.upper(), logging.WARNING)) | |
| self.registry = ModelRegistry(registry_path) | |
| self.analyzer = QueryAnalyzer() | |
| self.estimator = ComplexityEstimator() | |
| self.engine = OptimizationEngine(self.registry) # V1 — kept for fallback | |
| self.optimizer = PromptOptimizer() | |
| self.router = ModelRouter(ollama_base_url=ollama_base_url) | |
| # Initialize Semantic Cache (reads REDIS_URL from env if available) | |
| try: | |
| from dotenv import load_dotenv # type: ignore | |
| load_dotenv() | |
| load_dotenv("config/.env") | |
| except ImportError: | |
| pass | |
| redis_url = os.environ.get("REDIS_URL") | |
| self.cache = SemanticCache(redis_url=redis_url) | |
| self.judge = LLMJudge(judge_model="gpt-4o-mini") | |
| # V2 Utility Engine — default active | |
| self._use_v2 = use_v2_engine | |
| self._v2_engine: Optional[LLMOptEngine] = None | |
| if use_v2_engine: | |
| self._v2_engine = LLMOptEngine( | |
| available_keys={}, # populated per-request via update_keys() | |
| include_ollama=True, | |
| log_level=logging.WARNING, | |
| ) | |
| logger.info("[LLMOpt] V2 utility engine active.") | |
| # ------------------------------------------------------------------ | |
| # Primary API | |
| # ------------------------------------------------------------------ | |
| def generate( | |
| self, | |
| query: str, | |
| budget_mode: str = "balanced", | |
| max_cost_per_request: Optional[float] = None, | |
| quality_threshold: float = 0.60, | |
| exclude_providers: Optional[list[str]] = None, | |
| only_providers: Optional[list[str]] = None, | |
| prefer_local: bool = False, | |
| conversation_history: Optional[list[dict]] = None, | |
| temperature: float = 0.7, | |
| dry_run: bool = False, | |
| evaluate: bool = False, | |
| api_keys: Optional[Dict[str, str]] = None, | |
| alpha: Optional[float] = None, | |
| beta: Optional[float] = None, | |
| gamma: Optional[float] = None, | |
| compression_enabled: Optional[bool] = None, | |
| ) -> GenerateResult: | |
| """ | |
| Full pipeline: analyze → estimate → optimize → compress → route → return. | |
| Parameters | |
| ---------- | |
| query : The user's input query. | |
| budget_mode : "cheap" | "balanced" | "quality" | |
| max_cost_per_request: Hard cap in USD. None = unconstrained. | |
| quality_threshold : Minimum model capability score [0–1]. | |
| exclude_providers : List of provider names to skip. | |
| only_providers : Restrict to these providers only. | |
| prefer_local : Prefer Ollama models. | |
| conversation_history: List of {"role":..., "content":...} dicts. | |
| temperature : Sampling temperature passed to the model. | |
| dry_run : If True, skips actual LLM call (returns mock). | |
| """ | |
| t0 = time.perf_counter() | |
| # 1. Analyze | |
| features = self.analyzer.analyze(query) | |
| logger.debug(f"Features: domain={features.primary_domain}, tokens={features.token_count}") | |
| # 2. Estimate complexity | |
| complexity = self.estimator.estimate(features) | |
| logger.debug(f"Complexity: {complexity.score:.3f} ({complexity.tier})") | |
| # 2.5 Check Semantic Cache | |
| if not dry_run and not conversation_history: | |
| cached_response = self.cache.get(query) | |
| if cached_response: | |
| latency_ms = (time.perf_counter() - t0) * 1000 | |
| logger.info("Returning cached response directly.") | |
| constraints = UserConstraints( | |
| budget_mode=budget_mode, | |
| compression_enabled=compression_enabled, | |
| ) | |
| optimization = self.engine.optimize( | |
| complexity=complexity, | |
| output_length_bucket=features.estimated_output_length, | |
| constraints=constraints, | |
| alpha=alpha, | |
| beta=beta, | |
| gamma=gamma, | |
| ) | |
| optimized_prompt = self.optimizer.optimize( | |
| query=query, | |
| system_prompt_style=optimization.system_prompt_style, | |
| compression_enabled=optimization.compression_enabled, | |
| conversation_history=conversation_history, | |
| ) | |
| # Baseline cost for metrics calculation | |
| baseline_cost = ( | |
| self._BASELINE_INPUT_COST * optimization.estimated_input_tokens / 1000 | |
| + self._BASELINE_OUTPUT_COST * optimization.estimated_output_tokens / 1000 | |
| ) | |
| return GenerateResult( | |
| response=cached_response, | |
| model_used="redis-semantic-cache", | |
| provider="cache", | |
| input_tokens=0, | |
| output_tokens=0, | |
| total_tokens=0, | |
| estimated_cost=0.0, | |
| tokens_saved=optimized_prompt.tokens_saved, | |
| cost_saved=round(baseline_cost, 6), | |
| compression_ratio=optimized_prompt.compression_ratio, | |
| query_features=features, | |
| complexity=complexity, | |
| optimization=optimization, | |
| optimized_prompt=optimized_prompt, | |
| latency_ms=round(latency_ms, 1), | |
| ) | |
| # 3. Build constraints | |
| constraints = UserConstraints( | |
| budget_mode=budget_mode, | |
| max_cost_per_request=max_cost_per_request, | |
| quality_threshold=quality_threshold, | |
| exclude_providers=exclude_providers or [], | |
| only_providers=only_providers or [], | |
| prefer_local=prefer_local, | |
| compression_enabled=compression_enabled, | |
| ) | |
| if prefer_local: | |
| constraints.only_providers = ["ollama"] | |
| # 4. Optimize (select model + config) | |
| if self._use_v2 and self._v2_engine is not None: | |
| # Update BYOK keys for this request | |
| if api_keys: | |
| self._v2_engine.update_keys(api_keys) | |
| # Build constraints dict for V2 engine | |
| v2_constraints = { | |
| "exclude_providers": exclude_providers or [], | |
| "only_providers": only_providers or [], | |
| } | |
| if max_cost_per_request is not None: | |
| v2_constraints["max_cost_per_request"] = max_cost_per_request | |
| if prefer_local: | |
| v2_constraints["only_providers"] = ["ollama"] | |
| decision = self._v2_engine.route( | |
| query_features=features, | |
| budget_mode=budget_mode, | |
| constraints=v2_constraints, | |
| ) | |
| optimization = self._v2_to_optimization_result(decision, complexity, features) | |
| else: | |
| optimization = self.engine.optimize( | |
| complexity=complexity, | |
| output_length_bucket=features.estimated_output_length, | |
| constraints=constraints, | |
| alpha=alpha, | |
| beta=beta, | |
| gamma=gamma, | |
| ) | |
| logger.debug(f"Selected: {optimization.selected_model}") | |
| # 5. Optimize prompt | |
| optimized_prompt = self.optimizer.optimize( | |
| query=query, | |
| system_prompt_style=optimization.system_prompt_style, | |
| compression_enabled=optimization.compression_enabled, | |
| conversation_history=conversation_history, | |
| ) | |
| # 6. Build messages | |
| messages = self._build_messages(optimized_prompt, conversation_history) | |
| # 7. Route (or dry_run) | |
| if dry_run: | |
| routed = self._mock_response(optimization) | |
| else: | |
| # Fetch model spec from appropriate registry | |
| if self._use_v2 and self._v2_engine is not None: | |
| # V2: look up from the merged V2 registry (knows all new model IDs) | |
| v2_spec = self._v2_engine._registry.get_model(optimization.selected_model) | |
| in_cost = v2_spec["input_cost_per_1k"] if v2_spec else optimization.estimated_cost / 2 | |
| out_cost = v2_spec["output_cost_per_1k"] if v2_spec else optimization.estimated_cost / 2 | |
| else: | |
| # V1: look up from the old ModelRegistry | |
| model_spec = self.registry.get(optimization.selected_model) | |
| in_cost = model_spec.input_cost_per_1k | |
| out_cost = model_spec.output_cost_per_1k | |
| routed = self.router.route( | |
| model_name=optimization.selected_model, | |
| provider=optimization.provider, | |
| messages=messages, | |
| max_tokens=optimization.max_tokens, | |
| temperature=temperature, | |
| input_cost_per_1k=in_cost, | |
| output_cost_per_1k=out_cost, | |
| api_keys=api_keys, | |
| ) | |
| latency_ms = (time.perf_counter() - t0) * 1000 | |
| # Save to cache | |
| if not dry_run and not conversation_history: | |
| self.cache.set(query, routed.content) | |
| # 8. Compute savings vs baseline | |
| baseline_cost = ( | |
| self._BASELINE_INPUT_COST * routed.input_tokens / 1000 | |
| + self._BASELINE_OUTPUT_COST * routed.output_tokens / 1000 | |
| ) | |
| cost_saved = max(0.0, baseline_cost - routed.estimated_cost) | |
| # 9. Evaluate (if requested) and feed optimizer | |
| evaluation = None | |
| if evaluate and not dry_run: | |
| evaluation = self.judge.evaluate(query, routed.content) | |
| if self._use_v2 and self._v2_engine is not None: | |
| # Feed outcome back into adaptive EMA updater | |
| self._v2_engine.record_outcome( | |
| model_id=routed.model_used, | |
| latency_ms=routed.latency_ms, | |
| success=True, | |
| quality_score=evaluation.overall if evaluation else None, | |
| cost_usd=routed.estimated_cost, | |
| ) | |
| elif evaluation: | |
| # V1 path: feed Bayesian optimizer | |
| α, β, γ = self.engine.bayes.get_weights(constraints.budget_mode) | |
| self.engine.bayes.record_outcome( | |
| budget_mode=constraints.budget_mode, | |
| alpha=α, beta=β, gamma=γ, | |
| actual_cost=routed.estimated_cost, | |
| quality_score=evaluation.overall, | |
| ) | |
| return GenerateResult( | |
| response=routed.content, | |
| model_used=routed.model_used, | |
| provider=routed.provider, | |
| input_tokens=routed.input_tokens, | |
| output_tokens=routed.output_tokens, | |
| total_tokens=routed.total_tokens, | |
| estimated_cost=routed.estimated_cost, | |
| tokens_saved=optimized_prompt.tokens_saved, | |
| cost_saved=round(cost_saved, 6), | |
| compression_ratio=optimized_prompt.compression_ratio, | |
| query_features=features, | |
| complexity=complexity, | |
| optimization=optimization, | |
| optimized_prompt=optimized_prompt, | |
| latency_ms=round(latency_ms, 1), | |
| evaluation=evaluation, | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Streaming variant | |
| # ------------------------------------------------------------------ | |
| def stream( | |
| self, | |
| query: str, | |
| budget_mode: str = "balanced", | |
| api_keys: Optional[Dict[str, str]] = None, | |
| **kwargs, | |
| ): | |
| """Yields text chunks. Pipeline still runs fully before streaming.""" | |
| features = self.analyzer.analyze(query) | |
| complexity = self.estimator.estimate(features) | |
| if self._use_v2 and self._v2_engine is not None: | |
| if api_keys: | |
| self._v2_engine.update_keys(api_keys) | |
| decision = self._v2_engine.route( | |
| query_features=features, | |
| budget_mode=budget_mode, | |
| ) | |
| optimization = self._v2_to_optimization_result(decision, complexity, features) | |
| else: | |
| constraints = UserConstraints(budget_mode=budget_mode) | |
| optimization = self.engine.optimize( | |
| complexity=complexity, | |
| output_length_bucket=features.estimated_output_length, | |
| constraints=constraints, | |
| ) | |
| optimized_prompt = self.optimizer.optimize( | |
| query=query, | |
| system_prompt_style=optimization.system_prompt_style, | |
| compression_enabled=optimization.compression_enabled, | |
| ) | |
| messages = self._build_messages(optimized_prompt, None) | |
| yield from self.router.stream( | |
| model_name=optimization.selected_model, | |
| messages=messages, | |
| max_tokens=optimization.max_tokens, | |
| provider=optimization.provider, | |
| api_keys=api_keys, | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Explainability (standalone) | |
| # ------------------------------------------------------------------ | |
| def explain( | |
| self, | |
| query: str, | |
| budget_mode: str = "balanced", | |
| alpha: Optional[float] = None, | |
| beta: Optional[float] = None, | |
| gamma: Optional[float] = None, | |
| compression_enabled: Optional[bool] = None, | |
| exclude_providers: Optional[list[str]] = None, | |
| only_providers: Optional[list[str]] = None, | |
| api_keys: Optional[Dict[str, str]] = None, | |
| ) -> dict: | |
| """ | |
| Returns a structured explanation of what LLMOpt would do for a query, | |
| without making an actual API call. | |
| """ | |
| features = self.analyzer.analyze(query) | |
| complexity = self.estimator.estimate(features) | |
| if self._use_v2 and self._v2_engine is not None: | |
| if api_keys: | |
| self._v2_engine.update_keys(api_keys) | |
| v2_constraints = { | |
| "exclude_providers": exclude_providers or [], | |
| "only_providers": only_providers or [], | |
| } | |
| decision = self._v2_engine.route( | |
| query_features=features, | |
| budget_mode=budget_mode, | |
| constraints=v2_constraints, | |
| ) | |
| optimization = self._v2_to_optimization_result(decision, complexity, features) | |
| else: | |
| constraints = UserConstraints( | |
| budget_mode=budget_mode, | |
| compression_enabled=compression_enabled, | |
| exclude_providers=exclude_providers or [], | |
| only_providers=only_providers or [], | |
| ) | |
| optimization = self.engine.optimize( | |
| complexity=complexity, | |
| output_length_bucket=features.estimated_output_length, | |
| constraints=constraints, | |
| alpha=alpha, | |
| beta=beta, | |
| gamma=gamma, | |
| ) | |
| optimized_prompt = self.optimizer.optimize( | |
| query=query, | |
| system_prompt_style=optimization.system_prompt_style, | |
| compression_enabled=optimization.compression_enabled, | |
| ) | |
| return { | |
| "query": query, | |
| "features": features.to_dict(), | |
| "complexity": complexity.to_dict(), | |
| "optimization": optimization.to_dict(), | |
| "optimized_prompt": optimized_prompt.to_dict(), | |
| } | |
| # ------------------------------------------------------------------ | |
| # Helpers | |
| # ------------------------------------------------------------------ | |
| def _build_messages( | |
| self, | |
| prompt: OptimizedPrompt, | |
| history: Optional[list[dict]], | |
| ) -> list[dict]: | |
| messages = [{"role": "system", "content": prompt.system_prompt}] | |
| if history: | |
| # Include only last 6 turns to keep context manageable | |
| messages.extend(history[-6:]) | |
| messages.append({"role": "user", "content": prompt.optimized_query}) | |
| return messages | |
| def _mock_response(self, optimization: OptimizationResult) -> RoutedResponse: | |
| """Dry-run mock — returns a placeholder without calling any API.""" | |
| from llmopt.router.model_router import RoutedResponse | |
| return RoutedResponse( | |
| content="[DRY RUN — no API call made]", | |
| model_used=optimization.selected_model, | |
| provider=optimization.provider, | |
| input_tokens=optimization.estimated_input_tokens, | |
| output_tokens=optimization.estimated_output_tokens, | |
| total_tokens=optimization.estimated_input_tokens + optimization.estimated_output_tokens, | |
| latency_ms=0.0, | |
| estimated_cost=optimization.estimated_cost, | |
| ) | |
| def _v2_to_optimization_result( | |
| decision: RoutingDecision, | |
| complexity: ComplexityResult, | |
| features: QueryFeatures, | |
| ) -> OptimizationResult: | |
| """ | |
| Compatibility shim: maps RoutingDecision (V2) → OptimizationResult (V1 shape). | |
| This allows all downstream pipeline stages (PromptOptimizer, ModelRouter, | |
| logging, GenerateResult) to remain completely unchanged while the routing | |
| layer has been replaced by the utility engine. | |
| OptimizationResult fields (from optimization_engine.py): | |
| selected_model, provider, estimated_cost, estimated_input_tokens, | |
| estimated_output_tokens, max_tokens, compression_enabled, | |
| system_prompt_style, rationale, fallback_model, objective_score | |
| """ | |
| ex = decision.explanation | |
| # Build a rationale list from the V2 explanation dict | |
| rationale = [ | |
| f"engine=utility_v2 domain={ex.get('primary_domain', 'general')}", | |
| f"utility_score={decision.utility_score:.4f} budget_lambda={ex.get('lambda', '?')}", | |
| f"top_dims={list(ex.get('query_dimensions', {}).keys())[:3]}", | |
| f"candidates_evaluated={ex.get('candidates_evaluated', '?')}", | |
| f"registry_source={ex.get('registry_source', 'baseline')}", | |
| ] | |
| if decision.fallback_model_id: | |
| rationale.append(f"fallback={decision.fallback_model_id} ({decision.fallback_provider})") | |
| # Output length → token estimate lookup | |
| output_token_map = {"short": 300, "medium": 700, "long": 1500, "very_long": 3000} | |
| est_output = output_token_map.get( | |
| str(getattr(features, 'estimated_output_length', 'medium')).lower(), 700 | |
| ) | |
| est_input = max(getattr(features, 'token_count', 100), 100) | |
| # Budget mode drives compression and prompt style | |
| budget_mode = ex.get("budget_mode", "balanced") | |
| compression = (budget_mode == "cheap") | |
| system_prompt_style = "minimal" if budget_mode == "cheap" else "standard" | |
| max_tokens = min(est_output + 200, 4096) | |
| return OptimizationResult( | |
| selected_model=decision.model_id, | |
| provider=decision.provider, | |
| estimated_cost=decision.estimated_cost, | |
| estimated_input_tokens=est_input, | |
| estimated_output_tokens=est_output, | |
| max_tokens=max_tokens, | |
| compression_enabled=compression, | |
| system_prompt_style=system_prompt_style, | |
| rationale=rationale, | |
| fallback_model=decision.fallback_model_id, | |
| objective_score=1.0 - decision.utility_score, # invert: lower is better (V1 convention) | |
| ) | |