""" Monitoring decorators for metrics collection This module provides decorators for automatically collecting metrics from various parts of the AI Agent system. """ import asyncio import functools import logging import time from typing import Any, Callable, Dict, Optional from contextlib import asynccontextmanager from src.infrastructure.monitoring.metrics import ( record_agent_registration, record_task_execution, record_task_duration, record_error, AGENT_TASK_EXECUTIONS, AGENT_TASK_DURATION, ERRORS_TOTAL ) logger = logging.getLogger(__name__) def async_metrics(func: Callable) -> Callable: """ Decorator for async metrics collection. This decorator automatically records execution time and success/failure metrics for async functions. Args: func: The async function to decorate Returns: Decorated function with metrics collection """ @functools.wraps(func) async def wrapper(*args, **kwargs): start_time = time.time() function_name = func.__name__ try: result = await func(*args, **kwargs) execution_time = time.time() - start_time # Record success metrics logger.debug(f"Function {function_name} completed successfully in {execution_time:.3f}s") return result except Exception as e: execution_time = time.time() - start_time # Record error metrics record_error(type(e).__name__, function_name, "error") logger.error(f"Function {function_name} failed after {execution_time:.3f}s: {e}") raise return wrapper def agent_metrics(agent_name: str) -> Callable: """ Decorator for agent-specific metrics collection. This decorator records agent-specific metrics including task execution, duration, and error rates. Args: agent_name: Name of the agent for metrics labeling Returns: Decorator function """ def decorator(func: Callable) -> Callable: @functools.wraps(func) async def wrapper(*args, **kwargs): start_time = time.time() # Extract task information if available task_type = "unknown" if args and hasattr(args[0], 'task_type'): task_type = args[0].task_type elif 'task' in kwargs and hasattr(kwargs['task'], 'task_type'): task_type = kwargs['task'].task_type try: result = await func(*args, **kwargs) execution_time = time.time() - start_time # Record agent task execution metrics record_task_execution(agent_name, task_type, "success") record_task_duration(agent_name, task_type, execution_time) logger.debug(f"Agent {agent_name} completed {task_type} task in {execution_time:.3f}s") return result except Exception as e: execution_time = time.time() - start_time # Record error metrics record_task_execution(agent_name, task_type, "error") record_error(type(e).__name__, agent_name, "agent_error") logger.error(f"Agent {agent_name} failed {task_type} task after {execution_time:.3f}s: {e}") raise return wrapper return decorator def tool_metrics(tool_name: str) -> Callable: """ Decorator for tool-specific metrics collection. This decorator records tool execution metrics including usage frequency, execution time, and error rates. Args: tool_name: Name of the tool for metrics labeling Returns: Decorator function """ def decorator(func: Callable) -> Callable: @functools.wraps(func) async def wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) execution_time = time.time() - start_time # Record tool execution metrics record_task_execution(tool_name, "tool_execution", "success") record_task_duration(tool_name, "tool_execution", execution_time) logger.debug(f"Tool {tool_name} executed successfully in {execution_time:.3f}s") return result except Exception as e: execution_time = time.time() - start_time # Record error metrics record_task_execution(tool_name, "tool_execution", "error") record_error(type(e).__name__, tool_name, "tool_error") logger.error(f"Tool {tool_name} failed after {execution_time:.3f}s: {e}") raise return wrapper return decorator def performance_metrics(operation_name: str) -> Callable: """ Decorator for performance metrics collection. This decorator records detailed performance metrics including execution time, memory usage, and throughput. Args: operation_name: Name of the operation for metrics labeling Returns: Decorator function """ def decorator(func: Callable) -> Callable: @functools.wraps(func) async def wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) execution_time = time.time() - start_time # Record performance metrics record_task_duration(operation_name, "performance", execution_time) # Log performance information if execution_time > 1.0: # Log slow operations logger.warning(f"Slow operation {operation_name}: {execution_time:.3f}s") else: logger.debug(f"Operation {operation_name} completed in {execution_time:.3f}s") return result except Exception as e: execution_time = time.time() - start_time # Record error metrics record_error(type(e).__name__, operation_name, "performance_error") logger.error(f"Performance operation {operation_name} failed after {execution_time:.3f}s: {e}") raise return wrapper return decorator @asynccontextmanager async def metrics_context(operation_name: str, **labels): """ Context manager for metrics collection. This context manager automatically records metrics for operations that span multiple function calls or have complex execution patterns. Args: operation_name: Name of the operation **labels: Additional labels for metrics Yields: Context for metrics collection """ start_time = time.time() try: yield execution_time = time.time() - start_time # Record success metrics record_task_execution(operation_name, "context", "success") record_task_duration(operation_name, "context", execution_time) logger.debug(f"Context {operation_name} completed successfully in {execution_time:.3f}s") except Exception as e: execution_time = time.time() - start_time # Record error metrics record_task_execution(operation_name, "context", "error") record_error(type(e).__name__, operation_name, "context_error") logger.error(f"Context {operation_name} failed after {execution_time:.3f}s: {e}") raise def error_tracking(func: Callable) -> Callable: """ Decorator for error tracking and reporting. This decorator specifically focuses on error collection and reporting, providing detailed error context for debugging. Args: func: The function to decorate Returns: Decorated function with error tracking """ @functools.wraps(func) async def wrapper(*args, **kwargs): function_name = func.__name__ try: return await func(*args, **kwargs) except Exception as e: # Record detailed error information error_context = { "function": function_name, "error_type": type(e).__name__, "error_message": str(e), "args_count": len(args), "kwargs_keys": list(kwargs.keys()) } record_error(type(e).__name__, function_name, "function_error") logger.error(f"Error in {function_name}: {e}", extra={"error_context": error_context}) raise return wrapper def throughput_metrics(operation_name: str) -> Callable: """ Decorator for throughput metrics collection. This decorator tracks the rate of operations and provides throughput statistics. Args: operation_name: Name of the operation Returns: Decorator function """ def decorator(func: Callable) -> Callable: # Track operation counts operation_count = 0 last_reset_time = time.time() @functools.wraps(func) async def wrapper(*args, **kwargs): nonlocal operation_count, last_reset_time current_time = time.time() # Reset counter every minute if current_time - last_reset_time > 60: operation_count = 0 last_reset_time = current_time operation_count += 1 start_time = time.time() try: result = await func(*args, **kwargs) execution_time = time.time() - start_time # Record throughput metrics throughput = operation_count / (current_time - last_reset_time + 1) logger.debug(f"Operation {operation_name} throughput: {throughput:.2f} ops/sec") return result except Exception as e: execution_time = time.time() - start_time # Record error in throughput context record_error(type(e).__name__, operation_name, "throughput_error") logger.error(f"Throughput operation {operation_name} failed: {e}") raise return wrapper return decorator def resource_metrics(func: Callable) -> Callable: """ Decorator for resource usage metrics collection. This decorator tracks resource usage including memory and CPU consumption during function execution. Args: func: The function to decorate Returns: Decorated function with resource tracking """ @functools.wraps(func) async def wrapper(*args, **kwargs): import psutil import os process = psutil.Process(os.getpid()) # Record initial resource usage initial_memory = process.memory_info().rss / 1024 / 1024 # MB initial_cpu = process.cpu_percent() start_time = time.time() try: result = await func(*args, **kwargs) execution_time = time.time() - start_time # Record final resource usage final_memory = process.memory_info().rss / 1024 / 1024 # MB final_cpu = process.cpu_percent() memory_delta = final_memory - initial_memory cpu_delta = final_cpu - initial_cpu logger.debug(f"Resource usage for {func.__name__}: " f"Memory: {memory_delta:+.2f}MB, CPU: {cpu_delta:+.2f}%, " f"Time: {execution_time:.3f}s") return result except Exception as e: execution_time = time.time() - start_time # Record error with resource context record_error(type(e).__name__, func.__name__, "resource_error") logger.error(f"Resource operation {func.__name__} failed after {execution_time:.3f}s: {e}") raise return wrapper # Convenience function for getting metrics decorator def get_metrics_decorator(metric_type: str, **kwargs) -> Callable: """ Get a metrics decorator by type. Args: metric_type: Type of metrics decorator **kwargs: Additional arguments for the decorator Returns: Metrics decorator function """ decorators = { "async": async_metrics, "agent": agent_metrics, "tool": tool_metrics, "performance": performance_metrics, "error": error_tracking, "throughput": throughput_metrics, "resource": resource_metrics } if metric_type not in decorators: raise ValueError(f"Unknown metric type: {metric_type}") decorator = decorators[metric_type] if kwargs: return decorator(**kwargs) else: return decorator