Spaces:
Build error
Build error
| """ | |
| Monitoring decorators for metrics collection | |
| This module provides decorators for automatically collecting metrics | |
| from various parts of the AI Agent system. | |
| """ | |
| import asyncio | |
| import functools | |
| import logging | |
| import time | |
| from typing import Any, Callable, Dict, Optional | |
| from contextlib import asynccontextmanager | |
| from src.infrastructure.monitoring.metrics import ( | |
| record_agent_registration, record_task_execution, record_task_duration, | |
| record_error, AGENT_TASK_EXECUTIONS, AGENT_TASK_DURATION, ERRORS_TOTAL | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def async_metrics(func: Callable) -> Callable: | |
| """ | |
| Decorator for async metrics collection. | |
| This decorator automatically records execution time and success/failure | |
| metrics for async functions. | |
| Args: | |
| func: The async function to decorate | |
| Returns: | |
| Decorated function with metrics collection | |
| """ | |
| async def wrapper(*args, **kwargs): | |
| start_time = time.time() | |
| function_name = func.__name__ | |
| try: | |
| result = await func(*args, **kwargs) | |
| execution_time = time.time() - start_time | |
| # Record success metrics | |
| logger.debug(f"Function {function_name} completed successfully in {execution_time:.3f}s") | |
| return result | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| # Record error metrics | |
| record_error(type(e).__name__, function_name, "error") | |
| logger.error(f"Function {function_name} failed after {execution_time:.3f}s: {e}") | |
| raise | |
| return wrapper | |
| def agent_metrics(agent_name: str) -> Callable: | |
| """ | |
| Decorator for agent-specific metrics collection. | |
| This decorator records agent-specific metrics including task execution, | |
| duration, and error rates. | |
| Args: | |
| agent_name: Name of the agent for metrics labeling | |
| Returns: | |
| Decorator function | |
| """ | |
| def decorator(func: Callable) -> Callable: | |
| async def wrapper(*args, **kwargs): | |
| start_time = time.time() | |
| # Extract task information if available | |
| task_type = "unknown" | |
| if args and hasattr(args[0], 'task_type'): | |
| task_type = args[0].task_type | |
| elif 'task' in kwargs and hasattr(kwargs['task'], 'task_type'): | |
| task_type = kwargs['task'].task_type | |
| try: | |
| result = await func(*args, **kwargs) | |
| execution_time = time.time() - start_time | |
| # Record agent task execution metrics | |
| record_task_execution(agent_name, task_type, "success") | |
| record_task_duration(agent_name, task_type, execution_time) | |
| logger.debug(f"Agent {agent_name} completed {task_type} task in {execution_time:.3f}s") | |
| return result | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| # Record error metrics | |
| record_task_execution(agent_name, task_type, "error") | |
| record_error(type(e).__name__, agent_name, "agent_error") | |
| logger.error(f"Agent {agent_name} failed {task_type} task after {execution_time:.3f}s: {e}") | |
| raise | |
| return wrapper | |
| return decorator | |
| def tool_metrics(tool_name: str) -> Callable: | |
| """ | |
| Decorator for tool-specific metrics collection. | |
| This decorator records tool execution metrics including usage frequency, | |
| execution time, and error rates. | |
| Args: | |
| tool_name: Name of the tool for metrics labeling | |
| Returns: | |
| Decorator function | |
| """ | |
| def decorator(func: Callable) -> Callable: | |
| async def wrapper(*args, **kwargs): | |
| start_time = time.time() | |
| try: | |
| result = await func(*args, **kwargs) | |
| execution_time = time.time() - start_time | |
| # Record tool execution metrics | |
| record_task_execution(tool_name, "tool_execution", "success") | |
| record_task_duration(tool_name, "tool_execution", execution_time) | |
| logger.debug(f"Tool {tool_name} executed successfully in {execution_time:.3f}s") | |
| return result | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| # Record error metrics | |
| record_task_execution(tool_name, "tool_execution", "error") | |
| record_error(type(e).__name__, tool_name, "tool_error") | |
| logger.error(f"Tool {tool_name} failed after {execution_time:.3f}s: {e}") | |
| raise | |
| return wrapper | |
| return decorator | |
| def performance_metrics(operation_name: str) -> Callable: | |
| """ | |
| Decorator for performance metrics collection. | |
| This decorator records detailed performance metrics including | |
| execution time, memory usage, and throughput. | |
| Args: | |
| operation_name: Name of the operation for metrics labeling | |
| Returns: | |
| Decorator function | |
| """ | |
| def decorator(func: Callable) -> Callable: | |
| async def wrapper(*args, **kwargs): | |
| start_time = time.time() | |
| try: | |
| result = await func(*args, **kwargs) | |
| execution_time = time.time() - start_time | |
| # Record performance metrics | |
| record_task_duration(operation_name, "performance", execution_time) | |
| # Log performance information | |
| if execution_time > 1.0: # Log slow operations | |
| logger.warning(f"Slow operation {operation_name}: {execution_time:.3f}s") | |
| else: | |
| logger.debug(f"Operation {operation_name} completed in {execution_time:.3f}s") | |
| return result | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| # Record error metrics | |
| record_error(type(e).__name__, operation_name, "performance_error") | |
| logger.error(f"Performance operation {operation_name} failed after {execution_time:.3f}s: {e}") | |
| raise | |
| return wrapper | |
| return decorator | |
| async def metrics_context(operation_name: str, **labels): | |
| """ | |
| Context manager for metrics collection. | |
| This context manager automatically records metrics for operations | |
| that span multiple function calls or have complex execution patterns. | |
| Args: | |
| operation_name: Name of the operation | |
| **labels: Additional labels for metrics | |
| Yields: | |
| Context for metrics collection | |
| """ | |
| start_time = time.time() | |
| try: | |
| yield | |
| execution_time = time.time() - start_time | |
| # Record success metrics | |
| record_task_execution(operation_name, "context", "success") | |
| record_task_duration(operation_name, "context", execution_time) | |
| logger.debug(f"Context {operation_name} completed successfully in {execution_time:.3f}s") | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| # Record error metrics | |
| record_task_execution(operation_name, "context", "error") | |
| record_error(type(e).__name__, operation_name, "context_error") | |
| logger.error(f"Context {operation_name} failed after {execution_time:.3f}s: {e}") | |
| raise | |
| def error_tracking(func: Callable) -> Callable: | |
| """ | |
| Decorator for error tracking and reporting. | |
| This decorator specifically focuses on error collection and reporting, | |
| providing detailed error context for debugging. | |
| Args: | |
| func: The function to decorate | |
| Returns: | |
| Decorated function with error tracking | |
| """ | |
| async def wrapper(*args, **kwargs): | |
| function_name = func.__name__ | |
| try: | |
| return await func(*args, **kwargs) | |
| except Exception as e: | |
| # Record detailed error information | |
| error_context = { | |
| "function": function_name, | |
| "error_type": type(e).__name__, | |
| "error_message": str(e), | |
| "args_count": len(args), | |
| "kwargs_keys": list(kwargs.keys()) | |
| } | |
| record_error(type(e).__name__, function_name, "function_error") | |
| logger.error(f"Error in {function_name}: {e}", extra={"error_context": error_context}) | |
| raise | |
| return wrapper | |
| def throughput_metrics(operation_name: str) -> Callable: | |
| """ | |
| Decorator for throughput metrics collection. | |
| This decorator tracks the rate of operations and provides | |
| throughput statistics. | |
| Args: | |
| operation_name: Name of the operation | |
| Returns: | |
| Decorator function | |
| """ | |
| def decorator(func: Callable) -> Callable: | |
| # Track operation counts | |
| operation_count = 0 | |
| last_reset_time = time.time() | |
| async def wrapper(*args, **kwargs): | |
| nonlocal operation_count, last_reset_time | |
| current_time = time.time() | |
| # Reset counter every minute | |
| if current_time - last_reset_time > 60: | |
| operation_count = 0 | |
| last_reset_time = current_time | |
| operation_count += 1 | |
| start_time = time.time() | |
| try: | |
| result = await func(*args, **kwargs) | |
| execution_time = time.time() - start_time | |
| # Record throughput metrics | |
| throughput = operation_count / (current_time - last_reset_time + 1) | |
| logger.debug(f"Operation {operation_name} throughput: {throughput:.2f} ops/sec") | |
| return result | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| # Record error in throughput context | |
| record_error(type(e).__name__, operation_name, "throughput_error") | |
| logger.error(f"Throughput operation {operation_name} failed: {e}") | |
| raise | |
| return wrapper | |
| return decorator | |
| def resource_metrics(func: Callable) -> Callable: | |
| """ | |
| Decorator for resource usage metrics collection. | |
| This decorator tracks resource usage including memory and CPU | |
| consumption during function execution. | |
| Args: | |
| func: The function to decorate | |
| Returns: | |
| Decorated function with resource tracking | |
| """ | |
| async def wrapper(*args, **kwargs): | |
| import psutil | |
| import os | |
| process = psutil.Process(os.getpid()) | |
| # Record initial resource usage | |
| initial_memory = process.memory_info().rss / 1024 / 1024 # MB | |
| initial_cpu = process.cpu_percent() | |
| start_time = time.time() | |
| try: | |
| result = await func(*args, **kwargs) | |
| execution_time = time.time() - start_time | |
| # Record final resource usage | |
| final_memory = process.memory_info().rss / 1024 / 1024 # MB | |
| final_cpu = process.cpu_percent() | |
| memory_delta = final_memory - initial_memory | |
| cpu_delta = final_cpu - initial_cpu | |
| logger.debug(f"Resource usage for {func.__name__}: " | |
| f"Memory: {memory_delta:+.2f}MB, CPU: {cpu_delta:+.2f}%, " | |
| f"Time: {execution_time:.3f}s") | |
| return result | |
| except Exception as e: | |
| execution_time = time.time() - start_time | |
| # Record error with resource context | |
| record_error(type(e).__name__, func.__name__, "resource_error") | |
| logger.error(f"Resource operation {func.__name__} failed after {execution_time:.3f}s: {e}") | |
| raise | |
| return wrapper | |
| # Convenience function for getting metrics decorator | |
| def get_metrics_decorator(metric_type: str, **kwargs) -> Callable: | |
| """ | |
| Get a metrics decorator by type. | |
| Args: | |
| metric_type: Type of metrics decorator | |
| **kwargs: Additional arguments for the decorator | |
| Returns: | |
| Metrics decorator function | |
| """ | |
| decorators = { | |
| "async": async_metrics, | |
| "agent": agent_metrics, | |
| "tool": tool_metrics, | |
| "performance": performance_metrics, | |
| "error": error_tracking, | |
| "throughput": throughput_metrics, | |
| "resource": resource_metrics | |
| } | |
| if metric_type not in decorators: | |
| raise ValueError(f"Unknown metric type: {metric_type}") | |
| decorator = decorators[metric_type] | |
| if kwargs: | |
| return decorator(**kwargs) | |
| else: | |
| return decorator |