Spaces:
Runtime error
Runtime error
| """ | |
| Production-grade logging configuration for Sync Worker Microservice. | |
| Features: | |
| - Structured JSON output for log aggregation (ELK, CloudWatch, Datadog, etc.) | |
| - Rotating file handlers with size/backup limits | |
| - Separate error log file | |
| - Console handler with human-readable format for local dev | |
| - Worker/entity context injection via LoggerAdapter | |
| - Startup/shutdown structured events | |
| """ | |
| import logging | |
| import logging.handlers | |
| import sys | |
| import os | |
| import json | |
| import traceback | |
| from datetime import datetime, timezone | |
| from typing import Optional, Any, Dict | |
| from pathlib import Path | |
| # ββ Constants ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SERVICE_NAME = "sync-worker-ms" | |
| LOG_DIR = Path(os.getenv("LOG_DIR", "logs")) | |
| LOG_MAX_BYTES = int(os.getenv("LOG_MAX_BYTES", 50 * 1024 * 1024)) # 50 MB | |
| LOG_BACKUP_COUNT = int(os.getenv("LOG_BACKUP_COUNT", "10")) | |
| # ββ JSON Formatter ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class JSONFormatter(logging.Formatter): | |
| """ | |
| Emit log records as single-line JSON objects. | |
| Every record includes: | |
| timestamp β ISO-8601 UTC | |
| level β DEBUG / INFO / WARNING / ERROR / CRITICAL | |
| logger β dotted module name | |
| message β formatted log message | |
| service β SERVICE_NAME constant | |
| pid β process id | |
| Any key/value pairs passed via ``extra={}`` are merged at the top level. | |
| Exception info is serialised into ``exception.type``, ``exception.message`` | |
| and ``exception.stacktrace``. | |
| """ | |
| RESERVED = frozenset( | |
| { | |
| "args", "created", "exc_info", "exc_text", "filename", | |
| "funcName", "levelname", "levelno", "lineno", "message", | |
| "module", "msecs", "msg", "name", "pathname", "process", | |
| "processName", "relativeCreated", "stack_info", "thread", | |
| "threadName", | |
| } | |
| ) | |
| def format(self, record: logging.LogRecord) -> str: | |
| # Base fields | |
| payload: Dict[str, Any] = { | |
| "timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(), | |
| "level": record.levelname, | |
| "logger": record.name, | |
| "message": record.getMessage(), | |
| "service": SERVICE_NAME, | |
| "pid": record.process, | |
| } | |
| # Merge caller location for WARNING and above | |
| if record.levelno >= logging.WARNING: | |
| payload["caller"] = f"{record.pathname}:{record.lineno}" | |
| # Merge extra fields (skip stdlib internals) | |
| for key, val in record.__dict__.items(): | |
| if key not in self.RESERVED and not key.startswith("_"): | |
| payload[key] = val | |
| # Serialise exception | |
| if record.exc_info and record.exc_info[0] is not None: | |
| exc_type, exc_value, exc_tb = record.exc_info | |
| payload["exception"] = { | |
| "type": exc_type.__name__, | |
| "message": str(exc_value), | |
| "stacktrace": traceback.format_exception(exc_type, exc_value, exc_tb), | |
| } | |
| try: | |
| return json.dumps(payload, default=str) | |
| except Exception: | |
| # Fallback β never let the formatter crash the app | |
| payload["message"] = str(record.getMessage()) | |
| return json.dumps(payload, default=str) | |
| # ββ Human-readable formatter (console / local dev) βββββββββββββββββββββββββββ | |
| class ConsoleFormatter(logging.Formatter): | |
| """Coloured, human-readable formatter for local development.""" | |
| GREY = "\x1b[38;5;240m" | |
| CYAN = "\x1b[36m" | |
| YELLOW = "\x1b[33m" | |
| RED = "\x1b[31m" | |
| BOLD_RED = "\x1b[1;31m" | |
| RESET = "\x1b[0m" | |
| LEVEL_COLOURS = { | |
| logging.DEBUG: GREY, | |
| logging.INFO: CYAN, | |
| logging.WARNING: YELLOW, | |
| logging.ERROR: RED, | |
| logging.CRITICAL: BOLD_RED, | |
| } | |
| FMT = "%(asctime)s %(levelname)-8s %(name)s β %(message)s" | |
| def format(self, record: logging.LogRecord) -> str: | |
| colour = self.LEVEL_COLOURS.get(record.levelno, self.RESET) | |
| formatter = logging.Formatter( | |
| f"{colour}{self.FMT}{self.RESET}", | |
| datefmt="%Y-%m-%d %H:%M:%S", | |
| ) | |
| result = formatter.format(record) | |
| # Append extra context fields inline | |
| extras = { | |
| k: v for k, v in record.__dict__.items() | |
| if k not in JSONFormatter.RESERVED and not k.startswith("_") | |
| and k not in ("color_message",) | |
| } | |
| if extras: | |
| result += f" {self.GREY}{extras}{self.RESET}" | |
| return result | |
| # ββ Setup βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def setup_logging(level: str = "INFO") -> None: | |
| """ | |
| Configure root logger with production-grade handlers. | |
| Handlers created: | |
| - stdout β JSON (production) or coloured console (LOG_FORMAT=console) | |
| - app.log β rotating JSON, all levels β₯ level | |
| - error.log β rotating JSON, ERROR and above only | |
| Args: | |
| level: Minimum log level string (DEBUG / INFO / WARNING / ERROR) | |
| """ | |
| numeric_level = getattr(logging, level.upper(), logging.INFO) | |
| log_format = os.getenv("LOG_FORMAT", "json").lower() | |
| # Ensure log directory exists | |
| LOG_DIR.mkdir(parents=True, exist_ok=True) | |
| root = logging.getLogger() | |
| root.setLevel(numeric_level) | |
| # Remove any handlers added by earlier basicConfig calls | |
| root.handlers.clear() | |
| # ββ stdout handler ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| stdout_handler = logging.StreamHandler(sys.stdout) | |
| stdout_handler.setLevel(numeric_level) | |
| if log_format == "console": | |
| stdout_handler.setFormatter(ConsoleFormatter()) | |
| else: | |
| stdout_handler.setFormatter(JSONFormatter()) | |
| root.addHandler(stdout_handler) | |
| # ββ Rotating app log (all levels) βββββββββββββββββββββββββββββββββββββββββ | |
| app_log_path = LOG_DIR / "app.log" | |
| file_handler = logging.handlers.RotatingFileHandler( | |
| filename=app_log_path, | |
| maxBytes=LOG_MAX_BYTES, | |
| backupCount=LOG_BACKUP_COUNT, | |
| encoding="utf-8", | |
| ) | |
| file_handler.setLevel(numeric_level) | |
| file_handler.setFormatter(JSONFormatter()) | |
| root.addHandler(file_handler) | |
| # ββ Rotating error log (ERROR+) βββββββββββββββββββββββββββββββββββββββββββ | |
| error_log_path = LOG_DIR / "error.log" | |
| error_handler = logging.handlers.RotatingFileHandler( | |
| filename=error_log_path, | |
| maxBytes=LOG_MAX_BYTES, | |
| backupCount=LOG_BACKUP_COUNT, | |
| encoding="utf-8", | |
| ) | |
| error_handler.setLevel(logging.ERROR) | |
| error_handler.setFormatter(JSONFormatter()) | |
| root.addHandler(error_handler) | |
| # Silence noisy third-party loggers | |
| for noisy in ("motor", "pymongo", "asyncpg", "sqlalchemy.engine", "aioredis"): | |
| logging.getLogger(noisy).setLevel(logging.WARNING) | |
| # Emit a structured startup event so log pipelines can detect restarts | |
| startup_logger = logging.getLogger(SERVICE_NAME) | |
| startup_logger.info( | |
| "Logging initialised", | |
| extra={ | |
| "event": "logging_init", | |
| "log_level": level.upper(), | |
| "log_format": log_format, | |
| "log_dir": str(LOG_DIR.resolve()), | |
| "app_log": str(app_log_path.resolve()), | |
| "error_log": str(error_log_path.resolve()), | |
| }, | |
| ) | |
| # ββ Logger factory ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_logger(name: str) -> logging.Logger: | |
| """Return a standard Logger for the given module name.""" | |
| return logging.getLogger(name) | |
| def get_worker_logger(name: str, entity_type: str, worker_id: int) -> logging.LoggerAdapter: | |
| """ | |
| Return a LoggerAdapter that automatically injects worker context into every | |
| log record, so you never have to repeat entity_type / worker_id in extra={}. | |
| Usage:: | |
| logger = get_worker_logger(__name__, "catalogue", 3) | |
| logger.info("Processing item", extra={"entity_id": "abc-123"}) | |
| # β {"entity_type": "catalogue", "worker_id": 3, "entity_id": "abc-123", ...} | |
| """ | |
| base = logging.getLogger(name) | |
| return logging.LoggerAdapter(base, {"entity_type": entity_type, "worker_id": worker_id}) | |
| # ββ Structured event helpers ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def log_sync_start(logger: logging.Logger, entity_type: str, entity_id: str, operation: str, worker_id: int) -> None: | |
| logger.info( | |
| "Sync started", | |
| extra={ | |
| "event": "sync_start", | |
| "entity_type": entity_type, | |
| "entity_id": entity_id, | |
| "operation": operation, | |
| "worker_id": worker_id, | |
| }, | |
| ) | |
| def log_sync_success( | |
| logger: logging.Logger, | |
| entity_type: str, | |
| entity_id: str, | |
| operation: str, | |
| worker_id: int, | |
| duration_ms: float, | |
| ) -> None: | |
| logger.info( | |
| "Sync succeeded", | |
| extra={ | |
| "event": "sync_success", | |
| "entity_type": entity_type, | |
| "entity_id": entity_id, | |
| "operation": operation, | |
| "worker_id": worker_id, | |
| "duration_ms": round(duration_ms, 2), | |
| }, | |
| ) | |
| def log_sync_failure( | |
| logger: logging.Logger, | |
| entity_type: str, | |
| entity_id: str, | |
| operation: str, | |
| worker_id: int, | |
| duration_ms: float, | |
| error: str, | |
| attempt: int, | |
| max_attempts: int, | |
| ) -> None: | |
| logger.error( | |
| "Sync failed", | |
| extra={ | |
| "event": "sync_failure", | |
| "entity_type": entity_type, | |
| "entity_id": entity_id, | |
| "operation": operation, | |
| "worker_id": worker_id, | |
| "duration_ms": round(duration_ms, 2), | |
| "error": error, | |
| "attempt": attempt, | |
| "max_attempts": max_attempts, | |
| }, | |
| ) | |
| def log_worker_heartbeat( | |
| logger: logging.Logger, | |
| entity_type: str, | |
| worker_id: int, | |
| iteration: int, | |
| queue_size: int, | |
| ) -> None: | |
| logger.info( | |
| "Worker heartbeat", | |
| extra={ | |
| "event": "worker_heartbeat", | |
| "entity_type": entity_type, | |
| "worker_id": worker_id, | |
| "iteration": iteration, | |
| "queue_size": queue_size, | |
| }, | |
| ) | |
| def log_service_lifecycle(logger: logging.Logger, event: str, **kwargs: Any) -> None: | |
| """Emit a structured lifecycle event (startup, shutdown, connection, etc.).""" | |
| logger.info( | |
| event.replace("_", " ").capitalize(), | |
| extra={"event": event, **kwargs}, | |
| ) | |