MukeshKapoor25's picture
feat(logging): implement production-grade structured logging with JSON and rotating files
f2bfacd
"""
Production-grade logging configuration for Sync Worker Microservice.
Features:
- Structured JSON output for log aggregation (ELK, CloudWatch, Datadog, etc.)
- Rotating file handlers with size/backup limits
- Separate error log file
- Console handler with human-readable format for local dev
- Worker/entity context injection via LoggerAdapter
- Startup/shutdown structured events
"""
import logging
import logging.handlers
import sys
import os
import json
import traceback
from datetime import datetime, timezone
from typing import Optional, Any, Dict
from pathlib import Path
# ── Constants ────────────────────────────────────────────────────────────────
SERVICE_NAME = "sync-worker-ms"
LOG_DIR = Path(os.getenv("LOG_DIR", "logs"))
LOG_MAX_BYTES = int(os.getenv("LOG_MAX_BYTES", 50 * 1024 * 1024)) # 50 MB
LOG_BACKUP_COUNT = int(os.getenv("LOG_BACKUP_COUNT", "10"))
# ── JSON Formatter ────────────────────────────────────────────────────────────
class JSONFormatter(logging.Formatter):
"""
Emit log records as single-line JSON objects.
Every record includes:
timestamp – ISO-8601 UTC
level – DEBUG / INFO / WARNING / ERROR / CRITICAL
logger – dotted module name
message – formatted log message
service – SERVICE_NAME constant
pid – process id
Any key/value pairs passed via ``extra={}`` are merged at the top level.
Exception info is serialised into ``exception.type``, ``exception.message``
and ``exception.stacktrace``.
"""
RESERVED = frozenset(
{
"args", "created", "exc_info", "exc_text", "filename",
"funcName", "levelname", "levelno", "lineno", "message",
"module", "msecs", "msg", "name", "pathname", "process",
"processName", "relativeCreated", "stack_info", "thread",
"threadName",
}
)
def format(self, record: logging.LogRecord) -> str:
# Base fields
payload: Dict[str, Any] = {
"timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"service": SERVICE_NAME,
"pid": record.process,
}
# Merge caller location for WARNING and above
if record.levelno >= logging.WARNING:
payload["caller"] = f"{record.pathname}:{record.lineno}"
# Merge extra fields (skip stdlib internals)
for key, val in record.__dict__.items():
if key not in self.RESERVED and not key.startswith("_"):
payload[key] = val
# Serialise exception
if record.exc_info and record.exc_info[0] is not None:
exc_type, exc_value, exc_tb = record.exc_info
payload["exception"] = {
"type": exc_type.__name__,
"message": str(exc_value),
"stacktrace": traceback.format_exception(exc_type, exc_value, exc_tb),
}
try:
return json.dumps(payload, default=str)
except Exception:
# Fallback – never let the formatter crash the app
payload["message"] = str(record.getMessage())
return json.dumps(payload, default=str)
# ── Human-readable formatter (console / local dev) ───────────────────────────
class ConsoleFormatter(logging.Formatter):
"""Coloured, human-readable formatter for local development."""
GREY = "\x1b[38;5;240m"
CYAN = "\x1b[36m"
YELLOW = "\x1b[33m"
RED = "\x1b[31m"
BOLD_RED = "\x1b[1;31m"
RESET = "\x1b[0m"
LEVEL_COLOURS = {
logging.DEBUG: GREY,
logging.INFO: CYAN,
logging.WARNING: YELLOW,
logging.ERROR: RED,
logging.CRITICAL: BOLD_RED,
}
FMT = "%(asctime)s %(levelname)-8s %(name)s β€” %(message)s"
def format(self, record: logging.LogRecord) -> str:
colour = self.LEVEL_COLOURS.get(record.levelno, self.RESET)
formatter = logging.Formatter(
f"{colour}{self.FMT}{self.RESET}",
datefmt="%Y-%m-%d %H:%M:%S",
)
result = formatter.format(record)
# Append extra context fields inline
extras = {
k: v for k, v in record.__dict__.items()
if k not in JSONFormatter.RESERVED and not k.startswith("_")
and k not in ("color_message",)
}
if extras:
result += f" {self.GREY}{extras}{self.RESET}"
return result
# ── Setup ─────────────────────────────────────────────────────────────────────
def setup_logging(level: str = "INFO") -> None:
"""
Configure root logger with production-grade handlers.
Handlers created:
- stdout – JSON (production) or coloured console (LOG_FORMAT=console)
- app.log – rotating JSON, all levels β‰₯ level
- error.log – rotating JSON, ERROR and above only
Args:
level: Minimum log level string (DEBUG / INFO / WARNING / ERROR)
"""
numeric_level = getattr(logging, level.upper(), logging.INFO)
log_format = os.getenv("LOG_FORMAT", "json").lower()
# Ensure log directory exists
LOG_DIR.mkdir(parents=True, exist_ok=True)
root = logging.getLogger()
root.setLevel(numeric_level)
# Remove any handlers added by earlier basicConfig calls
root.handlers.clear()
# ── stdout handler ────────────────────────────────────────────────────────
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(numeric_level)
if log_format == "console":
stdout_handler.setFormatter(ConsoleFormatter())
else:
stdout_handler.setFormatter(JSONFormatter())
root.addHandler(stdout_handler)
# ── Rotating app log (all levels) ─────────────────────────────────────────
app_log_path = LOG_DIR / "app.log"
file_handler = logging.handlers.RotatingFileHandler(
filename=app_log_path,
maxBytes=LOG_MAX_BYTES,
backupCount=LOG_BACKUP_COUNT,
encoding="utf-8",
)
file_handler.setLevel(numeric_level)
file_handler.setFormatter(JSONFormatter())
root.addHandler(file_handler)
# ── Rotating error log (ERROR+) ───────────────────────────────────────────
error_log_path = LOG_DIR / "error.log"
error_handler = logging.handlers.RotatingFileHandler(
filename=error_log_path,
maxBytes=LOG_MAX_BYTES,
backupCount=LOG_BACKUP_COUNT,
encoding="utf-8",
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(JSONFormatter())
root.addHandler(error_handler)
# Silence noisy third-party loggers
for noisy in ("motor", "pymongo", "asyncpg", "sqlalchemy.engine", "aioredis"):
logging.getLogger(noisy).setLevel(logging.WARNING)
# Emit a structured startup event so log pipelines can detect restarts
startup_logger = logging.getLogger(SERVICE_NAME)
startup_logger.info(
"Logging initialised",
extra={
"event": "logging_init",
"log_level": level.upper(),
"log_format": log_format,
"log_dir": str(LOG_DIR.resolve()),
"app_log": str(app_log_path.resolve()),
"error_log": str(error_log_path.resolve()),
},
)
# ── Logger factory ────────────────────────────────────────────────────────────
def get_logger(name: str) -> logging.Logger:
"""Return a standard Logger for the given module name."""
return logging.getLogger(name)
def get_worker_logger(name: str, entity_type: str, worker_id: int) -> logging.LoggerAdapter:
"""
Return a LoggerAdapter that automatically injects worker context into every
log record, so you never have to repeat entity_type / worker_id in extra={}.
Usage::
logger = get_worker_logger(__name__, "catalogue", 3)
logger.info("Processing item", extra={"entity_id": "abc-123"})
# β†’ {"entity_type": "catalogue", "worker_id": 3, "entity_id": "abc-123", ...}
"""
base = logging.getLogger(name)
return logging.LoggerAdapter(base, {"entity_type": entity_type, "worker_id": worker_id})
# ── Structured event helpers ──────────────────────────────────────────────────
def log_sync_start(logger: logging.Logger, entity_type: str, entity_id: str, operation: str, worker_id: int) -> None:
logger.info(
"Sync started",
extra={
"event": "sync_start",
"entity_type": entity_type,
"entity_id": entity_id,
"operation": operation,
"worker_id": worker_id,
},
)
def log_sync_success(
logger: logging.Logger,
entity_type: str,
entity_id: str,
operation: str,
worker_id: int,
duration_ms: float,
) -> None:
logger.info(
"Sync succeeded",
extra={
"event": "sync_success",
"entity_type": entity_type,
"entity_id": entity_id,
"operation": operation,
"worker_id": worker_id,
"duration_ms": round(duration_ms, 2),
},
)
def log_sync_failure(
logger: logging.Logger,
entity_type: str,
entity_id: str,
operation: str,
worker_id: int,
duration_ms: float,
error: str,
attempt: int,
max_attempts: int,
) -> None:
logger.error(
"Sync failed",
extra={
"event": "sync_failure",
"entity_type": entity_type,
"entity_id": entity_id,
"operation": operation,
"worker_id": worker_id,
"duration_ms": round(duration_ms, 2),
"error": error,
"attempt": attempt,
"max_attempts": max_attempts,
},
)
def log_worker_heartbeat(
logger: logging.Logger,
entity_type: str,
worker_id: int,
iteration: int,
queue_size: int,
) -> None:
logger.info(
"Worker heartbeat",
extra={
"event": "worker_heartbeat",
"entity_type": entity_type,
"worker_id": worker_id,
"iteration": iteration,
"queue_size": queue_size,
},
)
def log_service_lifecycle(logger: logging.Logger, event: str, **kwargs: Any) -> None:
"""Emit a structured lifecycle event (startup, shutdown, connection, etc.)."""
logger.info(
event.replace("_", " ").capitalize(),
extra={"event": event, **kwargs},
)