Spaces:
Paused
Paused
File size: 13,786 Bytes
4ae946d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 | """
Advanced Logging and Tracing System
Implements structured logging, distributed tracing, and correlation IDs
"""
import json
import logging
import time
from contextvars import ContextVar
from typing import Any, Dict, Optional, Union
from fastapi import Request, Response
from fastapi.middleware.base import BaseHTTPMiddleware
# Context variables for request tracking
request_id_var: ContextVar[Optional[str]] = ContextVar('request_id', default=None)
user_id_var: ContextVar[Optional[str]] = ContextVar('user_id', default=None)
correlation_id_var: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None)
class StructuredLogger:
"""Enhanced logger with structured logging capabilities"""
def __init__(self, name: str, level: int = logging.INFO):
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
# Remove existing handlers to avoid duplicates
for handler in self.logger.handlers[:]:
self.logger.removeHandler(handler)
# Add structured JSON handler
handler = StructuredJSONHandler()
handler.setLevel(level)
formatter = StructuredJSONFormatter()
handler.setFormatter(formatter)
self.logger.addHandler(handler)
# Add console handler for development
if logging.getLogger().hasHandlers():
console_handler = logging.StreamHandler()
console_handler.setLevel(level)
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(console_formatter)
self.logger.addHandler(console_handler)
def _get_context_data(self) -> Dict[str, Any]:
"""Get current context data for logging"""
context = {}
# Add request tracking data
request_id = request_id_var.get()
if request_id:
context['request_id'] = request_id
correlation_id = correlation_id_var.get()
if correlation_id:
context['correlation_id'] = correlation_id
user_id = user_id_var.get()
if user_id:
context['user_id'] = user_id
return context
def debug(self, message: str, **kwargs):
"""Log debug message with structured data"""
self.logger.debug(message, extra={**self._get_context_data(), **kwargs})
def info(self, message: str, **kwargs):
"""Log info message with structured data"""
self.logger.info(message, extra={**self._get_context_data(), **kwargs})
def warning(self, message: str, **kwargs):
"""Log warning message with structured data"""
self.logger.warning(message, extra={**self._get_context_data(), **kwargs})
def error(self, message: str, exc_info=None, **kwargs):
"""Log error message with structured data"""
self.logger.error(message, exc_info=exc_info, extra={**self._get_context_data(), **kwargs})
def critical(self, message: str, exc_info=None, **kwargs):
"""Log critical message with structured data"""
self.logger.critical(message, exc_info=exc_info, extra={**self._get_context_data(), **kwargs})
def log_performance(self, operation: str, duration: float, **kwargs):
"""Log performance metrics"""
self.logger.info(
f"Performance: {operation} took {duration:.3f}s",
extra={
**self._get_context_data(),
'operation': operation,
'duration_seconds': duration,
'performance_metric': True,
**kwargs
}
)
def log_api_request(self, method: str, path: str, status_code: int, duration: float, **kwargs):
"""Log API request metrics"""
self.logger.info(
f"API Request: {method} {path} -> {status_code} ({duration:.3f}s)",
extra={
**self._get_context_data(),
'method': method,
'path': path,
'status_code': status_code,
'duration_seconds': duration,
'api_request': True,
**kwargs
}
)
def log_database_query(self, query: str, duration: float, row_count: Optional[int] = None, **kwargs):
"""Log database query metrics"""
message = f"DB Query: {query[:100]}{'...' if len(query) > 100 else ''}"
if row_count is not None:
message += f" -> {row_count} rows"
self.logger.info(
f"{message} ({duration:.3f}s)",
extra={
**self._get_context_data(),
'query': query,
'duration_seconds': duration,
'row_count': row_count,
'database_query': True,
**kwargs
}
)
def log_security_event(self, event_type: str, severity: str = "info", **kwargs):
"""Log security-related events"""
self.logger.warning(
f"Security Event: {event_type}",
extra={
**self._get_context_data(),
'event_type': event_type,
'severity': severity,
'security_event': True,
**kwargs
}
)
class StructuredJSONFormatter(logging.Formatter):
"""JSON formatter for structured logging"""
def format(self, record: logging.LogRecord) -> str:
# Create the base log entry
log_entry = {
'timestamp': self.formatTime(record),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
}
# Add any extra fields from the record
if hasattr(record, '__dict__'):
for key, value in record.__dict__.items():
if key not in ['name', 'msg', 'args', 'levelname', 'levelno',
'pathname', 'filename', 'module', 'exc_info',
'exc_text', 'stack_info', 'lineno', 'funcName',
'created', 'msecs', 'relativeCreated', 'thread',
'threadName', 'processName', 'process', 'message']:
# Convert non-serializable objects to strings
if isinstance(value, (dict, list, tuple)):
log_entry[key] = value
else:
log_entry[key] = str(value)
# Add exception info if present
if record.exc_info:
log_entry['exception'] = self.formatException(record.exc_info)
return json.dumps(log_entry, default=str)
class StructuredJSONHandler(logging.Handler):
"""Handler that outputs structured JSON logs"""
def __init__(self, level=logging.NOTSET):
super().__init__(level)
self.formatter = StructuredJSONFormatter()
def emit(self, record: logging.LogRecord) -> None:
try:
msg = self.format(record)
print(msg) # In production, this would go to a log aggregation system
except Exception:
self.handleError(record)
class RequestTrackingMiddleware(BaseHTTPMiddleware):
"""Middleware for request tracking and correlation IDs"""
def __init__(self, app):
super().__init__(app)
self.logger = StructuredLogger("request_tracking")
async def dispatch(self, request: Request, call_next) -> Response:
# Generate request ID
import uuid
request_id = str(uuid.uuid4())[:8]
# Set context variables
request_id_token = request_id_var.set(request_id)
# Extract or generate correlation ID
correlation_id = request.headers.get('X-Correlation-ID', request_id)
correlation_token = correlation_id_var.set(correlation_id)
# Extract user ID if available (from auth middleware)
user_id = getattr(request.state, 'user_id', None)
if user_id:
user_id_token = user_id_var.set(str(user_id))
else:
user_id_token = None
start_time = time.time()
try:
# Log request start
self.logger.info(
f"Request started: {request.method} {request.url.path}",
method=request.method,
path=request.url.path,
user_agent=request.headers.get('User-Agent'),
ip=request.client.host if request.client else None,
correlation_id=correlation_id
)
# Process request
response = await call_next(request)
# Calculate duration
duration = time.time() - start_time
# Log request completion
self.logger.log_api_request(
method=request.method,
path=request.url.path,
status_code=response.status_code,
duration=duration,
response_size=getattr(response, 'content_length', 0)
)
# Add correlation ID to response headers
response.headers['X-Correlation-ID'] = correlation_id
response.headers['X-Request-ID'] = request_id
return response
except Exception as e:
# Calculate duration for failed requests
duration = time.time() - start_time
# Log error
self.logger.error(
f"Request failed: {request.method} {request.url.path} - {str(e)}",
method=request.method,
path=request.url.path,
duration=duration,
exception=str(e)
)
raise
finally:
# Clean up context variables
request_id_var.reset(request_id_token)
correlation_id_var.reset(correlation_token)
if user_id_token:
user_id_var.reset(user_id_token)
class PerformanceMonitoringMiddleware(BaseHTTPMiddleware):
"""Middleware for performance monitoring and slow request detection"""
def __init__(self, app, slow_request_threshold: float = 1.0):
super().__init__(app)
self.slow_request_threshold = slow_request_threshold
self.logger = StructuredLogger("performance_monitoring")
async def dispatch(self, request: Request, call_next) -> Response:
start_time = time.time()
# Track database queries during request
# This would be enhanced with actual database monitoring
# For now, we'll track basic request performance
response = await call_next(request)
total_duration = time.time() - start_time
# Log slow requests
if total_duration > self.slow_request_threshold:
self.logger.warning(
f"Slow request detected: {request.method} {request.url.path} ({total_duration:.3f}s)",
method=request.method,
path=request.url.path,
duration=total_duration,
status_code=response.status_code,
slow_request=True
)
# Log performance metrics for all requests
self.logger.log_performance(
operation=f"http_{request.method.lower()}_{request.url.path.replace('/', '_')}",
duration=total_duration,
method=request.method,
path=request.url.path,
status_code=response.status_code
)
return response
class DatabaseQueryLogger:
"""Logger for database query performance monitoring"""
def __init__(self):
self.logger = StructuredLogger("database_queries")
def log_query(self, query: str, duration: float, row_count: Optional[int] = None):
"""Log a database query with performance metrics"""
self.logger.log_database_query(
query=query,
duration=duration,
row_count=row_count
)
# Alert on slow queries
if duration > 0.5: # 500ms threshold
self.logger.warning(
f"Slow database query detected: {duration:.3f}s",
query=query[:200],
duration=duration,
slow_query=True
)
# Global instances
structured_logger = StructuredLogger("app")
db_query_logger = DatabaseQueryLogger()
def get_request_id() -> Optional[str]:
"""Get current request ID from context"""
return request_id_var.get()
def get_correlation_id() -> Optional[str]:
"""Get current correlation ID from context"""
return correlation_id_var.get()
def get_user_id() -> Optional[str]:
"""Get current user ID from context"""
return user_id_var.get()
def set_user_context(user_id: Union[str, int]) -> None:
"""Set user context for logging"""
user_id_var.set(str(user_id))
# Convenience functions for common logging patterns
def log_api_request(method: str, path: str, status_code: int, duration: float, **kwargs):
"""Log API request with structured data"""
structured_logger.log_api_request(method, path, status_code, duration, **kwargs)
def log_performance(operation: str, duration: float, **kwargs):
"""Log performance metric"""
structured_logger.log_performance(operation, duration, **kwargs)
def log_security_event(event_type: str, severity: str = "info", **kwargs):
"""Log security event"""
structured_logger.log_security_event(event_type, severity, **kwargs)
def log_database_query(query: str, duration: float, row_count: Optional[int] = None, **kwargs):
"""Log database query"""
db_query_logger.log_query(query, duration, row_count)
structured_logger.log_database_query(query, duration, row_count, **kwargs)
|