fishapi / app /utils /performance.py
kamau1's picture
Initial commit
bcc2f7b verified
"""
Performance monitoring and optimization utilities.
"""
import time
import psutil
import functools
from typing import Dict, Any, Callable, Optional
from contextlib import contextmanager
from app.core.logging import get_logger
logger = get_logger(__name__)
class PerformanceMonitor:
"""Performance monitoring utility."""
def __init__(self):
self.metrics = {
"requests_total": 0,
"requests_successful": 0,
"requests_failed": 0,
"total_processing_time": 0.0,
"average_processing_time": 0.0,
"min_processing_time": float('inf'),
"max_processing_time": 0.0
}
def record_request(self, processing_time: float, success: bool = True):
"""Record a request's performance metrics."""
self.metrics["requests_total"] += 1
if success:
self.metrics["requests_successful"] += 1
else:
self.metrics["requests_failed"] += 1
self.metrics["total_processing_time"] += processing_time
self.metrics["average_processing_time"] = (
self.metrics["total_processing_time"] / self.metrics["requests_total"]
)
if processing_time < self.metrics["min_processing_time"]:
self.metrics["min_processing_time"] = processing_time
if processing_time > self.metrics["max_processing_time"]:
self.metrics["max_processing_time"] = processing_time
def get_metrics(self) -> Dict[str, Any]:
"""Get current performance metrics."""
metrics = self.metrics.copy()
# Add system metrics
try:
metrics.update({
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"memory_available_mb": psutil.virtual_memory().available / (1024 * 1024)
})
except Exception as e:
logger.warning(f"Failed to get system metrics: {str(e)}")
return metrics
def reset_metrics(self):
"""Reset all metrics."""
self.metrics = {
"requests_total": 0,
"requests_successful": 0,
"requests_failed": 0,
"total_processing_time": 0.0,
"average_processing_time": 0.0,
"min_processing_time": float('inf'),
"max_processing_time": 0.0
}
# Global performance monitor instance
performance_monitor = PerformanceMonitor()
@contextmanager
def measure_time():
"""Context manager to measure execution time."""
start_time = time.time()
try:
yield
finally:
end_time = time.time()
execution_time = end_time - start_time
logger.debug(f"Execution time: {execution_time:.3f}s")
def timed_function(func: Callable) -> Callable:
"""Decorator to measure function execution time."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
success = True
return result
except Exception as e:
success = False
raise
finally:
end_time = time.time()
execution_time = end_time - start_time
performance_monitor.record_request(execution_time, success)
logger.debug(f"{func.__name__} execution time: {execution_time:.3f}s")
return wrapper
async def timed_async_function(func: Callable) -> Callable:
"""Decorator to measure async function execution time."""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
success = True
return result
except Exception as e:
success = False
raise
finally:
end_time = time.time()
execution_time = end_time - start_time
performance_monitor.record_request(execution_time, success)
logger.debug(f"{func.__name__} execution time: {execution_time:.3f}s")
return wrapper
class SimpleCache:
"""Simple in-memory cache for inference results."""
def __init__(self, max_size: int = 100, ttl: int = 3600):
"""
Initialize cache.
Args:
max_size: Maximum number of items to cache
ttl: Time to live in seconds
"""
self.max_size = max_size
self.ttl = ttl
self.cache = {}
self.access_times = {}
def _is_expired(self, key: str) -> bool:
"""Check if a cache entry is expired."""
if key not in self.access_times:
return True
return time.time() - self.access_times[key] > self.ttl
def _evict_expired(self):
"""Remove expired entries."""
current_time = time.time()
expired_keys = [
key for key, access_time in self.access_times.items()
if current_time - access_time > self.ttl
]
for key in expired_keys:
self.cache.pop(key, None)
self.access_times.pop(key, None)
def _evict_lru(self):
"""Remove least recently used entry."""
if not self.access_times:
return
lru_key = min(self.access_times.keys(), key=lambda k: self.access_times[k])
self.cache.pop(lru_key, None)
self.access_times.pop(lru_key, None)
def get(self, key: str) -> Optional[Any]:
"""Get item from cache."""
if key not in self.cache or self._is_expired(key):
return None
self.access_times[key] = time.time()
return self.cache[key]
def set(self, key: str, value: Any):
"""Set item in cache."""
# Clean up expired entries
self._evict_expired()
# Evict LRU if at max size
while len(self.cache) >= self.max_size:
self._evict_lru()
self.cache[key] = value
self.access_times[key] = time.time()
def clear(self):
"""Clear all cache entries."""
self.cache.clear()
self.access_times.clear()
def size(self) -> int:
"""Get current cache size."""
return len(self.cache)
def stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
return {
"size": len(self.cache),
"max_size": self.max_size,
"ttl": self.ttl,
"hit_ratio": getattr(self, '_hits', 0) / max(getattr(self, '_requests', 1), 1)
}
# Global cache instance
inference_cache = SimpleCache(max_size=50, ttl=1800) # 30 minutes TTL
def get_system_info() -> Dict[str, Any]:
"""Get system information."""
try:
return {
"cpu_count": psutil.cpu_count(),
"cpu_percent": psutil.cpu_percent(interval=1),
"memory_total_gb": psutil.virtual_memory().total / (1024**3),
"memory_available_gb": psutil.virtual_memory().available / (1024**3),
"memory_percent": psutil.virtual_memory().percent,
"disk_usage_percent": psutil.disk_usage('/').percent
}
except Exception as e:
logger.error(f"Failed to get system info: {str(e)}")
return {"error": str(e)}