import asyncio import hashlib import time import psutil import gc from typing import Dict, List, Optional, Any from collections import OrderedDict from dataclasses import dataclass from datetime import datetime, timedelta import redis import json @dataclass class CacheEntry: response: str timestamp: datetime hit_count: int = 0 class PerformanceOptimizer: def __init__(self, cache_size: int = 100, cache_ttl_hours: int = 24, use_redis: bool = False): """Initialize performance optimizer with caching""" self.cache_size = cache_size self.cache_ttl = timedelta(hours=cache_ttl_hours) # Use Redis if available, fallback to in-memory self.use_redis = use_redis if use_redis: try: self.redis_client = redis.Redis( host='localhost', port=6379, decode_responses=True ) self.redis_client.ping() except: print("Redis not available, using in-memory cache") self.use_redis = False self.cache = OrderedDict() else: self.cache = OrderedDict() # Metrics self.metrics = { "cache_hits": 0, "cache_misses": 0, "total_requests": 0, "average_response_time": 0, "memory_usage_mb": 0 } def _hash_prompt(self, prompt: str) -> str: """Create hash for caching""" normalized = prompt.lower().strip() return hashlib.md5(normalized.encode()).hexdigest() def get_cached_response(self, prompt: str) -> Optional[str]: """Get response from cache if available""" self.metrics["total_requests"] += 1 prompt_hash = self._hash_prompt(prompt) if self.use_redis: cached = self.redis_client.get(f"chat:{prompt_hash}") if cached: self.metrics["cache_hits"] += 1 # Update hit count self.redis_client.hincrby(f"chat:stats:{prompt_hash}", "hits", 1) return json.loads(cached)["response"] else: if prompt_hash in self.cache: entry = self.cache[prompt_hash] # Check TTL if datetime.now() - entry.timestamp < self.cache_ttl: self.metrics["cache_hits"] += 1 entry.hit_count += 1 # Move to end (LRU) self.cache.move_to_end(prompt_hash) return entry.response else: # Expired del self.cache[prompt_hash] self.metrics["cache_misses"] += 1 return None def cache_response(self, prompt: str, response: str): """Cache a response""" prompt_hash = self._hash_prompt(prompt) if self.use_redis: cache_data = { "response": response, "timestamp": datetime.now().isoformat() } self.redis_client.setex( f"chat:{prompt_hash}", int(self.cache_ttl.total_seconds()), json.dumps(cache_data) ) self.redis_client.hset( f"chat:stats:{prompt_hash}", mapping={"hits": 0, "created": datetime.now().isoformat()} ) else: # LRU cache management if len(self.cache) >= self.cache_size: # Remove least recently used self.cache.popitem(last=False) self.cache[prompt_hash] = CacheEntry( response=response, timestamp=datetime.now() ) def get_metrics(self) -> Dict[str, Any]: """Get performance metrics""" # Update memory usage process = psutil.Process() self.metrics["memory_usage_mb"] = process.memory_info().rss / 1024 / 1024 # Calculate cache hit rate if self.metrics["total_requests"] > 0: self.metrics["cache_hit_rate"] = ( self.metrics["cache_hits"] / self.metrics["total_requests"] ) return self.metrics def clear_cache(self): """Clear all cached responses""" if self.use_redis: for key in self.redis_client.scan_iter("chat:*"): self.redis_client.delete(key) else: self.cache.clear() gc.collect() class MemoryManager: def __init__(self, max_memory_mb: int = 8192): """Initialize memory manager""" self.max_memory_mb = max_memory_mb self.warning_threshold = 0.8 # Warn at 80% usage self.critical_threshold = 0.9 # Critical at 90% usage def check_memory(self) -> Dict[str, Any]: """Check current memory usage""" process = psutil.Process() memory_info = process.memory_info() current_mb = memory_info.rss / 1024 / 1024 percentage = current_mb / self.max_memory_mb status = "normal" if percentage > self.critical_threshold: status = "critical" elif percentage > self.warning_threshold: status = "warning" return { "current_mb": round(current_mb, 2), "max_mb": self.max_memory_mb, "percentage": round(percentage * 100, 2), "status": status, "available_mb": round(self.max_memory_mb - current_mb, 2) } def optimize_if_needed(self) -> bool: """Run optimization if memory usage is high""" memory_status = self.check_memory() if memory_status["status"] in ["warning", "critical"]: # Force garbage collection gc.collect() # Clear unused objects if memory_status["status"] == "critical": # More aggressive cleanup gc.collect(2) return True return False class RequestBatcher: def __init__(self, batch_size: int = 5, timeout_ms: int = 100): """Initialize request batcher for efficiency""" self.batch_size = batch_size self.timeout_ms = timeout_ms self.pending_requests = [] self.results = {} async def add_request(self, request_id: str, prompt: str) -> str: """Add request to batch""" self.pending_requests.append({ "id": request_id, "prompt": prompt, "timestamp": time.time() }) # Process if batch is full if len(self.pending_requests) >= self.batch_size: await self._process_batch() else: # Wait for timeout await asyncio.sleep(self.timeout_ms / 1000) if request_id not in self.results: await self._process_batch() return self.results.get(request_id, "Error processing request") async def _process_batch(self): """Process pending requests as batch""" if not self.pending_requests: return batch = self.pending_requests[:self.batch_size] self.pending_requests = self.pending_requests[self.batch_size:] # Process batch (simulate concurrent processing) tasks = [] for request in batch: # In production, this would call the LLM tasks.append(self._process_single(request)) results = await asyncio.gather(*tasks) for request, result in zip(batch, results): self.results[request["id"]] = result async def _process_single(self, request: Dict[str, Any]) -> str: """Process single request (placeholder)""" # Simulate processing await asyncio.sleep(0.1) return f"Response to: {request['prompt']}"