""" Advanced Logging and Tracing System Implements structured logging, distributed tracing, and correlation IDs """ import json import logging import time from contextvars import ContextVar from typing import Any, Dict, Optional, Union from fastapi import Request, Response from fastapi.middleware.base import BaseHTTPMiddleware # Context variables for request tracking request_id_var: ContextVar[Optional[str]] = ContextVar('request_id', default=None) user_id_var: ContextVar[Optional[str]] = ContextVar('user_id', default=None) correlation_id_var: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None) class StructuredLogger: """Enhanced logger with structured logging capabilities""" def __init__(self, name: str, level: int = logging.INFO): self.logger = logging.getLogger(name) self.logger.setLevel(level) # Remove existing handlers to avoid duplicates for handler in self.logger.handlers[:]: self.logger.removeHandler(handler) # Add structured JSON handler handler = StructuredJSONHandler() handler.setLevel(level) formatter = StructuredJSONFormatter() handler.setFormatter(formatter) self.logger.addHandler(handler) # Add console handler for development if logging.getLogger().hasHandlers(): console_handler = logging.StreamHandler() console_handler.setLevel(level) console_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) console_handler.setFormatter(console_formatter) self.logger.addHandler(console_handler) def _get_context_data(self) -> Dict[str, Any]: """Get current context data for logging""" context = {} # Add request tracking data request_id = request_id_var.get() if request_id: context['request_id'] = request_id correlation_id = correlation_id_var.get() if correlation_id: context['correlation_id'] = correlation_id user_id = user_id_var.get() if user_id: context['user_id'] = user_id return context def debug(self, message: str, **kwargs): """Log debug message with structured data""" self.logger.debug(message, extra={**self._get_context_data(), **kwargs}) def info(self, message: str, **kwargs): """Log info message with structured data""" self.logger.info(message, extra={**self._get_context_data(), **kwargs}) def warning(self, message: str, **kwargs): """Log warning message with structured data""" self.logger.warning(message, extra={**self._get_context_data(), **kwargs}) def error(self, message: str, exc_info=None, **kwargs): """Log error message with structured data""" self.logger.error(message, exc_info=exc_info, extra={**self._get_context_data(), **kwargs}) def critical(self, message: str, exc_info=None, **kwargs): """Log critical message with structured data""" self.logger.critical(message, exc_info=exc_info, extra={**self._get_context_data(), **kwargs}) def log_performance(self, operation: str, duration: float, **kwargs): """Log performance metrics""" self.logger.info( f"Performance: {operation} took {duration:.3f}s", extra={ **self._get_context_data(), 'operation': operation, 'duration_seconds': duration, 'performance_metric': True, **kwargs } ) def log_api_request(self, method: str, path: str, status_code: int, duration: float, **kwargs): """Log API request metrics""" self.logger.info( f"API Request: {method} {path} -> {status_code} ({duration:.3f}s)", extra={ **self._get_context_data(), 'method': method, 'path': path, 'status_code': status_code, 'duration_seconds': duration, 'api_request': True, **kwargs } ) def log_database_query(self, query: str, duration: float, row_count: Optional[int] = None, **kwargs): """Log database query metrics""" message = f"DB Query: {query[:100]}{'...' if len(query) > 100 else ''}" if row_count is not None: message += f" -> {row_count} rows" self.logger.info( f"{message} ({duration:.3f}s)", extra={ **self._get_context_data(), 'query': query, 'duration_seconds': duration, 'row_count': row_count, 'database_query': True, **kwargs } ) def log_security_event(self, event_type: str, severity: str = "info", **kwargs): """Log security-related events""" self.logger.warning( f"Security Event: {event_type}", extra={ **self._get_context_data(), 'event_type': event_type, 'severity': severity, 'security_event': True, **kwargs } ) class StructuredJSONFormatter(logging.Formatter): """JSON formatter for structured logging""" def format(self, record: logging.LogRecord) -> str: # Create the base log entry log_entry = { 'timestamp': self.formatTime(record), 'level': record.levelname, 'logger': record.name, 'message': record.getMessage(), 'module': record.module, 'function': record.funcName, 'line': record.lineno, } # Add any extra fields from the record if hasattr(record, '__dict__'): for key, value in record.__dict__.items(): if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname', 'filename', 'module', 'exc_info', 'exc_text', 'stack_info', 'lineno', 'funcName', 'created', 'msecs', 'relativeCreated', 'thread', 'threadName', 'processName', 'process', 'message']: # Convert non-serializable objects to strings if isinstance(value, (dict, list, tuple)): log_entry[key] = value else: log_entry[key] = str(value) # Add exception info if present if record.exc_info: log_entry['exception'] = self.formatException(record.exc_info) return json.dumps(log_entry, default=str) class StructuredJSONHandler(logging.Handler): """Handler that outputs structured JSON logs""" def __init__(self, level=logging.NOTSET): super().__init__(level) self.formatter = StructuredJSONFormatter() def emit(self, record: logging.LogRecord) -> None: try: msg = self.format(record) print(msg) # In production, this would go to a log aggregation system except Exception: self.handleError(record) class RequestTrackingMiddleware(BaseHTTPMiddleware): """Middleware for request tracking and correlation IDs""" def __init__(self, app): super().__init__(app) self.logger = StructuredLogger("request_tracking") async def dispatch(self, request: Request, call_next) -> Response: # Generate request ID import uuid request_id = str(uuid.uuid4())[:8] # Set context variables request_id_token = request_id_var.set(request_id) # Extract or generate correlation ID correlation_id = request.headers.get('X-Correlation-ID', request_id) correlation_token = correlation_id_var.set(correlation_id) # Extract user ID if available (from auth middleware) user_id = getattr(request.state, 'user_id', None) if user_id: user_id_token = user_id_var.set(str(user_id)) else: user_id_token = None start_time = time.time() try: # Log request start self.logger.info( f"Request started: {request.method} {request.url.path}", method=request.method, path=request.url.path, user_agent=request.headers.get('User-Agent'), ip=request.client.host if request.client else None, correlation_id=correlation_id ) # Process request response = await call_next(request) # Calculate duration duration = time.time() - start_time # Log request completion self.logger.log_api_request( method=request.method, path=request.url.path, status_code=response.status_code, duration=duration, response_size=getattr(response, 'content_length', 0) ) # Add correlation ID to response headers response.headers['X-Correlation-ID'] = correlation_id response.headers['X-Request-ID'] = request_id return response except Exception as e: # Calculate duration for failed requests duration = time.time() - start_time # Log error self.logger.error( f"Request failed: {request.method} {request.url.path} - {str(e)}", method=request.method, path=request.url.path, duration=duration, exception=str(e) ) raise finally: # Clean up context variables request_id_var.reset(request_id_token) correlation_id_var.reset(correlation_token) if user_id_token: user_id_var.reset(user_id_token) class PerformanceMonitoringMiddleware(BaseHTTPMiddleware): """Middleware for performance monitoring and slow request detection""" def __init__(self, app, slow_request_threshold: float = 1.0): super().__init__(app) self.slow_request_threshold = slow_request_threshold self.logger = StructuredLogger("performance_monitoring") async def dispatch(self, request: Request, call_next) -> Response: start_time = time.time() # Track database queries during request # This would be enhanced with actual database monitoring # For now, we'll track basic request performance response = await call_next(request) total_duration = time.time() - start_time # Log slow requests if total_duration > self.slow_request_threshold: self.logger.warning( f"Slow request detected: {request.method} {request.url.path} ({total_duration:.3f}s)", method=request.method, path=request.url.path, duration=total_duration, status_code=response.status_code, slow_request=True ) # Log performance metrics for all requests self.logger.log_performance( operation=f"http_{request.method.lower()}_{request.url.path.replace('/', '_')}", duration=total_duration, method=request.method, path=request.url.path, status_code=response.status_code ) return response class DatabaseQueryLogger: """Logger for database query performance monitoring""" def __init__(self): self.logger = StructuredLogger("database_queries") def log_query(self, query: str, duration: float, row_count: Optional[int] = None): """Log a database query with performance metrics""" self.logger.log_database_query( query=query, duration=duration, row_count=row_count ) # Alert on slow queries if duration > 0.5: # 500ms threshold self.logger.warning( f"Slow database query detected: {duration:.3f}s", query=query[:200], duration=duration, slow_query=True ) # Global instances structured_logger = StructuredLogger("app") db_query_logger = DatabaseQueryLogger() def get_request_id() -> Optional[str]: """Get current request ID from context""" return request_id_var.get() def get_correlation_id() -> Optional[str]: """Get current correlation ID from context""" return correlation_id_var.get() def get_user_id() -> Optional[str]: """Get current user ID from context""" return user_id_var.get() def set_user_context(user_id: Union[str, int]) -> None: """Set user context for logging""" user_id_var.set(str(user_id)) # Convenience functions for common logging patterns def log_api_request(method: str, path: str, status_code: int, duration: float, **kwargs): """Log API request with structured data""" structured_logger.log_api_request(method, path, status_code, duration, **kwargs) def log_performance(operation: str, duration: float, **kwargs): """Log performance metric""" structured_logger.log_performance(operation, duration, **kwargs) def log_security_event(event_type: str, severity: str = "info", **kwargs): """Log security event""" structured_logger.log_security_event(event_type, severity, **kwargs) def log_database_query(query: str, duration: float, row_count: Optional[int] = None, **kwargs): """Log database query""" db_query_logger.log_query(query, duration, row_count) structured_logger.log_database_query(query, duration, row_count, **kwargs)