avfranco's picture
HF Space deploy snapshot (minimal allow-list)
557ee65
import logging
import json
import time
from contextlib import contextmanager
from contextvars import ContextVar
from typing import Any, Dict, Optional, Union
from . import components
# Context variables for trace and span propagation
# These allow us to keep track of trace_id and span context without passing it through every function call
_trace_id_var: ContextVar[Optional[str]] = ContextVar("trace_id", default=None)
_span_name_var: ContextVar[Optional[str]] = ContextVar("span_name", default=None)
_component_var: ContextVar[Optional[str]] = ContextVar("component", default=None)
def set_trace_id(trace_id: str):
"""Set the trace ID in the current context."""
_trace_id_var.set(trace_id)
def get_trace_id() -> Optional[str]:
"""Get the trace ID from the current context."""
return _trace_id_var.get()
def clear_trace_id():
"""Clear the trace ID from the current context."""
_trace_id_var.set(None)
class StructuredFormatter(logging.Formatter):
"""
Format logs as structured JSON records.
Each record includes standard fields plus trace and span context.
"""
def format(self, record: logging.LogRecord) -> str:
# Core fields
log_records = {
"timestamp": self.formatTime(record, self.datefmt),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"trace_id": get_trace_id(),
"span_name": _span_name_var.get(),
"component": _component_var.get() or getattr(record, "component", "unknown"),
"event": getattr(record, "event", "info"),
}
# Add duration if it was added to the record
if hasattr(record, "duration_ms"):
log_records["duration_ms"] = record.duration_ms
# Add extra fields passed via extra={"fields": {...}}
if hasattr(record, "fields") and isinstance(record.fields, dict):
log_records.update(record.fields)
return json.dumps(log_records)
def setup_logging(level: int = logging.INFO):
"""
Configure the root logger to use the StructuredFormatter.
This should be called as early as possible in the application lifecycle.
"""
import os
root_logger = logging.getLogger()
root_logger.setLevel(level)
formatter = StructuredFormatter()
# Configure console handler
if not any(
isinstance(h, logging.StreamHandler) and not isinstance(h, logging.FileHandler)
for h in root_logger.handlers
):
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# Configure file handler
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, "app.log")
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
# Update any existing handlers that might not have our formatter
for handler in root_logger.handlers:
handler.setFormatter(formatter)
# Initialize logging on import
setup_logging()
def get_logger(name: str) -> logging.Logger:
"""Return a logger with the given name."""
return logging.getLogger(name)
def bind_trace(trace_id: str):
"""Bind a trace ID to the current context."""
set_trace_id(trace_id)
def bind_new_trace_id():
"""Create a new UUID and bind it as trace_id to the current context."""
import uuid
trace_id = str(uuid.uuid4())
set_trace_id(trace_id)
return trace_id
def ensure_trace():
"""
Guarantee a trace_id exists in the current context.
If none is active, create and bind a new one.
"""
trace_id = get_trace_id()
if not trace_id:
bind_new_trace_id()
return get_trace_id()
import os
class SpanContext:
"""
Context manager for a span that supports structured metadata.
"""
def __init__(self, obs_module, name: str, component: str):
self.obs = obs_module
self.name = name
self.component = component
self.start_time = None
self.fields = {}
self.previous_span = None
self.previous_component = None
def set_field(self, key: str, value: Any):
"""Add a metadata field to be included in the span end log."""
self.fields[key] = value
def __enter__(self):
# Guard for duplicates
active_span = _span_name_var.get()
if active_span == self.name:
dev_mode = os.getenv("DEV_MODE", "false").lower() == "true"
if dev_mode:
raise RuntimeError(f"Duplicate nested span detected: {self.name} in {self.component}")
else:
log_event(
"warning",
f"Duplicate nested span detected: {self.name}",
event="duplicate_span_detected",
component=self.component,
span_name=self.name,
)
# Store previous context
self.previous_span = _span_name_var.get()
self.previous_component = _component_var.get()
_span_name_var.set(self.name)
_component_var.set(self.component)
log_event("info", f"Started span: {self.name}", event="start", component=self.component)
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
duration_ms = (time.time() - self.start_time) * 1000
# Merge exception info if any
if exc_val:
self.fields["error"] = str(exc_val)
self.fields["error_type"] = exc_type.__name__ if exc_type else "Exception"
# Pop and log
self.obs._pop_span(
span_name=self.name,
component=self.component,
duration_ms=duration_ms,
extra_fields=self.fields,
previous_span=self.previous_span,
previous_component=self.previous_component
)
def _classify_span(span_name: str, component: str) -> str:
"""Helper to categorize spans for structured analytics."""
if span_name.endswith(".llm"):
return components.LLM
if span_name.endswith(".run") and component == components.AGENT:
return components.AGENT
if component == components.SERVICE:
return components.SERVICE
if component == components.DOMAIN:
return components.DOMAIN
if component == components.ORCHESTRATOR:
return components.ORCHESTRATOR
return "internal"
def _pop_span(
span_name: str,
component: str,
duration_ms: Optional[float] = None,
extra_fields: Optional[Dict[str, Any]] = None,
previous_span: Optional[str] = None,
previous_component: Optional[str] = None
):
"""
Internal logic to close a span, classify it, and emit the structured log.
"""
fields = extra_fields or {}
# Feature extraction
feature = span_name.split(".")[0] if "." in span_name else span_name
# Classification
span_type = _classify_span(span_name, component)
log_event(
"info",
f"Ended span: {span_name}",
event="span_end",
component=component,
duration_ms=duration_ms,
span_type=span_type,
feature=feature,
**fields
)
# Restore previous context if provided, otherwise clear if matching
if previous_span is not None or previous_component is not None:
_span_name_var.set(previous_span)
_component_var.set(previous_component)
elif _span_name_var.get() == span_name:
_span_name_var.set(None)
_component_var.set(None)
def start_span(name: str, component: str) -> SpanContext:
"""
Start a new span context.
Returns a SpanContext object that works as a context manager.
"""
import sys
return SpanContext(sys.modules[__name__], name, component)
def end_span(name: str, component: str, **fields):
"""
End the current span context and log the end event.
When using start_span with 'with', this is called automatically.
"""
# Manual end still works and correctly pops the context
_pop_span(span_name=name, component=component, extra_fields=fields)
def log_event(
level: str, message: str, event: str = "info", component: Optional[str] = None, **fields
):
"""
Log a structured event with optional extra fields.
Args:
level: Log level (e.g., "info", "error", "debug")
message: The log message
event: The type of event (start, end, info, error)
component: Overwrite the current component
**fields: Additional key-value pairs to include in the log
"""
# Use the logger name of the caller or a default
logger = logging.getLogger("observability")
lvl = getattr(logging, level.upper(), logging.INFO)
extra = {"event": event, "fields": fields}
if component:
extra["component"] = component
logger.log(lvl, message, extra=extra)