Spaces:
Running
Running
| """ | |
| telemetry/structured_logging.py | |
| βββββββββββββββββββββββββββββββββ | |
| Structured JSON logging via structlog. | |
| Every log event emitted through this module includes: | |
| - timestamp (ISO 8601 UTC) | |
| - level | |
| - logger name | |
| - event message | |
| - structured key-value context | |
| Usage: | |
| from telemetry.structured_logging import get_logger | |
| log = get_logger(__name__) | |
| log.info("task_started", task_id="abc123", repo="django/django") | |
| log.error("patch_failed", failure_category="syntax_error", attempt=2) | |
| The structured format makes logs queryable in tools like: | |
| - CloudWatch Logs Insights: fields @timestamp, @message | filter level="ERROR" | |
| - Grafana Loki: {app="code-agent"} | json | failure_category="syntax_error" | |
| - PostHog: track custom events from log stream | |
| Fallback: if structlog is not installed, returns a standard logging.Logger | |
| with a JSON formatter. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import sys | |
| from datetime import datetime, timezone | |
| from typing import Any | |
| try: | |
| import structlog | |
| _STRUCTLOG_AVAILABLE = True | |
| except ImportError: | |
| _STRUCTLOG_AVAILABLE = False | |
| def configure_logging( | |
| level: str = "INFO", | |
| json_output: bool = True, | |
| include_caller_info: bool = False, | |
| ) -> None: | |
| """ | |
| Configure structured logging for the application. | |
| Call once at startup (e.g. in FastAPI lifespan or main()). | |
| """ | |
| if _STRUCTLOG_AVAILABLE: | |
| _configure_structlog(level, json_output, include_caller_info) | |
| else: | |
| _configure_stdlib(level, json_output) | |
| def _configure_structlog(level: str, json_output: bool, caller_info: bool) -> None: | |
| import structlog | |
| processors = [ | |
| structlog.contextvars.merge_contextvars, | |
| structlog.stdlib.add_log_level, | |
| structlog.stdlib.add_logger_name, | |
| structlog.processors.TimeStamper(fmt="iso", utc=True), | |
| ] | |
| if caller_info: | |
| processors.append(structlog.processors.CallsiteParameterAdder( | |
| [structlog.processors.CallsiteParameter.FILENAME, | |
| structlog.processors.CallsiteParameter.LINENO] | |
| )) | |
| if json_output: | |
| processors.append(structlog.processors.JSONRenderer()) | |
| else: | |
| processors.append(structlog.dev.ConsoleRenderer(colors=True)) | |
| structlog.configure( | |
| processors=processors, | |
| wrapper_class=structlog.BoundLogger, | |
| context_class=dict, | |
| logger_factory=structlog.PrintLoggerFactory(sys.stdout), | |
| cache_logger_on_first_use=True, | |
| ) | |
| logging.basicConfig(level=getattr(logging, level.upper(), logging.INFO)) | |
| def _configure_stdlib(level: str, json_output: bool) -> None: | |
| """Fallback when structlog is not available.""" | |
| class JsonFormatter(logging.Formatter): | |
| def format(self, record: logging.LogRecord) -> str: | |
| data = { | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "level": record.levelname, | |
| "logger": record.name, | |
| "event": record.getMessage(), | |
| } | |
| if hasattr(record, "extra"): | |
| data.update(record.extra) | |
| return json.dumps(data) | |
| handler = logging.StreamHandler(sys.stdout) | |
| if json_output: | |
| handler.setFormatter(JsonFormatter()) | |
| logging.basicConfig( | |
| level=getattr(logging, level.upper(), logging.INFO), | |
| handlers=[handler], | |
| ) | |
| def get_logger(name: str) -> Any: | |
| """ | |
| Get a structured logger for the given name. | |
| Returns a structlog BoundLogger if available, else stdlib Logger. | |
| """ | |
| if _STRUCTLOG_AVAILABLE: | |
| import structlog | |
| return structlog.get_logger(name) | |
| return logging.getLogger(name) | |
| # ββ Request context binder βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class RequestContext: | |
| """ | |
| Bind per-request context to all log lines within a request/task. | |
| Usage: | |
| with RequestContext(task_id="abc", repo="django/django"): | |
| log.info("processing") # automatically includes task_id, repo | |
| """ | |
| def __init__(self, **kwargs): | |
| self._ctx = kwargs | |
| def __enter__(self): | |
| if _STRUCTLOG_AVAILABLE: | |
| import structlog | |
| structlog.contextvars.bind_contextvars(**self._ctx) | |
| return self | |
| def __exit__(self, *args): | |
| if _STRUCTLOG_AVAILABLE: | |
| import structlog | |
| structlog.contextvars.unbind_contextvars(*self._ctx.keys()) | |