""" Comprehensive Agent Logging System This module provides detailed logging for AI agents, tracking decision-making processes, performance metrics, and debugging information for reliable AI systems. """ import logging import json import time import uuid from datetime import datetime, timedelta from typing import Dict, List, Optional, Any, Union, Callable from dataclasses import dataclass, asdict from enum import Enum from contextlib import contextmanager import functools import traceback class LogLevel(str, Enum): """Custom log levels for agent operations.""" TRACE = "TRACE" # Detailed execution flow DEBUG = "DEBUG" # Debug information INFO = "INFO" # General information WARNING = "WARNING" # Warning messages ERROR = "ERROR" # Error conditions CRITICAL = "CRITICAL" # Critical failures class AgentOperation(str, Enum): """Types of agent operations to log.""" SEARCH = "search" FILTER = "filter" RANK = "rank" VALIDATE = "validate" COORDINATE = "coordinate" API_CALL = "api_call" DECISION = "decision" ERROR_HANDLING = "error_handling" @dataclass class LogEntry: """Structured log entry for agent operations.""" id: str timestamp: datetime agent_type: str operation: AgentOperation level: LogLevel message: str data: Dict[str, Any] = None duration_ms: Optional[float] = None success: Optional[bool] = None error: Optional[str] = None context: Dict[str, Any] = None @dataclass class PerformanceMetrics: """Performance metrics for agent operations.""" operation: AgentOperation total_calls: int = 0 successful_calls: int = 0 failed_calls: int = 0 average_duration_ms: float = 0.0 min_duration_ms: float = float('inf') max_duration_ms: float = 0.0 success_rate: float = 0.0 error_rate: float = 0.0 class AgentLogger: """Comprehensive logger for AI agent operations.""" def __init__(self, agent_type: str, log_level: LogLevel = LogLevel.INFO): self.agent_type = agent_type self.log_level = log_level self.session_id = str(uuid.uuid4()) self.start_time = datetime.now() # Setup logging self.logger = logging.getLogger(f"agent.{agent_type}") self.logger.setLevel(getattr(logging, log_level.value)) # Performance tracking self.metrics: Dict[AgentOperation, PerformanceMetrics] = {} self.log_entries: List[LogEntry] = [] # Setup formatters self._setup_formatters() def _setup_formatters(self): """Setup log formatters for different output types.""" # Console formatter console_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # File formatter (JSON) self.file_formatter = logging.Formatter( '%(message)s' # JSON will be the message ) # Add console handler if not already present if not self.logger.handlers: console_handler = logging.StreamHandler() console_handler.setFormatter(console_formatter) self.logger.addHandler(console_handler) def _should_log(self, level: LogLevel) -> bool: """Check if we should log at this level.""" level_values = { LogLevel.TRACE: 0, LogLevel.DEBUG: 1, LogLevel.INFO: 2, LogLevel.WARNING: 3, LogLevel.ERROR: 4, LogLevel.CRITICAL: 5 } return level_values[level] >= level_values[self.log_level] def _create_log_entry(self, operation: AgentOperation, level: LogLevel, message: str, data: Dict[str, Any] = None, duration_ms: float = None, success: bool = None, error: str = None, context: Dict[str, Any] = None) -> LogEntry: """Create a structured log entry.""" return LogEntry( id=str(uuid.uuid4()), timestamp=datetime.now(), agent_type=self.agent_type, operation=operation, level=level, message=message, data=data or {}, duration_ms=duration_ms, success=success, error=error, context=context or {} ) def _log_entry(self, log_entry: LogEntry): """Internal method to process and store log entries.""" # Store in memory self.log_entries.append(log_entry) # Update metrics self._update_metrics(log_entry) # Output to logger if self._should_log(log_entry.level): # Create JSON output for structured logging log_data = asdict(log_entry) log_data['timestamp'] = log_entry.timestamp.isoformat() log_data['operation'] = log_entry.operation.value log_data['level'] = log_entry.level.value self.logger.log( getattr(logging, log_entry.level.value), json.dumps(log_data, default=str) ) def _update_metrics(self, log_entry: LogEntry): """Update performance metrics based on log entry.""" operation = log_entry.operation if operation not in self.metrics: self.metrics[operation] = PerformanceMetrics(operation=operation) metrics = self.metrics[operation] metrics.total_calls += 1 if log_entry.success is not None: if log_entry.success: metrics.successful_calls += 1 else: metrics.failed_calls += 1 if log_entry.duration_ms is not None: # Update duration statistics if metrics.total_calls == 1: metrics.average_duration_ms = log_entry.duration_ms metrics.min_duration_ms = log_entry.duration_ms metrics.max_duration_ms = log_entry.duration_ms else: # Running average metrics.average_duration_ms = ( (metrics.average_duration_ms * (metrics.total_calls - 1) + log_entry.duration_ms) / metrics.total_calls ) metrics.min_duration_ms = min(metrics.min_duration_ms, log_entry.duration_ms) metrics.max_duration_ms = max(metrics.max_duration_ms, log_entry.duration_ms) # Update rates if metrics.total_calls > 0: metrics.success_rate = metrics.successful_calls / metrics.total_calls metrics.error_rate = metrics.failed_calls / metrics.total_calls # Public logging methods def trace(self, operation: AgentOperation, message: str, data: Dict[str, Any] = None): """Log trace-level information.""" log_entry = self._create_log_entry(operation, LogLevel.TRACE, message, data) self._log_entry(log_entry) def debug(self, operation: AgentOperation, message: str, data: Dict[str, Any] = None): """Log debug-level information.""" log_entry = self._create_log_entry(operation, LogLevel.DEBUG, message, data) self._log_entry(log_entry) def info(self, operation: AgentOperation, message: str, data: Dict[str, Any] = None): """Log info-level information.""" log_entry = self._create_log_entry(operation, LogLevel.INFO, message, data) self._log_entry(log_entry) def warning(self, operation: AgentOperation, message: str, data: Dict[str, Any] = None): """Log warning-level information.""" log_entry = self._create_log_entry(operation, LogLevel.WARNING, message, data) self._log_entry(log_entry) def error(self, operation: AgentOperation, message: str, error: str = None, data: Dict[str, Any] = None): """Log error-level information.""" log_entry = self._create_log_entry( operation, LogLevel.ERROR, message, data, success=False, error=error ) self._log_entry(log_entry) def critical(self, operation: AgentOperation, message: str, error: str = None, data: Dict[str, Any] = None): """Log critical-level information.""" log_entry = self._create_log_entry( operation, LogLevel.CRITICAL, message, data, success=False, error=error ) self._log_entry(log_entry) @contextmanager def operation_timer(self, operation: AgentOperation, message: str, data: Dict[str, Any] = None): """Context manager for timing operations.""" start_time = time.time() self.info(operation, f"Starting: {message}", data) try: yield duration_ms = (time.time() - start_time) * 1000 log_entry = self._create_log_entry( operation, LogLevel.INFO, f"Completed: {message}", data, duration_ms, True ) self._log_entry(log_entry) except Exception as e: duration_ms = (time.time() - start_time) * 1000 self.error(operation, f"Failed: {message}", str(e), data) raise def log_decision(self, decision: str, reasoning: str, alternatives: List[str] = None, confidence: float = None, data: Dict[str, Any] = None): """Log agent decision-making process.""" decision_data = { "decision": decision, "reasoning": reasoning, "alternatives": alternatives or [], "confidence": confidence, **(data or {}) } self.info(AgentOperation.DECISION, f"Decision made: {decision}", decision_data) def log_api_call(self, api_name: str, endpoint: str, method: str, request_data: Dict[str, Any] = None, response_data: Dict[str, Any] = None, status_code: int = None, duration_ms: float = None): """Log API calls with detailed information.""" api_data = { "api_name": api_name, "endpoint": endpoint, "method": method, "request_data": request_data, "response_data": response_data, "status_code": status_code } success = status_code and 200 <= status_code < 300 log_entry = self._create_log_entry( AgentOperation.API_CALL, LogLevel.INFO if success else LogLevel.ERROR, f"API call to {api_name}: {method} {endpoint}", api_data, duration_ms, success ) self._log_entry(log_entry) def log_validation(self, validation_type: str, result: str, details: Dict[str, Any] = None): """Log validation operations.""" validation_data = { "validation_type": validation_type, "result": result, "details": details or {} } success = result.lower() in ["approved", "valid", "passed"] level = LogLevel.INFO if success else LogLevel.WARNING self._log_entry(self._create_log_entry( AgentOperation.VALIDATE, level, f"Validation {validation_type}: {result}", validation_data, success=success )) def get_metrics_summary(self) -> Dict[str, Any]: """Get summary of all performance metrics.""" total_operations = sum(metrics.total_calls for metrics in self.metrics.values()) total_successful = sum(metrics.successful_calls for metrics in self.metrics.values()) total_failed = sum(metrics.failed_calls for metrics in self.metrics.values()) session_duration = (datetime.now() - self.start_time).total_seconds() return { "session_id": self.session_id, "agent_type": self.agent_type, "session_duration_seconds": session_duration, "total_operations": total_operations, "total_successful": total_successful, "total_failed": total_failed, "overall_success_rate": total_successful / total_operations if total_operations > 0 else 0, "overall_error_rate": total_failed / total_operations if total_operations > 0 else 0, "operations": { operation.value: { "total_calls": metrics.total_calls, "successful_calls": metrics.successful_calls, "failed_calls": metrics.failed_calls, "success_rate": metrics.success_rate, "error_rate": metrics.error_rate, "average_duration_ms": metrics.average_duration_ms, "min_duration_ms": metrics.min_duration_ms if metrics.min_duration_ms != float('inf') else 0, "max_duration_ms": metrics.max_duration_ms } for operation, metrics in self.metrics.items() } } def export_logs(self, format: str = "json") -> Union[str, Dict[str, Any]]: """Export logs in specified format.""" if format == "json": return { "session_id": self.session_id, "agent_type": self.agent_type, "start_time": self.start_time.isoformat(), "end_time": datetime.now().isoformat(), "metrics": self.get_metrics_summary(), "log_entries": [ { **asdict(entry), "timestamp": entry.timestamp.isoformat(), "operation": entry.operation.value, "level": entry.level.value } for entry in self.log_entries ] } else: # Return as formatted string lines = [] for entry in self.log_entries: lines.append( f"{entry.timestamp.isoformat()} [{entry.level.value}] " f"{entry.agent_type}:{entry.operation.value} - {entry.message}" ) return "\n".join(lines) # Decorator for automatic logging def log_operation(operation: AgentOperation, message: str = None, log_args: bool = False, log_result: bool = False): """Decorator to automatically log function calls.""" def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): # Try to find logger in the instance logger = None if args and hasattr(args[0], 'logger'): logger = args[0].logger if not logger: return func(*args, **kwargs) func_name = func.__name__ log_message = message or f"Calling {func_name}" # Log function arguments if requested log_data = {} if log_args: log_data["args"] = str(args)[:200] # Truncate long arguments log_data["kwargs"] = {k: str(v)[:100] for k, v in kwargs.items()} with logger.operation_timer(operation, log_message, log_data): result = func(*args, **kwargs) # Log result if requested if log_result: logger.debug(operation, f"Result from {func_name}", {"result": str(result)[:200]}) return result return wrapper return decorator # Example usage and testing if __name__ == "__main__": print("šŸ“Š Agent Logger Testing") print("=" * 30) # Create logger logger = AgentLogger("flight_agent", LogLevel.DEBUG) # Test various logging operations logger.info(AgentOperation.SEARCH, "Starting flight search", {"query": "NYC to LAX"}) with logger.operation_timer(AgentOperation.API_CALL, "Calling flight API"): time.sleep(0.1) # Simulate API call logger.log_decision( "Select United Airlines flight", "Best price-to-duration ratio", ["American Airlines", "Delta"], 0.85 ) logger.log_api_call( "FlightAPI", "/search", "POST", {"origin": "NYC", "destination": "LAX"}, {"flights": 15}, 200, 150.5 ) logger.log_validation("price_check", "approved", {"max_price": 500, "actual_price": 350}) # Get metrics metrics = logger.get_metrics_summary() print(f"\nšŸ“ˆ Performance Metrics:") print(f" Total Operations: {metrics['total_operations']}") print(f" Success Rate: {metrics['overall_success_rate']:.2%}") # Export logs logs_json = logger.export_logs("json") print(f"\nšŸ“‹ Log Export: {len(logs_json['log_entries'])} entries") print("\nāœ… Agent logging system working correctly!")