Spaces:
Sleeping
Sleeping
| """Structured logging with correlation IDs and context management.""" | |
| import logging | |
| import json | |
| import uuid | |
| import time | |
| from typing import Dict, Any, Optional, Union | |
| from datetime import datetime | |
| from contextvars import ContextVar | |
| from dataclasses import dataclass, asdict | |
| from enum import Enum | |
| # Context variable for correlation ID | |
| correlation_id_context: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None) | |
| class LogLevel(Enum): | |
| """Log levels.""" | |
| DEBUG = "DEBUG" | |
| INFO = "INFO" | |
| WARNING = "WARNING" | |
| ERROR = "ERROR" | |
| CRITICAL = "CRITICAL" | |
| class LogContext: | |
| """Structured log context.""" | |
| correlation_id: str | |
| operation: Optional[str] = None | |
| component: Optional[str] = None | |
| user_id: Optional[str] = None | |
| session_id: Optional[str] = None | |
| request_id: Optional[str] = None | |
| metadata: Optional[Dict[str, Any]] = None | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary.""" | |
| return {k: v for k, v in asdict(self).items() if v is not None} | |
| class StructuredLogger: | |
| """Structured logger with correlation ID support.""" | |
| def __init__(self, name: str, enable_json_logging: bool = True): | |
| """ | |
| Initialize structured logger. | |
| Args: | |
| name: Logger name | |
| enable_json_logging: Whether to use JSON format | |
| """ | |
| self.logger = logging.getLogger(name) | |
| self.enable_json_logging = enable_json_logging | |
| self._setup_formatter() | |
| def _setup_formatter(self) -> None: | |
| """Setup log formatter.""" | |
| if self.enable_json_logging: | |
| formatter = JsonFormatter() | |
| else: | |
| formatter = ContextFormatter() | |
| # Apply formatter to all handlers | |
| for handler in self.logger.handlers: | |
| handler.setFormatter(formatter) | |
| def _get_log_data(self, message: str, level: str, | |
| context: Optional[LogContext] = None, | |
| extra: Optional[Dict[str, Any]] = None, | |
| exception: Optional[Exception] = None) -> Dict[str, Any]: | |
| """ | |
| Prepare log data structure. | |
| Args: | |
| message: Log message | |
| level: Log level | |
| context: Log context | |
| extra: Extra data | |
| exception: Exception if any | |
| Returns: | |
| Dict[str, Any]: Structured log data | |
| """ | |
| # Get correlation ID from context or generate new one | |
| correlation_id = correlation_id_context.get() | |
| if not correlation_id and context: | |
| correlation_id = context.correlation_id | |
| if not correlation_id: | |
| correlation_id = str(uuid.uuid4()) | |
| log_data = { | |
| 'timestamp': datetime.utcnow().isoformat() + 'Z', | |
| 'level': level, | |
| 'message': message, | |
| 'correlation_id': correlation_id, | |
| 'logger_name': self.logger.name | |
| } | |
| # Add context information | |
| if context: | |
| log_data.update(context.to_dict()) | |
| # Add extra data | |
| if extra: | |
| log_data['extra'] = extra | |
| # Add exception information | |
| if exception: | |
| log_data['exception'] = { | |
| 'type': type(exception).__name__, | |
| 'message': str(exception), | |
| 'module': getattr(exception, '__module__', None) | |
| } | |
| # Add stack trace for debugging | |
| import traceback | |
| log_data['exception']['traceback'] = traceback.format_exc() | |
| return log_data | |
| def debug(self, message: str, context: Optional[LogContext] = None, | |
| extra: Optional[Dict[str, Any]] = None) -> None: | |
| """Log debug message.""" | |
| if self.logger.isEnabledFor(logging.DEBUG): | |
| log_data = self._get_log_data(message, LogLevel.DEBUG.value, context, extra) | |
| # Use 'structured_data' to avoid conflicts with LogRecord attributes | |
| self.logger.info(message, extra={'structured_data': log_data}) | |
| def info(self, message: str, context: Optional[LogContext] = None, | |
| extra: Optional[Dict[str, Any]] = None) -> None: | |
| """Log info message.""" | |
| if self.logger.isEnabledFor(logging.INFO): | |
| log_data = self._get_log_data(message, LogLevel.INFO.value, context, extra) | |
| # Use 'structured_data' to avoid conflicts with LogRecord attributes | |
| self.logger.info(message, extra={'structured_data': log_data}) | |
| def warning(self, message: str, context: Optional[LogContext] = None, | |
| extra: Optional[Dict[str, Any]] = None) -> None: | |
| """Log warning message.""" | |
| if self.logger.isEnabledFor(logging.WARNING): | |
| log_data = self._get_log_data(message, LogLevel.WARNING.value, context, extra) | |
| # Use 'structured_data' to avoid conflicts with LogRecord attributes | |
| self.logger.warning(message, extra={'structured_data': log_data}) | |
| def error(self, message: str, context: Optional[LogContext] = None, | |
| extra: Optional[Dict[str, Any]] = None, | |
| exception: Optional[Exception] = None) -> None: | |
| """Log error message.""" | |
| if self.logger.isEnabledFor(logging.ERROR): | |
| log_data = self._get_log_data(message, LogLevel.ERROR.value, context, extra, exception) | |
| # Use 'structured_data' to avoid conflicts with LogRecord attributes | |
| self.logger.error(message, extra={'structured_data': log_data}) | |
| def critical(self, message: str, context: Optional[LogContext] = None, | |
| extra: Optional[Dict[str, Any]] = None, | |
| exception: Optional[Exception] = None) -> None: | |
| """Log critical message.""" | |
| if self.logger.isEnabledFor(logging.CRITICAL): | |
| log_data = self._get_log_data(message, LogLevel.CRITICAL.value, context, extra, exception) | |
| # Use 'structured_data' to avoid conflicts with LogRecord attributes | |
| self.logger.critical(message, extra={'structured_data': log_data}) | |
| def log_operation_start(self, operation: str, context: Optional[LogContext] = None, | |
| extra: Optional[Dict[str, Any]] = None) -> str: | |
| """ | |
| Log operation start and return correlation ID. | |
| Args: | |
| operation: Operation name | |
| context: Log context | |
| extra: Extra data | |
| Returns: | |
| str: Correlation ID for the operation | |
| """ | |
| correlation_id = str(uuid.uuid4()) | |
| if context: | |
| context.correlation_id = correlation_id | |
| context.operation = operation | |
| else: | |
| context = LogContext(correlation_id=correlation_id, operation=operation) | |
| # Set correlation ID in context | |
| correlation_id_context.set(correlation_id) | |
| self.info(f"Operation started: {operation}", context, extra) | |
| return correlation_id | |
| def log_operation_end(self, operation: str, correlation_id: str, | |
| success: bool = True, duration: Optional[float] = None, | |
| context: Optional[LogContext] = None, | |
| extra: Optional[Dict[str, Any]] = None) -> None: | |
| """ | |
| Log operation end. | |
| Args: | |
| operation: Operation name | |
| correlation_id: Correlation ID | |
| success: Whether operation succeeded | |
| duration: Operation duration in seconds | |
| context: Log context | |
| extra: Extra data | |
| """ | |
| if context: | |
| context.correlation_id = correlation_id | |
| context.operation = operation | |
| else: | |
| context = LogContext(correlation_id=correlation_id, operation=operation) | |
| # Add performance data | |
| if extra is None: | |
| extra = {} | |
| extra['success'] = success | |
| if duration is not None: | |
| extra['duration_seconds'] = duration | |
| status = "completed successfully" if success else "failed" | |
| message = f"Operation {status}: {operation}" | |
| if success: | |
| self.info(message, context, extra) | |
| else: | |
| self.error(message, context, extra) | |
| def log_performance_metric(self, metric_name: str, value: Union[int, float], | |
| unit: str = None, context: Optional[LogContext] = None, | |
| extra: Optional[Dict[str, Any]] = None) -> None: | |
| """ | |
| Log performance metric. | |
| Args: | |
| metric_name: Name of the metric | |
| value: Metric value | |
| unit: Unit of measurement | |
| context: Log context | |
| extra: Extra data | |
| """ | |
| if extra is None: | |
| extra = {} | |
| extra['metric'] = { | |
| 'name': metric_name, | |
| 'value': value, | |
| 'unit': unit, | |
| 'timestamp': time.time() | |
| } | |
| message = f"Performance metric: {metric_name}={value}" | |
| if unit: | |
| message += f" {unit}" | |
| self.info(message, context, extra) | |
| class JsonFormatter(logging.Formatter): | |
| """JSON log formatter.""" | |
| def format(self, record: logging.LogRecord) -> str: | |
| """Format log record as JSON.""" | |
| try: | |
| # Get structured data from extra | |
| log_data = getattr(record, 'structured_data', {}) | |
| # Ensure basic fields are present | |
| if 'timestamp' not in log_data: | |
| log_data['timestamp'] = datetime.utcnow().isoformat() + 'Z' | |
| if 'level' not in log_data: | |
| log_data['level'] = record.levelname | |
| if 'message' not in log_data: | |
| log_data['message'] = record.getMessage() | |
| if 'logger_name' not in log_data: | |
| log_data['logger_name'] = record.name | |
| # Add standard log record fields | |
| log_data.update({ | |
| 'module': record.module, | |
| 'function': record.funcName, | |
| 'line': record.lineno, | |
| 'thread': record.thread, | |
| 'process': record.process | |
| }) | |
| return json.dumps(log_data, default=str, ensure_ascii=False) | |
| except Exception as e: | |
| # Fallback to standard formatting | |
| return f"JSON formatting error: {e} | Original: {record.getMessage()}" | |
| class ContextFormatter(logging.Formatter): | |
| """Context-aware log formatter.""" | |
| def __init__(self): | |
| """Initialize formatter.""" | |
| super().__init__( | |
| fmt='%(asctime)s - %(name)s - %(levelname)s - [%(correlation_id)s] - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| def format(self, record: logging.LogRecord) -> str: | |
| """Format log record with context.""" | |
| try: | |
| # Get correlation ID from context or record | |
| correlation_id = correlation_id_context.get() | |
| if not correlation_id: | |
| structured_data = getattr(record, 'structured_data', {}) | |
| correlation_id = structured_data.get('correlation_id', 'unknown') | |
| # Add correlation ID to record | |
| record.correlation_id = correlation_id | |
| # Add context information if available | |
| structured_data = getattr(record, 'structured_data', {}) | |
| if 'operation' in structured_data: | |
| record.message = f"[{structured_data['operation']}] {record.getMessage()}" | |
| return super().format(record) | |
| except Exception as e: | |
| # Fallback formatting | |
| return f"Formatting error: {e} | Original: {record.getMessage()}" | |
| def get_structured_logger(name: str, enable_json_logging: bool = True) -> StructuredLogger: | |
| """ | |
| Get a structured logger instance. | |
| Args: | |
| name: Logger name | |
| enable_json_logging: Whether to use JSON format | |
| Returns: | |
| StructuredLogger: Configured structured logger | |
| """ | |
| return StructuredLogger(name, enable_json_logging) | |
| def set_correlation_id(correlation_id: str) -> None: | |
| """ | |
| Set correlation ID in context. | |
| Args: | |
| correlation_id: Correlation ID to set | |
| """ | |
| correlation_id_context.set(correlation_id) | |
| def get_correlation_id() -> Optional[str]: | |
| """ | |
| Get current correlation ID from context. | |
| Returns: | |
| Optional[str]: Current correlation ID | |
| """ | |
| return correlation_id_context.get() | |
| def generate_correlation_id() -> str: | |
| """ | |
| Generate a new correlation ID. | |
| Returns: | |
| str: New correlation ID | |
| """ | |
| return str(uuid.uuid4()) |