repomind-api / telemetry /structured_logging.py
SouravNath's picture
Initial commit
dc71cad
"""
telemetry/structured_logging.py
─────────────────────────────────
Structured JSON logging via structlog.
Every log event emitted through this module includes:
- timestamp (ISO 8601 UTC)
- level
- logger name
- event message
- structured key-value context
Usage:
from telemetry.structured_logging import get_logger
log = get_logger(__name__)
log.info("task_started", task_id="abc123", repo="django/django")
log.error("patch_failed", failure_category="syntax_error", attempt=2)
The structured format makes logs queryable in tools like:
- CloudWatch Logs Insights: fields @timestamp, @message | filter level="ERROR"
- Grafana Loki: {app="code-agent"} | json | failure_category="syntax_error"
- PostHog: track custom events from log stream
Fallback: if structlog is not installed, returns a standard logging.Logger
with a JSON formatter.
"""
from __future__ import annotations
import json
import logging
import sys
from datetime import datetime, timezone
from typing import Any
try:
import structlog
_STRUCTLOG_AVAILABLE = True
except ImportError:
_STRUCTLOG_AVAILABLE = False
def configure_logging(
level: str = "INFO",
json_output: bool = True,
include_caller_info: bool = False,
) -> None:
"""
Configure structured logging for the application.
Call once at startup (e.g. in FastAPI lifespan or main()).
"""
if _STRUCTLOG_AVAILABLE:
_configure_structlog(level, json_output, include_caller_info)
else:
_configure_stdlib(level, json_output)
def _configure_structlog(level: str, json_output: bool, caller_info: bool) -> None:
import structlog
processors = [
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso", utc=True),
]
if caller_info:
processors.append(structlog.processors.CallsiteParameterAdder(
[structlog.processors.CallsiteParameter.FILENAME,
structlog.processors.CallsiteParameter.LINENO]
))
if json_output:
processors.append(structlog.processors.JSONRenderer())
else:
processors.append(structlog.dev.ConsoleRenderer(colors=True))
structlog.configure(
processors=processors,
wrapper_class=structlog.BoundLogger,
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(sys.stdout),
cache_logger_on_first_use=True,
)
logging.basicConfig(level=getattr(logging, level.upper(), logging.INFO))
def _configure_stdlib(level: str, json_output: bool) -> None:
"""Fallback when structlog is not available."""
class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"event": record.getMessage(),
}
if hasattr(record, "extra"):
data.update(record.extra)
return json.dumps(data)
handler = logging.StreamHandler(sys.stdout)
if json_output:
handler.setFormatter(JsonFormatter())
logging.basicConfig(
level=getattr(logging, level.upper(), logging.INFO),
handlers=[handler],
)
def get_logger(name: str) -> Any:
"""
Get a structured logger for the given name.
Returns a structlog BoundLogger if available, else stdlib Logger.
"""
if _STRUCTLOG_AVAILABLE:
import structlog
return structlog.get_logger(name)
return logging.getLogger(name)
# ── Request context binder ─────────────────────────────────────────────────────
class RequestContext:
"""
Bind per-request context to all log lines within a request/task.
Usage:
with RequestContext(task_id="abc", repo="django/django"):
log.info("processing") # automatically includes task_id, repo
"""
def __init__(self, **kwargs):
self._ctx = kwargs
def __enter__(self):
if _STRUCTLOG_AVAILABLE:
import structlog
structlog.contextvars.bind_contextvars(**self._ctx)
return self
def __exit__(self, *args):
if _STRUCTLOG_AVAILABLE:
import structlog
structlog.contextvars.unbind_contextvars(*self._ctx.keys())