""" Production-grade logging configuration for Sync Worker Microservice. Features: - Structured JSON output for log aggregation (ELK, CloudWatch, Datadog, etc.) - Rotating file handlers with size/backup limits - Separate error log file - Console handler with human-readable format for local dev - Worker/entity context injection via LoggerAdapter - Startup/shutdown structured events """ import logging import logging.handlers import sys import os import json import traceback from datetime import datetime, timezone from typing import Optional, Any, Dict from pathlib import Path # ── Constants ──────────────────────────────────────────────────────────────── SERVICE_NAME = "sync-worker-ms" LOG_DIR = Path(os.getenv("LOG_DIR", "logs")) LOG_MAX_BYTES = int(os.getenv("LOG_MAX_BYTES", 50 * 1024 * 1024)) # 50 MB LOG_BACKUP_COUNT = int(os.getenv("LOG_BACKUP_COUNT", "10")) # ── JSON Formatter ──────────────────────────────────────────────────────────── class JSONFormatter(logging.Formatter): """ Emit log records as single-line JSON objects. Every record includes: timestamp – ISO-8601 UTC level – DEBUG / INFO / WARNING / ERROR / CRITICAL logger – dotted module name message – formatted log message service – SERVICE_NAME constant pid – process id Any key/value pairs passed via ``extra={}`` are merged at the top level. Exception info is serialised into ``exception.type``, ``exception.message`` and ``exception.stacktrace``. """ RESERVED = frozenset( { "args", "created", "exc_info", "exc_text", "filename", "funcName", "levelname", "levelno", "lineno", "message", "module", "msecs", "msg", "name", "pathname", "process", "processName", "relativeCreated", "stack_info", "thread", "threadName", } ) def format(self, record: logging.LogRecord) -> str: # Base fields payload: Dict[str, Any] = { "timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(), "level": record.levelname, "logger": record.name, "message": record.getMessage(), "service": SERVICE_NAME, "pid": record.process, } # Merge caller location for WARNING and above if record.levelno >= logging.WARNING: payload["caller"] = f"{record.pathname}:{record.lineno}" # Merge extra fields (skip stdlib internals) for key, val in record.__dict__.items(): if key not in self.RESERVED and not key.startswith("_"): payload[key] = val # Serialise exception if record.exc_info and record.exc_info[0] is not None: exc_type, exc_value, exc_tb = record.exc_info payload["exception"] = { "type": exc_type.__name__, "message": str(exc_value), "stacktrace": traceback.format_exception(exc_type, exc_value, exc_tb), } try: return json.dumps(payload, default=str) except Exception: # Fallback – never let the formatter crash the app payload["message"] = str(record.getMessage()) return json.dumps(payload, default=str) # ── Human-readable formatter (console / local dev) ─────────────────────────── class ConsoleFormatter(logging.Formatter): """Coloured, human-readable formatter for local development.""" GREY = "\x1b[38;5;240m" CYAN = "\x1b[36m" YELLOW = "\x1b[33m" RED = "\x1b[31m" BOLD_RED = "\x1b[1;31m" RESET = "\x1b[0m" LEVEL_COLOURS = { logging.DEBUG: GREY, logging.INFO: CYAN, logging.WARNING: YELLOW, logging.ERROR: RED, logging.CRITICAL: BOLD_RED, } FMT = "%(asctime)s %(levelname)-8s %(name)s — %(message)s" def format(self, record: logging.LogRecord) -> str: colour = self.LEVEL_COLOURS.get(record.levelno, self.RESET) formatter = logging.Formatter( f"{colour}{self.FMT}{self.RESET}", datefmt="%Y-%m-%d %H:%M:%S", ) result = formatter.format(record) # Append extra context fields inline extras = { k: v for k, v in record.__dict__.items() if k not in JSONFormatter.RESERVED and not k.startswith("_") and k not in ("color_message",) } if extras: result += f" {self.GREY}{extras}{self.RESET}" return result # ── Setup ───────────────────────────────────────────────────────────────────── def setup_logging(level: str = "INFO") -> None: """ Configure root logger with production-grade handlers. Handlers created: - stdout – JSON (production) or coloured console (LOG_FORMAT=console) - app.log – rotating JSON, all levels ≥ level - error.log – rotating JSON, ERROR and above only Args: level: Minimum log level string (DEBUG / INFO / WARNING / ERROR) """ numeric_level = getattr(logging, level.upper(), logging.INFO) log_format = os.getenv("LOG_FORMAT", "json").lower() # Ensure log directory exists LOG_DIR.mkdir(parents=True, exist_ok=True) root = logging.getLogger() root.setLevel(numeric_level) # Remove any handlers added by earlier basicConfig calls root.handlers.clear() # ── stdout handler ──────────────────────────────────────────────────────── stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setLevel(numeric_level) if log_format == "console": stdout_handler.setFormatter(ConsoleFormatter()) else: stdout_handler.setFormatter(JSONFormatter()) root.addHandler(stdout_handler) # ── Rotating app log (all levels) ───────────────────────────────────────── app_log_path = LOG_DIR / "app.log" file_handler = logging.handlers.RotatingFileHandler( filename=app_log_path, maxBytes=LOG_MAX_BYTES, backupCount=LOG_BACKUP_COUNT, encoding="utf-8", ) file_handler.setLevel(numeric_level) file_handler.setFormatter(JSONFormatter()) root.addHandler(file_handler) # ── Rotating error log (ERROR+) ─────────────────────────────────────────── error_log_path = LOG_DIR / "error.log" error_handler = logging.handlers.RotatingFileHandler( filename=error_log_path, maxBytes=LOG_MAX_BYTES, backupCount=LOG_BACKUP_COUNT, encoding="utf-8", ) error_handler.setLevel(logging.ERROR) error_handler.setFormatter(JSONFormatter()) root.addHandler(error_handler) # Silence noisy third-party loggers for noisy in ("motor", "pymongo", "asyncpg", "sqlalchemy.engine", "aioredis"): logging.getLogger(noisy).setLevel(logging.WARNING) # Emit a structured startup event so log pipelines can detect restarts startup_logger = logging.getLogger(SERVICE_NAME) startup_logger.info( "Logging initialised", extra={ "event": "logging_init", "log_level": level.upper(), "log_format": log_format, "log_dir": str(LOG_DIR.resolve()), "app_log": str(app_log_path.resolve()), "error_log": str(error_log_path.resolve()), }, ) # ── Logger factory ──────────────────────────────────────────────────────────── def get_logger(name: str) -> logging.Logger: """Return a standard Logger for the given module name.""" return logging.getLogger(name) def get_worker_logger(name: str, entity_type: str, worker_id: int) -> logging.LoggerAdapter: """ Return a LoggerAdapter that automatically injects worker context into every log record, so you never have to repeat entity_type / worker_id in extra={}. Usage:: logger = get_worker_logger(__name__, "catalogue", 3) logger.info("Processing item", extra={"entity_id": "abc-123"}) # → {"entity_type": "catalogue", "worker_id": 3, "entity_id": "abc-123", ...} """ base = logging.getLogger(name) return logging.LoggerAdapter(base, {"entity_type": entity_type, "worker_id": worker_id}) # ── Structured event helpers ────────────────────────────────────────────────── def log_sync_start(logger: logging.Logger, entity_type: str, entity_id: str, operation: str, worker_id: int) -> None: logger.info( "Sync started", extra={ "event": "sync_start", "entity_type": entity_type, "entity_id": entity_id, "operation": operation, "worker_id": worker_id, }, ) def log_sync_success( logger: logging.Logger, entity_type: str, entity_id: str, operation: str, worker_id: int, duration_ms: float, ) -> None: logger.info( "Sync succeeded", extra={ "event": "sync_success", "entity_type": entity_type, "entity_id": entity_id, "operation": operation, "worker_id": worker_id, "duration_ms": round(duration_ms, 2), }, ) def log_sync_failure( logger: logging.Logger, entity_type: str, entity_id: str, operation: str, worker_id: int, duration_ms: float, error: str, attempt: int, max_attempts: int, ) -> None: logger.error( "Sync failed", extra={ "event": "sync_failure", "entity_type": entity_type, "entity_id": entity_id, "operation": operation, "worker_id": worker_id, "duration_ms": round(duration_ms, 2), "error": error, "attempt": attempt, "max_attempts": max_attempts, }, ) def log_worker_heartbeat( logger: logging.Logger, entity_type: str, worker_id: int, iteration: int, queue_size: int, ) -> None: logger.info( "Worker heartbeat", extra={ "event": "worker_heartbeat", "entity_type": entity_type, "worker_id": worker_id, "iteration": iteration, "queue_size": queue_size, }, ) def log_service_lifecycle(logger: logging.Logger, event: str, **kwargs: Any) -> None: """Emit a structured lifecycle event (startup, shutdown, connection, etc.).""" logger.info( event.replace("_", " ").capitalize(), extra={"event": event, **kwargs}, )