Spaces:
Sleeping
Sleeping
| import asyncio | |
| import time | |
| from typing import Dict, List, Optional, Any | |
| from datetime import datetime, timezone, timedelta | |
| from dataclasses import dataclass, asdict | |
| from collections import defaultdict, deque | |
| import json | |
| import logging | |
| from enum import Enum | |
| import statistics | |
| from contextlib import asynccontextmanager | |
| # Configure metrics logger | |
| metrics_logger = logging.getLogger("performance_metrics") | |
| class MetricType(Enum): | |
| """Types of performance metrics""" | |
| QUERY_EXECUTION_TIME = "query_execution_time" | |
| QUERY_COUNT = "query_count" | |
| SLOW_QUERY_COUNT = "slow_query_count" | |
| ERROR_COUNT = "error_count" | |
| CONNECTION_COUNT = "connection_count" | |
| TRANSACTION_TIME = "transaction_time" | |
| class PerformanceMetric: | |
| """Individual performance metric data point""" | |
| metric_type: MetricType | |
| value: float | |
| timestamp: datetime | |
| labels: Optional[Dict[str, str]] = None | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary for serialization""" | |
| data = asdict(self) | |
| data['metric_type'] = self.metric_type.value | |
| data['timestamp'] = self.timestamp.isoformat() | |
| return data | |
| class MetricSummary: | |
| """Summary statistics for a metric""" | |
| metric_type: MetricType | |
| count: int | |
| min_value: float | |
| max_value: float | |
| avg_value: float | |
| median_value: float | |
| p95_value: float | |
| p99_value: float | |
| total_value: float | |
| time_window: str | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary for serialization""" | |
| data = asdict(self) | |
| data['metric_type'] = self.metric_type.value | |
| return data | |
| class PerformanceMetricsCollector: | |
| """Collects and analyzes database performance metrics""" | |
| def __init__(self, | |
| max_metrics_per_type: int = 1000, | |
| cleanup_interval: int = 300, # 5 minutes | |
| retention_hours: int = 24): | |
| """ | |
| Initialize metrics collector | |
| Args: | |
| max_metrics_per_type: Maximum metrics to keep per type | |
| cleanup_interval: Cleanup interval in seconds | |
| retention_hours: How long to retain metrics | |
| """ | |
| self.max_metrics_per_type = max_metrics_per_type | |
| self.cleanup_interval = cleanup_interval | |
| self.retention_hours = retention_hours | |
| # Store metrics in deques for efficient operations | |
| self.metrics: Dict[MetricType, deque] = defaultdict( | |
| lambda: deque(maxlen=max_metrics_per_type) | |
| ) | |
| # Aggregated counters for quick access | |
| self.counters: Dict[str, int] = defaultdict(int) | |
| self.gauges: Dict[str, float] = defaultdict(float) | |
| # Last cleanup time | |
| self.last_cleanup = time.time() | |
| # Start background cleanup task | |
| self._cleanup_task = None | |
| self._start_cleanup_task() | |
| def _start_cleanup_task(self): | |
| """Start background cleanup task""" | |
| try: | |
| if self._cleanup_task is None or self._cleanup_task.done(): | |
| self._cleanup_task = asyncio.create_task(self._periodic_cleanup()) | |
| except RuntimeError: | |
| # No event loop running, cleanup task will be started later | |
| pass | |
| async def _periodic_cleanup(self): | |
| """Periodic cleanup of old metrics""" | |
| while True: | |
| try: | |
| await asyncio.sleep(self.cleanup_interval) | |
| self._cleanup_old_metrics() | |
| except asyncio.CancelledError: | |
| break | |
| except Exception as e: | |
| metrics_logger.error(f"Error in periodic cleanup: {e}") | |
| def _cleanup_old_metrics(self): | |
| """Remove metrics older than retention period""" | |
| cutoff_time = datetime.now(timezone.utc) - timedelta(hours=self.retention_hours) | |
| for metric_type, metric_deque in self.metrics.items(): | |
| # Remove old metrics from the left side of deque | |
| while metric_deque and metric_deque[0].timestamp < cutoff_time: | |
| metric_deque.popleft() | |
| self.last_cleanup = time.time() | |
| metrics_logger.info(f"Cleaned up metrics older than {cutoff_time}") | |
| def record_metric(self, | |
| metric_type: MetricType, | |
| value: float, | |
| labels: Optional[Dict[str, str]] = None): | |
| """ | |
| Record a performance metric | |
| Args: | |
| metric_type: Type of metric | |
| value: Metric value | |
| labels: Optional labels for the metric | |
| """ | |
| metric = PerformanceMetric( | |
| metric_type=metric_type, | |
| value=value, | |
| timestamp=datetime.now(timezone.utc), | |
| labels=labels or {} | |
| ) | |
| self.metrics[metric_type].append(metric) | |
| # Update counters and gauges | |
| counter_key = f"{metric_type.value}_count" | |
| self.counters[counter_key] += 1 | |
| if metric_type in [MetricType.QUERY_EXECUTION_TIME, MetricType.TRANSACTION_TIME]: | |
| gauge_key = f"{metric_type.value}_latest" | |
| self.gauges[gauge_key] = value | |
| def record_query_execution(self, execution_time: float, query_type: str, is_slow: bool = False): | |
| """Record query execution metrics""" | |
| labels = {"query_type": query_type} | |
| self.record_metric(MetricType.QUERY_EXECUTION_TIME, execution_time, labels) | |
| self.record_metric(MetricType.QUERY_COUNT, 1, labels) | |
| if is_slow: | |
| self.record_metric(MetricType.SLOW_QUERY_COUNT, 1, labels) | |
| def record_query_error(self, query_type: str, error_type: str): | |
| """Record query error metrics""" | |
| labels = {"query_type": query_type, "error_type": error_type} | |
| self.record_metric(MetricType.ERROR_COUNT, 1, labels) | |
| def record_transaction_time(self, transaction_time: float, transaction_type: str = "default"): | |
| """Record transaction execution time""" | |
| labels = {"transaction_type": transaction_type} | |
| self.record_metric(MetricType.TRANSACTION_TIME, transaction_time, labels) | |
| async def monitor_db_operation(self, query_type: str, table_name: str = "unknown"): | |
| """Async context manager to monitor a DB operation. | |
| Records execution time and errors, marking slow operations. | |
| Args: | |
| query_type: e.g., "SELECT", "INSERT", "UPDATE", "DELETE". | |
| table_name: table/collection name for labeling. | |
| """ | |
| start = time.perf_counter() | |
| try: | |
| yield | |
| except Exception as e: | |
| # Record error with query type and table name | |
| self.record_query_error(query_type=query_type, error_type=type(e).__name__) | |
| metrics_logger.error(f"DB {query_type} error on {table_name}: {e}") | |
| raise | |
| finally: | |
| duration = time.perf_counter() - start | |
| # Mark as slow if > 1s (tunable threshold) | |
| is_slow = duration > 1.0 | |
| self.record_query_execution(execution_time=duration, query_type=query_type, is_slow=is_slow) | |
| metrics_logger.info( | |
| f"DB {query_type} on {table_name} took {duration:.3f}s" + | |
| (" (slow)" if is_slow else "") | |
| ) | |
| def get_metric_summary(self, | |
| metric_type: MetricType, | |
| time_window_minutes: Optional[int] = None) -> Optional[MetricSummary]: | |
| """ | |
| Get summary statistics for a metric type | |
| Args: | |
| metric_type: Type of metric to summarize | |
| time_window_minutes: Time window in minutes (None for all data) | |
| Returns: | |
| MetricSummary or None if no data | |
| """ | |
| if metric_type not in self.metrics: | |
| return None | |
| metrics_data = list(self.metrics[metric_type]) | |
| if not metrics_data: | |
| return None | |
| # Filter by time window if specified | |
| if time_window_minutes: | |
| cutoff_time = datetime.now(timezone.utc) - timedelta(minutes=time_window_minutes) | |
| metrics_data = [m for m in metrics_data if m.timestamp >= cutoff_time] | |
| if not metrics_data: | |
| return None | |
| values = [m.value for m in metrics_data] | |
| return MetricSummary( | |
| metric_type=metric_type, | |
| count=len(values), | |
| min_value=min(values), | |
| max_value=max(values), | |
| avg_value=statistics.mean(values), | |
| median_value=statistics.median(values), | |
| p95_value=self._percentile(values, 95), | |
| p99_value=self._percentile(values, 99), | |
| total_value=sum(values), | |
| time_window=f"{time_window_minutes}min" if time_window_minutes else "all" | |
| ) | |
| def _percentile(self, values: List[float], percentile: int) -> float: | |
| """Calculate percentile value""" | |
| if not values: | |
| return 0.0 | |
| sorted_values = sorted(values) | |
| k = (len(sorted_values) - 1) * percentile / 100 | |
| f = int(k) | |
| c = k - f | |
| if f == len(sorted_values) - 1: | |
| return sorted_values[f] | |
| return sorted_values[f] * (1 - c) + sorted_values[f + 1] * c | |
| def get_all_summaries(self, time_window_minutes: Optional[int] = None) -> Dict[str, MetricSummary]: | |
| """Get summaries for all metric types""" | |
| summaries = {} | |
| for metric_type in MetricType: | |
| summary = self.get_metric_summary(metric_type, time_window_minutes) | |
| if summary: | |
| summaries[metric_type.value] = summary | |
| return summaries | |
| def get_counters(self) -> Dict[str, int]: | |
| """Get current counter values""" | |
| return dict(self.counters) | |
| def get_gauges(self) -> Dict[str, float]: | |
| """Get current gauge values""" | |
| return dict(self.gauges) | |
| def get_health_metrics(self) -> Dict[str, Any]: | |
| """Get health-related metrics""" | |
| now = datetime.now(timezone.utc) | |
| last_5_min = now - timedelta(minutes=5) | |
| last_hour = now - timedelta(hours=1) | |
| # Get recent query metrics | |
| recent_queries = [] | |
| recent_errors = [] | |
| for metric in self.metrics[MetricType.QUERY_EXECUTION_TIME]: | |
| if metric.timestamp >= last_5_min: | |
| recent_queries.append(metric.value) | |
| for metric in self.metrics[MetricType.ERROR_COUNT]: | |
| if metric.timestamp >= last_hour: | |
| recent_errors.append(metric.value) | |
| return { | |
| "queries_last_5min": len(recent_queries), | |
| "avg_query_time_last_5min": statistics.mean(recent_queries) if recent_queries else 0, | |
| "errors_last_hour": len(recent_errors), | |
| "slow_queries_last_hour": len([ | |
| m for m in self.metrics[MetricType.SLOW_QUERY_COUNT] | |
| if m.timestamp >= last_hour | |
| ]), | |
| "total_metrics_stored": sum(len(deque) for deque in self.metrics.values()), | |
| "last_cleanup": self.last_cleanup | |
| } | |
| def export_metrics(self, format_type: str = "json") -> str: | |
| """ | |
| Export metrics in specified format | |
| Args: | |
| format_type: Export format ("json" or "prometheus") | |
| Returns: | |
| Formatted metrics string | |
| """ | |
| if format_type.lower() == "json": | |
| return self._export_json() | |
| elif format_type.lower() == "prometheus": | |
| return self._export_prometheus() | |
| else: | |
| raise ValueError(f"Unsupported format: {format_type}") | |
| def _export_json(self) -> str: | |
| """Export metrics as JSON""" | |
| export_data = { | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "summaries": {k: v.to_dict() for k, v in self.get_all_summaries(60).items()}, | |
| "counters": self.get_counters(), | |
| "gauges": self.get_gauges(), | |
| "health": self.get_health_metrics() | |
| } | |
| return json.dumps(export_data, indent=2) | |
| def _export_prometheus(self) -> str: | |
| """Export metrics in Prometheus format""" | |
| lines = [] | |
| timestamp = int(time.time() * 1000) | |
| # Export counters | |
| for name, value in self.get_counters().items(): | |
| lines.append(f"db_{name} {value} {timestamp}") | |
| # Export gauges | |
| for name, value in self.get_gauges().items(): | |
| lines.append(f"db_{name} {value} {timestamp}") | |
| # Export summaries | |
| for metric_type, summary in self.get_all_summaries(60).items(): | |
| prefix = f"db_{metric_type}_summary" | |
| lines.extend([ | |
| f"{prefix}_count {summary.count} {timestamp}", | |
| f"{prefix}_avg {summary.avg_value} {timestamp}", | |
| f"{prefix}_p95 {summary.p95_value} {timestamp}", | |
| f"{prefix}_p99 {summary.p99_value} {timestamp}" | |
| ]) | |
| return "\n".join(lines) | |
| def log_performance_report(self, time_window_minutes: int = 60): | |
| """Log a performance report""" | |
| summaries = self.get_all_summaries(time_window_minutes) | |
| health = self.get_health_metrics() | |
| report = { | |
| "time_window_minutes": time_window_minutes, | |
| "summaries": {k: v.to_dict() for k, v in summaries.items()}, | |
| "health_metrics": health | |
| } | |
| metrics_logger.info(f"Performance Report: {json.dumps(report, indent=2)}") | |
| def cleanup(self): | |
| """Cleanup resources""" | |
| if self._cleanup_task and not self._cleanup_task.done(): | |
| self._cleanup_task.cancel() | |
| # Global metrics collector instance | |
| metrics_collector = PerformanceMetricsCollector() | |
| # Convenience functions | |
| def record_query_execution(execution_time: float, query_type: str, is_slow: bool = False): | |
| """Record query execution metrics""" | |
| metrics_collector.record_query_execution(execution_time, query_type, is_slow) | |
| def record_query_error(query_type: str, error_type: str): | |
| """Record query error metrics""" | |
| metrics_collector.record_query_error(query_type, error_type) | |
| def record_transaction_time(transaction_time: float, transaction_type: str = "default"): | |
| """Record transaction time metrics""" | |
| metrics_collector.record_transaction_time(transaction_time, transaction_type) | |
| async def monitor_db_operation(query_type: str, table_name: str = "unknown"): | |
| """Convenience async context manager that delegates to the global collector.""" | |
| async with metrics_collector.monitor_db_operation(query_type=query_type, table_name=table_name): | |
| yield | |
| def get_performance_summary(time_window_minutes: int = 60) -> Dict[str, Any]: | |
| """Get performance summary""" | |
| return { | |
| "summaries": {k: v.to_dict() for k, v in metrics_collector.get_all_summaries(time_window_minutes).items()}, | |
| "health": metrics_collector.get_health_metrics() | |
| } | |
| def log_performance_report(time_window_minutes: int = 60): | |
| """Log performance report""" | |
| metrics_collector.log_performance_report(time_window_minutes) | |
| # Export main components | |
| __all__ = [ | |
| 'PerformanceMetricsCollector', | |
| 'PerformanceMetric', | |
| 'MetricSummary', | |
| 'MetricType', | |
| 'metrics_collector', | |
| 'record_query_execution', | |
| 'record_query_error', | |
| 'record_transaction_time', | |
| 'get_performance_summary', | |
| 'log_performance_report' | |
| ] |