zenith-backend / core /monitoring /jaeger_tracing.py
teoat's picture
Upload folder using huggingface_hub
4ae946d verified
"""
Jaeger Distributed Tracing Integration
OpenTelemetry-based distributed tracing for microservices
"""
import os
import time
from typing import Optional
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import Status, StatusCode
from opentelemetry.trace.propagation.tracecontext import TraceContextPropagator
from core.logging.advanced_logging import structured_logger
class JaegerTracer:
"""Jaeger distributed tracing integration"""
def __init__(self, service_name: str = "zenith-backend"):
self.service_name = service_name
self.tracer = None
self.logger = structured_logger
self._setup_tracing()
def _setup_tracing(self):
"""Initialize Jaeger tracing"""
# Set up Jaeger exporter
jaeger_host = os.getenv("JAEGER_HOST", "localhost")
jaeger_port = int(os.getenv("JAEGER_PORT", "6831"))
jaeger_exporter = JaegerExporter(
agent_host_name=jaeger_host,
agent_port=jaeger_port,
)
# Set up trace provider
trace.set_tracer_provider(TracerProvider())
tracer_provider = trace.get_tracer_provider()
# Add span processor
span_processor = BatchSpanProcessor(jaeger_exporter)
tracer_provider.add_span_processor(span_processor)
# Set global propagator for trace context
trace.set_global_textmap(TraceContextPropagator())
# Get tracer
self.tracer = trace.get_tracer(__name__)
self.logger.info(f"Jaeger tracing initialized for service: {self.service_name}")
def start_span(self, name: str, kind: trace.SpanKind = trace.SpanKind.INTERNAL):
"""Start a new span"""
if not self.tracer:
return None
return self.tracer.start_as_current_span(name, kind=kind)
def create_child_span(self, parent_span, name: str):
"""Create a child span"""
if not self.tracer or not parent_span:
return None
return self.tracer.start_as_current_span(name, parent=parent_span)
def set_span_attributes(self, span, attributes: dict):
"""Set attributes on a span"""
if span:
for key, value in attributes.items():
span.set_attribute(key, value)
def set_span_status(self, span, status_code: StatusCode, description: str = ""):
"""Set span status"""
if span:
span.set_status(Status(status_code, description))
def record_exception(self, span, exception: Exception):
"""Record an exception on a span"""
if span:
span.record_exception(exception)
span.set_status(Status(StatusCode.ERROR, str(exception)))
def add_span_event(self, span, name: str, attributes: Optional[dict] = None):
"""Add an event to a span"""
if span:
span.add_event(name, attributes or {})
class TracingMiddleware:
"""FastAPI middleware for distributed tracing"""
def __init__(self, app, tracer: JaegerTracer):
self.app = app
self.tracer = tracer
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
# Extract trace context from headers
carrier = {}
for header_name, header_value in scope.get("headers", []):
if header_name.startswith(b"traceparent"):
carrier["traceparent"] = header_value.decode()
# Start span
with self.tracer.start_span("http_request") as span:
if span:
# Set span attributes
method = scope.get("method", "UNKNOWN")
path = scope.get("path", "/")
query_string = scope.get("query_string", b"").decode()
span.set_attribute("http.method", method)
span.set_attribute("http.url", f"{path}?{query_string}" if query_string else path)
span.set_attribute("http.scheme", scope.get("scheme", "http"))
span.set_attribute("http.host", scope.get("server", ["localhost"])[0])
# Add request headers as span attributes (excluding sensitive ones)
headers = dict(scope.get("headers", []))
safe_headers = {}
for header_name, header_value in headers.items():
header_name_str = header_name.decode().lower()
# Skip sensitive headers
if header_name_str not in ['authorization', 'cookie', 'x-api-key']:
safe_headers[header_name_str] = header_value.decode()[:100] # Truncate long headers
span.set_attribute("http.request_headers", str(safe_headers))
# Track response
response_status = [200]
response_headers = {}
request_start = time.time()
async def capture_response(message):
nonlocal response_status, response_headers
if message["type"] == "http.response.start":
response_status[0] = message["status"]
response_headers = dict(message.get("headers", []))
elif message["type"] == "http.response.body":
# Could track response size here
pass
await send(message)
try:
await self.app(scope, receive, capture_response)
# Set response attributes
duration = time.time() - request_start
span.set_attribute("http.status_code", response_status[0])
span.set_attribute("http.response_time", duration)
# Set span status based on response
if response_status[0] >= 500:
span.set_status(Status(StatusCode.ERROR, f"HTTP {response_status[0]}"))
elif response_status[0] >= 400:
span.set_status(Status(StatusCode.ERROR, f"HTTP {response_status[0]}"))
else:
span.set_status(Status(StatusCode.OK))
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
class DatabaseTracing:
"""Database operation tracing"""
def __init__(self, tracer: JaegerTracer):
self.tracer = tracer
def trace_query(self, query: str, parameters: Optional[dict] = None):
"""Decorator for tracing database queries"""
def decorator(func):
async def wrapper(*args, **kwargs):
operation_name = f"db.{func.__name__}"
with self.tracer.start_span(operation_name) as span:
if span:
span.set_attribute("db.query", query[:500]) # Truncate long queries
if parameters:
# Don't log sensitive parameters
safe_params = {}
for key, value in parameters.items():
if key.lower() not in ['password', 'token', 'secret']:
safe_params[key] = str(value)[:100] # Truncate values
span.set_attribute("db.parameters", str(safe_params))
span.set_attribute("db.operation", func.__name__)
start_time = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
if span:
span.set_attribute("db.duration", duration)
span.set_attribute("db.success", True)
return result
except Exception as e:
duration = time.time() - start_time
if span:
span.set_attribute("db.duration", duration)
span.set_attribute("db.success", False)
span.record_exception(e)
raise
return wrapper
return decorator
class ExternalServiceTracing:
"""External service call tracing"""
def __init__(self, tracer: JaegerTracer):
self.tracer = tracer
async def trace_http_call(self, method: str, url: str, status_code: int, duration: float, service_name: str = "external"):
"""Trace external HTTP calls"""
span_name = f"http.{service_name}"
with self.tracer.start_span(span_name) as span:
if span:
span.set_attribute("http.method", method)
span.set_attribute("http.url", url)
span.set_attribute("http.status_code", status_code)
span.set_attribute("http.duration", duration)
span.set_attribute("service.name", service_name)
if status_code >= 400:
span.set_status(Status(StatusCode.ERROR, f"HTTP {status_code}"))
else:
span.set_status(Status(StatusCode.OK))
# Global tracer instance
jaeger_tracer = JaegerTracer()
db_tracer = DatabaseTracing(jaeger_tracer)
external_tracer = ExternalServiceTracing(jaeger_tracer)
def get_tracer() -> JaegerTracer:
"""Get the global Jaeger tracer instance"""
return jaeger_tracer
def trace_database_operation(operation_name: str):
"""Decorator for tracing database operations"""
def decorator(func):
async def wrapper(*args, **kwargs):
with jaeger_tracer.start_span(f"db.{operation_name}") as span:
if span:
span.set_attribute("db.operation", operation_name)
span.set_attribute("db.table", kwargs.get('table', 'unknown'))
try:
result = await func(*args, **kwargs)
if span:
span.set_attribute("db.success", True)
return result
except Exception as e:
if span:
span.set_attribute("db.success", False)
span.record_exception(e)
raise
return wrapper
return decorator
def trace_external_call(service_name: str):
"""Decorator for tracing external service calls"""
def decorator(func):
async def wrapper(*args, **kwargs):
with jaeger_tracer.start_span(f"external.{service_name}") as span:
if span:
span.set_attribute("external.service", service_name)
span.set_attribute("external.operation", func.__name__)
start_time = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
if span:
span.set_attribute("external.duration", duration)
span.set_attribute("external.success", True)
return result
except Exception as e:
duration = time.time() - start_time
if span:
span.set_attribute("external.duration", duration)
span.set_attribute("external.success", False)
span.record_exception(e)
raise
return wrapper
return decorator