Spaces:
Sleeping
Sleeping
| """ | |
| Comprehensive Agent Logging System | |
| This module provides detailed logging for AI agents, tracking decision-making processes, | |
| performance metrics, and debugging information for reliable AI systems. | |
| """ | |
| import logging | |
| import json | |
| import time | |
| import uuid | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any, Union, Callable | |
| from dataclasses import dataclass, asdict | |
| from enum import Enum | |
| from contextlib import contextmanager | |
| import functools | |
| import traceback | |
| class LogLevel(str, Enum): | |
| """Custom log levels for agent operations.""" | |
| TRACE = "TRACE" # Detailed execution flow | |
| DEBUG = "DEBUG" # Debug information | |
| INFO = "INFO" # General information | |
| WARNING = "WARNING" # Warning messages | |
| ERROR = "ERROR" # Error conditions | |
| CRITICAL = "CRITICAL" # Critical failures | |
| class AgentOperation(str, Enum): | |
| """Types of agent operations to log.""" | |
| SEARCH = "search" | |
| FILTER = "filter" | |
| RANK = "rank" | |
| VALIDATE = "validate" | |
| COORDINATE = "coordinate" | |
| API_CALL = "api_call" | |
| DECISION = "decision" | |
| ERROR_HANDLING = "error_handling" | |
| class LogEntry: | |
| """Structured log entry for agent operations.""" | |
| id: str | |
| timestamp: datetime | |
| agent_type: str | |
| operation: AgentOperation | |
| level: LogLevel | |
| message: str | |
| data: Dict[str, Any] = None | |
| duration_ms: Optional[float] = None | |
| success: Optional[bool] = None | |
| error: Optional[str] = None | |
| context: Dict[str, Any] = None | |
| class PerformanceMetrics: | |
| """Performance metrics for agent operations.""" | |
| operation: AgentOperation | |
| total_calls: int = 0 | |
| successful_calls: int = 0 | |
| failed_calls: int = 0 | |
| average_duration_ms: float = 0.0 | |
| min_duration_ms: float = float('inf') | |
| max_duration_ms: float = 0.0 | |
| success_rate: float = 0.0 | |
| error_rate: float = 0.0 | |
| class AgentLogger: | |
| """Comprehensive logger for AI agent operations.""" | |
| def __init__(self, agent_type: str, log_level: LogLevel = LogLevel.INFO): | |
| self.agent_type = agent_type | |
| self.log_level = log_level | |
| self.session_id = str(uuid.uuid4()) | |
| self.start_time = datetime.now() | |
| # Setup logging | |
| self.logger = logging.getLogger(f"agent.{agent_type}") | |
| self.logger.setLevel(getattr(logging, log_level.value)) | |
| # Performance tracking | |
| self.metrics: Dict[AgentOperation, PerformanceMetrics] = {} | |
| self.log_entries: List[LogEntry] = [] | |
| # Setup formatters | |
| self._setup_formatters() | |
| def _setup_formatters(self): | |
| """Setup log formatters for different output types.""" | |
| # Console formatter | |
| console_formatter = logging.Formatter( | |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| # File formatter (JSON) | |
| self.file_formatter = logging.Formatter( | |
| '%(message)s' # JSON will be the message | |
| ) | |
| # Add console handler if not already present | |
| if not self.logger.handlers: | |
| console_handler = logging.StreamHandler() | |
| console_handler.setFormatter(console_formatter) | |
| self.logger.addHandler(console_handler) | |
| def _should_log(self, level: LogLevel) -> bool: | |
| """Check if we should log at this level.""" | |
| level_values = { | |
| LogLevel.TRACE: 0, | |
| LogLevel.DEBUG: 1, | |
| LogLevel.INFO: 2, | |
| LogLevel.WARNING: 3, | |
| LogLevel.ERROR: 4, | |
| LogLevel.CRITICAL: 5 | |
| } | |
| return level_values[level] >= level_values[self.log_level] | |
| def _create_log_entry(self, operation: AgentOperation, level: LogLevel, | |
| message: str, data: Dict[str, Any] = None, | |
| duration_ms: float = None, success: bool = None, | |
| error: str = None, context: Dict[str, Any] = None) -> LogEntry: | |
| """Create a structured log entry.""" | |
| return LogEntry( | |
| id=str(uuid.uuid4()), | |
| timestamp=datetime.now(), | |
| agent_type=self.agent_type, | |
| operation=operation, | |
| level=level, | |
| message=message, | |
| data=data or {}, | |
| duration_ms=duration_ms, | |
| success=success, | |
| error=error, | |
| context=context or {} | |
| ) | |
| def _log_entry(self, log_entry: LogEntry): | |
| """Internal method to process and store log entries.""" | |
| # Store in memory | |
| self.log_entries.append(log_entry) | |
| # Update metrics | |
| self._update_metrics(log_entry) | |
| # Output to logger | |
| if self._should_log(log_entry.level): | |
| # Create JSON output for structured logging | |
| log_data = asdict(log_entry) | |
| log_data['timestamp'] = log_entry.timestamp.isoformat() | |
| log_data['operation'] = log_entry.operation.value | |
| log_data['level'] = log_entry.level.value | |
| self.logger.log( | |
| getattr(logging, log_entry.level.value), | |
| json.dumps(log_data, default=str) | |
| ) | |
| def _update_metrics(self, log_entry: LogEntry): | |
| """Update performance metrics based on log entry.""" | |
| operation = log_entry.operation | |
| if operation not in self.metrics: | |
| self.metrics[operation] = PerformanceMetrics(operation=operation) | |
| metrics = self.metrics[operation] | |
| metrics.total_calls += 1 | |
| if log_entry.success is not None: | |
| if log_entry.success: | |
| metrics.successful_calls += 1 | |
| else: | |
| metrics.failed_calls += 1 | |
| if log_entry.duration_ms is not None: | |
| # Update duration statistics | |
| if metrics.total_calls == 1: | |
| metrics.average_duration_ms = log_entry.duration_ms | |
| metrics.min_duration_ms = log_entry.duration_ms | |
| metrics.max_duration_ms = log_entry.duration_ms | |
| else: | |
| # Running average | |
| metrics.average_duration_ms = ( | |
| (metrics.average_duration_ms * (metrics.total_calls - 1) + log_entry.duration_ms) | |
| / metrics.total_calls | |
| ) | |
| metrics.min_duration_ms = min(metrics.min_duration_ms, log_entry.duration_ms) | |
| metrics.max_duration_ms = max(metrics.max_duration_ms, log_entry.duration_ms) | |
| # Update rates | |
| if metrics.total_calls > 0: | |
| metrics.success_rate = metrics.successful_calls / metrics.total_calls | |
| metrics.error_rate = metrics.failed_calls / metrics.total_calls | |
| # Public logging methods | |
| def trace(self, operation: AgentOperation, message: str, data: Dict[str, Any] = None): | |
| """Log trace-level information.""" | |
| log_entry = self._create_log_entry(operation, LogLevel.TRACE, message, data) | |
| self._log_entry(log_entry) | |
| def debug(self, operation: AgentOperation, message: str, data: Dict[str, Any] = None): | |
| """Log debug-level information.""" | |
| log_entry = self._create_log_entry(operation, LogLevel.DEBUG, message, data) | |
| self._log_entry(log_entry) | |
| def info(self, operation: AgentOperation, message: str, data: Dict[str, Any] = None): | |
| """Log info-level information.""" | |
| log_entry = self._create_log_entry(operation, LogLevel.INFO, message, data) | |
| self._log_entry(log_entry) | |
| def warning(self, operation: AgentOperation, message: str, data: Dict[str, Any] = None): | |
| """Log warning-level information.""" | |
| log_entry = self._create_log_entry(operation, LogLevel.WARNING, message, data) | |
| self._log_entry(log_entry) | |
| def error(self, operation: AgentOperation, message: str, error: str = None, data: Dict[str, Any] = None): | |
| """Log error-level information.""" | |
| log_entry = self._create_log_entry( | |
| operation, LogLevel.ERROR, message, data, | |
| success=False, error=error | |
| ) | |
| self._log_entry(log_entry) | |
| def critical(self, operation: AgentOperation, message: str, error: str = None, data: Dict[str, Any] = None): | |
| """Log critical-level information.""" | |
| log_entry = self._create_log_entry( | |
| operation, LogLevel.CRITICAL, message, data, | |
| success=False, error=error | |
| ) | |
| self._log_entry(log_entry) | |
| def operation_timer(self, operation: AgentOperation, message: str, data: Dict[str, Any] = None): | |
| """Context manager for timing operations.""" | |
| start_time = time.time() | |
| self.info(operation, f"Starting: {message}", data) | |
| try: | |
| yield | |
| duration_ms = (time.time() - start_time) * 1000 | |
| log_entry = self._create_log_entry( | |
| operation, LogLevel.INFO, f"Completed: {message}", | |
| data, duration_ms, True | |
| ) | |
| self._log_entry(log_entry) | |
| except Exception as e: | |
| duration_ms = (time.time() - start_time) * 1000 | |
| self.error(operation, f"Failed: {message}", str(e), data) | |
| raise | |
| def log_decision(self, decision: str, reasoning: str, alternatives: List[str] = None, | |
| confidence: float = None, data: Dict[str, Any] = None): | |
| """Log agent decision-making process.""" | |
| decision_data = { | |
| "decision": decision, | |
| "reasoning": reasoning, | |
| "alternatives": alternatives or [], | |
| "confidence": confidence, | |
| **(data or {}) | |
| } | |
| self.info(AgentOperation.DECISION, f"Decision made: {decision}", decision_data) | |
| def log_api_call(self, api_name: str, endpoint: str, method: str, | |
| request_data: Dict[str, Any] = None, response_data: Dict[str, Any] = None, | |
| status_code: int = None, duration_ms: float = None): | |
| """Log API calls with detailed information.""" | |
| api_data = { | |
| "api_name": api_name, | |
| "endpoint": endpoint, | |
| "method": method, | |
| "request_data": request_data, | |
| "response_data": response_data, | |
| "status_code": status_code | |
| } | |
| success = status_code and 200 <= status_code < 300 | |
| log_entry = self._create_log_entry( | |
| AgentOperation.API_CALL, | |
| LogLevel.INFO if success else LogLevel.ERROR, | |
| f"API call to {api_name}: {method} {endpoint}", | |
| api_data, | |
| duration_ms, | |
| success | |
| ) | |
| self._log_entry(log_entry) | |
| def log_validation(self, validation_type: str, result: str, details: Dict[str, Any] = None): | |
| """Log validation operations.""" | |
| validation_data = { | |
| "validation_type": validation_type, | |
| "result": result, | |
| "details": details or {} | |
| } | |
| success = result.lower() in ["approved", "valid", "passed"] | |
| level = LogLevel.INFO if success else LogLevel.WARNING | |
| self._log_entry(self._create_log_entry( | |
| AgentOperation.VALIDATE, | |
| level, | |
| f"Validation {validation_type}: {result}", | |
| validation_data, | |
| success=success | |
| )) | |
| def get_metrics_summary(self) -> Dict[str, Any]: | |
| """Get summary of all performance metrics.""" | |
| total_operations = sum(metrics.total_calls for metrics in self.metrics.values()) | |
| total_successful = sum(metrics.successful_calls for metrics in self.metrics.values()) | |
| total_failed = sum(metrics.failed_calls for metrics in self.metrics.values()) | |
| session_duration = (datetime.now() - self.start_time).total_seconds() | |
| return { | |
| "session_id": self.session_id, | |
| "agent_type": self.agent_type, | |
| "session_duration_seconds": session_duration, | |
| "total_operations": total_operations, | |
| "total_successful": total_successful, | |
| "total_failed": total_failed, | |
| "overall_success_rate": total_successful / total_operations if total_operations > 0 else 0, | |
| "overall_error_rate": total_failed / total_operations if total_operations > 0 else 0, | |
| "operations": { | |
| operation.value: { | |
| "total_calls": metrics.total_calls, | |
| "successful_calls": metrics.successful_calls, | |
| "failed_calls": metrics.failed_calls, | |
| "success_rate": metrics.success_rate, | |
| "error_rate": metrics.error_rate, | |
| "average_duration_ms": metrics.average_duration_ms, | |
| "min_duration_ms": metrics.min_duration_ms if metrics.min_duration_ms != float('inf') else 0, | |
| "max_duration_ms": metrics.max_duration_ms | |
| } | |
| for operation, metrics in self.metrics.items() | |
| } | |
| } | |
| def export_logs(self, format: str = "json") -> Union[str, Dict[str, Any]]: | |
| """Export logs in specified format.""" | |
| if format == "json": | |
| return { | |
| "session_id": self.session_id, | |
| "agent_type": self.agent_type, | |
| "start_time": self.start_time.isoformat(), | |
| "end_time": datetime.now().isoformat(), | |
| "metrics": self.get_metrics_summary(), | |
| "log_entries": [ | |
| { | |
| **asdict(entry), | |
| "timestamp": entry.timestamp.isoformat(), | |
| "operation": entry.operation.value, | |
| "level": entry.level.value | |
| } | |
| for entry in self.log_entries | |
| ] | |
| } | |
| else: | |
| # Return as formatted string | |
| lines = [] | |
| for entry in self.log_entries: | |
| lines.append( | |
| f"{entry.timestamp.isoformat()} [{entry.level.value}] " | |
| f"{entry.agent_type}:{entry.operation.value} - {entry.message}" | |
| ) | |
| return "\n".join(lines) | |
| # Decorator for automatic logging | |
| def log_operation(operation: AgentOperation, message: str = None, log_args: bool = False, log_result: bool = False): | |
| """Decorator to automatically log function calls.""" | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| # Try to find logger in the instance | |
| logger = None | |
| if args and hasattr(args[0], 'logger'): | |
| logger = args[0].logger | |
| if not logger: | |
| return func(*args, **kwargs) | |
| func_name = func.__name__ | |
| log_message = message or f"Calling {func_name}" | |
| # Log function arguments if requested | |
| log_data = {} | |
| if log_args: | |
| log_data["args"] = str(args)[:200] # Truncate long arguments | |
| log_data["kwargs"] = {k: str(v)[:100] for k, v in kwargs.items()} | |
| with logger.operation_timer(operation, log_message, log_data): | |
| result = func(*args, **kwargs) | |
| # Log result if requested | |
| if log_result: | |
| logger.debug(operation, f"Result from {func_name}", {"result": str(result)[:200]}) | |
| return result | |
| return wrapper | |
| return decorator | |
| # Example usage and testing | |
| if __name__ == "__main__": | |
| print("๐ Agent Logger Testing") | |
| print("=" * 30) | |
| # Create logger | |
| logger = AgentLogger("flight_agent", LogLevel.DEBUG) | |
| # Test various logging operations | |
| logger.info(AgentOperation.SEARCH, "Starting flight search", {"query": "NYC to LAX"}) | |
| with logger.operation_timer(AgentOperation.API_CALL, "Calling flight API"): | |
| time.sleep(0.1) # Simulate API call | |
| logger.log_decision( | |
| "Select United Airlines flight", | |
| "Best price-to-duration ratio", | |
| ["American Airlines", "Delta"], | |
| 0.85 | |
| ) | |
| logger.log_api_call( | |
| "FlightAPI", | |
| "/search", | |
| "POST", | |
| {"origin": "NYC", "destination": "LAX"}, | |
| {"flights": 15}, | |
| 200, | |
| 150.5 | |
| ) | |
| logger.log_validation("price_check", "approved", {"max_price": 500, "actual_price": 350}) | |
| # Get metrics | |
| metrics = logger.get_metrics_summary() | |
| print(f"\n๐ Performance Metrics:") | |
| print(f" Total Operations: {metrics['total_operations']}") | |
| print(f" Success Rate: {metrics['overall_success_rate']:.2%}") | |
| # Export logs | |
| logs_json = logger.export_logs("json") | |
| print(f"\n๐ Log Export: {len(logs_json['log_entries'])} entries") | |
| print("\nโ Agent logging system working correctly!") | |