Spaces:
Running
Running
| import logging | |
| import json | |
| import time | |
| from contextlib import contextmanager | |
| from contextvars import ContextVar | |
| from typing import Any, Dict, Optional, Union | |
| from . import components | |
| # Context variables for trace and span propagation | |
| # These allow us to keep track of trace_id and span context without passing it through every function call | |
| _trace_id_var: ContextVar[Optional[str]] = ContextVar("trace_id", default=None) | |
| _span_name_var: ContextVar[Optional[str]] = ContextVar("span_name", default=None) | |
| _component_var: ContextVar[Optional[str]] = ContextVar("component", default=None) | |
| def set_trace_id(trace_id: str): | |
| """Set the trace ID in the current context.""" | |
| _trace_id_var.set(trace_id) | |
| def get_trace_id() -> Optional[str]: | |
| """Get the trace ID from the current context.""" | |
| return _trace_id_var.get() | |
| def clear_trace_id(): | |
| """Clear the trace ID from the current context.""" | |
| _trace_id_var.set(None) | |
| class StructuredFormatter(logging.Formatter): | |
| """ | |
| Format logs as structured JSON records. | |
| Each record includes standard fields plus trace and span context. | |
| """ | |
| def format(self, record: logging.LogRecord) -> str: | |
| # Core fields | |
| log_records = { | |
| "timestamp": self.formatTime(record, self.datefmt), | |
| "level": record.levelname, | |
| "logger": record.name, | |
| "message": record.getMessage(), | |
| "trace_id": get_trace_id(), | |
| "span_name": _span_name_var.get(), | |
| "component": _component_var.get() or getattr(record, "component", "unknown"), | |
| "event": getattr(record, "event", "info"), | |
| } | |
| # Add duration if it was added to the record | |
| if hasattr(record, "duration_ms"): | |
| log_records["duration_ms"] = record.duration_ms | |
| # Add extra fields passed via extra={"fields": {...}} | |
| if hasattr(record, "fields") and isinstance(record.fields, dict): | |
| log_records.update(record.fields) | |
| return json.dumps(log_records) | |
| def setup_logging(level: int = logging.INFO): | |
| """ | |
| Configure the root logger to use the StructuredFormatter. | |
| This should be called as early as possible in the application lifecycle. | |
| """ | |
| import os | |
| root_logger = logging.getLogger() | |
| root_logger.setLevel(level) | |
| formatter = StructuredFormatter() | |
| # Configure console handler | |
| if not any( | |
| isinstance(h, logging.StreamHandler) and not isinstance(h, logging.FileHandler) | |
| for h in root_logger.handlers | |
| ): | |
| console_handler = logging.StreamHandler() | |
| console_handler.setFormatter(formatter) | |
| root_logger.addHandler(console_handler) | |
| # Configure file handler | |
| log_dir = "logs" | |
| if not os.path.exists(log_dir): | |
| os.makedirs(log_dir, exist_ok=True) | |
| log_file = os.path.join(log_dir, "app.log") | |
| file_handler = logging.FileHandler(log_file) | |
| file_handler.setFormatter(formatter) | |
| root_logger.addHandler(file_handler) | |
| # Update any existing handlers that might not have our formatter | |
| for handler in root_logger.handlers: | |
| handler.setFormatter(formatter) | |
| # Initialize logging on import | |
| setup_logging() | |
| def get_logger(name: str) -> logging.Logger: | |
| """Return a logger with the given name.""" | |
| return logging.getLogger(name) | |
| def bind_trace(trace_id: str): | |
| """Bind a trace ID to the current context.""" | |
| set_trace_id(trace_id) | |
| def bind_new_trace_id(): | |
| """Create a new UUID and bind it as trace_id to the current context.""" | |
| import uuid | |
| trace_id = str(uuid.uuid4()) | |
| set_trace_id(trace_id) | |
| return trace_id | |
| def ensure_trace(): | |
| """ | |
| Guarantee a trace_id exists in the current context. | |
| If none is active, create and bind a new one. | |
| """ | |
| trace_id = get_trace_id() | |
| if not trace_id: | |
| bind_new_trace_id() | |
| return get_trace_id() | |
| import os | |
| class SpanContext: | |
| """ | |
| Context manager for a span that supports structured metadata. | |
| """ | |
| def __init__(self, obs_module, name: str, component: str): | |
| self.obs = obs_module | |
| self.name = name | |
| self.component = component | |
| self.start_time = None | |
| self.fields = {} | |
| self.previous_span = None | |
| self.previous_component = None | |
| def set_field(self, key: str, value: Any): | |
| """Add a metadata field to be included in the span end log.""" | |
| self.fields[key] = value | |
| def __enter__(self): | |
| # Guard for duplicates | |
| active_span = _span_name_var.get() | |
| if active_span == self.name: | |
| dev_mode = os.getenv("DEV_MODE", "false").lower() == "true" | |
| if dev_mode: | |
| raise RuntimeError(f"Duplicate nested span detected: {self.name} in {self.component}") | |
| else: | |
| log_event( | |
| "warning", | |
| f"Duplicate nested span detected: {self.name}", | |
| event="duplicate_span_detected", | |
| component=self.component, | |
| span_name=self.name, | |
| ) | |
| # Store previous context | |
| self.previous_span = _span_name_var.get() | |
| self.previous_component = _component_var.get() | |
| _span_name_var.set(self.name) | |
| _component_var.set(self.component) | |
| log_event("info", f"Started span: {self.name}", event="start", component=self.component) | |
| self.start_time = time.time() | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| duration_ms = (time.time() - self.start_time) * 1000 | |
| # Merge exception info if any | |
| if exc_val: | |
| self.fields["error"] = str(exc_val) | |
| self.fields["error_type"] = exc_type.__name__ if exc_type else "Exception" | |
| # Pop and log | |
| self.obs._pop_span( | |
| span_name=self.name, | |
| component=self.component, | |
| duration_ms=duration_ms, | |
| extra_fields=self.fields, | |
| previous_span=self.previous_span, | |
| previous_component=self.previous_component | |
| ) | |
| def _classify_span(span_name: str, component: str) -> str: | |
| """Helper to categorize spans for structured analytics.""" | |
| if span_name.endswith(".llm"): | |
| return components.LLM | |
| if span_name.endswith(".run") and component == components.AGENT: | |
| return components.AGENT | |
| if component == components.SERVICE: | |
| return components.SERVICE | |
| if component == components.DOMAIN: | |
| return components.DOMAIN | |
| if component == components.ORCHESTRATOR: | |
| return components.ORCHESTRATOR | |
| return "internal" | |
| def _pop_span( | |
| span_name: str, | |
| component: str, | |
| duration_ms: Optional[float] = None, | |
| extra_fields: Optional[Dict[str, Any]] = None, | |
| previous_span: Optional[str] = None, | |
| previous_component: Optional[str] = None | |
| ): | |
| """ | |
| Internal logic to close a span, classify it, and emit the structured log. | |
| """ | |
| fields = extra_fields or {} | |
| # Feature extraction | |
| feature = span_name.split(".")[0] if "." in span_name else span_name | |
| # Classification | |
| span_type = _classify_span(span_name, component) | |
| log_event( | |
| "info", | |
| f"Ended span: {span_name}", | |
| event="span_end", | |
| component=component, | |
| duration_ms=duration_ms, | |
| span_type=span_type, | |
| feature=feature, | |
| **fields | |
| ) | |
| # Restore previous context if provided, otherwise clear if matching | |
| if previous_span is not None or previous_component is not None: | |
| _span_name_var.set(previous_span) | |
| _component_var.set(previous_component) | |
| elif _span_name_var.get() == span_name: | |
| _span_name_var.set(None) | |
| _component_var.set(None) | |
| def start_span(name: str, component: str) -> SpanContext: | |
| """ | |
| Start a new span context. | |
| Returns a SpanContext object that works as a context manager. | |
| """ | |
| import sys | |
| return SpanContext(sys.modules[__name__], name, component) | |
| def end_span(name: str, component: str, **fields): | |
| """ | |
| End the current span context and log the end event. | |
| When using start_span with 'with', this is called automatically. | |
| """ | |
| # Manual end still works and correctly pops the context | |
| _pop_span(span_name=name, component=component, extra_fields=fields) | |
| def log_event( | |
| level: str, message: str, event: str = "info", component: Optional[str] = None, **fields | |
| ): | |
| """ | |
| Log a structured event with optional extra fields. | |
| Args: | |
| level: Log level (e.g., "info", "error", "debug") | |
| message: The log message | |
| event: The type of event (start, end, info, error) | |
| component: Overwrite the current component | |
| **fields: Additional key-value pairs to include in the log | |
| """ | |
| # Use the logger name of the caller or a default | |
| logger = logging.getLogger("observability") | |
| lvl = getattr(logging, level.upper(), logging.INFO) | |
| extra = {"event": event, "fields": fields} | |
| if component: | |
| extra["component"] = component | |
| logger.log(lvl, message, extra=extra) | |