zenith-backend / core /logging /advanced_logging.py
teoat's picture
Upload folder using huggingface_hub
4ae946d verified
"""
Advanced Logging and Tracing System
Implements structured logging, distributed tracing, and correlation IDs
"""
import json
import logging
import time
from contextvars import ContextVar
from typing import Any, Dict, Optional, Union
from fastapi import Request, Response
from fastapi.middleware.base import BaseHTTPMiddleware
# Context variables for request tracking
request_id_var: ContextVar[Optional[str]] = ContextVar('request_id', default=None)
user_id_var: ContextVar[Optional[str]] = ContextVar('user_id', default=None)
correlation_id_var: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None)
class StructuredLogger:
"""Enhanced logger with structured logging capabilities"""
def __init__(self, name: str, level: int = logging.INFO):
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
# Remove existing handlers to avoid duplicates
for handler in self.logger.handlers[:]:
self.logger.removeHandler(handler)
# Add structured JSON handler
handler = StructuredJSONHandler()
handler.setLevel(level)
formatter = StructuredJSONFormatter()
handler.setFormatter(formatter)
self.logger.addHandler(handler)
# Add console handler for development
if logging.getLogger().hasHandlers():
console_handler = logging.StreamHandler()
console_handler.setLevel(level)
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(console_formatter)
self.logger.addHandler(console_handler)
def _get_context_data(self) -> Dict[str, Any]:
"""Get current context data for logging"""
context = {}
# Add request tracking data
request_id = request_id_var.get()
if request_id:
context['request_id'] = request_id
correlation_id = correlation_id_var.get()
if correlation_id:
context['correlation_id'] = correlation_id
user_id = user_id_var.get()
if user_id:
context['user_id'] = user_id
return context
def debug(self, message: str, **kwargs):
"""Log debug message with structured data"""
self.logger.debug(message, extra={**self._get_context_data(), **kwargs})
def info(self, message: str, **kwargs):
"""Log info message with structured data"""
self.logger.info(message, extra={**self._get_context_data(), **kwargs})
def warning(self, message: str, **kwargs):
"""Log warning message with structured data"""
self.logger.warning(message, extra={**self._get_context_data(), **kwargs})
def error(self, message: str, exc_info=None, **kwargs):
"""Log error message with structured data"""
self.logger.error(message, exc_info=exc_info, extra={**self._get_context_data(), **kwargs})
def critical(self, message: str, exc_info=None, **kwargs):
"""Log critical message with structured data"""
self.logger.critical(message, exc_info=exc_info, extra={**self._get_context_data(), **kwargs})
def log_performance(self, operation: str, duration: float, **kwargs):
"""Log performance metrics"""
self.logger.info(
f"Performance: {operation} took {duration:.3f}s",
extra={
**self._get_context_data(),
'operation': operation,
'duration_seconds': duration,
'performance_metric': True,
**kwargs
}
)
def log_api_request(self, method: str, path: str, status_code: int, duration: float, **kwargs):
"""Log API request metrics"""
self.logger.info(
f"API Request: {method} {path} -> {status_code} ({duration:.3f}s)",
extra={
**self._get_context_data(),
'method': method,
'path': path,
'status_code': status_code,
'duration_seconds': duration,
'api_request': True,
**kwargs
}
)
def log_database_query(self, query: str, duration: float, row_count: Optional[int] = None, **kwargs):
"""Log database query metrics"""
message = f"DB Query: {query[:100]}{'...' if len(query) > 100 else ''}"
if row_count is not None:
message += f" -> {row_count} rows"
self.logger.info(
f"{message} ({duration:.3f}s)",
extra={
**self._get_context_data(),
'query': query,
'duration_seconds': duration,
'row_count': row_count,
'database_query': True,
**kwargs
}
)
def log_security_event(self, event_type: str, severity: str = "info", **kwargs):
"""Log security-related events"""
self.logger.warning(
f"Security Event: {event_type}",
extra={
**self._get_context_data(),
'event_type': event_type,
'severity': severity,
'security_event': True,
**kwargs
}
)
class StructuredJSONFormatter(logging.Formatter):
"""JSON formatter for structured logging"""
def format(self, record: logging.LogRecord) -> str:
# Create the base log entry
log_entry = {
'timestamp': self.formatTime(record),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
}
# Add any extra fields from the record
if hasattr(record, '__dict__'):
for key, value in record.__dict__.items():
if key not in ['name', 'msg', 'args', 'levelname', 'levelno',
'pathname', 'filename', 'module', 'exc_info',
'exc_text', 'stack_info', 'lineno', 'funcName',
'created', 'msecs', 'relativeCreated', 'thread',
'threadName', 'processName', 'process', 'message']:
# Convert non-serializable objects to strings
if isinstance(value, (dict, list, tuple)):
log_entry[key] = value
else:
log_entry[key] = str(value)
# Add exception info if present
if record.exc_info:
log_entry['exception'] = self.formatException(record.exc_info)
return json.dumps(log_entry, default=str)
class StructuredJSONHandler(logging.Handler):
"""Handler that outputs structured JSON logs"""
def __init__(self, level=logging.NOTSET):
super().__init__(level)
self.formatter = StructuredJSONFormatter()
def emit(self, record: logging.LogRecord) -> None:
try:
msg = self.format(record)
print(msg) # In production, this would go to a log aggregation system
except Exception:
self.handleError(record)
class RequestTrackingMiddleware(BaseHTTPMiddleware):
"""Middleware for request tracking and correlation IDs"""
def __init__(self, app):
super().__init__(app)
self.logger = StructuredLogger("request_tracking")
async def dispatch(self, request: Request, call_next) -> Response:
# Generate request ID
import uuid
request_id = str(uuid.uuid4())[:8]
# Set context variables
request_id_token = request_id_var.set(request_id)
# Extract or generate correlation ID
correlation_id = request.headers.get('X-Correlation-ID', request_id)
correlation_token = correlation_id_var.set(correlation_id)
# Extract user ID if available (from auth middleware)
user_id = getattr(request.state, 'user_id', None)
if user_id:
user_id_token = user_id_var.set(str(user_id))
else:
user_id_token = None
start_time = time.time()
try:
# Log request start
self.logger.info(
f"Request started: {request.method} {request.url.path}",
method=request.method,
path=request.url.path,
user_agent=request.headers.get('User-Agent'),
ip=request.client.host if request.client else None,
correlation_id=correlation_id
)
# Process request
response = await call_next(request)
# Calculate duration
duration = time.time() - start_time
# Log request completion
self.logger.log_api_request(
method=request.method,
path=request.url.path,
status_code=response.status_code,
duration=duration,
response_size=getattr(response, 'content_length', 0)
)
# Add correlation ID to response headers
response.headers['X-Correlation-ID'] = correlation_id
response.headers['X-Request-ID'] = request_id
return response
except Exception as e:
# Calculate duration for failed requests
duration = time.time() - start_time
# Log error
self.logger.error(
f"Request failed: {request.method} {request.url.path} - {str(e)}",
method=request.method,
path=request.url.path,
duration=duration,
exception=str(e)
)
raise
finally:
# Clean up context variables
request_id_var.reset(request_id_token)
correlation_id_var.reset(correlation_token)
if user_id_token:
user_id_var.reset(user_id_token)
class PerformanceMonitoringMiddleware(BaseHTTPMiddleware):
"""Middleware for performance monitoring and slow request detection"""
def __init__(self, app, slow_request_threshold: float = 1.0):
super().__init__(app)
self.slow_request_threshold = slow_request_threshold
self.logger = StructuredLogger("performance_monitoring")
async def dispatch(self, request: Request, call_next) -> Response:
start_time = time.time()
# Track database queries during request
# This would be enhanced with actual database monitoring
# For now, we'll track basic request performance
response = await call_next(request)
total_duration = time.time() - start_time
# Log slow requests
if total_duration > self.slow_request_threshold:
self.logger.warning(
f"Slow request detected: {request.method} {request.url.path} ({total_duration:.3f}s)",
method=request.method,
path=request.url.path,
duration=total_duration,
status_code=response.status_code,
slow_request=True
)
# Log performance metrics for all requests
self.logger.log_performance(
operation=f"http_{request.method.lower()}_{request.url.path.replace('/', '_')}",
duration=total_duration,
method=request.method,
path=request.url.path,
status_code=response.status_code
)
return response
class DatabaseQueryLogger:
"""Logger for database query performance monitoring"""
def __init__(self):
self.logger = StructuredLogger("database_queries")
def log_query(self, query: str, duration: float, row_count: Optional[int] = None):
"""Log a database query with performance metrics"""
self.logger.log_database_query(
query=query,
duration=duration,
row_count=row_count
)
# Alert on slow queries
if duration > 0.5: # 500ms threshold
self.logger.warning(
f"Slow database query detected: {duration:.3f}s",
query=query[:200],
duration=duration,
slow_query=True
)
# Global instances
structured_logger = StructuredLogger("app")
db_query_logger = DatabaseQueryLogger()
def get_request_id() -> Optional[str]:
"""Get current request ID from context"""
return request_id_var.get()
def get_correlation_id() -> Optional[str]:
"""Get current correlation ID from context"""
return correlation_id_var.get()
def get_user_id() -> Optional[str]:
"""Get current user ID from context"""
return user_id_var.get()
def set_user_context(user_id: Union[str, int]) -> None:
"""Set user context for logging"""
user_id_var.set(str(user_id))
# Convenience functions for common logging patterns
def log_api_request(method: str, path: str, status_code: int, duration: float, **kwargs):
"""Log API request with structured data"""
structured_logger.log_api_request(method, path, status_code, duration, **kwargs)
def log_performance(operation: str, duration: float, **kwargs):
"""Log performance metric"""
structured_logger.log_performance(operation, duration, **kwargs)
def log_security_event(event_type: str, severity: str = "info", **kwargs):
"""Log security event"""
structured_logger.log_security_event(event_type, severity, **kwargs)
def log_database_query(query: str, duration: float, row_count: Optional[int] = None, **kwargs):
"""Log database query"""
db_query_logger.log_query(query, duration, row_count)
structured_logger.log_database_query(query, duration, row_count, **kwargs)