""" LLMOpt — main client interface. This is the single entry point developers use: from llmopt import LLMOpt, UserConstraints client = LLMOpt() result = client.generate("Explain quicksort with Python code", budget_mode="balanced") print(result.response) print(result.explain()) """ from __future__ import annotations import logging import time import os from dataclasses import dataclass from pathlib import Path from typing import Optional, Dict, List from llmopt.analyzer.query_analyzer import QueryAnalyzer, QueryFeatures from llmopt.estimator.complexity_estimator import ComplexityEstimator, ComplexityResult from llmopt.engine.optimization_engine import OptimizationEngine, OptimizationResult, UserConstraints from llmopt.engine.llmopt_engine import LLMOptEngine from llmopt.engine.utility_engine import RoutingDecision from llmopt.optimizer.prompt_optimizer import PromptOptimizer, OptimizedPrompt from llmopt.router.model_router import ModelRouter, RoutedResponse from llmopt.registry.model_registry import ModelRegistry from llmopt.cache.semantic_cache import SemanticCache from llmopt.evaluation.evaluator import LLMJudge, EvaluationResult logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Full pipeline result # --------------------------------------------------------------------------- @dataclass class GenerateResult: # Core output response: str model_used: str provider: str # Cost / token metrics input_tokens: int output_tokens: int total_tokens: int estimated_cost: float # Savings metrics (vs naive GPT-4o baseline) tokens_saved: int cost_saved: float compression_ratio: float # Pipeline internals (for debugging / explainability) query_features: QueryFeatures complexity: ComplexityResult optimization: OptimizationResult optimized_prompt: OptimizedPrompt latency_ms: float evaluation: Optional[EvaluationResult] = None def explain(self) -> str: """Human-readable explanation of routing decisions.""" lines = [ "=" * 55, "LLMOpt Decision Explanation", "=" * 55, f"Query complexity : {self.complexity.score:.3f} ({self.complexity.tier})", f"Primary domain : {self.query_features.primary_domain}", f"Required reasoning: {self.complexity.required_reasoning:.2f}", f"Required coding : {self.complexity.required_coding:.2f}", f"Required math : {self.complexity.required_math:.2f}", "", f"Selected model : {self.model_used} ({self.provider})", f"Fallback model : {self.optimization.fallback_model or 'N/A'}", f"Compression : {'yes' if self.optimization.compression_enabled else 'no'}", f"System prompt : {self.optimization.system_prompt_style}", "", "Scoring rationale:", ] for r in self.optimization.rationale: lines.append(f" • {r}") lines += [ "", f"Tokens : {self.input_tokens} in / {self.output_tokens} out", f"Tokens saved : {self.tokens_saved} ({self.compression_ratio*100:.1f}% compression)", f"Cost : ${self.estimated_cost:.6f}", f"Cost saved : ${self.cost_saved:.6f} vs GPT-4o baseline", f"Latency : {self.latency_ms:.0f}ms", "=" * 55, ] return "\n".join(lines) def to_dict(self) -> dict: return { "response": self.response, "model_used": self.model_used, "provider": self.provider, "input_tokens": self.input_tokens, "output_tokens": self.output_tokens, "total_tokens": self.total_tokens, "estimated_cost": self.estimated_cost, "tokens_saved": self.tokens_saved, "cost_saved": self.cost_saved, "compression_ratio": self.compression_ratio, "complexity_score": self.complexity.score, "complexity_tier": self.complexity.tier, "latency_ms": self.latency_ms, } # --------------------------------------------------------------------------- # Main client # --------------------------------------------------------------------------- class LLMOpt: """ Adaptive LLM Inference Optimization client. Usage: client = LLMOpt() result = client.generate("What is Python?", budget_mode="cheap") """ # GPT-4o costs used as baseline for savings calculation _BASELINE_MODEL = "gpt-4o" _BASELINE_INPUT_COST = 0.0025 _BASELINE_OUTPUT_COST = 0.010 def __init__( self, registry_path: Optional[Path] = None, ollama_base_url: Optional[str] = None, log_level: str = "WARNING", use_v2_engine: bool = True, ): logging.basicConfig(level=getattr(logging, log_level.upper(), logging.WARNING)) self.registry = ModelRegistry(registry_path) self.analyzer = QueryAnalyzer() self.estimator = ComplexityEstimator() self.engine = OptimizationEngine(self.registry) # V1 — kept for fallback self.optimizer = PromptOptimizer() self.router = ModelRouter(ollama_base_url=ollama_base_url) # Initialize Semantic Cache (reads REDIS_URL from env if available) try: from dotenv import load_dotenv # type: ignore load_dotenv() load_dotenv("config/.env") except ImportError: pass redis_url = os.environ.get("REDIS_URL") self.cache = SemanticCache(redis_url=redis_url) self.judge = LLMJudge(judge_model="gpt-4o-mini") # V2 Utility Engine — default active self._use_v2 = use_v2_engine self._v2_engine: Optional[LLMOptEngine] = None if use_v2_engine: self._v2_engine = LLMOptEngine( available_keys={}, # populated per-request via update_keys() include_ollama=True, log_level=logging.WARNING, ) logger.info("[LLMOpt] V2 utility engine active.") # ------------------------------------------------------------------ # Primary API # ------------------------------------------------------------------ def generate( self, query: str, budget_mode: str = "balanced", max_cost_per_request: Optional[float] = None, quality_threshold: float = 0.60, exclude_providers: Optional[list[str]] = None, only_providers: Optional[list[str]] = None, prefer_local: bool = False, conversation_history: Optional[list[dict]] = None, temperature: float = 0.7, dry_run: bool = False, evaluate: bool = False, api_keys: Optional[Dict[str, str]] = None, alpha: Optional[float] = None, beta: Optional[float] = None, gamma: Optional[float] = None, compression_enabled: Optional[bool] = None, ) -> GenerateResult: """ Full pipeline: analyze → estimate → optimize → compress → route → return. Parameters ---------- query : The user's input query. budget_mode : "cheap" | "balanced" | "quality" max_cost_per_request: Hard cap in USD. None = unconstrained. quality_threshold : Minimum model capability score [0–1]. exclude_providers : List of provider names to skip. only_providers : Restrict to these providers only. prefer_local : Prefer Ollama models. conversation_history: List of {"role":..., "content":...} dicts. temperature : Sampling temperature passed to the model. dry_run : If True, skips actual LLM call (returns mock). """ t0 = time.perf_counter() # 1. Analyze features = self.analyzer.analyze(query) logger.debug(f"Features: domain={features.primary_domain}, tokens={features.token_count}") # 2. Estimate complexity complexity = self.estimator.estimate(features) logger.debug(f"Complexity: {complexity.score:.3f} ({complexity.tier})") # 2.5 Check Semantic Cache if not dry_run and not conversation_history: cached_response = self.cache.get(query) if cached_response: latency_ms = (time.perf_counter() - t0) * 1000 logger.info("Returning cached response directly.") constraints = UserConstraints( budget_mode=budget_mode, compression_enabled=compression_enabled, ) optimization = self.engine.optimize( complexity=complexity, output_length_bucket=features.estimated_output_length, constraints=constraints, alpha=alpha, beta=beta, gamma=gamma, ) optimized_prompt = self.optimizer.optimize( query=query, system_prompt_style=optimization.system_prompt_style, compression_enabled=optimization.compression_enabled, conversation_history=conversation_history, ) # Baseline cost for metrics calculation baseline_cost = ( self._BASELINE_INPUT_COST * optimization.estimated_input_tokens / 1000 + self._BASELINE_OUTPUT_COST * optimization.estimated_output_tokens / 1000 ) return GenerateResult( response=cached_response, model_used="redis-semantic-cache", provider="cache", input_tokens=0, output_tokens=0, total_tokens=0, estimated_cost=0.0, tokens_saved=optimized_prompt.tokens_saved, cost_saved=round(baseline_cost, 6), compression_ratio=optimized_prompt.compression_ratio, query_features=features, complexity=complexity, optimization=optimization, optimized_prompt=optimized_prompt, latency_ms=round(latency_ms, 1), ) # 3. Build constraints constraints = UserConstraints( budget_mode=budget_mode, max_cost_per_request=max_cost_per_request, quality_threshold=quality_threshold, exclude_providers=exclude_providers or [], only_providers=only_providers or [], prefer_local=prefer_local, compression_enabled=compression_enabled, ) if prefer_local: constraints.only_providers = ["ollama"] # 4. Optimize (select model + config) if self._use_v2 and self._v2_engine is not None: # Update BYOK keys for this request if api_keys: self._v2_engine.update_keys(api_keys) # Build constraints dict for V2 engine v2_constraints = { "exclude_providers": exclude_providers or [], "only_providers": only_providers or [], } if max_cost_per_request is not None: v2_constraints["max_cost_per_request"] = max_cost_per_request if prefer_local: v2_constraints["only_providers"] = ["ollama"] decision = self._v2_engine.route( query_features=features, budget_mode=budget_mode, constraints=v2_constraints, ) optimization = self._v2_to_optimization_result(decision, complexity, features) else: optimization = self.engine.optimize( complexity=complexity, output_length_bucket=features.estimated_output_length, constraints=constraints, alpha=alpha, beta=beta, gamma=gamma, ) logger.debug(f"Selected: {optimization.selected_model}") # 5. Optimize prompt optimized_prompt = self.optimizer.optimize( query=query, system_prompt_style=optimization.system_prompt_style, compression_enabled=optimization.compression_enabled, conversation_history=conversation_history, ) # 6. Build messages messages = self._build_messages(optimized_prompt, conversation_history) # 7. Route (or dry_run) if dry_run: routed = self._mock_response(optimization) else: # Fetch model spec from appropriate registry if self._use_v2 and self._v2_engine is not None: # V2: look up from the merged V2 registry (knows all new model IDs) v2_spec = self._v2_engine._registry.get_model(optimization.selected_model) in_cost = v2_spec["input_cost_per_1k"] if v2_spec else optimization.estimated_cost / 2 out_cost = v2_spec["output_cost_per_1k"] if v2_spec else optimization.estimated_cost / 2 else: # V1: look up from the old ModelRegistry model_spec = self.registry.get(optimization.selected_model) in_cost = model_spec.input_cost_per_1k out_cost = model_spec.output_cost_per_1k routed = self.router.route( model_name=optimization.selected_model, provider=optimization.provider, messages=messages, max_tokens=optimization.max_tokens, temperature=temperature, input_cost_per_1k=in_cost, output_cost_per_1k=out_cost, api_keys=api_keys, ) latency_ms = (time.perf_counter() - t0) * 1000 # Save to cache if not dry_run and not conversation_history: self.cache.set(query, routed.content) # 8. Compute savings vs baseline baseline_cost = ( self._BASELINE_INPUT_COST * routed.input_tokens / 1000 + self._BASELINE_OUTPUT_COST * routed.output_tokens / 1000 ) cost_saved = max(0.0, baseline_cost - routed.estimated_cost) # 9. Evaluate (if requested) and feed optimizer evaluation = None if evaluate and not dry_run: evaluation = self.judge.evaluate(query, routed.content) if self._use_v2 and self._v2_engine is not None: # Feed outcome back into adaptive EMA updater self._v2_engine.record_outcome( model_id=routed.model_used, latency_ms=routed.latency_ms, success=True, quality_score=evaluation.overall if evaluation else None, cost_usd=routed.estimated_cost, ) elif evaluation: # V1 path: feed Bayesian optimizer α, β, γ = self.engine.bayes.get_weights(constraints.budget_mode) self.engine.bayes.record_outcome( budget_mode=constraints.budget_mode, alpha=α, beta=β, gamma=γ, actual_cost=routed.estimated_cost, quality_score=evaluation.overall, ) return GenerateResult( response=routed.content, model_used=routed.model_used, provider=routed.provider, input_tokens=routed.input_tokens, output_tokens=routed.output_tokens, total_tokens=routed.total_tokens, estimated_cost=routed.estimated_cost, tokens_saved=optimized_prompt.tokens_saved, cost_saved=round(cost_saved, 6), compression_ratio=optimized_prompt.compression_ratio, query_features=features, complexity=complexity, optimization=optimization, optimized_prompt=optimized_prompt, latency_ms=round(latency_ms, 1), evaluation=evaluation, ) # ------------------------------------------------------------------ # Streaming variant # ------------------------------------------------------------------ def stream( self, query: str, budget_mode: str = "balanced", api_keys: Optional[Dict[str, str]] = None, **kwargs, ): """Yields text chunks. Pipeline still runs fully before streaming.""" features = self.analyzer.analyze(query) complexity = self.estimator.estimate(features) if self._use_v2 and self._v2_engine is not None: if api_keys: self._v2_engine.update_keys(api_keys) decision = self._v2_engine.route( query_features=features, budget_mode=budget_mode, ) optimization = self._v2_to_optimization_result(decision, complexity, features) else: constraints = UserConstraints(budget_mode=budget_mode) optimization = self.engine.optimize( complexity=complexity, output_length_bucket=features.estimated_output_length, constraints=constraints, ) optimized_prompt = self.optimizer.optimize( query=query, system_prompt_style=optimization.system_prompt_style, compression_enabled=optimization.compression_enabled, ) messages = self._build_messages(optimized_prompt, None) yield from self.router.stream( model_name=optimization.selected_model, messages=messages, max_tokens=optimization.max_tokens, provider=optimization.provider, api_keys=api_keys, ) # ------------------------------------------------------------------ # Explainability (standalone) # ------------------------------------------------------------------ def explain( self, query: str, budget_mode: str = "balanced", alpha: Optional[float] = None, beta: Optional[float] = None, gamma: Optional[float] = None, compression_enabled: Optional[bool] = None, exclude_providers: Optional[list[str]] = None, only_providers: Optional[list[str]] = None, api_keys: Optional[Dict[str, str]] = None, ) -> dict: """ Returns a structured explanation of what LLMOpt would do for a query, without making an actual API call. """ features = self.analyzer.analyze(query) complexity = self.estimator.estimate(features) if self._use_v2 and self._v2_engine is not None: if api_keys: self._v2_engine.update_keys(api_keys) v2_constraints = { "exclude_providers": exclude_providers or [], "only_providers": only_providers or [], } decision = self._v2_engine.route( query_features=features, budget_mode=budget_mode, constraints=v2_constraints, ) optimization = self._v2_to_optimization_result(decision, complexity, features) else: constraints = UserConstraints( budget_mode=budget_mode, compression_enabled=compression_enabled, exclude_providers=exclude_providers or [], only_providers=only_providers or [], ) optimization = self.engine.optimize( complexity=complexity, output_length_bucket=features.estimated_output_length, constraints=constraints, alpha=alpha, beta=beta, gamma=gamma, ) optimized_prompt = self.optimizer.optimize( query=query, system_prompt_style=optimization.system_prompt_style, compression_enabled=optimization.compression_enabled, ) return { "query": query, "features": features.to_dict(), "complexity": complexity.to_dict(), "optimization": optimization.to_dict(), "optimized_prompt": optimized_prompt.to_dict(), } # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _build_messages( self, prompt: OptimizedPrompt, history: Optional[list[dict]], ) -> list[dict]: messages = [{"role": "system", "content": prompt.system_prompt}] if history: # Include only last 6 turns to keep context manageable messages.extend(history[-6:]) messages.append({"role": "user", "content": prompt.optimized_query}) return messages def _mock_response(self, optimization: OptimizationResult) -> RoutedResponse: """Dry-run mock — returns a placeholder without calling any API.""" from llmopt.router.model_router import RoutedResponse return RoutedResponse( content="[DRY RUN — no API call made]", model_used=optimization.selected_model, provider=optimization.provider, input_tokens=optimization.estimated_input_tokens, output_tokens=optimization.estimated_output_tokens, total_tokens=optimization.estimated_input_tokens + optimization.estimated_output_tokens, latency_ms=0.0, estimated_cost=optimization.estimated_cost, ) @staticmethod def _v2_to_optimization_result( decision: RoutingDecision, complexity: ComplexityResult, features: QueryFeatures, ) -> OptimizationResult: """ Compatibility shim: maps RoutingDecision (V2) → OptimizationResult (V1 shape). This allows all downstream pipeline stages (PromptOptimizer, ModelRouter, logging, GenerateResult) to remain completely unchanged while the routing layer has been replaced by the utility engine. OptimizationResult fields (from optimization_engine.py): selected_model, provider, estimated_cost, estimated_input_tokens, estimated_output_tokens, max_tokens, compression_enabled, system_prompt_style, rationale, fallback_model, objective_score """ ex = decision.explanation # Build a rationale list from the V2 explanation dict rationale = [ f"engine=utility_v2 domain={ex.get('primary_domain', 'general')}", f"utility_score={decision.utility_score:.4f} budget_lambda={ex.get('lambda', '?')}", f"top_dims={list(ex.get('query_dimensions', {}).keys())[:3]}", f"candidates_evaluated={ex.get('candidates_evaluated', '?')}", f"registry_source={ex.get('registry_source', 'baseline')}", ] if decision.fallback_model_id: rationale.append(f"fallback={decision.fallback_model_id} ({decision.fallback_provider})") # Output length → token estimate lookup output_token_map = {"short": 300, "medium": 700, "long": 1500, "very_long": 3000} est_output = output_token_map.get( str(getattr(features, 'estimated_output_length', 'medium')).lower(), 700 ) est_input = max(getattr(features, 'token_count', 100), 100) # Budget mode drives compression and prompt style budget_mode = ex.get("budget_mode", "balanced") compression = (budget_mode == "cheap") system_prompt_style = "minimal" if budget_mode == "cheap" else "standard" max_tokens = min(est_output + 200, 4096) return OptimizationResult( selected_model=decision.model_id, provider=decision.provider, estimated_cost=decision.estimated_cost, estimated_input_tokens=est_input, estimated_output_tokens=est_output, max_tokens=max_tokens, compression_enabled=compression, system_prompt_style=system_prompt_style, rationale=rationale, fallback_model=decision.fallback_model_id, objective_score=1.0 - decision.utility_score, # invert: lower is better (V1 convention) )