blux-ca / ca /core /llm_adapter.py
Justadudeinspace
restructure and upgrade all ca python files
2c5ae19
from __future__ import annotations
import json
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, Union
from functools import lru_cache
import hashlib
from pathlib import Path
from ca.config import load_config
# Configure logging
logger = logging.getLogger(__name__)
class LLMProvider(str, Enum):
"""Supported LLM providers."""
OPENAI = "openai"
ANTHROPIC = "anthropic"
LOCAL = "local"
OLLAMA = "ollama"
LITELLM = "litellm"
MOCK = "mock" # For testing
@dataclass
class LLMResponse:
"""Structured LLM response."""
content: str
model: str
tokens_used: int
cost_estimate: float = 0.0
latency_ms: float = 0.0
cache_hit: bool = False
error: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class LLMRequest:
"""Structured LLM request."""
system: str
user: str
model: Optional[str] = None
temperature: float = 0.7
max_tokens: int = 1000
provider: Optional[LLMProvider] = None
stream: bool = False
metadata: Dict[str, Any] = field(default_factory=dict)
class LLMAdapter:
"""
Advanced LLM adapter with multiple backend support.
Handles provider abstraction, caching, error handling, and monitoring.
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or load_config().get("llm", {})
self.provider = LLMProvider(self.config.get("provider", "openai"))
self.default_model = self.config.get("model", "gpt-3.5-turbo")
self.temperature = self.config.get("temperature", 0.7)
self.max_tokens = self.config.get("max_tokens", 1000)
# Cache configuration
self.cache_enabled = self.config.get("cache_enabled", True)
self.cache_max_size = self.config.get("cache_max_size", 1000)
self._response_cache: Dict[str, LLMResponse] = {}
# Rate limiting
self.rate_limit_requests = self.config.get("rate_limit_requests", 60)
self.rate_limit_period = self.config.get("rate_limit_period", 60) # seconds
self._request_timestamps: List[float] = []
# Initialize provider client
self.client = self._initialize_client()
logger.info(f"LLM Adapter initialized with provider: {self.provider.value}")
def _initialize_client(self) -> Any:
"""Initialize the appropriate LLM client based on provider."""
try:
if self.provider == LLMProvider.OPENAI:
return self._init_openai_client()
elif self.provider == LLMProvider.ANTHROPIC:
return self._init_anthropic_client()
elif self.provider == LLMProvider.LOCAL:
return self._init_local_client()
elif self.provider == LLMProvider.OLLAMA:
return self._init_ollama_client()
elif self.provider == LLMProvider.LITELLM:
return self._init_litellm_client()
elif self.provider == LLMProvider.MOCK:
return self._init_mock_client()
else:
raise ValueError(f"Unsupported provider: {self.provider}")
except ImportError as e:
logger.error(f"Failed to import required package for {self.provider}: {e}")
raise
except Exception as e:
logger.error(f"Failed to initialize {self.provider} client: {e}")
raise
def _init_openai_client(self) -> Any:
"""Initialize OpenAI client."""
try:
from openai import OpenAI
api_key = self.config.get("api_key") or self.config.get("openai_api_key")
if not api_key:
logger.warning("OpenAI API key not found in config")
return OpenAI(api_key=api_key)
except ImportError:
logger.error("OpenAI package not installed. Install with: pip install openai")
raise
def _init_anthropic_client(self) -> Any:
"""Initialize Anthropic client."""
try:
import anthropic
api_key = self.config.get("api_key") or self.config.get("anthropic_api_key")
if not api_key:
logger.warning("Anthropic API key not found in config")
return anthropic.Anthropic(api_key=api_key)
except ImportError:
logger.error("Anthropic package not installed. Install with: pip install anthropic")
raise
def _init_local_client(self) -> Any:
"""Initialize local model client."""
try:
# This is a stub - implement based on your local model setup
# Could be transformers, llama.cpp, etc.
logger.info("Using local model provider (stub implementation)")
return {"type": "local", "model_path": self.config.get("model_path")}
except Exception as e:
logger.error(f"Failed to initialize local client: {e}")
raise
def _init_ollama_client(self) -> Any:
"""Initialize Ollama client."""
try:
import ollama
host = self.config.get("ollama_host", "http://localhost:11434")
logger.info(f"Using Ollama at {host}")
return ollama.Client(host=host)
except ImportError:
logger.error("Ollama package not installed. Install with: pip install ollama")
raise
def _init_litellm_client(self) -> Any:
"""Initialize LiteLLM client."""
try:
import litellm
# LiteLLM uses environment variables for configuration
logger.info("Using LiteLLM for multi-provider support")
return litellm
except ImportError:
logger.error("LiteLLM package not installed. Install with: pip install litellm")
raise
def _init_mock_client(self) -> Any:
"""Initialize mock client for testing."""
logger.info("Using mock LLM client (testing only)")
return {"type": "mock"}
def _check_rate_limit(self) -> bool:
"""Check if we're within rate limits."""
if not self.rate_limit_requests:
return True
now = time.time()
# Remove timestamps outside the period
self._request_timestamps = [
ts for ts in self._request_timestamps
if now - ts < self.rate_limit_period
]
if len(self._request_timestamps) >= self.rate_limit_requests:
wait_time = self.rate_limit_period - (now - self._request_timestamps[0])
logger.warning(f"Rate limit exceeded. Wait {wait_time:.1f}s")
return False
return True
def _get_cache_key(self, request: LLMRequest) -> str:
"""Generate cache key for request."""
content = f"{request.system}|{request.user}|{request.model}|{request.temperature}"
return hashlib.sha256(content.encode()).hexdigest()
def call(self, request: LLMRequest) -> LLMResponse:
"""
Main method to call LLM with structured request.
Args:
request: LLMRequest object with system and user prompts
Returns:
LLMResponse object with content and metadata
"""
start_time = time.time()
# Check rate limit
if not self._check_rate_limit():
return LLMResponse(
content="Rate limit exceeded. Please try again shortly.",
model=request.model or self.default_model,
tokens_used=0,
cost_estimate=0.0,
latency_ms=0.0,
error="Rate limit exceeded",
metadata={"rate_limited": True}
)
# Check cache
cache_key = self._get_cache_key(request)
if self.cache_enabled and cache_key in self._response_cache:
cached = self._response_cache[cache_key]
cached.cache_hit = True
cached.latency_ms = (time.time() - start_time) * 1000
logger.debug(f"Cache hit for request: {cache_key[:16]}...")
return cached
try:
# Record request timestamp
self._request_timestamps.append(time.time())
# Call appropriate provider
if self.provider == LLMProvider.OPENAI:
response = self._call_openai(request)
elif self.provider == LLMProvider.ANTHROPIC:
response = self._call_anthropic(request)
elif self.provider == LLMProvider.LOCAL:
response = self._call_local(request)
elif self.provider == LLMProvider.OLLAMA:
response = self._call_ollama(request)
elif self.provider == LLMProvider.LITELLM:
response = self._call_litellm(request)
elif self.provider == LLMProvider.MOCK:
response = self._call_mock(request)
else:
raise ValueError(f"Unsupported provider: {self.provider}")
# Calculate latency
response.latency_ms = (time.time() - start_time) * 1000
# Cache the response
if self.cache_enabled and not response.error:
if len(self._response_cache) >= self.cache_max_size:
# Remove oldest entry (FIFO)
oldest_key = next(iter(self._response_cache))
del self._response_cache[oldest_key]
self._response_cache[cache_key] = response
logger.info(f"LLM call completed: {response.tokens_used} tokens, "
f"{response.latency_ms:.0f}ms, model: {response.model}")
return response
except Exception as e:
logger.error(f"LLM call failed: {e}")
return LLMResponse(
content=f"Error processing request: {str(e)[:100]}",
model=request.model or self.default_model,
tokens_used=0,
cost_estimate=0.0,
latency_ms=(time.time() - start_time) * 1000,
error=str(e),
metadata={"failed": True}
)
def _call_openai(self, request: LLMRequest) -> LLMResponse:
"""Call OpenAI API."""
try:
model = request.model or self.default_model
response = self.client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": request.system},
{"role": "user", "content": request.user}
],
temperature=request.temperature,
max_tokens=request.max_tokens,
stream=request.stream
)
content = response.choices[0].message.content
tokens_used = response.usage.total_tokens if response.usage else 0
# Estimate cost (very rough)
cost_estimate = self._estimate_openai_cost(model, tokens_used)
return LLMResponse(
content=content or "",
model=model,
tokens_used=tokens_used,
cost_estimate=cost_estimate,
metadata={
"finish_reason": response.choices[0].finish_reason,
"provider": "openai"
}
)
except Exception as e:
logger.error(f"OpenAI API error: {e}")
raise
def _call_anthropic(self, request: LLMRequest) -> LLMResponse:
"""Call Anthropic API."""
try:
model = request.model or self.default_model
response = self.client.messages.create(
model=model,
max_tokens=request.max_tokens,
temperature=request.temperature,
system=request.system,
messages=[{"role": "user", "content": request.user}]
)
content = response.content[0].text
tokens_used = response.usage.input_tokens + response.usage.output_tokens
# Estimate cost (very rough)
cost_estimate = self._estimate_anthropic_cost(model, tokens_used)
return LLMResponse(
content=content,
model=model,
tokens_used=tokens_used,
cost_estimate=cost_estimate,
metadata={
"finish_reason": response.stop_reason,
"provider": "anthropic"
}
)
except Exception as e:
logger.error(f"Anthropic API error: {e}")
raise
def _call_local(self, request: LLMRequest) -> LLMResponse:
"""Call local model."""
# This is a stub implementation
# In practice, you'd integrate with your local model here
model = request.model or self.default_model
mock_response = f"[Local model: {model}]\n\nSystem: {request.system[:50]}...\n\nUser: {request.user[:100]}..."
return LLMResponse(
content=mock_response,
model=model,
tokens_used=len(mock_response.split()),
cost_estimate=0.0,
metadata={"provider": "local", "stub": True}
)
def _call_ollama(self, request: LLMRequest) -> LLMResponse:
"""Call Ollama API."""
try:
model = request.model or self.default_model
response = self.client.chat(
model=model,
messages=[
{"role": "system", "content": request.system},
{"role": "user", "content": request.user}
],
options={
"temperature": request.temperature,
"num_predict": request.max_tokens
}
)
content = response["message"]["content"]
# Ollama doesn't return token counts by default
tokens_used = len(content.split()) * 1.3 # Rough estimate
return LLMResponse(
content=content,
model=model,
tokens_used=int(tokens_used),
cost_estimate=0.0,
metadata={"provider": "ollama"}
)
except Exception as e:
logger.error(f"Ollama API error: {e}")
raise
def _call_litellm(self, request: LLMRequest) -> LLMResponse:
"""Call via LiteLLM."""
try:
model = request.model or self.default_model
response = self.client.completion(
model=model,
messages=[
{"role": "system", "content": request.system},
{"role": "user", "content": request.user}
],
temperature=request.temperature,
max_tokens=request.max_tokens
)
content = response.choices[0].message.content
tokens_used = response.usage.total_tokens if hasattr(response, 'usage') else 0
return LLMResponse(
content=content,
model=model,
tokens_used=tokens_used,
cost_estimate=0.0, # LiteLLM might provide cost tracking
metadata={"provider": "litellm"}
)
except Exception as e:
logger.error(f"LiteLLM error: {e}")
raise
def _call_mock(self, request: LLMRequest) -> LLMResponse:
"""Mock LLM call for testing."""
model = request.model or self.default_model
mock_response = f"[Mock LLM Response]\nSystem context: {request.system[:30]}...\n\nBased on your input: {request.user[:50]}...\n\nI understand and will respond thoughtfully."
return LLMResponse(
content=mock_response,
model=model,
tokens_used=len(mock_response.split()),
cost_estimate=0.0,
metadata={"provider": "mock", "test": True}
)
def _estimate_openai_cost(self, model: str, tokens: int) -> float:
"""Very rough OpenAI cost estimation."""
# Prices per 1K tokens (as of 2024, approximate)
prices = {
"gpt-4": (0.03, 0.06), # input, output per 1K tokens
"gpt-4-turbo": (0.01, 0.03),
"gpt-3.5-turbo": (0.0005, 0.0015),
}
for model_prefix, (input_price, output_price) in prices.items():
if model.startswith(model_prefix):
# Rough estimate: assume 2:1 input:output ratio
input_tokens = tokens * 0.67
output_tokens = tokens * 0.33
cost = (input_tokens / 1000 * input_price) + (output_tokens / 1000 * output_price)
return round(cost, 4)
return 0.0
def _estimate_anthropic_cost(self, model: str, tokens: int) -> float:
"""Very rough Anthropic cost estimation."""
prices = {
"claude-3-opus": (0.015, 0.075),
"claude-3-sonnet": (0.003, 0.015),
"claude-3-haiku": (0.00025, 0.00125),
}
for model_prefix, (input_price, output_price) in prices.items():
if model.startswith(model_prefix):
# Rough estimate: assume 2:1 input:output ratio
input_tokens = tokens * 0.67
output_tokens = tokens * 0.33
cost = (input_tokens / 1000 * input_price) + (output_tokens / 1000 * output_price)
return round(cost, 4)
return 0.0
def clear_cache(self) -> None:
"""Clear the response cache."""
self._response_cache.clear()
logger.info("LLM response cache cleared")
def get_stats(self) -> Dict[str, Any]:
"""Get adapter statistics."""
return {
"provider": self.provider.value,
"cache_size": len(self._response_cache),
"cache_hits": sum(1 for r in self._response_cache.values() if r.cache_hit),
"total_calls": len(self._request_timestamps),
"rate_limit": {
"requests": self.rate_limit_requests,
"period": self.rate_limit_period,
"current_window": len(self._request_timestamps)
}
}
# Global adapter instance for convenience
_adapter_instance: Optional[LLMAdapter] = None
def get_llm_adapter(config: Optional[Dict[str, Any]] = None) -> LLMAdapter:
"""Get or create global LLM adapter instance."""
global _adapter_instance
if _adapter_instance is None:
_adapter_instance = LLMAdapter(config)
return _adapter_instance
def call_llm(system: str, user: str, **kwargs) -> str:
"""
Convenience function for backward compatibility.
Args:
system: System prompt
user: User prompt
**kwargs: Additional arguments for LLMRequest
Returns:
LLM response text
"""
try:
adapter = get_llm_adapter()
request = LLMRequest(system=system, user=user, **kwargs)
response = adapter.call(request)
if response.error:
logger.error(f"LLM call failed: {response.error}")
return f"[Error: {response.error}] Please try again or check configuration."
return response.content
except Exception as e:
logger.error(f"Failed to call LLM: {e}")
# Fallback to simple implementation if adapter fails
return _fallback_llm(system, user)
def _fallback_llm(system: str, user: str) -> str:
"""Simple fallback LLM implementation."""
# Very basic rule-based responses for critical functionality
if "crisis" in system.lower() or "emergency" in system.lower():
return "I hear this feels urgent. Let's focus on grounding first. Take a breath."
elif "logical" in system.lower():
return "Let's break this down logically. What's the core question?"
elif "emotional" in system.lower():
return "I hear real feeling in this. Let's acknowledge what's present."
elif "shadow" in system.lower():
return "There may be patterns here worth exploring gently."
else:
return "I'm here to help you find clarity. Tell me more about what you're experiencing."