| | """ |
| | Observability module for production monitoring and debugging. |
| | Includes: |
| | - Structured logging with trace IDs |
| | - Performance monitoring hooks |
| | - Error tracking patterns |
| | - Request/Response logging middleware |
| | """ |
| |
|
| | import json |
| | import logging |
| | import os |
| | import time |
| | import uuid |
| | from collections.abc import Callable |
| | from contextvars import ContextVar |
| | from dataclasses import asdict, dataclass |
| | from datetime import UTC, datetime |
| | from enum import Enum |
| | from functools import wraps |
| | from typing import Any |
| |
|
| | from dotenv import load_dotenv |
| |
|
| | load_dotenv() |
| |
|
| | |
| | trace_id_var: ContextVar[str] = ContextVar("trace_id", default="") |
| | span_id_var: ContextVar[str] = ContextVar("span_id", default="") |
| |
|
| |
|
| | class LogLevel(Enum): |
| | DEBUG = "DEBUG" |
| | INFO = "INFO" |
| | WARNING = "WARNING" |
| | ERROR = "ERROR" |
| | CRITICAL = "CRITICAL" |
| |
|
| |
|
| | @dataclass |
| | class LogContext: |
| | """Structured log context.""" |
| |
|
| | trace_id: str |
| | span_id: str |
| | timestamp: str |
| | service: str = "specs-before-code-api" |
| | environment: str = os.getenv("ENVIRONMENT", "development") |
| | version: str = "1.0.0" |
| |
|
| |
|
| | @dataclass |
| | class LogEntry: |
| | """Structured log entry.""" |
| |
|
| | level: str |
| | message: str |
| | context: LogContext |
| | data: dict[str, Any] | None = None |
| | error: str | None = None |
| | stack_trace: str | None = None |
| | duration_ms: float | None = None |
| |
|
| |
|
| | class StructuredLogger: |
| | """ |
| | Structured JSON logger with trace ID support. |
| | |
| | Produces logs in a format suitable for log aggregation systems |
| | like Datadog, CloudWatch, or ELK stack. |
| | """ |
| |
|
| | def __init__(self, name: str): |
| | self.name = name |
| | self.logger = logging.getLogger(name) |
| | self._setup_handler() |
| |
|
| | def _setup_handler(self): |
| | """Setup JSON handler for console and file if not already configured.""" |
| | if not self.logger.handlers: |
| | |
| | stream_handler = logging.StreamHandler() |
| | stream_handler.setFormatter(StructuredFormatter()) |
| | self.logger.addHandler(stream_handler) |
| |
|
| | |
| | if os.getenv("ENVIRONMENT") == "development": |
| | log_dir = os.path.join( |
| | os.path.dirname(os.path.abspath(__file__)), "../../logs" |
| | ) |
| | os.makedirs(log_dir, exist_ok=True) |
| | file_handler = logging.FileHandler( |
| | os.path.join(log_dir, "app.log"), encoding="utf-8" |
| | ) |
| | file_handler.setFormatter(StructuredFormatter()) |
| | self.logger.addHandler(file_handler) |
| |
|
| | self.logger.setLevel( |
| | logging.DEBUG |
| | if os.getenv("ENVIRONMENT") == "development" |
| | else logging.INFO |
| | ) |
| |
|
| | def _get_context(self) -> LogContext: |
| | """Get current logging context with trace IDs.""" |
| | return LogContext( |
| | trace_id=trace_id_var.get() or str(uuid.uuid4()), |
| | span_id=span_id_var.get() or str(uuid.uuid4())[:8], |
| | timestamp=datetime.now(UTC).isoformat(), |
| | ) |
| |
|
| | def _log( |
| | self, |
| | level: LogLevel, |
| | message: str, |
| | data: dict[str, Any] | None = None, |
| | error: Exception | None = None, |
| | duration_ms: float | None = None, |
| | ): |
| | """Internal logging method.""" |
| | entry = LogEntry( |
| | level=level.value, |
| | message=message, |
| | context=self._get_context(), |
| | data=data, |
| | error=str(error) if error else None, |
| | stack_trace=self._get_stack_trace(error) if error else None, |
| | duration_ms=duration_ms, |
| | ) |
| |
|
| | log_dict = asdict(entry) |
| | log_dict["context"] = asdict(entry.context) |
| |
|
| | |
| | log_dict = {k: v for k, v in log_dict.items() if v is not None} |
| | if "context" in log_dict: |
| | log_dict["context"] = { |
| | k: v for k, v in log_dict["context"].items() if v is not None |
| | } |
| |
|
| | self.logger.log(getattr(logging, level.value), json.dumps(log_dict)) |
| |
|
| | def _get_stack_trace(self, error: Exception | None) -> str | None: |
| | """Get stack trace from exception.""" |
| | if error: |
| | import traceback |
| |
|
| | return "".join( |
| | traceback.format_exception(type(error), error, error.__traceback__) |
| | ) |
| | return None |
| |
|
| | def debug(self, message: str, data: dict[str, Any] | None = None): |
| | self._log(LogLevel.DEBUG, message, data) |
| |
|
| | def info(self, message: str, data: dict[str, Any] | None = None): |
| | self._log(LogLevel.INFO, message, data) |
| |
|
| | def warning( |
| | self, |
| | message: str, |
| | data: dict[str, Any] | None = None, |
| | error: Exception | None = None, |
| | ): |
| | self._log(LogLevel.WARNING, message, data, error) |
| |
|
| | def error( |
| | self, |
| | message: str, |
| | data: dict[str, Any] | None = None, |
| | error: Exception | None = None, |
| | ): |
| | self._log(LogLevel.ERROR, message, data, error) |
| |
|
| | def critical( |
| | self, |
| | message: str, |
| | data: dict[str, Any] | None = None, |
| | error: Exception | None = None, |
| | ): |
| | self._log(LogLevel.CRITICAL, message, data, error) |
| |
|
| | def log_request( |
| | self, |
| | method: str, |
| | path: str, |
| | status_code: int, |
| | duration_ms: float, |
| | user_id: str | None = None, |
| | extra: dict[str, Any] | None = None, |
| | ): |
| | """Log HTTP request with timing.""" |
| | data = { |
| | "http_method": method, |
| | "http_path": path, |
| | "http_status": status_code, |
| | "user_id": user_id, |
| | **(extra or {}), |
| | } |
| | self._log( |
| | LogLevel.INFO, |
| | f"{method} {path} {status_code}", |
| | data, |
| | duration_ms=duration_ms, |
| | ) |
| |
|
| |
|
| | class StructuredFormatter(logging.Formatter): |
| | """Custom formatter that passes through structured JSON logs.""" |
| |
|
| | def format(self, record: logging.LogRecord) -> str: |
| | |
| | try: |
| | json.loads(record.getMessage()) |
| | return record.getMessage() |
| | except (json.JSONDecodeError, TypeError): |
| | |
| | return json.dumps( |
| | { |
| | "level": record.levelname, |
| | "message": record.getMessage(), |
| | "logger": record.name, |
| | "timestamp": datetime.now(UTC).isoformat(), |
| | } |
| | ) |
| |
|
| |
|
| | |
| | _loggers: dict[str, StructuredLogger] = {} |
| |
|
| |
|
| | def get_logger(name: str = "specs-before-code") -> StructuredLogger: |
| | """ |
| | Get or create a structured logger by name. |
| | |
| | Uses a registry so each module gets its own stable logger instance. |
| | Repeated calls with the same name return the same instance (no handler duplication). |
| | """ |
| | if name not in _loggers: |
| | _loggers[name] = StructuredLogger(name) |
| | return _loggers[name] |
| |
|
| |
|
| | def set_trace_id(trace_id: str): |
| | """Set the trace ID for the current context.""" |
| | trace_id_var.set(trace_id) |
| |
|
| |
|
| | def get_trace_id() -> str: |
| | """Get the current trace ID.""" |
| | return trace_id_var.get() or str(uuid.uuid4()) |
| |
|
| |
|
| | def new_trace_id() -> str: |
| | """Generate and set a new trace ID.""" |
| | trace_id = str(uuid.uuid4()) |
| | trace_id_var.set(trace_id) |
| | return trace_id |
| |
|
| |
|
| | def set_span_id(span_id: str): |
| | """Set the span ID for the current context.""" |
| | span_id_var.set(span_id) |
| |
|
| |
|
| | @dataclass |
| | class PerformanceMetrics: |
| | """Container for performance metrics.""" |
| |
|
| | operation: str |
| | duration_ms: float |
| | success: bool |
| | metadata: dict[str, Any] |
| | timestamp: str = "" |
| |
|
| | def __post_init__(self): |
| | if not self.timestamp: |
| | self.timestamp = datetime.now(UTC).isoformat() |
| |
|
| |
|
| | class PerformanceMonitor: |
| | """ |
| | Performance monitoring for tracking operation durations and patterns. |
| | """ |
| |
|
| | def __init__(self): |
| | self._metrics: list[PerformanceMetrics] = [] |
| | self._max_metrics = 10000 |
| |
|
| | def record( |
| | self, |
| | operation: str, |
| | duration_ms: float, |
| | success: bool, |
| | metadata: dict[str, Any] | None = None, |
| | ): |
| | """Record a performance metric.""" |
| | metric = PerformanceMetrics( |
| | operation=operation, |
| | duration_ms=duration_ms, |
| | success=success, |
| | metadata=metadata or {}, |
| | ) |
| | self._metrics.append(metric) |
| |
|
| | |
| | if len(self._metrics) > self._max_metrics: |
| | self._metrics = self._metrics[-self._max_metrics :] |
| |
|
| | |
| | if duration_ms > 5000: |
| | get_logger().warning( |
| | f"Slow operation detected: {operation}", |
| | data={"duration_ms": duration_ms, **metric.metadata}, |
| | ) |
| |
|
| | def get_stats( |
| | self, operation: str | None = None, window_seconds: int = 300 |
| | ) -> dict[str, Any]: |
| | """Get performance statistics for an operation.""" |
| | cutoff = datetime.now(UTC).timestamp() - window_seconds |
| |
|
| | filtered = [ |
| | m |
| | for m in self._metrics |
| | if (operation is None or m.operation == operation) |
| | and datetime.fromisoformat(m.timestamp).timestamp() > cutoff |
| | ] |
| |
|
| | if not filtered: |
| | return {"count": 0} |
| |
|
| | durations = [m.duration_ms for m in filtered] |
| | successes = [m for m in filtered if m.success] |
| |
|
| | return { |
| | "operation": operation or "all", |
| | "count": len(filtered), |
| | "success_rate": len(successes) / len(filtered), |
| | "avg_duration_ms": sum(durations) / len(durations), |
| | "min_duration_ms": min(durations), |
| | "max_duration_ms": max(durations), |
| | "p50_duration_ms": sorted(durations)[len(durations) // 2], |
| | "p95_duration_ms": sorted(durations)[int(len(durations) * 0.95)] |
| | if len(durations) > 1 |
| | else durations[0], |
| | "p99_duration_ms": sorted(durations)[int(len(durations) * 0.99)] |
| | if len(durations) > 1 |
| | else durations[0], |
| | } |
| |
|
| | def get_all_stats(self, window_seconds: int = 300) -> dict[str, dict[str, Any]]: |
| | """Get statistics for all operations.""" |
| | operations = {m.operation for m in self._metrics} |
| | return {op: self.get_stats(op, window_seconds) for op in operations} |
| |
|
| |
|
| | |
| | _performance_monitor: PerformanceMonitor | None = None |
| |
|
| |
|
| | def get_performance_monitor() -> PerformanceMonitor: |
| | """Get or create the performance monitor.""" |
| | global _performance_monitor |
| | if _performance_monitor is None: |
| | _performance_monitor = PerformanceMonitor() |
| | return _performance_monitor |
| |
|
| |
|
| | def timed(operation_name: str | None = None): |
| | """ |
| | Decorator to time function execution and record metrics. |
| | |
| | Usage: |
| | @timed("llm_call") |
| | async def call_llm(): |
| | ... |
| | """ |
| |
|
| | def decorator(func: Callable) -> Callable: |
| | name = operation_name or f"{func.__module__}.{func.__name__}" |
| |
|
| | @wraps(func) |
| | async def async_wrapper(*args, **kwargs): |
| | start = time.perf_counter() |
| | success = True |
| | try: |
| | return await func(*args, **kwargs) |
| | except Exception: |
| | success = False |
| | raise |
| | finally: |
| | duration_ms = (time.perf_counter() - start) * 1000 |
| | get_performance_monitor().record( |
| | operation=name, |
| | duration_ms=duration_ms, |
| | success=success, |
| | metadata={ |
| | "args_count": len(args), |
| | "kwargs_keys": list(kwargs.keys()), |
| | }, |
| | ) |
| |
|
| | @wraps(func) |
| | def sync_wrapper(*args, **kwargs): |
| | start = time.perf_counter() |
| | success = True |
| | try: |
| | return func(*args, **kwargs) |
| | except Exception: |
| | success = False |
| | raise |
| | finally: |
| | duration_ms = (time.perf_counter() - start) * 1000 |
| | get_performance_monitor().record( |
| | operation=name, |
| | duration_ms=duration_ms, |
| | success=success, |
| | metadata={ |
| | "args_count": len(args), |
| | "kwargs_keys": list(kwargs.keys()), |
| | }, |
| | ) |
| |
|
| | import asyncio |
| |
|
| | if asyncio.iscoroutinefunction(func): |
| | return async_wrapper |
| | return sync_wrapper |
| |
|
| | return decorator |
| |
|
| |
|
| | class ErrorTracker: |
| | """ |
| | Error tracking and aggregation for monitoring error patterns. |
| | """ |
| |
|
| | def __init__(self): |
| | self._errors: list[dict[str, Any]] = [] |
| | self._max_errors = 1000 |
| |
|
| | def track( |
| | self, |
| | error: Exception, |
| | context: dict[str, Any] | None = None, |
| | severity: str = "error", |
| | ): |
| | """Track an error occurrence.""" |
| | error_entry = { |
| | "type": type(error).__name__, |
| | "message": str(error), |
| | "severity": severity, |
| | "trace_id": get_trace_id(), |
| | "context": context or {}, |
| | "timestamp": datetime.now(UTC).isoformat(), |
| | } |
| |
|
| | self._errors.append(error_entry) |
| |
|
| | if len(self._errors) > self._max_errors: |
| | self._errors = self._errors[-self._max_errors :] |
| |
|
| | |
| | get_logger().error( |
| | f"Error tracked: {type(error).__name__}", data=error_entry, error=error |
| | ) |
| |
|
| | def get_error_summary(self, window_seconds: int = 3600) -> dict[str, Any]: |
| | """Get error summary for the time window.""" |
| | cutoff = datetime.now(UTC).timestamp() - window_seconds |
| |
|
| | recent = [ |
| | e |
| | for e in self._errors |
| | if datetime.fromisoformat(e["timestamp"]).timestamp() > cutoff |
| | ] |
| |
|
| | |
| | by_type: dict[str, int] = {} |
| | for e in recent: |
| | by_type[e["type"]] = by_type.get(e["type"], 0) + 1 |
| |
|
| | return { |
| | "total_errors": len(recent), |
| | "by_type": by_type, |
| | "recent_errors": recent[-10:], |
| | } |
| |
|
| |
|
| | |
| | _error_tracker: ErrorTracker | None = None |
| |
|
| |
|
| | def get_error_tracker() -> ErrorTracker: |
| | """Get or create the error tracker.""" |
| | global _error_tracker |
| | if _error_tracker is None: |
| | _error_tracker = ErrorTracker() |
| | return _error_tracker |
| |
|
| |
|
| | def track_error( |
| | error: Exception, context: dict[str, Any] | None = None, severity: str = "error" |
| | ): |
| | """Convenience function to track an error.""" |
| | get_error_tracker().track(error, context, severity) |
| |
|