File size: 4,607 Bytes
dc71cad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""
telemetry/structured_logging.py
─────────────────────────────────
Structured JSON logging via structlog.

Every log event emitted through this module includes:
  - timestamp (ISO 8601 UTC)
  - level
  - logger name
  - event message
  - structured key-value context

Usage:
    from telemetry.structured_logging import get_logger
    log = get_logger(__name__)
    log.info("task_started", task_id="abc123", repo="django/django")
    log.error("patch_failed", failure_category="syntax_error", attempt=2)

The structured format makes logs queryable in tools like:
  - CloudWatch Logs Insights: fields @timestamp, @message | filter level="ERROR"
  - Grafana Loki: {app="code-agent"} | json | failure_category="syntax_error"
  - PostHog: track custom events from log stream

Fallback: if structlog is not installed, returns a standard logging.Logger
with a JSON formatter.
"""
from __future__ import annotations

import json
import logging
import sys
from datetime import datetime, timezone
from typing import Any


try:
    import structlog
    _STRUCTLOG_AVAILABLE = True
except ImportError:
    _STRUCTLOG_AVAILABLE = False


def configure_logging(
    level: str = "INFO",
    json_output: bool = True,
    include_caller_info: bool = False,
) -> None:
    """
    Configure structured logging for the application.
    Call once at startup (e.g. in FastAPI lifespan or main()).
    """
    if _STRUCTLOG_AVAILABLE:
        _configure_structlog(level, json_output, include_caller_info)
    else:
        _configure_stdlib(level, json_output)


def _configure_structlog(level: str, json_output: bool, caller_info: bool) -> None:
    import structlog

    processors = [
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.add_log_level,
        structlog.stdlib.add_logger_name,
        structlog.processors.TimeStamper(fmt="iso", utc=True),
    ]
    if caller_info:
        processors.append(structlog.processors.CallsiteParameterAdder(
            [structlog.processors.CallsiteParameter.FILENAME,
             structlog.processors.CallsiteParameter.LINENO]
        ))
    if json_output:
        processors.append(structlog.processors.JSONRenderer())
    else:
        processors.append(structlog.dev.ConsoleRenderer(colors=True))

    structlog.configure(
        processors=processors,
        wrapper_class=structlog.BoundLogger,
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(sys.stdout),
        cache_logger_on_first_use=True,
    )
    logging.basicConfig(level=getattr(logging, level.upper(), logging.INFO))


def _configure_stdlib(level: str, json_output: bool) -> None:
    """Fallback when structlog is not available."""

    class JsonFormatter(logging.Formatter):
        def format(self, record: logging.LogRecord) -> str:
            data = {
                "timestamp": datetime.now(timezone.utc).isoformat(),
                "level": record.levelname,
                "logger": record.name,
                "event": record.getMessage(),
            }
            if hasattr(record, "extra"):
                data.update(record.extra)
            return json.dumps(data)

    handler = logging.StreamHandler(sys.stdout)
    if json_output:
        handler.setFormatter(JsonFormatter())
    logging.basicConfig(
        level=getattr(logging, level.upper(), logging.INFO),
        handlers=[handler],
    )


def get_logger(name: str) -> Any:
    """
    Get a structured logger for the given name.
    Returns a structlog BoundLogger if available, else stdlib Logger.
    """
    if _STRUCTLOG_AVAILABLE:
        import structlog
        return structlog.get_logger(name)
    return logging.getLogger(name)


# ── Request context binder ─────────────────────────────────────────────────────

class RequestContext:
    """
    Bind per-request context to all log lines within a request/task.

    Usage:
        with RequestContext(task_id="abc", repo="django/django"):
            log.info("processing")   # automatically includes task_id, repo
    """
    def __init__(self, **kwargs):
        self._ctx = kwargs

    def __enter__(self):
        if _STRUCTLOG_AVAILABLE:
            import structlog
            structlog.contextvars.bind_contextvars(**self._ctx)
        return self

    def __exit__(self, *args):
        if _STRUCTLOG_AVAILABLE:
            import structlog
            structlog.contextvars.unbind_contextvars(*self._ctx.keys())