BlakeL's picture
Upload 115 files
3f9f85b verified
"""
Comprehensive Agent Logging System
This module provides detailed logging for AI agents, tracking decision-making processes,
performance metrics, and debugging information for reliable AI systems.
"""
import logging
import json
import time
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Union, Callable
from dataclasses import dataclass, asdict
from enum import Enum
from contextlib import contextmanager
import functools
import traceback
class LogLevel(str, Enum):
"""Custom log levels for agent operations."""
TRACE = "TRACE" # Detailed execution flow
DEBUG = "DEBUG" # Debug information
INFO = "INFO" # General information
WARNING = "WARNING" # Warning messages
ERROR = "ERROR" # Error conditions
CRITICAL = "CRITICAL" # Critical failures
class AgentOperation(str, Enum):
"""Types of agent operations to log."""
SEARCH = "search"
FILTER = "filter"
RANK = "rank"
VALIDATE = "validate"
COORDINATE = "coordinate"
API_CALL = "api_call"
DECISION = "decision"
ERROR_HANDLING = "error_handling"
@dataclass
class LogEntry:
"""Structured log entry for agent operations."""
id: str
timestamp: datetime
agent_type: str
operation: AgentOperation
level: LogLevel
message: str
data: Dict[str, Any] = None
duration_ms: Optional[float] = None
success: Optional[bool] = None
error: Optional[str] = None
context: Dict[str, Any] = None
@dataclass
class PerformanceMetrics:
"""Performance metrics for agent operations."""
operation: AgentOperation
total_calls: int = 0
successful_calls: int = 0
failed_calls: int = 0
average_duration_ms: float = 0.0
min_duration_ms: float = float('inf')
max_duration_ms: float = 0.0
success_rate: float = 0.0
error_rate: float = 0.0
class AgentLogger:
"""Comprehensive logger for AI agent operations."""
def __init__(self, agent_type: str, log_level: LogLevel = LogLevel.INFO):
self.agent_type = agent_type
self.log_level = log_level
self.session_id = str(uuid.uuid4())
self.start_time = datetime.now()
# Setup logging
self.logger = logging.getLogger(f"agent.{agent_type}")
self.logger.setLevel(getattr(logging, log_level.value))
# Performance tracking
self.metrics: Dict[AgentOperation, PerformanceMetrics] = {}
self.log_entries: List[LogEntry] = []
# Setup formatters
self._setup_formatters()
def _setup_formatters(self):
"""Setup log formatters for different output types."""
# Console formatter
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# File formatter (JSON)
self.file_formatter = logging.Formatter(
'%(message)s' # JSON will be the message
)
# Add console handler if not already present
if not self.logger.handlers:
console_handler = logging.StreamHandler()
console_handler.setFormatter(console_formatter)
self.logger.addHandler(console_handler)
def _should_log(self, level: LogLevel) -> bool:
"""Check if we should log at this level."""
level_values = {
LogLevel.TRACE: 0,
LogLevel.DEBUG: 1,
LogLevel.INFO: 2,
LogLevel.WARNING: 3,
LogLevel.ERROR: 4,
LogLevel.CRITICAL: 5
}
return level_values[level] >= level_values[self.log_level]
def _create_log_entry(self, operation: AgentOperation, level: LogLevel,
message: str, data: Dict[str, Any] = None,
duration_ms: float = None, success: bool = None,
error: str = None, context: Dict[str, Any] = None) -> LogEntry:
"""Create a structured log entry."""
return LogEntry(
id=str(uuid.uuid4()),
timestamp=datetime.now(),
agent_type=self.agent_type,
operation=operation,
level=level,
message=message,
data=data or {},
duration_ms=duration_ms,
success=success,
error=error,
context=context or {}
)
def _log_entry(self, log_entry: LogEntry):
"""Internal method to process and store log entries."""
# Store in memory
self.log_entries.append(log_entry)
# Update metrics
self._update_metrics(log_entry)
# Output to logger
if self._should_log(log_entry.level):
# Create JSON output for structured logging
log_data = asdict(log_entry)
log_data['timestamp'] = log_entry.timestamp.isoformat()
log_data['operation'] = log_entry.operation.value
log_data['level'] = log_entry.level.value
self.logger.log(
getattr(logging, log_entry.level.value),
json.dumps(log_data, default=str)
)
def _update_metrics(self, log_entry: LogEntry):
"""Update performance metrics based on log entry."""
operation = log_entry.operation
if operation not in self.metrics:
self.metrics[operation] = PerformanceMetrics(operation=operation)
metrics = self.metrics[operation]
metrics.total_calls += 1
if log_entry.success is not None:
if log_entry.success:
metrics.successful_calls += 1
else:
metrics.failed_calls += 1
if log_entry.duration_ms is not None:
# Update duration statistics
if metrics.total_calls == 1:
metrics.average_duration_ms = log_entry.duration_ms
metrics.min_duration_ms = log_entry.duration_ms
metrics.max_duration_ms = log_entry.duration_ms
else:
# Running average
metrics.average_duration_ms = (
(metrics.average_duration_ms * (metrics.total_calls - 1) + log_entry.duration_ms)
/ metrics.total_calls
)
metrics.min_duration_ms = min(metrics.min_duration_ms, log_entry.duration_ms)
metrics.max_duration_ms = max(metrics.max_duration_ms, log_entry.duration_ms)
# Update rates
if metrics.total_calls > 0:
metrics.success_rate = metrics.successful_calls / metrics.total_calls
metrics.error_rate = metrics.failed_calls / metrics.total_calls
# Public logging methods
def trace(self, operation: AgentOperation, message: str, data: Dict[str, Any] = None):
"""Log trace-level information."""
log_entry = self._create_log_entry(operation, LogLevel.TRACE, message, data)
self._log_entry(log_entry)
def debug(self, operation: AgentOperation, message: str, data: Dict[str, Any] = None):
"""Log debug-level information."""
log_entry = self._create_log_entry(operation, LogLevel.DEBUG, message, data)
self._log_entry(log_entry)
def info(self, operation: AgentOperation, message: str, data: Dict[str, Any] = None):
"""Log info-level information."""
log_entry = self._create_log_entry(operation, LogLevel.INFO, message, data)
self._log_entry(log_entry)
def warning(self, operation: AgentOperation, message: str, data: Dict[str, Any] = None):
"""Log warning-level information."""
log_entry = self._create_log_entry(operation, LogLevel.WARNING, message, data)
self._log_entry(log_entry)
def error(self, operation: AgentOperation, message: str, error: str = None, data: Dict[str, Any] = None):
"""Log error-level information."""
log_entry = self._create_log_entry(
operation, LogLevel.ERROR, message, data,
success=False, error=error
)
self._log_entry(log_entry)
def critical(self, operation: AgentOperation, message: str, error: str = None, data: Dict[str, Any] = None):
"""Log critical-level information."""
log_entry = self._create_log_entry(
operation, LogLevel.CRITICAL, message, data,
success=False, error=error
)
self._log_entry(log_entry)
@contextmanager
def operation_timer(self, operation: AgentOperation, message: str, data: Dict[str, Any] = None):
"""Context manager for timing operations."""
start_time = time.time()
self.info(operation, f"Starting: {message}", data)
try:
yield
duration_ms = (time.time() - start_time) * 1000
log_entry = self._create_log_entry(
operation, LogLevel.INFO, f"Completed: {message}",
data, duration_ms, True
)
self._log_entry(log_entry)
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
self.error(operation, f"Failed: {message}", str(e), data)
raise
def log_decision(self, decision: str, reasoning: str, alternatives: List[str] = None,
confidence: float = None, data: Dict[str, Any] = None):
"""Log agent decision-making process."""
decision_data = {
"decision": decision,
"reasoning": reasoning,
"alternatives": alternatives or [],
"confidence": confidence,
**(data or {})
}
self.info(AgentOperation.DECISION, f"Decision made: {decision}", decision_data)
def log_api_call(self, api_name: str, endpoint: str, method: str,
request_data: Dict[str, Any] = None, response_data: Dict[str, Any] = None,
status_code: int = None, duration_ms: float = None):
"""Log API calls with detailed information."""
api_data = {
"api_name": api_name,
"endpoint": endpoint,
"method": method,
"request_data": request_data,
"response_data": response_data,
"status_code": status_code
}
success = status_code and 200 <= status_code < 300
log_entry = self._create_log_entry(
AgentOperation.API_CALL,
LogLevel.INFO if success else LogLevel.ERROR,
f"API call to {api_name}: {method} {endpoint}",
api_data,
duration_ms,
success
)
self._log_entry(log_entry)
def log_validation(self, validation_type: str, result: str, details: Dict[str, Any] = None):
"""Log validation operations."""
validation_data = {
"validation_type": validation_type,
"result": result,
"details": details or {}
}
success = result.lower() in ["approved", "valid", "passed"]
level = LogLevel.INFO if success else LogLevel.WARNING
self._log_entry(self._create_log_entry(
AgentOperation.VALIDATE,
level,
f"Validation {validation_type}: {result}",
validation_data,
success=success
))
def get_metrics_summary(self) -> Dict[str, Any]:
"""Get summary of all performance metrics."""
total_operations = sum(metrics.total_calls for metrics in self.metrics.values())
total_successful = sum(metrics.successful_calls for metrics in self.metrics.values())
total_failed = sum(metrics.failed_calls for metrics in self.metrics.values())
session_duration = (datetime.now() - self.start_time).total_seconds()
return {
"session_id": self.session_id,
"agent_type": self.agent_type,
"session_duration_seconds": session_duration,
"total_operations": total_operations,
"total_successful": total_successful,
"total_failed": total_failed,
"overall_success_rate": total_successful / total_operations if total_operations > 0 else 0,
"overall_error_rate": total_failed / total_operations if total_operations > 0 else 0,
"operations": {
operation.value: {
"total_calls": metrics.total_calls,
"successful_calls": metrics.successful_calls,
"failed_calls": metrics.failed_calls,
"success_rate": metrics.success_rate,
"error_rate": metrics.error_rate,
"average_duration_ms": metrics.average_duration_ms,
"min_duration_ms": metrics.min_duration_ms if metrics.min_duration_ms != float('inf') else 0,
"max_duration_ms": metrics.max_duration_ms
}
for operation, metrics in self.metrics.items()
}
}
def export_logs(self, format: str = "json") -> Union[str, Dict[str, Any]]:
"""Export logs in specified format."""
if format == "json":
return {
"session_id": self.session_id,
"agent_type": self.agent_type,
"start_time": self.start_time.isoformat(),
"end_time": datetime.now().isoformat(),
"metrics": self.get_metrics_summary(),
"log_entries": [
{
**asdict(entry),
"timestamp": entry.timestamp.isoformat(),
"operation": entry.operation.value,
"level": entry.level.value
}
for entry in self.log_entries
]
}
else:
# Return as formatted string
lines = []
for entry in self.log_entries:
lines.append(
f"{entry.timestamp.isoformat()} [{entry.level.value}] "
f"{entry.agent_type}:{entry.operation.value} - {entry.message}"
)
return "\n".join(lines)
# Decorator for automatic logging
def log_operation(operation: AgentOperation, message: str = None, log_args: bool = False, log_result: bool = False):
"""Decorator to automatically log function calls."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Try to find logger in the instance
logger = None
if args and hasattr(args[0], 'logger'):
logger = args[0].logger
if not logger:
return func(*args, **kwargs)
func_name = func.__name__
log_message = message or f"Calling {func_name}"
# Log function arguments if requested
log_data = {}
if log_args:
log_data["args"] = str(args)[:200] # Truncate long arguments
log_data["kwargs"] = {k: str(v)[:100] for k, v in kwargs.items()}
with logger.operation_timer(operation, log_message, log_data):
result = func(*args, **kwargs)
# Log result if requested
if log_result:
logger.debug(operation, f"Result from {func_name}", {"result": str(result)[:200]})
return result
return wrapper
return decorator
# Example usage and testing
if __name__ == "__main__":
print("๐Ÿ“Š Agent Logger Testing")
print("=" * 30)
# Create logger
logger = AgentLogger("flight_agent", LogLevel.DEBUG)
# Test various logging operations
logger.info(AgentOperation.SEARCH, "Starting flight search", {"query": "NYC to LAX"})
with logger.operation_timer(AgentOperation.API_CALL, "Calling flight API"):
time.sleep(0.1) # Simulate API call
logger.log_decision(
"Select United Airlines flight",
"Best price-to-duration ratio",
["American Airlines", "Delta"],
0.85
)
logger.log_api_call(
"FlightAPI",
"/search",
"POST",
{"origin": "NYC", "destination": "LAX"},
{"flights": 15},
200,
150.5
)
logger.log_validation("price_check", "approved", {"max_price": 500, "actual_price": 350})
# Get metrics
metrics = logger.get_metrics_summary()
print(f"\n๐Ÿ“ˆ Performance Metrics:")
print(f" Total Operations: {metrics['total_operations']}")
print(f" Success Rate: {metrics['overall_success_rate']:.2%}")
# Export logs
logs_json = logger.export_logs("json")
print(f"\n๐Ÿ“‹ Log Export: {len(logs_json['log_entries'])} entries")
print("\nโœ… Agent logging system working correctly!")