Spaces:
Sleeping
Sleeping
| """ | |
| Performance monitoring utilities for tracking query execution times and database operations. | |
| """ | |
| import time | |
| import logging | |
| from functools import wraps | |
| from typing import Dict, Any, Optional | |
| from contextlib import asynccontextmanager | |
| logger = logging.getLogger(__name__) | |
| class PerformanceMetrics: | |
| """Class to track and store performance metrics.""" | |
| def __init__(self): | |
| self.query_times = [] | |
| self.slow_queries = [] | |
| self.total_queries = 0 | |
| self.total_time = 0.0 | |
| def add_query_time(self, collection: str, pipeline_length: int, execution_time: float, query_type: str = "aggregation"): | |
| """Add a query execution time to metrics.""" | |
| self.query_times.append({ | |
| "collection": collection, | |
| "pipeline_length": pipeline_length, | |
| "execution_time": execution_time, | |
| "query_type": query_type, | |
| "timestamp": time.time() | |
| }) | |
| self.total_queries += 1 | |
| self.total_time += execution_time | |
| # Track slow queries (> 1 second) | |
| if execution_time > 1.0: | |
| self.slow_queries.append({ | |
| "collection": collection, | |
| "pipeline_length": pipeline_length, | |
| "execution_time": execution_time, | |
| "query_type": query_type, | |
| "timestamp": time.time() | |
| }) | |
| logger.warning(f"Slow query detected: {collection} took {execution_time:.3f}s") | |
| def get_average_time(self) -> float: | |
| """Get average query execution time.""" | |
| return self.total_time / self.total_queries if self.total_queries > 0 else 0.0 | |
| def get_slow_query_count(self) -> int: | |
| """Get count of slow queries.""" | |
| return len(self.slow_queries) | |
| def get_metrics_summary(self) -> Dict[str, Any]: | |
| """Get a summary of performance metrics.""" | |
| return { | |
| "total_queries": self.total_queries, | |
| "total_time": round(self.total_time, 3), | |
| "average_time": round(self.get_average_time(), 3), | |
| "slow_queries": self.get_slow_query_count(), | |
| "recent_queries": self.query_times[-10:] if self.query_times else [] | |
| } | |
| # Global performance metrics instance | |
| performance_metrics = PerformanceMetrics() | |
| def monitor_query_performance(func): | |
| """Decorator to monitor query performance.""" | |
| async def wrapper(*args, **kwargs): | |
| start_time = time.time() | |
| try: | |
| result = await func(*args, **kwargs) | |
| execution_time = time.time() - start_time | |
| # Extract collection and pipeline info from args | |
| collection = args[0] if args else "unknown" | |
| pipeline_length = len(args[1]) if len(args) > 1 and isinstance(args[1], list) else 0 | |
| performance_metrics.add_query_time( | |
| collection=collection, | |
| pipeline_length=pipeline_length, | |
| execution_time=execution_time, | |
| query_type="aggregation" | |
| ) | |
| logger.info(f"Query executed: {collection} in {execution_time:.3f}s (pipeline length: {pipeline_length})") | |
| return result | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| logger.error(f"Query failed after {execution_time:.3f}s: {str(e)}") | |
| raise | |
| return wrapper | |
| async def performance_timer(operation_name: str): | |
| """Context manager for timing operations.""" | |
| start_time = time.time() | |
| try: | |
| yield | |
| finally: | |
| execution_time = time.time() - start_time | |
| logger.info(f"Operation '{operation_name}' completed in {execution_time:.3f}s") | |
| def log_pipeline_complexity(pipeline: list, collection: str, operation: str): | |
| """Log pipeline complexity metrics.""" | |
| complexity_score = 0 | |
| stage_counts = {} | |
| for stage in pipeline: | |
| stage_type = list(stage.keys())[0] if stage else "unknown" | |
| stage_counts[stage_type] = stage_counts.get(stage_type, 0) + 1 | |
| # Assign complexity scores to different stages | |
| complexity_weights = { | |
| "$match": 1, | |
| "$project": 1, | |
| "$sort": 2, | |
| "$group": 3, | |
| "$lookup": 4, | |
| "$facet": 5, | |
| "$unwind": 2, | |
| "$addFields": 1, | |
| "$limit": 1, | |
| "$skip": 1 | |
| } | |
| complexity_score += complexity_weights.get(stage_type, 2) | |
| logger.info(f"Pipeline complexity for {operation} on {collection}: " | |
| f"score={complexity_score}, stages={len(pipeline)}, " | |
| f"breakdown={stage_counts}") | |
| # Warn about high complexity | |
| if complexity_score > 15: | |
| logger.warning(f"High complexity pipeline detected: {operation} on {collection} " | |
| f"(score: {complexity_score})") | |
| return complexity_score | |
| def get_performance_report() -> Dict[str, Any]: | |
| """Get a comprehensive performance report.""" | |
| return { | |
| "metrics": performance_metrics.get_metrics_summary(), | |
| "recommendations": _generate_recommendations() | |
| } | |
| def _generate_recommendations() -> list: | |
| """Generate performance recommendations based on metrics.""" | |
| recommendations = [] | |
| avg_time = performance_metrics.get_average_time() | |
| slow_query_count = performance_metrics.get_slow_query_count() | |
| if avg_time > 0.5: | |
| recommendations.append("Consider adding indexes for frequently queried fields") | |
| if slow_query_count > 0: | |
| recommendations.append(f"Optimize {slow_query_count} slow queries detected") | |
| if performance_metrics.total_queries > 100: | |
| recommendations.append("Consider implementing query result caching") | |
| return recommendations |