Spaces:
Paused
Paused
| import logging | |
| import sys | |
| import json | |
| import time | |
| import uuid | |
| import traceback | |
| from datetime import datetime | |
| from pathlib import Path | |
| from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler | |
| from typing import Optional, Dict, Any, Union | |
| from contextvars import ContextVar | |
| from functools import wraps | |
| from ..core.config import settings | |
| # Create logs directory if it doesn't exist | |
| logs_dir = Path("logs") | |
| logs_dir.mkdir(exist_ok=True) | |
| # Correlation ID context | |
| correlation_id: ContextVar[str] = ContextVar('correlation_id', default='') | |
| class StructuredFormatter(logging.Formatter): | |
| """Custom formatter that outputs logs in JSON format""" | |
| def __init__(self): | |
| super().__init__() | |
| self.default_fields = { | |
| 'hostname': settings.PROJECT_NAME, | |
| 'version': settings.VERSION | |
| } | |
| def format(self, record: logging.LogRecord) -> str: | |
| message = { | |
| 'timestamp': datetime.utcfromtimestamp(record.created).isoformat(), | |
| 'level': record.levelname, | |
| 'logger': record.name, | |
| 'message': record.getMessage(), | |
| 'correlation_id': correlation_id.get(), | |
| **self.default_fields | |
| } | |
| if hasattr(record, 'duration'): | |
| message['duration'] = f"{record.duration:.3f}s" | |
| if hasattr(record, 'request_id'): | |
| message['request_id'] = record.request_id | |
| if record.exc_info: | |
| message['exception'] = { | |
| 'type': record.exc_info[0].__name__, | |
| 'message': str(record.exc_info[1]), | |
| 'stacktrace': traceback.format_exception(*record.exc_info) | |
| } | |
| # Add any extra fields | |
| if hasattr(record, 'extra_fields'): | |
| message.update(record.extra_fields) | |
| return json.dumps(message) | |
| class CustomLogger(logging.Logger): | |
| """Enhanced logger with additional functionality""" | |
| def __init__(self, name: str): | |
| super().__init__(name) | |
| self.metrics: Dict[str, Dict[str, Union[int, float]]] = { | |
| 'requests': {'count': 0, 'total_duration': 0}, | |
| 'errors': {'count': 0}, | |
| 'database': {'operations': 0, 'failures': 0} | |
| } | |
| def log_with_context(self, level: int, msg: str, extra_fields: Optional[Dict[str, Any]] = None, **kwargs): | |
| """Log message with additional context""" | |
| if extra_fields: | |
| kwargs['extra'] = {'extra_fields': extra_fields} | |
| self.log(level, msg, **kwargs) | |
| def start_operation(self, operation_name: str) -> float: | |
| """Start timing an operation""" | |
| return time.time() | |
| def end_operation(self, start_time: float, operation_name: str, success: bool = True): | |
| """End timing an operation and log its duration""" | |
| duration = time.time() - start_time | |
| self.info( | |
| f"Operation completed: {operation_name}", | |
| extra={'extra_fields': { | |
| 'operation': operation_name, | |
| 'duration': duration, | |
| 'success': success | |
| }} | |
| ) | |
| return duration | |
| class HealthCheckFilter(logging.Filter): | |
| """Filter to add health check information to log records""" | |
| def filter(self, record): | |
| record.health_check = getattr(record, 'health_check', False) | |
| return True | |
| def setup_logger(name: str) -> CustomLogger: | |
| """Set up enhanced logger with multiple handlers and formatters""" | |
| # Use custom logger class | |
| logging.setLoggerClass(CustomLogger) | |
| logger = logging.getLogger(name) | |
| logger.setLevel(logging.INFO) | |
| # Structured JSON formatter for file logs | |
| json_formatter = StructuredFormatter() | |
| # Console handler with standard formatting | |
| console_handler = logging.StreamHandler(sys.stdout) | |
| console_handler.setFormatter(logging.Formatter( | |
| "%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
| )) | |
| logger.addHandler(console_handler) | |
| # File handler with JSON formatting and daily rotation | |
| daily_handler = TimedRotatingFileHandler( | |
| logs_dir / f"{name}.log", | |
| when="midnight", | |
| interval=1, | |
| backupCount=30, # Keep a month of logs | |
| encoding="utf-8" | |
| ) | |
| daily_handler.setFormatter(json_formatter) | |
| logger.addHandler(daily_handler) | |
| # Error file handler for error-level logs | |
| error_handler = RotatingFileHandler( | |
| logs_dir / f"{name}_error.log", | |
| maxBytes=10485760, # 10MB | |
| backupCount=10 | |
| ) | |
| error_handler.setLevel(logging.ERROR) | |
| error_handler.setFormatter(json_formatter) | |
| logger.addHandler(error_handler) | |
| # Add health check filter | |
| logger.addFilter(HealthCheckFilter()) | |
| # Separate file handler for health check logs | |
| health_handler = TimedRotatingFileHandler( | |
| logs_dir / f"{name}_health_checks.log", | |
| when='midnight', | |
| interval=1, | |
| backupCount=30 | |
| ) | |
| health_handler.setLevel(logging.INFO) | |
| health_format = logging.Formatter( | |
| '%(asctime)s - %(levelname)s - %(message)s - Health: %(health_check)s' | |
| ) | |
| health_handler.setFormatter(health_format) | |
| health_handler.addFilter(lambda record: getattr(record, 'health_check', False)) | |
| logger.addHandler(health_handler) | |
| return logger | |
| # Create main application logger | |
| logger = setup_logger("admin_dashboard") | |
| def with_correlation_id(): | |
| """Decorator to add correlation ID to context""" | |
| def decorator(func): | |
| async def wrapper(*args, **kwargs): | |
| correlation = str(uuid.uuid4()) | |
| token = correlation_id.set(correlation) | |
| try: | |
| return await func(*args, **kwargs) | |
| finally: | |
| correlation_id.reset(token) | |
| return wrapper | |
| return decorator | |
| def log_api_request(method: str, path: str, status_code: int, duration: float): | |
| """Log API request details with enhanced context""" | |
| logger.info( | |
| "API Request", | |
| extra={'extra_fields': { | |
| 'method': method, | |
| 'path': path, | |
| 'status_code': status_code, | |
| 'duration': duration, | |
| 'timestamp': datetime.utcnow().isoformat() | |
| }} | |
| ) | |
| # Update metrics | |
| logger.metrics['requests']['count'] += 1 | |
| logger.metrics['requests']['total_duration'] += duration | |
| def log_error(error: Exception, context: Optional[Dict] = None): | |
| """Log error with enhanced context and tracking""" | |
| logger.error( | |
| f"Error occurred: {str(error)}", | |
| exc_info=True, | |
| extra={'extra_fields': { | |
| 'error_type': type(error).__name__, | |
| 'context': context or {}, | |
| 'timestamp': datetime.utcnow().isoformat() | |
| }} | |
| ) | |
| logger.metrics['errors']['count'] += 1 | |
| def log_database_operation(operation: str, collection: str, success: bool, duration: Optional[float] = None): | |
| """Log database operations with performance metrics""" | |
| logger.info( | |
| "Database Operation", | |
| extra={'extra_fields': { | |
| 'operation_type': operation, | |
| 'collection': collection, | |
| 'success': success, | |
| 'duration': duration, | |
| 'timestamp': datetime.utcnow().isoformat() | |
| }} | |
| ) | |
| logger.metrics['database']['operations'] += 1 | |
| if not success: | |
| logger.metrics['database']['failures'] += 1 | |
| def log_health_check(component: str, status: str, details: dict = None): | |
| """Log health check results""" | |
| msg = f"Health Check - {component}: {status}" | |
| if details: | |
| msg += f" - Details: {details}" | |
| logger.info(msg, extra={'health_check': True}) | |
| def log_maintenance_activity(activity: str, result: str, details: dict = None): | |
| """Log maintenance activities""" | |
| msg = f"Maintenance - {activity}: {result}" | |
| if details: | |
| msg += f" - Details: {details}" | |
| logger.info(msg) | |
| def get_metrics() -> Dict[str, Dict[str, Union[int, float]]]: | |
| """Get current logging metrics""" | |
| return logger.metrics | |
| def reset_metrics(): | |
| """Reset all logging metrics""" | |
| logger.metrics = { | |
| 'requests': {'count': 0, 'total_duration': 0}, | |
| 'errors': {'count': 0}, | |
| 'database': {'operations': 0, 'failures': 0} | |
| } |