""" Performance monitoring and optimization utilities. """ import time import psutil import functools from typing import Dict, Any, Callable, Optional from contextlib import contextmanager from app.core.logging import get_logger logger = get_logger(__name__) class PerformanceMonitor: """Performance monitoring utility.""" def __init__(self): self.metrics = { "requests_total": 0, "requests_successful": 0, "requests_failed": 0, "total_processing_time": 0.0, "average_processing_time": 0.0, "min_processing_time": float('inf'), "max_processing_time": 0.0 } def record_request(self, processing_time: float, success: bool = True): """Record a request's performance metrics.""" self.metrics["requests_total"] += 1 if success: self.metrics["requests_successful"] += 1 else: self.metrics["requests_failed"] += 1 self.metrics["total_processing_time"] += processing_time self.metrics["average_processing_time"] = ( self.metrics["total_processing_time"] / self.metrics["requests_total"] ) if processing_time < self.metrics["min_processing_time"]: self.metrics["min_processing_time"] = processing_time if processing_time > self.metrics["max_processing_time"]: self.metrics["max_processing_time"] = processing_time def get_metrics(self) -> Dict[str, Any]: """Get current performance metrics.""" metrics = self.metrics.copy() # Add system metrics try: metrics.update({ "cpu_percent": psutil.cpu_percent(), "memory_percent": psutil.virtual_memory().percent, "memory_available_mb": psutil.virtual_memory().available / (1024 * 1024) }) except Exception as e: logger.warning(f"Failed to get system metrics: {str(e)}") return metrics def reset_metrics(self): """Reset all metrics.""" self.metrics = { "requests_total": 0, "requests_successful": 0, "requests_failed": 0, "total_processing_time": 0.0, "average_processing_time": 0.0, "min_processing_time": float('inf'), "max_processing_time": 0.0 } # Global performance monitor instance performance_monitor = PerformanceMonitor() @contextmanager def measure_time(): """Context manager to measure execution time.""" start_time = time.time() try: yield finally: end_time = time.time() execution_time = end_time - start_time logger.debug(f"Execution time: {execution_time:.3f}s") def timed_function(func: Callable) -> Callable: """Decorator to measure function execution time.""" @functools.wraps(func) def wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) success = True return result except Exception as e: success = False raise finally: end_time = time.time() execution_time = end_time - start_time performance_monitor.record_request(execution_time, success) logger.debug(f"{func.__name__} execution time: {execution_time:.3f}s") return wrapper async def timed_async_function(func: Callable) -> Callable: """Decorator to measure async function execution time.""" @functools.wraps(func) async def wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) success = True return result except Exception as e: success = False raise finally: end_time = time.time() execution_time = end_time - start_time performance_monitor.record_request(execution_time, success) logger.debug(f"{func.__name__} execution time: {execution_time:.3f}s") return wrapper class SimpleCache: """Simple in-memory cache for inference results.""" def __init__(self, max_size: int = 100, ttl: int = 3600): """ Initialize cache. Args: max_size: Maximum number of items to cache ttl: Time to live in seconds """ self.max_size = max_size self.ttl = ttl self.cache = {} self.access_times = {} def _is_expired(self, key: str) -> bool: """Check if a cache entry is expired.""" if key not in self.access_times: return True return time.time() - self.access_times[key] > self.ttl def _evict_expired(self): """Remove expired entries.""" current_time = time.time() expired_keys = [ key for key, access_time in self.access_times.items() if current_time - access_time > self.ttl ] for key in expired_keys: self.cache.pop(key, None) self.access_times.pop(key, None) def _evict_lru(self): """Remove least recently used entry.""" if not self.access_times: return lru_key = min(self.access_times.keys(), key=lambda k: self.access_times[k]) self.cache.pop(lru_key, None) self.access_times.pop(lru_key, None) def get(self, key: str) -> Optional[Any]: """Get item from cache.""" if key not in self.cache or self._is_expired(key): return None self.access_times[key] = time.time() return self.cache[key] def set(self, key: str, value: Any): """Set item in cache.""" # Clean up expired entries self._evict_expired() # Evict LRU if at max size while len(self.cache) >= self.max_size: self._evict_lru() self.cache[key] = value self.access_times[key] = time.time() def clear(self): """Clear all cache entries.""" self.cache.clear() self.access_times.clear() def size(self) -> int: """Get current cache size.""" return len(self.cache) def stats(self) -> Dict[str, Any]: """Get cache statistics.""" return { "size": len(self.cache), "max_size": self.max_size, "ttl": self.ttl, "hit_ratio": getattr(self, '_hits', 0) / max(getattr(self, '_requests', 1), 1) } # Global cache instance inference_cache = SimpleCache(max_size=50, ttl=1800) # 30 minutes TTL def get_system_info() -> Dict[str, Any]: """Get system information.""" try: return { "cpu_count": psutil.cpu_count(), "cpu_percent": psutil.cpu_percent(interval=1), "memory_total_gb": psutil.virtual_memory().total / (1024**3), "memory_available_gb": psutil.virtual_memory().available / (1024**3), "memory_percent": psutil.virtual_memory().percent, "disk_usage_percent": psutil.disk_usage('/').percent } except Exception as e: logger.error(f"Failed to get system info: {str(e)}") return {"error": str(e)}