Spaces:
Sleeping
Sleeping
| # monitoring.py - System monitoring and metrics | |
| import psutil | |
| import time | |
| import logging | |
| from datetime import datetime | |
| from typing import Dict, Any | |
| import asyncio | |
| import aiofiles | |
| import json | |
| class SystemMonitor: | |
| """System performance and health monitoring""" | |
| def __init__(self, log_file: str = "logs/metrics.log"): | |
| self.log_file = log_file | |
| self.logger = logging.getLogger("system_monitor") | |
| async def get_system_metrics(self) -> Dict[str, Any]: | |
| """Collect comprehensive system metrics""" | |
| # CPU metrics | |
| cpu_percent = psutil.cpu_percent(interval=1) | |
| cpu_count = psutil.cpu_count() | |
| # Memory metrics | |
| memory = psutil.virtual_memory() | |
| # Disk metrics | |
| disk = psutil.disk_usage('/') | |
| # Process metrics | |
| process = psutil.Process() | |
| process_memory = process.memory_info() | |
| metrics = { | |
| "timestamp": datetime.now().isoformat(), | |
| "system": { | |
| "cpu_percent": cpu_percent, | |
| "cpu_count": cpu_count, | |
| "memory_total": memory.total, | |
| "memory_available": memory.available, | |
| "memory_percent": memory.percent, | |
| "disk_total": disk.total, | |
| "disk_free": disk.free, | |
| "disk_percent": disk.percent | |
| }, | |
| "process": { | |
| "pid": process.pid, | |
| "memory_rss": process_memory.rss, | |
| "memory_vms": process_memory.vms, | |
| "cpu_percent": process.cpu_percent(), | |
| "num_threads": process.num_threads(), | |
| "create_time": process.create_time() | |
| } | |
| } | |
| return metrics | |
| async def log_metrics(self, metrics: Dict[str, Any]): | |
| """Log metrics to file""" | |
| async with aiofiles.open(self.log_file, 'a') as f: | |
| await f.write(json.dumps(metrics) + '\n') | |
| async def check_health(self) -> Dict[str, str]: | |
| """Perform health checks""" | |
| health_status = { | |
| "overall": "healthy", | |
| "components": {} | |
| } | |
| # Check CPU usage | |
| cpu_percent = psutil.cpu_percent(interval=1) | |
| if cpu_percent > 90: | |
| health_status["components"]["cpu"] = "critical" | |
| health_status["overall"] = "unhealthy" | |
| elif cpu_percent > 70: | |
| health_status["components"]["cpu"] = "warning" | |
| else: | |
| health_status["components"]["cpu"] = "healthy" | |
| # Check memory usage | |
| memory = psutil.virtual_memory() | |
| if memory.percent > 90: | |
| health_status["components"]["memory"] = "critical" | |
| health_status["overall"] = "unhealthy" | |
| elif memory.percent > 80: | |
| health_status["components"]["memory"] = "warning" | |
| else: | |
| health_status["components"]["memory"] = "healthy" | |
| # Check disk space | |
| disk = psutil.disk_usage('/') | |
| if disk.percent > 95: | |
| health_status["components"]["disk"] = "critical" | |
| health_status["overall"] = "unhealthy" | |
| elif disk.percent > 85: | |
| health_status["components"]["disk"] = "warning" | |
| else: | |
| health_status["components"]["disk"] = "healthy" | |
| return health_status | |
| class PerformanceProfiler: | |
| """Performance profiling for document processing""" | |
| def __init__(self): | |
| self.processing_times = [] | |
| self.error_rates = {} | |
| self.throughput_metrics = {} | |
| def record_processing_time(self, operation: str, duration: float, success: bool): | |
| """Record processing time and success rate""" | |
| timestamp = time.time() | |
| self.processing_times.append({ | |
| "operation": operation, | |
| "duration": duration, | |
| "success": success, | |
| "timestamp": timestamp | |
| }) | |
| # Update error rates | |
| if operation not in self.error_rates: | |
| self.error_rates[operation] = {"total": 0, "errors": 0} | |
| self.error_rates[operation]["total"] += 1 | |
| if not success: | |
| self.error_rates[operation]["errors"] += 1 | |
| def get_performance_summary(self) -> Dict[str, Any]: | |
| """Get performance summary""" | |
| if not self.processing_times: | |
| return {"message": "No performance data available"} | |
| # Calculate averages by operation | |
| operations = {} | |
| for record in self.processing_times: | |
| op = record["operation"] | |
| if op not in operations: | |
| operations[op] = [] | |
| operations[op].append(record["duration"]) | |
| summary = {} | |
| for op, times in operations.items(): | |
| avg_time = sum(times) / len(times) | |
| max_time = max(times) | |
| min_time = min(times) | |
| error_rate = 0 | |
| if op in self.error_rates: | |
| total = self.error_rates[op]["total"] | |
| errors = self.error_rates[op]["errors"] | |
| error_rate = (errors / total) * 100 if total > 0 else 0 | |
| summary[op] = { | |
| "avg_duration": round(avg_time, 2), | |
| "max_duration": round(max_time, 2), | |
| "min_duration": round(min_time, 2), | |
| "total_operations": len(times), | |
| "error_rate_percent": round(error_rate, 2) | |
| } | |
| return summary | |