Spaces:
Paused
Paused
| """ | |
| Advanced Logging and Tracing System | |
| Implements structured logging, distributed tracing, and correlation IDs | |
| """ | |
| import json | |
| import logging | |
| import time | |
| from contextvars import ContextVar | |
| from typing import Any, Dict, Optional, Union | |
| from fastapi import Request, Response | |
| from fastapi.middleware.base import BaseHTTPMiddleware | |
| # Context variables for request tracking | |
| request_id_var: ContextVar[Optional[str]] = ContextVar('request_id', default=None) | |
| user_id_var: ContextVar[Optional[str]] = ContextVar('user_id', default=None) | |
| correlation_id_var: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None) | |
| class StructuredLogger: | |
| """Enhanced logger with structured logging capabilities""" | |
| def __init__(self, name: str, level: int = logging.INFO): | |
| self.logger = logging.getLogger(name) | |
| self.logger.setLevel(level) | |
| # Remove existing handlers to avoid duplicates | |
| for handler in self.logger.handlers[:]: | |
| self.logger.removeHandler(handler) | |
| # Add structured JSON handler | |
| handler = StructuredJSONHandler() | |
| handler.setLevel(level) | |
| formatter = StructuredJSONFormatter() | |
| handler.setFormatter(formatter) | |
| self.logger.addHandler(handler) | |
| # Add console handler for development | |
| if logging.getLogger().hasHandlers(): | |
| console_handler = logging.StreamHandler() | |
| console_handler.setLevel(level) | |
| console_formatter = logging.Formatter( | |
| '%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| console_handler.setFormatter(console_formatter) | |
| self.logger.addHandler(console_handler) | |
| def _get_context_data(self) -> Dict[str, Any]: | |
| """Get current context data for logging""" | |
| context = {} | |
| # Add request tracking data | |
| request_id = request_id_var.get() | |
| if request_id: | |
| context['request_id'] = request_id | |
| correlation_id = correlation_id_var.get() | |
| if correlation_id: | |
| context['correlation_id'] = correlation_id | |
| user_id = user_id_var.get() | |
| if user_id: | |
| context['user_id'] = user_id | |
| return context | |
| def debug(self, message: str, **kwargs): | |
| """Log debug message with structured data""" | |
| self.logger.debug(message, extra={**self._get_context_data(), **kwargs}) | |
| def info(self, message: str, **kwargs): | |
| """Log info message with structured data""" | |
| self.logger.info(message, extra={**self._get_context_data(), **kwargs}) | |
| def warning(self, message: str, **kwargs): | |
| """Log warning message with structured data""" | |
| self.logger.warning(message, extra={**self._get_context_data(), **kwargs}) | |
| def error(self, message: str, exc_info=None, **kwargs): | |
| """Log error message with structured data""" | |
| self.logger.error(message, exc_info=exc_info, extra={**self._get_context_data(), **kwargs}) | |
| def critical(self, message: str, exc_info=None, **kwargs): | |
| """Log critical message with structured data""" | |
| self.logger.critical(message, exc_info=exc_info, extra={**self._get_context_data(), **kwargs}) | |
| def log_performance(self, operation: str, duration: float, **kwargs): | |
| """Log performance metrics""" | |
| self.logger.info( | |
| f"Performance: {operation} took {duration:.3f}s", | |
| extra={ | |
| **self._get_context_data(), | |
| 'operation': operation, | |
| 'duration_seconds': duration, | |
| 'performance_metric': True, | |
| **kwargs | |
| } | |
| ) | |
| def log_api_request(self, method: str, path: str, status_code: int, duration: float, **kwargs): | |
| """Log API request metrics""" | |
| self.logger.info( | |
| f"API Request: {method} {path} -> {status_code} ({duration:.3f}s)", | |
| extra={ | |
| **self._get_context_data(), | |
| 'method': method, | |
| 'path': path, | |
| 'status_code': status_code, | |
| 'duration_seconds': duration, | |
| 'api_request': True, | |
| **kwargs | |
| } | |
| ) | |
| def log_database_query(self, query: str, duration: float, row_count: Optional[int] = None, **kwargs): | |
| """Log database query metrics""" | |
| message = f"DB Query: {query[:100]}{'...' if len(query) > 100 else ''}" | |
| if row_count is not None: | |
| message += f" -> {row_count} rows" | |
| self.logger.info( | |
| f"{message} ({duration:.3f}s)", | |
| extra={ | |
| **self._get_context_data(), | |
| 'query': query, | |
| 'duration_seconds': duration, | |
| 'row_count': row_count, | |
| 'database_query': True, | |
| **kwargs | |
| } | |
| ) | |
| def log_security_event(self, event_type: str, severity: str = "info", **kwargs): | |
| """Log security-related events""" | |
| self.logger.warning( | |
| f"Security Event: {event_type}", | |
| extra={ | |
| **self._get_context_data(), | |
| 'event_type': event_type, | |
| 'severity': severity, | |
| 'security_event': True, | |
| **kwargs | |
| } | |
| ) | |
| class StructuredJSONFormatter(logging.Formatter): | |
| """JSON formatter for structured logging""" | |
| def format(self, record: logging.LogRecord) -> str: | |
| # Create the base log entry | |
| log_entry = { | |
| 'timestamp': self.formatTime(record), | |
| 'level': record.levelname, | |
| 'logger': record.name, | |
| 'message': record.getMessage(), | |
| 'module': record.module, | |
| 'function': record.funcName, | |
| 'line': record.lineno, | |
| } | |
| # Add any extra fields from the record | |
| if hasattr(record, '__dict__'): | |
| for key, value in record.__dict__.items(): | |
| if key not in ['name', 'msg', 'args', 'levelname', 'levelno', | |
| 'pathname', 'filename', 'module', 'exc_info', | |
| 'exc_text', 'stack_info', 'lineno', 'funcName', | |
| 'created', 'msecs', 'relativeCreated', 'thread', | |
| 'threadName', 'processName', 'process', 'message']: | |
| # Convert non-serializable objects to strings | |
| if isinstance(value, (dict, list, tuple)): | |
| log_entry[key] = value | |
| else: | |
| log_entry[key] = str(value) | |
| # Add exception info if present | |
| if record.exc_info: | |
| log_entry['exception'] = self.formatException(record.exc_info) | |
| return json.dumps(log_entry, default=str) | |
| class StructuredJSONHandler(logging.Handler): | |
| """Handler that outputs structured JSON logs""" | |
| def __init__(self, level=logging.NOTSET): | |
| super().__init__(level) | |
| self.formatter = StructuredJSONFormatter() | |
| def emit(self, record: logging.LogRecord) -> None: | |
| try: | |
| msg = self.format(record) | |
| print(msg) # In production, this would go to a log aggregation system | |
| except Exception: | |
| self.handleError(record) | |
| class RequestTrackingMiddleware(BaseHTTPMiddleware): | |
| """Middleware for request tracking and correlation IDs""" | |
| def __init__(self, app): | |
| super().__init__(app) | |
| self.logger = StructuredLogger("request_tracking") | |
| async def dispatch(self, request: Request, call_next) -> Response: | |
| # Generate request ID | |
| import uuid | |
| request_id = str(uuid.uuid4())[:8] | |
| # Set context variables | |
| request_id_token = request_id_var.set(request_id) | |
| # Extract or generate correlation ID | |
| correlation_id = request.headers.get('X-Correlation-ID', request_id) | |
| correlation_token = correlation_id_var.set(correlation_id) | |
| # Extract user ID if available (from auth middleware) | |
| user_id = getattr(request.state, 'user_id', None) | |
| if user_id: | |
| user_id_token = user_id_var.set(str(user_id)) | |
| else: | |
| user_id_token = None | |
| start_time = time.time() | |
| try: | |
| # Log request start | |
| self.logger.info( | |
| f"Request started: {request.method} {request.url.path}", | |
| method=request.method, | |
| path=request.url.path, | |
| user_agent=request.headers.get('User-Agent'), | |
| ip=request.client.host if request.client else None, | |
| correlation_id=correlation_id | |
| ) | |
| # Process request | |
| response = await call_next(request) | |
| # Calculate duration | |
| duration = time.time() - start_time | |
| # Log request completion | |
| self.logger.log_api_request( | |
| method=request.method, | |
| path=request.url.path, | |
| status_code=response.status_code, | |
| duration=duration, | |
| response_size=getattr(response, 'content_length', 0) | |
| ) | |
| # Add correlation ID to response headers | |
| response.headers['X-Correlation-ID'] = correlation_id | |
| response.headers['X-Request-ID'] = request_id | |
| return response | |
| except Exception as e: | |
| # Calculate duration for failed requests | |
| duration = time.time() - start_time | |
| # Log error | |
| self.logger.error( | |
| f"Request failed: {request.method} {request.url.path} - {str(e)}", | |
| method=request.method, | |
| path=request.url.path, | |
| duration=duration, | |
| exception=str(e) | |
| ) | |
| raise | |
| finally: | |
| # Clean up context variables | |
| request_id_var.reset(request_id_token) | |
| correlation_id_var.reset(correlation_token) | |
| if user_id_token: | |
| user_id_var.reset(user_id_token) | |
| class PerformanceMonitoringMiddleware(BaseHTTPMiddleware): | |
| """Middleware for performance monitoring and slow request detection""" | |
| def __init__(self, app, slow_request_threshold: float = 1.0): | |
| super().__init__(app) | |
| self.slow_request_threshold = slow_request_threshold | |
| self.logger = StructuredLogger("performance_monitoring") | |
| async def dispatch(self, request: Request, call_next) -> Response: | |
| start_time = time.time() | |
| # Track database queries during request | |
| # This would be enhanced with actual database monitoring | |
| # For now, we'll track basic request performance | |
| response = await call_next(request) | |
| total_duration = time.time() - start_time | |
| # Log slow requests | |
| if total_duration > self.slow_request_threshold: | |
| self.logger.warning( | |
| f"Slow request detected: {request.method} {request.url.path} ({total_duration:.3f}s)", | |
| method=request.method, | |
| path=request.url.path, | |
| duration=total_duration, | |
| status_code=response.status_code, | |
| slow_request=True | |
| ) | |
| # Log performance metrics for all requests | |
| self.logger.log_performance( | |
| operation=f"http_{request.method.lower()}_{request.url.path.replace('/', '_')}", | |
| duration=total_duration, | |
| method=request.method, | |
| path=request.url.path, | |
| status_code=response.status_code | |
| ) | |
| return response | |
| class DatabaseQueryLogger: | |
| """Logger for database query performance monitoring""" | |
| def __init__(self): | |
| self.logger = StructuredLogger("database_queries") | |
| def log_query(self, query: str, duration: float, row_count: Optional[int] = None): | |
| """Log a database query with performance metrics""" | |
| self.logger.log_database_query( | |
| query=query, | |
| duration=duration, | |
| row_count=row_count | |
| ) | |
| # Alert on slow queries | |
| if duration > 0.5: # 500ms threshold | |
| self.logger.warning( | |
| f"Slow database query detected: {duration:.3f}s", | |
| query=query[:200], | |
| duration=duration, | |
| slow_query=True | |
| ) | |
| # Global instances | |
| structured_logger = StructuredLogger("app") | |
| db_query_logger = DatabaseQueryLogger() | |
| def get_request_id() -> Optional[str]: | |
| """Get current request ID from context""" | |
| return request_id_var.get() | |
| def get_correlation_id() -> Optional[str]: | |
| """Get current correlation ID from context""" | |
| return correlation_id_var.get() | |
| def get_user_id() -> Optional[str]: | |
| """Get current user ID from context""" | |
| return user_id_var.get() | |
| def set_user_context(user_id: Union[str, int]) -> None: | |
| """Set user context for logging""" | |
| user_id_var.set(str(user_id)) | |
| # Convenience functions for common logging patterns | |
| def log_api_request(method: str, path: str, status_code: int, duration: float, **kwargs): | |
| """Log API request with structured data""" | |
| structured_logger.log_api_request(method, path, status_code, duration, **kwargs) | |
| def log_performance(operation: str, duration: float, **kwargs): | |
| """Log performance metric""" | |
| structured_logger.log_performance(operation, duration, **kwargs) | |
| def log_security_event(event_type: str, severity: str = "info", **kwargs): | |
| """Log security event""" | |
| structured_logger.log_security_event(event_type, severity, **kwargs) | |
| def log_database_query(query: str, duration: float, row_count: Optional[int] = None, **kwargs): | |
| """Log database query""" | |
| db_query_logger.log_query(query, duration, row_count) | |
| structured_logger.log_database_query(query, duration, row_count, **kwargs) | |