llmopt-server / llmopt /core.py
Shrot101's picture
feat: initialize core LLMOpt framework including model routing, optimization engines, and frontend dashboard infrastructure.
bd238e9
"""
LLMOpt — main client interface.
This is the single entry point developers use:
from llmopt import LLMOpt, UserConstraints
client = LLMOpt()
result = client.generate("Explain quicksort with Python code", budget_mode="balanced")
print(result.response)
print(result.explain())
"""
from __future__ import annotations
import logging
import time
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Dict, List
from llmopt.analyzer.query_analyzer import QueryAnalyzer, QueryFeatures
from llmopt.estimator.complexity_estimator import ComplexityEstimator, ComplexityResult
from llmopt.engine.optimization_engine import OptimizationEngine, OptimizationResult, UserConstraints
from llmopt.engine.llmopt_engine import LLMOptEngine
from llmopt.engine.utility_engine import RoutingDecision
from llmopt.optimizer.prompt_optimizer import PromptOptimizer, OptimizedPrompt
from llmopt.router.model_router import ModelRouter, RoutedResponse
from llmopt.registry.model_registry import ModelRegistry
from llmopt.cache.semantic_cache import SemanticCache
from llmopt.evaluation.evaluator import LLMJudge, EvaluationResult
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Full pipeline result
# ---------------------------------------------------------------------------
@dataclass
class GenerateResult:
# Core output
response: str
model_used: str
provider: str
# Cost / token metrics
input_tokens: int
output_tokens: int
total_tokens: int
estimated_cost: float
# Savings metrics (vs naive GPT-4o baseline)
tokens_saved: int
cost_saved: float
compression_ratio: float
# Pipeline internals (for debugging / explainability)
query_features: QueryFeatures
complexity: ComplexityResult
optimization: OptimizationResult
optimized_prompt: OptimizedPrompt
latency_ms: float
evaluation: Optional[EvaluationResult] = None
def explain(self) -> str:
"""Human-readable explanation of routing decisions."""
lines = [
"=" * 55,
"LLMOpt Decision Explanation",
"=" * 55,
f"Query complexity : {self.complexity.score:.3f} ({self.complexity.tier})",
f"Primary domain : {self.query_features.primary_domain}",
f"Required reasoning: {self.complexity.required_reasoning:.2f}",
f"Required coding : {self.complexity.required_coding:.2f}",
f"Required math : {self.complexity.required_math:.2f}",
"",
f"Selected model : {self.model_used} ({self.provider})",
f"Fallback model : {self.optimization.fallback_model or 'N/A'}",
f"Compression : {'yes' if self.optimization.compression_enabled else 'no'}",
f"System prompt : {self.optimization.system_prompt_style}",
"",
"Scoring rationale:",
]
for r in self.optimization.rationale:
lines.append(f" • {r}")
lines += [
"",
f"Tokens : {self.input_tokens} in / {self.output_tokens} out",
f"Tokens saved : {self.tokens_saved} ({self.compression_ratio*100:.1f}% compression)",
f"Cost : ${self.estimated_cost:.6f}",
f"Cost saved : ${self.cost_saved:.6f} vs GPT-4o baseline",
f"Latency : {self.latency_ms:.0f}ms",
"=" * 55,
]
return "\n".join(lines)
def to_dict(self) -> dict:
return {
"response": self.response,
"model_used": self.model_used,
"provider": self.provider,
"input_tokens": self.input_tokens,
"output_tokens": self.output_tokens,
"total_tokens": self.total_tokens,
"estimated_cost": self.estimated_cost,
"tokens_saved": self.tokens_saved,
"cost_saved": self.cost_saved,
"compression_ratio": self.compression_ratio,
"complexity_score": self.complexity.score,
"complexity_tier": self.complexity.tier,
"latency_ms": self.latency_ms,
}
# ---------------------------------------------------------------------------
# Main client
# ---------------------------------------------------------------------------
class LLMOpt:
"""
Adaptive LLM Inference Optimization client.
Usage:
client = LLMOpt()
result = client.generate("What is Python?", budget_mode="cheap")
"""
# GPT-4o costs used as baseline for savings calculation
_BASELINE_MODEL = "gpt-4o"
_BASELINE_INPUT_COST = 0.0025
_BASELINE_OUTPUT_COST = 0.010
def __init__(
self,
registry_path: Optional[Path] = None,
ollama_base_url: Optional[str] = None,
log_level: str = "WARNING",
use_v2_engine: bool = True,
):
logging.basicConfig(level=getattr(logging, log_level.upper(), logging.WARNING))
self.registry = ModelRegistry(registry_path)
self.analyzer = QueryAnalyzer()
self.estimator = ComplexityEstimator()
self.engine = OptimizationEngine(self.registry) # V1 — kept for fallback
self.optimizer = PromptOptimizer()
self.router = ModelRouter(ollama_base_url=ollama_base_url)
# Initialize Semantic Cache (reads REDIS_URL from env if available)
try:
from dotenv import load_dotenv # type: ignore
load_dotenv()
load_dotenv("config/.env")
except ImportError:
pass
redis_url = os.environ.get("REDIS_URL")
self.cache = SemanticCache(redis_url=redis_url)
self.judge = LLMJudge(judge_model="gpt-4o-mini")
# V2 Utility Engine — default active
self._use_v2 = use_v2_engine
self._v2_engine: Optional[LLMOptEngine] = None
if use_v2_engine:
self._v2_engine = LLMOptEngine(
available_keys={}, # populated per-request via update_keys()
include_ollama=True,
log_level=logging.WARNING,
)
logger.info("[LLMOpt] V2 utility engine active.")
# ------------------------------------------------------------------
# Primary API
# ------------------------------------------------------------------
def generate(
self,
query: str,
budget_mode: str = "balanced",
max_cost_per_request: Optional[float] = None,
quality_threshold: float = 0.60,
exclude_providers: Optional[list[str]] = None,
only_providers: Optional[list[str]] = None,
prefer_local: bool = False,
conversation_history: Optional[list[dict]] = None,
temperature: float = 0.7,
dry_run: bool = False,
evaluate: bool = False,
api_keys: Optional[Dict[str, str]] = None,
alpha: Optional[float] = None,
beta: Optional[float] = None,
gamma: Optional[float] = None,
compression_enabled: Optional[bool] = None,
) -> GenerateResult:
"""
Full pipeline: analyze → estimate → optimize → compress → route → return.
Parameters
----------
query : The user's input query.
budget_mode : "cheap" | "balanced" | "quality"
max_cost_per_request: Hard cap in USD. None = unconstrained.
quality_threshold : Minimum model capability score [0–1].
exclude_providers : List of provider names to skip.
only_providers : Restrict to these providers only.
prefer_local : Prefer Ollama models.
conversation_history: List of {"role":..., "content":...} dicts.
temperature : Sampling temperature passed to the model.
dry_run : If True, skips actual LLM call (returns mock).
"""
t0 = time.perf_counter()
# 1. Analyze
features = self.analyzer.analyze(query)
logger.debug(f"Features: domain={features.primary_domain}, tokens={features.token_count}")
# 2. Estimate complexity
complexity = self.estimator.estimate(features)
logger.debug(f"Complexity: {complexity.score:.3f} ({complexity.tier})")
# 2.5 Check Semantic Cache
if not dry_run and not conversation_history:
cached_response = self.cache.get(query)
if cached_response:
latency_ms = (time.perf_counter() - t0) * 1000
logger.info("Returning cached response directly.")
constraints = UserConstraints(
budget_mode=budget_mode,
compression_enabled=compression_enabled,
)
optimization = self.engine.optimize(
complexity=complexity,
output_length_bucket=features.estimated_output_length,
constraints=constraints,
alpha=alpha,
beta=beta,
gamma=gamma,
)
optimized_prompt = self.optimizer.optimize(
query=query,
system_prompt_style=optimization.system_prompt_style,
compression_enabled=optimization.compression_enabled,
conversation_history=conversation_history,
)
# Baseline cost for metrics calculation
baseline_cost = (
self._BASELINE_INPUT_COST * optimization.estimated_input_tokens / 1000
+ self._BASELINE_OUTPUT_COST * optimization.estimated_output_tokens / 1000
)
return GenerateResult(
response=cached_response,
model_used="redis-semantic-cache",
provider="cache",
input_tokens=0,
output_tokens=0,
total_tokens=0,
estimated_cost=0.0,
tokens_saved=optimized_prompt.tokens_saved,
cost_saved=round(baseline_cost, 6),
compression_ratio=optimized_prompt.compression_ratio,
query_features=features,
complexity=complexity,
optimization=optimization,
optimized_prompt=optimized_prompt,
latency_ms=round(latency_ms, 1),
)
# 3. Build constraints
constraints = UserConstraints(
budget_mode=budget_mode,
max_cost_per_request=max_cost_per_request,
quality_threshold=quality_threshold,
exclude_providers=exclude_providers or [],
only_providers=only_providers or [],
prefer_local=prefer_local,
compression_enabled=compression_enabled,
)
if prefer_local:
constraints.only_providers = ["ollama"]
# 4. Optimize (select model + config)
if self._use_v2 and self._v2_engine is not None:
# Update BYOK keys for this request
if api_keys:
self._v2_engine.update_keys(api_keys)
# Build constraints dict for V2 engine
v2_constraints = {
"exclude_providers": exclude_providers or [],
"only_providers": only_providers or [],
}
if max_cost_per_request is not None:
v2_constraints["max_cost_per_request"] = max_cost_per_request
if prefer_local:
v2_constraints["only_providers"] = ["ollama"]
decision = self._v2_engine.route(
query_features=features,
budget_mode=budget_mode,
constraints=v2_constraints,
)
optimization = self._v2_to_optimization_result(decision, complexity, features)
else:
optimization = self.engine.optimize(
complexity=complexity,
output_length_bucket=features.estimated_output_length,
constraints=constraints,
alpha=alpha,
beta=beta,
gamma=gamma,
)
logger.debug(f"Selected: {optimization.selected_model}")
# 5. Optimize prompt
optimized_prompt = self.optimizer.optimize(
query=query,
system_prompt_style=optimization.system_prompt_style,
compression_enabled=optimization.compression_enabled,
conversation_history=conversation_history,
)
# 6. Build messages
messages = self._build_messages(optimized_prompt, conversation_history)
# 7. Route (or dry_run)
if dry_run:
routed = self._mock_response(optimization)
else:
# Fetch model spec from appropriate registry
if self._use_v2 and self._v2_engine is not None:
# V2: look up from the merged V2 registry (knows all new model IDs)
v2_spec = self._v2_engine._registry.get_model(optimization.selected_model)
in_cost = v2_spec["input_cost_per_1k"] if v2_spec else optimization.estimated_cost / 2
out_cost = v2_spec["output_cost_per_1k"] if v2_spec else optimization.estimated_cost / 2
else:
# V1: look up from the old ModelRegistry
model_spec = self.registry.get(optimization.selected_model)
in_cost = model_spec.input_cost_per_1k
out_cost = model_spec.output_cost_per_1k
routed = self.router.route(
model_name=optimization.selected_model,
provider=optimization.provider,
messages=messages,
max_tokens=optimization.max_tokens,
temperature=temperature,
input_cost_per_1k=in_cost,
output_cost_per_1k=out_cost,
api_keys=api_keys,
)
latency_ms = (time.perf_counter() - t0) * 1000
# Save to cache
if not dry_run and not conversation_history:
self.cache.set(query, routed.content)
# 8. Compute savings vs baseline
baseline_cost = (
self._BASELINE_INPUT_COST * routed.input_tokens / 1000
+ self._BASELINE_OUTPUT_COST * routed.output_tokens / 1000
)
cost_saved = max(0.0, baseline_cost - routed.estimated_cost)
# 9. Evaluate (if requested) and feed optimizer
evaluation = None
if evaluate and not dry_run:
evaluation = self.judge.evaluate(query, routed.content)
if self._use_v2 and self._v2_engine is not None:
# Feed outcome back into adaptive EMA updater
self._v2_engine.record_outcome(
model_id=routed.model_used,
latency_ms=routed.latency_ms,
success=True,
quality_score=evaluation.overall if evaluation else None,
cost_usd=routed.estimated_cost,
)
elif evaluation:
# V1 path: feed Bayesian optimizer
α, β, γ = self.engine.bayes.get_weights(constraints.budget_mode)
self.engine.bayes.record_outcome(
budget_mode=constraints.budget_mode,
alpha=α, beta=β, gamma=γ,
actual_cost=routed.estimated_cost,
quality_score=evaluation.overall,
)
return GenerateResult(
response=routed.content,
model_used=routed.model_used,
provider=routed.provider,
input_tokens=routed.input_tokens,
output_tokens=routed.output_tokens,
total_tokens=routed.total_tokens,
estimated_cost=routed.estimated_cost,
tokens_saved=optimized_prompt.tokens_saved,
cost_saved=round(cost_saved, 6),
compression_ratio=optimized_prompt.compression_ratio,
query_features=features,
complexity=complexity,
optimization=optimization,
optimized_prompt=optimized_prompt,
latency_ms=round(latency_ms, 1),
evaluation=evaluation,
)
# ------------------------------------------------------------------
# Streaming variant
# ------------------------------------------------------------------
def stream(
self,
query: str,
budget_mode: str = "balanced",
api_keys: Optional[Dict[str, str]] = None,
**kwargs,
):
"""Yields text chunks. Pipeline still runs fully before streaming."""
features = self.analyzer.analyze(query)
complexity = self.estimator.estimate(features)
if self._use_v2 and self._v2_engine is not None:
if api_keys:
self._v2_engine.update_keys(api_keys)
decision = self._v2_engine.route(
query_features=features,
budget_mode=budget_mode,
)
optimization = self._v2_to_optimization_result(decision, complexity, features)
else:
constraints = UserConstraints(budget_mode=budget_mode)
optimization = self.engine.optimize(
complexity=complexity,
output_length_bucket=features.estimated_output_length,
constraints=constraints,
)
optimized_prompt = self.optimizer.optimize(
query=query,
system_prompt_style=optimization.system_prompt_style,
compression_enabled=optimization.compression_enabled,
)
messages = self._build_messages(optimized_prompt, None)
yield from self.router.stream(
model_name=optimization.selected_model,
messages=messages,
max_tokens=optimization.max_tokens,
provider=optimization.provider,
api_keys=api_keys,
)
# ------------------------------------------------------------------
# Explainability (standalone)
# ------------------------------------------------------------------
def explain(
self,
query: str,
budget_mode: str = "balanced",
alpha: Optional[float] = None,
beta: Optional[float] = None,
gamma: Optional[float] = None,
compression_enabled: Optional[bool] = None,
exclude_providers: Optional[list[str]] = None,
only_providers: Optional[list[str]] = None,
api_keys: Optional[Dict[str, str]] = None,
) -> dict:
"""
Returns a structured explanation of what LLMOpt would do for a query,
without making an actual API call.
"""
features = self.analyzer.analyze(query)
complexity = self.estimator.estimate(features)
if self._use_v2 and self._v2_engine is not None:
if api_keys:
self._v2_engine.update_keys(api_keys)
v2_constraints = {
"exclude_providers": exclude_providers or [],
"only_providers": only_providers or [],
}
decision = self._v2_engine.route(
query_features=features,
budget_mode=budget_mode,
constraints=v2_constraints,
)
optimization = self._v2_to_optimization_result(decision, complexity, features)
else:
constraints = UserConstraints(
budget_mode=budget_mode,
compression_enabled=compression_enabled,
exclude_providers=exclude_providers or [],
only_providers=only_providers or [],
)
optimization = self.engine.optimize(
complexity=complexity,
output_length_bucket=features.estimated_output_length,
constraints=constraints,
alpha=alpha,
beta=beta,
gamma=gamma,
)
optimized_prompt = self.optimizer.optimize(
query=query,
system_prompt_style=optimization.system_prompt_style,
compression_enabled=optimization.compression_enabled,
)
return {
"query": query,
"features": features.to_dict(),
"complexity": complexity.to_dict(),
"optimization": optimization.to_dict(),
"optimized_prompt": optimized_prompt.to_dict(),
}
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _build_messages(
self,
prompt: OptimizedPrompt,
history: Optional[list[dict]],
) -> list[dict]:
messages = [{"role": "system", "content": prompt.system_prompt}]
if history:
# Include only last 6 turns to keep context manageable
messages.extend(history[-6:])
messages.append({"role": "user", "content": prompt.optimized_query})
return messages
def _mock_response(self, optimization: OptimizationResult) -> RoutedResponse:
"""Dry-run mock — returns a placeholder without calling any API."""
from llmopt.router.model_router import RoutedResponse
return RoutedResponse(
content="[DRY RUN — no API call made]",
model_used=optimization.selected_model,
provider=optimization.provider,
input_tokens=optimization.estimated_input_tokens,
output_tokens=optimization.estimated_output_tokens,
total_tokens=optimization.estimated_input_tokens + optimization.estimated_output_tokens,
latency_ms=0.0,
estimated_cost=optimization.estimated_cost,
)
@staticmethod
def _v2_to_optimization_result(
decision: RoutingDecision,
complexity: ComplexityResult,
features: QueryFeatures,
) -> OptimizationResult:
"""
Compatibility shim: maps RoutingDecision (V2) → OptimizationResult (V1 shape).
This allows all downstream pipeline stages (PromptOptimizer, ModelRouter,
logging, GenerateResult) to remain completely unchanged while the routing
layer has been replaced by the utility engine.
OptimizationResult fields (from optimization_engine.py):
selected_model, provider, estimated_cost, estimated_input_tokens,
estimated_output_tokens, max_tokens, compression_enabled,
system_prompt_style, rationale, fallback_model, objective_score
"""
ex = decision.explanation
# Build a rationale list from the V2 explanation dict
rationale = [
f"engine=utility_v2 domain={ex.get('primary_domain', 'general')}",
f"utility_score={decision.utility_score:.4f} budget_lambda={ex.get('lambda', '?')}",
f"top_dims={list(ex.get('query_dimensions', {}).keys())[:3]}",
f"candidates_evaluated={ex.get('candidates_evaluated', '?')}",
f"registry_source={ex.get('registry_source', 'baseline')}",
]
if decision.fallback_model_id:
rationale.append(f"fallback={decision.fallback_model_id} ({decision.fallback_provider})")
# Output length → token estimate lookup
output_token_map = {"short": 300, "medium": 700, "long": 1500, "very_long": 3000}
est_output = output_token_map.get(
str(getattr(features, 'estimated_output_length', 'medium')).lower(), 700
)
est_input = max(getattr(features, 'token_count', 100), 100)
# Budget mode drives compression and prompt style
budget_mode = ex.get("budget_mode", "balanced")
compression = (budget_mode == "cheap")
system_prompt_style = "minimal" if budget_mode == "cheap" else "standard"
max_tokens = min(est_output + 200, 4096)
return OptimizationResult(
selected_model=decision.model_id,
provider=decision.provider,
estimated_cost=decision.estimated_cost,
estimated_input_tokens=est_input,
estimated_output_tokens=est_output,
max_tokens=max_tokens,
compression_enabled=compression,
system_prompt_style=system_prompt_style,
rationale=rationale,
fallback_model=decision.fallback_model_id,
objective_score=1.0 - decision.utility_score, # invert: lower is better (V1 convention)
)