File size: 7,409 Bytes
bcc2f7b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
"""
Performance monitoring and optimization utilities.
"""
import time
import psutil
import functools
from typing import Dict, Any, Callable, Optional
from contextlib import contextmanager
from app.core.logging import get_logger
logger = get_logger(__name__)
class PerformanceMonitor:
"""Performance monitoring utility."""
def __init__(self):
self.metrics = {
"requests_total": 0,
"requests_successful": 0,
"requests_failed": 0,
"total_processing_time": 0.0,
"average_processing_time": 0.0,
"min_processing_time": float('inf'),
"max_processing_time": 0.0
}
def record_request(self, processing_time: float, success: bool = True):
"""Record a request's performance metrics."""
self.metrics["requests_total"] += 1
if success:
self.metrics["requests_successful"] += 1
else:
self.metrics["requests_failed"] += 1
self.metrics["total_processing_time"] += processing_time
self.metrics["average_processing_time"] = (
self.metrics["total_processing_time"] / self.metrics["requests_total"]
)
if processing_time < self.metrics["min_processing_time"]:
self.metrics["min_processing_time"] = processing_time
if processing_time > self.metrics["max_processing_time"]:
self.metrics["max_processing_time"] = processing_time
def get_metrics(self) -> Dict[str, Any]:
"""Get current performance metrics."""
metrics = self.metrics.copy()
# Add system metrics
try:
metrics.update({
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"memory_available_mb": psutil.virtual_memory().available / (1024 * 1024)
})
except Exception as e:
logger.warning(f"Failed to get system metrics: {str(e)}")
return metrics
def reset_metrics(self):
"""Reset all metrics."""
self.metrics = {
"requests_total": 0,
"requests_successful": 0,
"requests_failed": 0,
"total_processing_time": 0.0,
"average_processing_time": 0.0,
"min_processing_time": float('inf'),
"max_processing_time": 0.0
}
# Global performance monitor instance
performance_monitor = PerformanceMonitor()
@contextmanager
def measure_time():
"""Context manager to measure execution time."""
start_time = time.time()
try:
yield
finally:
end_time = time.time()
execution_time = end_time - start_time
logger.debug(f"Execution time: {execution_time:.3f}s")
def timed_function(func: Callable) -> Callable:
"""Decorator to measure function execution time."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
success = True
return result
except Exception as e:
success = False
raise
finally:
end_time = time.time()
execution_time = end_time - start_time
performance_monitor.record_request(execution_time, success)
logger.debug(f"{func.__name__} execution time: {execution_time:.3f}s")
return wrapper
async def timed_async_function(func: Callable) -> Callable:
"""Decorator to measure async function execution time."""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
success = True
return result
except Exception as e:
success = False
raise
finally:
end_time = time.time()
execution_time = end_time - start_time
performance_monitor.record_request(execution_time, success)
logger.debug(f"{func.__name__} execution time: {execution_time:.3f}s")
return wrapper
class SimpleCache:
"""Simple in-memory cache for inference results."""
def __init__(self, max_size: int = 100, ttl: int = 3600):
"""
Initialize cache.
Args:
max_size: Maximum number of items to cache
ttl: Time to live in seconds
"""
self.max_size = max_size
self.ttl = ttl
self.cache = {}
self.access_times = {}
def _is_expired(self, key: str) -> bool:
"""Check if a cache entry is expired."""
if key not in self.access_times:
return True
return time.time() - self.access_times[key] > self.ttl
def _evict_expired(self):
"""Remove expired entries."""
current_time = time.time()
expired_keys = [
key for key, access_time in self.access_times.items()
if current_time - access_time > self.ttl
]
for key in expired_keys:
self.cache.pop(key, None)
self.access_times.pop(key, None)
def _evict_lru(self):
"""Remove least recently used entry."""
if not self.access_times:
return
lru_key = min(self.access_times.keys(), key=lambda k: self.access_times[k])
self.cache.pop(lru_key, None)
self.access_times.pop(lru_key, None)
def get(self, key: str) -> Optional[Any]:
"""Get item from cache."""
if key not in self.cache or self._is_expired(key):
return None
self.access_times[key] = time.time()
return self.cache[key]
def set(self, key: str, value: Any):
"""Set item in cache."""
# Clean up expired entries
self._evict_expired()
# Evict LRU if at max size
while len(self.cache) >= self.max_size:
self._evict_lru()
self.cache[key] = value
self.access_times[key] = time.time()
def clear(self):
"""Clear all cache entries."""
self.cache.clear()
self.access_times.clear()
def size(self) -> int:
"""Get current cache size."""
return len(self.cache)
def stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
return {
"size": len(self.cache),
"max_size": self.max_size,
"ttl": self.ttl,
"hit_ratio": getattr(self, '_hits', 0) / max(getattr(self, '_requests', 1), 1)
}
# Global cache instance
inference_cache = SimpleCache(max_size=50, ttl=1800) # 30 minutes TTL
def get_system_info() -> Dict[str, Any]:
"""Get system information."""
try:
return {
"cpu_count": psutil.cpu_count(),
"cpu_percent": psutil.cpu_percent(interval=1),
"memory_total_gb": psutil.virtual_memory().total / (1024**3),
"memory_available_gb": psutil.virtual_memory().available / (1024**3),
"memory_percent": psutil.virtual_memory().percent,
"disk_usage_percent": psutil.disk_usage('/').percent
}
except Exception as e:
logger.error(f"Failed to get system info: {str(e)}")
return {"error": str(e)}
|