Spaces:
Paused
Paused
| # /// zerodep | |
| # version = "0.3.0" | |
| # deps = [] | |
| # tier = "medium" | |
| # category = "devtools" | |
| # note = "Install/update via `zerodep add structlog`" | |
| # /// | |
| """Zero-dependency structured logging with pretty console output. | |
| Part of zerodep: https://github.com/Oaklight/zerodep | |
| Copyright (c) 2026 Peng Ding. MIT License. | |
| Structured logging library inspired by structlog, with a loguru-style | |
| colored console renderer. Provides bound loggers with context propagation, | |
| a processor pipeline, and multiple output renderers (console, JSON, key-value). | |
| Quick start (pretty console output, zero config):: | |
| from structlog import get_logger | |
| logger = get_logger() | |
| logger.info("server started", host="0.0.0.0", port=8080) | |
| Bound logger (context propagation):: | |
| log = get_logger().bind(request_id="abc-123") | |
| log.info("handling request") | |
| log = log.bind(user_id=42) | |
| log.info("authenticated") | |
| One-call setup with stdlib integration:: | |
| from structlog import setup_logging | |
| logger = setup_logging(level="DEBUG", renderer="json") | |
| logger.info("structured", key="value") | |
| Custom processor pipeline:: | |
| from structlog import configure, add_log_level, TimeStamper, JSONRenderer | |
| configure(processors=[add_log_level, TimeStamper(), JSONRenderer()]) | |
| """ | |
| from __future__ import annotations | |
| import dataclasses | |
| import datetime | |
| import json as _json | |
| import logging | |
| import os | |
| import sys | |
| import traceback | |
| from typing import IO, Any, Callable | |
| __all__ = [ | |
| # Type aliases | |
| "EventDict", | |
| "Processor", | |
| "LoggerFactory", | |
| # Exceptions | |
| "DropEvent", | |
| # Logger factories | |
| "PrintLogger", | |
| "PrintLoggerFactory", | |
| "StdlibLoggerFactory", | |
| # Processors | |
| "add_log_level", | |
| "add_logger_name", | |
| "TimeStamper", | |
| "format_exc_info", | |
| # Renderers | |
| "KeyValueRenderer", | |
| "JSONRenderer", | |
| "ConsoleRenderer", | |
| # Bound logger | |
| "BoundLogger", | |
| # Configuration | |
| "configure", | |
| "reset_defaults", | |
| "get_config", | |
| "get_logger", | |
| "wrap_logger", | |
| # Setup helper | |
| "setup_logging", | |
| # Utilities | |
| "truncate_string", | |
| "truncate_base64", | |
| ] | |
| # ββ Type Aliases βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| EventDict = dict[str, Any] | |
| """A log event dictionary passed through the processor pipeline.""" | |
| Processor = Callable[[Any, str, EventDict], EventDict | str] | |
| """Signature: (logger, method_name, event_dict) -> event_dict or rendered str.""" | |
| LoggerFactory = Callable[..., Any] | |
| """Callable that creates an underlying logger instance.""" | |
| # ββ Exceptions βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class DropEvent(Exception): | |
| """Raise inside a processor to silently discard the current log event.""" | |
| # ββ Level Mapping ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _NAME_TO_LEVEL: dict[str, int] = { | |
| "debug": logging.DEBUG, | |
| "info": logging.INFO, | |
| "warning": logging.WARNING, | |
| "warn": logging.WARNING, | |
| "error": logging.ERROR, | |
| "critical": logging.CRITICAL, | |
| "fatal": logging.CRITICAL, | |
| "exception": logging.ERROR, | |
| } | |
| _LEVEL_TO_NAME: dict[int, str] = { | |
| logging.DEBUG: "debug", | |
| logging.INFO: "info", | |
| logging.WARNING: "warning", | |
| logging.ERROR: "error", | |
| logging.CRITICAL: "critical", | |
| } | |
| # ββ ANSI Colors ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class _Colors: | |
| """ANSI escape codes for terminal colorization.""" | |
| RESET = "\033[0m" | |
| BOLD = "\033[1m" | |
| DIM = "\033[2m" | |
| ITALIC = "\033[3m" | |
| UNDERLINE = "\033[4m" | |
| STRIKETHROUGH = "\033[9m" | |
| BLACK = "\033[30m" | |
| RED = "\033[31m" | |
| GREEN = "\033[32m" | |
| YELLOW = "\033[33m" | |
| BLUE = "\033[34m" | |
| MAGENTA = "\033[35m" | |
| CYAN = "\033[36m" | |
| WHITE = "\033[37m" | |
| BRIGHT_BLACK = "\033[90m" | |
| BRIGHT_RED = "\033[91m" | |
| BRIGHT_GREEN = "\033[92m" | |
| BRIGHT_YELLOW = "\033[93m" | |
| BRIGHT_BLUE = "\033[94m" | |
| BRIGHT_MAGENTA = "\033[95m" | |
| BRIGHT_CYAN = "\033[96m" | |
| BRIGHT_WHITE = "\033[97m" | |
| _LEVEL_STYLES: dict[str, str] = { | |
| "debug": _Colors.BLUE, | |
| "info": _Colors.BRIGHT_WHITE, | |
| "warning": _Colors.YELLOW, | |
| "warn": _Colors.YELLOW, | |
| "error": _Colors.RED, | |
| "critical": _Colors.BRIGHT_RED + _Colors.BOLD, | |
| "fatal": _Colors.BRIGHT_RED + _Colors.BOLD, | |
| "exception": _Colors.RED, | |
| } | |
| _LEVEL_LABEL_STYLES: dict[str, str] = { | |
| "debug": _Colors.CYAN, | |
| "info": _Colors.GREEN, | |
| "warning": _Colors.YELLOW, | |
| "warn": _Colors.YELLOW, | |
| "error": _Colors.RED, | |
| "critical": _Colors.BRIGHT_RED + _Colors.BOLD, | |
| "fatal": _Colors.BRIGHT_RED + _Colors.BOLD, | |
| "exception": _Colors.RED, | |
| } | |
| _LEVEL_LABELS: dict[str, str] = { | |
| "debug": "DEBUG ", | |
| "info": "INFO ", | |
| "warning": "WARNING ", | |
| "warn": "WARNING ", | |
| "error": "ERROR ", | |
| "critical": "CRITICAL", | |
| "fatal": "FATAL ", | |
| "exception": "ERROR ", | |
| } | |
| def _supports_color(stream: Any = None) -> bool: | |
| """Check if the output stream supports ANSI color codes. | |
| Respects the ``FORCE_COLOR`` and ``NO_COLOR`` environment variables | |
| (see https://force-color.org/ and https://no-color.org/). | |
| Args: | |
| stream: The output stream to check. Defaults to ``sys.stderr``. | |
| Returns: | |
| True if the stream is a color-capable terminal. | |
| """ | |
| if os.environ.get("FORCE_COLOR"): | |
| return True | |
| if os.environ.get("NO_COLOR"): | |
| return False | |
| s = stream or sys.stderr | |
| if not hasattr(s, "isatty") or not s.isatty(): | |
| return False | |
| if os.environ.get("TERM", "") == "dumb": | |
| return False | |
| return True | |
| # ββ Logger Factories βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class PrintLogger: | |
| """Minimal logger that writes to a file handle via ``print()``. | |
| This is the default underlying logger. It has no filtering, no | |
| handler chain -- just ``print()`` to the configured stream. | |
| Args: | |
| file: Output stream. Defaults to ``sys.stderr``. | |
| """ | |
| def __init__(self, file: IO[str] | None = None) -> None: | |
| self._file = file or sys.stderr | |
| self._write = self._file.write | |
| self._flush = self._file.flush | |
| def msg(self, message: str) -> None: | |
| """Write *message* followed by a newline.""" | |
| self._write(message + "\n") | |
| self._flush() | |
| debug = info = warning = warn = error = critical = fatal = msg | |
| class PrintLoggerFactory: | |
| """Factory that creates :class:`PrintLogger` instances. | |
| Args: | |
| file: Output stream passed to each ``PrintLogger``. | |
| """ | |
| def __init__(self, file: IO[str] | None = None) -> None: | |
| self._file = file | |
| def __call__(self, *args: Any) -> PrintLogger: | |
| return PrintLogger(file=self._file) | |
| class StdlibLoggerFactory: | |
| """Factory that returns a :class:`logging.Logger` from the stdlib. | |
| Args: | |
| name: Logger name passed to ``logging.getLogger()``. | |
| If *None*, uses the root logger. | |
| """ | |
| def __init__(self, name: str | None = None) -> None: | |
| self._name = name | |
| def __call__(self, *args: Any) -> logging.Logger: | |
| if args: | |
| return logging.getLogger(args[0]) | |
| return logging.getLogger(self._name) | |
| # ββ Built-in Processors ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def add_log_level(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: | |
| """Add ``level`` key derived from the log method name. | |
| Example: | |
| ``logger.info(...)`` -> ``event_dict["level"] = "info"`` | |
| """ | |
| event_dict["level"] = method_name | |
| return event_dict | |
| def add_logger_name(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: | |
| """Add ``logger`` key from the underlying logger's name.""" | |
| name = getattr(logger, "name", None) or "" | |
| event_dict["logger"] = name | |
| return event_dict | |
| class TimeStamper: | |
| """Processor that adds a timestamp to the event dict. | |
| Args: | |
| fmt: Timestamp format. ``"iso"`` for ISO 8601, ``None`` for a | |
| UNIX float, or a :func:`~datetime.datetime.strftime` format | |
| string. Defaults to ``"iso"``. | |
| utc: If *True*, use UTC; otherwise local time. | |
| key: Dict key for the timestamp. Defaults to ``"timestamp"``. | |
| """ | |
| __slots__ = ("_fmt", "_utc", "_key") | |
| def __init__( | |
| self, | |
| fmt: str | None = "iso", | |
| utc: bool = True, | |
| key: str = "timestamp", | |
| ) -> None: | |
| self._fmt = fmt | |
| self._utc = utc | |
| self._key = key | |
| def __call__( | |
| self, logger: Any, method_name: str, event_dict: EventDict | |
| ) -> EventDict: | |
| now = ( | |
| datetime.datetime.now(datetime.timezone.utc) | |
| if self._utc | |
| else datetime.datetime.now() | |
| ) | |
| if self._fmt is None: | |
| event_dict[self._key] = now.timestamp() | |
| elif self._fmt == "iso": | |
| event_dict[self._key] = now.isoformat() | |
| else: | |
| event_dict[self._key] = now.strftime(self._fmt) | |
| return event_dict | |
| def format_exc_info(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: | |
| """Replace ``exc_info`` with a formatted ``exception`` string. | |
| If ``exc_info`` is *True*, captures the current exception via | |
| :func:`sys.exc_info`. If it is an exception tuple, formats it | |
| directly. The ``exc_info`` key is removed and replaced with | |
| ``exception``. | |
| """ | |
| exc_info = event_dict.pop("exc_info", None) | |
| if exc_info is None or exc_info is False: | |
| return event_dict | |
| if exc_info is True: | |
| exc_info = sys.exc_info() | |
| if isinstance(exc_info, BaseException): | |
| exc_info = (type(exc_info), exc_info, exc_info.__traceback__) | |
| if isinstance(exc_info, tuple) and exc_info[0] is not None: | |
| event_dict["exception"] = "".join( | |
| traceback.format_exception(*exc_info) | |
| ).rstrip() | |
| return event_dict | |
| # ββ Renderers (final processors) βββββββββββββββββββββββββββββββββββββββββββββ | |
| class KeyValueRenderer: | |
| """Render the event dict as ``key=value`` pairs. | |
| Args: | |
| key_order: Keys to render first, in this order. | |
| sort_keys: Sort remaining keys alphabetically. | |
| drop_missing: Skip *key_order* keys that are absent from the dict. | |
| """ | |
| __slots__ = ("_key_order", "_sort_keys", "_drop_missing") | |
| def __init__( | |
| self, | |
| key_order: list[str] | None = None, | |
| sort_keys: bool = False, | |
| drop_missing: bool = True, | |
| ) -> None: | |
| self._key_order = key_order or [] | |
| self._sort_keys = sort_keys | |
| self._drop_missing = drop_missing | |
| def __call__(self, logger: Any, method_name: str, event_dict: EventDict) -> str: | |
| ordered: list[tuple[str, Any]] = [] | |
| remaining = dict(event_dict) | |
| for key in self._key_order: | |
| if key in remaining: | |
| ordered.append((key, remaining.pop(key))) | |
| elif not self._drop_missing: | |
| ordered.append((key, None)) | |
| rest = sorted(remaining.items()) if self._sort_keys else list(remaining.items()) | |
| ordered.extend(rest) | |
| parts = [ | |
| f"{k}={v!r}" if not isinstance(v, str) else f"{k}={v}" for k, v in ordered | |
| ] | |
| return " ".join(parts) | |
| class JSONRenderer: | |
| """Render the event dict as a JSON string. | |
| Args: | |
| serializer: JSON serialization function. Defaults to | |
| :func:`json.dumps`. | |
| **dumps_kw: Extra keyword arguments passed to *serializer*. | |
| """ | |
| __slots__ = ("_serializer", "_dumps_kw") | |
| def __init__( | |
| self, | |
| serializer: Callable[..., str] = _json.dumps, | |
| **dumps_kw: Any, | |
| ) -> None: | |
| self._serializer = serializer | |
| self._dumps_kw = dumps_kw | |
| if "default" not in self._dumps_kw: | |
| self._dumps_kw["default"] = _json_default | |
| def __call__(self, logger: Any, method_name: str, event_dict: EventDict) -> str: | |
| return self._serializer(event_dict, **self._dumps_kw) | |
| def _json_default(obj: Any) -> Any: | |
| """Fallback serializer for objects that ``json.dumps`` cannot handle.""" | |
| if isinstance(obj, datetime.datetime): | |
| return obj.isoformat() | |
| if isinstance(obj, datetime.date): | |
| return obj.isoformat() | |
| if isinstance(obj, set | frozenset): | |
| return sorted(obj, key=str) | |
| if isinstance(obj, bytes): | |
| return obj.decode("utf-8", errors="replace") | |
| return str(obj) | |
| class ConsoleRenderer: | |
| """Render the event dict as colorized, loguru-style console output. | |
| Output format:: | |
| 2026-03-27 14:30:00.123 | INFO | event message key=val key=val | |
| Args: | |
| colors: Enable ANSI color codes. *None* auto-detects terminal | |
| support. | |
| pad_event: Pad the event field to this width for alignment. | |
| level_styles: Override per-level ANSI color strings. | |
| """ | |
| __slots__ = ("_colors", "_pad_event", "_level_styles") | |
| def __init__( | |
| self, | |
| colors: bool | None = None, | |
| pad_event: int = 30, | |
| level_styles: dict[str, str] | None = None, | |
| ) -> None: | |
| self._colors = _supports_color() if colors is None else colors | |
| self._pad_event = pad_event | |
| self._level_styles = level_styles or _LEVEL_STYLES | |
| def __call__(self, logger: Any, method_name: str, event_dict: EventDict) -> str: | |
| # Extract well-known keys | |
| event = str(event_dict.pop("event", "")) | |
| level = event_dict.pop("level", method_name) | |
| timestamp = event_dict.pop("timestamp", None) | |
| event_dict.pop("logger", None) | |
| exception = event_dict.pop("exception", None) | |
| # Build timestamp string | |
| if timestamp is None: | |
| now = datetime.datetime.now() | |
| ms = now.microsecond // 1000 | |
| ts_str = now.strftime("%Y-%m-%d %H:%M:%S.") + f"{ms:03d}" | |
| elif isinstance(timestamp, str): | |
| # Try to format ISO timestamps more compactly | |
| ts_str = _compact_iso(timestamp) | |
| elif isinstance(timestamp, float): | |
| dt = datetime.datetime.fromtimestamp(timestamp) | |
| ts_str = dt.strftime("%Y-%m-%d %H:%M:%S.") + f"{dt.microsecond // 1000:03d}" | |
| else: | |
| ts_str = str(timestamp) | |
| # Build level label | |
| level_str = _LEVEL_LABELS.get(level, f"{level.upper():<8s}") | |
| # Build key=value pairs from remaining context | |
| kv_parts: list[str] = [] | |
| for k, v in event_dict.items(): | |
| kv_parts.append(f"{k}={v!r}" if not isinstance(v, str) else f"{k}={v}") | |
| kv_str = " ".join(kv_parts) | |
| # Pad event | |
| padded_event = event.ljust(self._pad_event) if kv_str else event | |
| # Assemble | |
| if self._colors: | |
| msg_color = self._level_styles.get(level, _Colors.WHITE) | |
| label_color = _LEVEL_LABEL_STYLES.get(level, _Colors.WHITE) | |
| line = ( | |
| f"{_Colors.GREEN}{ts_str}{_Colors.RESET} | " | |
| f"{label_color}{_Colors.BOLD}{level_str}{_Colors.RESET} | " | |
| f"{msg_color}{padded_event}{_Colors.RESET}" | |
| ) | |
| if kv_str: | |
| line += f" {_Colors.DIM}{kv_str}{_Colors.RESET}" | |
| if exception: | |
| line += f"\n{_Colors.RED}{exception}{_Colors.RESET}" | |
| else: | |
| line = f"{ts_str} | {level_str} | {padded_event}" | |
| if kv_str: | |
| line += f" {kv_str}" | |
| if exception: | |
| line += f"\n{exception}" | |
| return line | |
| def _compact_iso(ts: str) -> str: | |
| """Convert an ISO 8601 timestamp to ``YYYY-MM-DD HH:MM:SS.mmm`` format.""" | |
| try: | |
| dt = datetime.datetime.fromisoformat(ts) | |
| return dt.strftime("%Y-%m-%d %H:%M:%S.") + f"{dt.microsecond // 1000:03d}" | |
| except (ValueError, AttributeError): | |
| return ts | |
| # ββ BoundLogger ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class BoundLogger: | |
| """A logger that carries bound context through a processor pipeline. | |
| Do not instantiate directly; use :func:`get_logger` or | |
| :func:`wrap_logger` instead. | |
| Args: | |
| logger: The underlying logger instance (e.g. ``PrintLogger``). | |
| processors: Ordered list of processors to run on each log event. | |
| context: Initial context dictionary. | |
| """ | |
| __slots__ = ("_logger", "_processors", "_context") | |
| def __init__( | |
| self, | |
| logger: Any, | |
| processors: list[Processor], | |
| context: dict[str, Any], | |
| ) -> None: | |
| self._logger = logger | |
| self._processors = processors | |
| self._context = context | |
| def bind(self, **new_values: Any) -> BoundLogger: | |
| """Return a new logger with *new_values* merged into the context.""" | |
| new_ctx = {**self._context, **new_values} | |
| return BoundLogger(self._logger, self._processors, new_ctx) | |
| def unbind(self, *keys: str) -> BoundLogger: | |
| """Return a new logger with *keys* removed from the context.""" | |
| new_ctx = {k: v for k, v in self._context.items() if k not in keys} | |
| return BoundLogger(self._logger, self._processors, new_ctx) | |
| def new(self, **new_values: Any) -> BoundLogger: | |
| """Return a new logger with *new_values* replacing the context.""" | |
| return BoundLogger(self._logger, self._processors, dict(new_values)) | |
| # ββ Log methods ββ | |
| def debug(self, event: str | None = None, /, **kw: Any) -> None: | |
| self._process("debug", event, kw) | |
| def info(self, event: str | None = None, /, **kw: Any) -> None: | |
| self._process("info", event, kw) | |
| def warning(self, event: str | None = None, /, **kw: Any) -> None: | |
| self._process("warning", event, kw) | |
| def warn(self, event: str | None = None, /, **kw: Any) -> None: | |
| self._process("warning", event, kw) | |
| def error(self, event: str | None = None, /, **kw: Any) -> None: | |
| self._process("error", event, kw) | |
| def critical(self, event: str | None = None, /, **kw: Any) -> None: | |
| self._process("critical", event, kw) | |
| def fatal(self, event: str | None = None, /, **kw: Any) -> None: | |
| self._process("critical", event, kw) | |
| def exception(self, event: str | None = None, /, **kw: Any) -> None: | |
| kw.setdefault("exc_info", True) | |
| self._process("error", event, kw) | |
| def log(self, level: int, event: str | None = None, /, **kw: Any) -> None: | |
| method_name = _LEVEL_TO_NAME.get(level, "info") | |
| self._process(method_name, event, kw) | |
| # ββ Internal ββ | |
| def _process(self, method_name: str, event: str | None, kw: dict[str, Any]) -> None: | |
| """Merge context, run processors, and emit via the underlying logger.""" | |
| event_dict: EventDict = {**self._context, **kw} | |
| if event is not None: | |
| event_dict["event"] = event | |
| elif "event" not in event_dict: | |
| event_dict["event"] = "" | |
| try: | |
| for proc in self._processors: | |
| event_dict_or_str = proc(self._logger, method_name, event_dict) | |
| if isinstance(event_dict_or_str, str): | |
| # Final renderer returned a string -- emit and stop. | |
| _emit(self._logger, method_name, event_dict_or_str) | |
| return | |
| event_dict = event_dict_or_str | |
| except DropEvent: | |
| return | |
| # If no processor returned a string, emit the repr of event_dict. | |
| _emit(self._logger, method_name, str(event_dict)) | |
| def _emit(logger: Any, method_name: str, message: str) -> None: | |
| """Dispatch *message* to the underlying logger.""" | |
| if isinstance(logger, logging.Logger): | |
| level = _NAME_TO_LEVEL.get(method_name, logging.INFO) | |
| logger.log(level, "%s", message) | |
| else: | |
| func = getattr(logger, method_name, None) or getattr(logger, "msg", None) | |
| if func: | |
| func(message) | |
| # ββ Configuration ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class _Configuration: | |
| """Global structlog configuration.""" | |
| processors: list[Processor] | |
| wrapper_class: type[BoundLogger] | |
| context_class: type | |
| logger_factory: LoggerFactory | |
| cache_logger_on_first_use: bool | |
| def _make_defaults() -> _Configuration: | |
| return _Configuration( | |
| processors=[add_log_level, TimeStamper(), ConsoleRenderer()], | |
| wrapper_class=BoundLogger, | |
| context_class=dict, | |
| logger_factory=PrintLoggerFactory(), | |
| cache_logger_on_first_use=True, | |
| ) | |
| _config: _Configuration = _make_defaults() | |
| _logger_cache: dict[Any, BoundLogger] = {} | |
| def configure( | |
| processors: list[Processor] | None = None, | |
| wrapper_class: type[BoundLogger] | None = None, | |
| context_class: type[dict] | None = None, | |
| logger_factory: LoggerFactory | None = None, | |
| cache_logger_on_first_use: bool | None = None, | |
| ) -> None: | |
| """Override the global configuration. | |
| Only non-*None* arguments are changed. Call :func:`reset_defaults` to | |
| restore factory settings. | |
| Args: | |
| processors: Ordered processor list. | |
| wrapper_class: BoundLogger subclass to use. | |
| context_class: Dict-like class for context storage. | |
| logger_factory: Factory for the underlying logger. | |
| cache_logger_on_first_use: Cache loggers returned by | |
| :func:`get_logger`. | |
| """ | |
| global _config | |
| _logger_cache.clear() | |
| p = _config | |
| _config = _Configuration( | |
| processors=(processors if processors is not None else p.processors), | |
| wrapper_class=(wrapper_class if wrapper_class is not None else p.wrapper_class), | |
| context_class=(context_class if context_class is not None else p.context_class), | |
| logger_factory=( | |
| logger_factory if logger_factory is not None else p.logger_factory | |
| ), | |
| cache_logger_on_first_use=( | |
| cache_logger_on_first_use | |
| if cache_logger_on_first_use is not None | |
| else p.cache_logger_on_first_use | |
| ), | |
| ) | |
| def reset_defaults() -> None: | |
| """Restore the global configuration to factory defaults.""" | |
| global _config | |
| _logger_cache.clear() | |
| _config = _make_defaults() | |
| def get_config() -> _Configuration: | |
| """Return the current global configuration (read-only snapshot).""" | |
| return _config | |
| # ββ Logger Creation ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_logger(*args: Any, **initial_values: Any) -> BoundLogger: | |
| """Create a :class:`BoundLogger` using the global configuration. | |
| Positional arguments are forwarded to the logger factory (e.g. a | |
| logger name). Keyword arguments become the initial bound context. | |
| Returns: | |
| A configured :class:`BoundLogger`. | |
| """ | |
| cache_key = args if not initial_values else None | |
| if cache_key is not None and _config.cache_logger_on_first_use: | |
| cached = _logger_cache.get(cache_key) | |
| if cached is not None: | |
| return cached | |
| underlying = _config.logger_factory(*args) | |
| ctx = ( | |
| _config.context_class(initial_values) | |
| if initial_values | |
| else _config.context_class() | |
| ) | |
| bound = _config.wrapper_class( | |
| logger=underlying, | |
| processors=list(_config.processors), | |
| context=ctx, | |
| ) | |
| if cache_key is not None and _config.cache_logger_on_first_use: | |
| _logger_cache[cache_key] = bound | |
| return bound | |
| def wrap_logger( | |
| logger: Any, | |
| processors: list[Processor] | None = None, | |
| **initial_values: Any, | |
| ) -> BoundLogger: | |
| """Wrap an existing logger with a processor pipeline. | |
| Args: | |
| logger: Any object with ``debug``/``info``/... methods. | |
| processors: Processor list. Defaults to the global config. | |
| **initial_values: Initial bound context. | |
| Returns: | |
| A :class:`BoundLogger` wrapping *logger*. | |
| """ | |
| procs = processors if processors is not None else list(_config.processors) | |
| ctx = dict(initial_values) | |
| return BoundLogger(logger=logger, processors=procs, context=ctx) | |
| # ββ Convenience Setup ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _resolve_level(level: int | str) -> int: | |
| """Convert a level name or int to a logging level int.""" | |
| if isinstance(level, int): | |
| return level | |
| return _NAME_TO_LEVEL.get(level.lower(), logging.INFO) | |
| def setup_logging( | |
| level: int | str = logging.INFO, | |
| renderer: str = "console", | |
| colors: bool | None = None, | |
| processors: list[Processor] | None = None, | |
| logger_name: str | None = None, | |
| stream: IO[str] | None = None, | |
| ) -> BoundLogger: | |
| """One-call logging setup with stdlib integration. | |
| Configures both stdlib ``logging`` and the structlog processor | |
| pipeline in a single call. | |
| Args: | |
| level: Log level (name or int). Defaults to ``INFO``. | |
| renderer: Output renderer: ``"console"``, ``"json"``, or ``"kv"``. | |
| colors: Enable ANSI colors. *None* auto-detects. | |
| processors: Custom processor list. Overrides *renderer* if given. | |
| logger_name: stdlib logger name. | |
| stream: Output stream. Defaults to ``sys.stderr``. | |
| Returns: | |
| A ready-to-use :class:`BoundLogger`. | |
| """ | |
| resolved_level = _resolve_level(level) | |
| out = stream or sys.stderr | |
| if processors is None: | |
| base: list[Processor] = [add_log_level, TimeStamper()] | |
| if renderer == "json": | |
| base.append(JSONRenderer()) | |
| elif renderer == "kv": | |
| base.append(KeyValueRenderer(key_order=["event", "level", "timestamp"])) | |
| else: | |
| base.append(ConsoleRenderer(colors=colors)) | |
| processors = base | |
| # Configure stdlib logging to pass through rendered strings. | |
| stdlib_logger = logging.getLogger(logger_name) | |
| stdlib_logger.setLevel(resolved_level) | |
| stdlib_logger.propagate = False | |
| # Remove existing handlers to avoid duplicate output on repeated calls. | |
| stdlib_logger.handlers.clear() | |
| handler = logging.StreamHandler(out) | |
| handler.setLevel(resolved_level) | |
| handler.setFormatter(logging.Formatter("%(message)s")) | |
| stdlib_logger.addHandler(handler) | |
| configure( | |
| processors=processors, | |
| logger_factory=StdlibLoggerFactory(name=logger_name), | |
| ) | |
| return get_logger() | |
| # ββ Utilities ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def truncate_string(s: str, max_length: int, suffix: str = "...") -> str: | |
| """Truncate *s* to *max_length*, appending a count of remaining chars. | |
| Args: | |
| s: The string to truncate. | |
| max_length: Maximum number of characters to keep. | |
| suffix: Separator between the kept text and the count. | |
| Returns: | |
| The original string if short enough, otherwise truncated with a | |
| ``"...[N more chars]"`` suffix. | |
| """ | |
| if len(s) <= max_length: | |
| return s | |
| remaining = len(s) - max_length | |
| return f"{s[:max_length]}{suffix}[{remaining} more chars]" | |
| def truncate_base64(data_url: str, max_length: int = 100) -> str: | |
| """Truncate base64 data-URLs for cleaner logging. | |
| Args: | |
| data_url: A ``data:`` URL or any string. | |
| max_length: Maximum base64 payload chars to keep. | |
| Returns: | |
| The truncated URL, or the original string if not a data-URL. | |
| """ | |
| if not data_url.startswith("data:"): | |
| return data_url | |
| if ";base64," in data_url: | |
| header, base64_data = data_url.split(";base64,", 1) | |
| if len(base64_data) > max_length: | |
| remaining = len(base64_data) - max_length | |
| return ( | |
| f"{header};base64,{base64_data[:max_length]}...[{remaining} more chars]" | |
| ) | |
| return data_url | |