""" Jaeger Distributed Tracing Integration OpenTelemetry-based distributed tracing for microservices """ import os import time from typing import Optional from opentelemetry import trace from opentelemetry.exporter.jaeger.thrift import JaegerExporter from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.trace import Status, StatusCode from opentelemetry.trace.propagation.tracecontext import TraceContextPropagator from core.logging.advanced_logging import structured_logger class JaegerTracer: """Jaeger distributed tracing integration""" def __init__(self, service_name: str = "zenith-backend"): self.service_name = service_name self.tracer = None self.logger = structured_logger self._setup_tracing() def _setup_tracing(self): """Initialize Jaeger tracing""" # Set up Jaeger exporter jaeger_host = os.getenv("JAEGER_HOST", "localhost") jaeger_port = int(os.getenv("JAEGER_PORT", "6831")) jaeger_exporter = JaegerExporter( agent_host_name=jaeger_host, agent_port=jaeger_port, ) # Set up trace provider trace.set_tracer_provider(TracerProvider()) tracer_provider = trace.get_tracer_provider() # Add span processor span_processor = BatchSpanProcessor(jaeger_exporter) tracer_provider.add_span_processor(span_processor) # Set global propagator for trace context trace.set_global_textmap(TraceContextPropagator()) # Get tracer self.tracer = trace.get_tracer(__name__) self.logger.info(f"Jaeger tracing initialized for service: {self.service_name}") def start_span(self, name: str, kind: trace.SpanKind = trace.SpanKind.INTERNAL): """Start a new span""" if not self.tracer: return None return self.tracer.start_as_current_span(name, kind=kind) def create_child_span(self, parent_span, name: str): """Create a child span""" if not self.tracer or not parent_span: return None return self.tracer.start_as_current_span(name, parent=parent_span) def set_span_attributes(self, span, attributes: dict): """Set attributes on a span""" if span: for key, value in attributes.items(): span.set_attribute(key, value) def set_span_status(self, span, status_code: StatusCode, description: str = ""): """Set span status""" if span: span.set_status(Status(status_code, description)) def record_exception(self, span, exception: Exception): """Record an exception on a span""" if span: span.record_exception(exception) span.set_status(Status(StatusCode.ERROR, str(exception))) def add_span_event(self, span, name: str, attributes: Optional[dict] = None): """Add an event to a span""" if span: span.add_event(name, attributes or {}) class TracingMiddleware: """FastAPI middleware for distributed tracing""" def __init__(self, app, tracer: JaegerTracer): self.app = app self.tracer = tracer async def __call__(self, scope, receive, send): if scope["type"] != "http": await self.app(scope, receive, send) return # Extract trace context from headers carrier = {} for header_name, header_value in scope.get("headers", []): if header_name.startswith(b"traceparent"): carrier["traceparent"] = header_value.decode() # Start span with self.tracer.start_span("http_request") as span: if span: # Set span attributes method = scope.get("method", "UNKNOWN") path = scope.get("path", "/") query_string = scope.get("query_string", b"").decode() span.set_attribute("http.method", method) span.set_attribute("http.url", f"{path}?{query_string}" if query_string else path) span.set_attribute("http.scheme", scope.get("scheme", "http")) span.set_attribute("http.host", scope.get("server", ["localhost"])[0]) # Add request headers as span attributes (excluding sensitive ones) headers = dict(scope.get("headers", [])) safe_headers = {} for header_name, header_value in headers.items(): header_name_str = header_name.decode().lower() # Skip sensitive headers if header_name_str not in ['authorization', 'cookie', 'x-api-key']: safe_headers[header_name_str] = header_value.decode()[:100] # Truncate long headers span.set_attribute("http.request_headers", str(safe_headers)) # Track response response_status = [200] response_headers = {} request_start = time.time() async def capture_response(message): nonlocal response_status, response_headers if message["type"] == "http.response.start": response_status[0] = message["status"] response_headers = dict(message.get("headers", [])) elif message["type"] == "http.response.body": # Could track response size here pass await send(message) try: await self.app(scope, receive, capture_response) # Set response attributes duration = time.time() - request_start span.set_attribute("http.status_code", response_status[0]) span.set_attribute("http.response_time", duration) # Set span status based on response if response_status[0] >= 500: span.set_status(Status(StatusCode.ERROR, f"HTTP {response_status[0]}")) elif response_status[0] >= 400: span.set_status(Status(StatusCode.ERROR, f"HTTP {response_status[0]}")) else: span.set_status(Status(StatusCode.OK)) except Exception as e: span.record_exception(e) span.set_status(Status(StatusCode.ERROR, str(e))) raise class DatabaseTracing: """Database operation tracing""" def __init__(self, tracer: JaegerTracer): self.tracer = tracer def trace_query(self, query: str, parameters: Optional[dict] = None): """Decorator for tracing database queries""" def decorator(func): async def wrapper(*args, **kwargs): operation_name = f"db.{func.__name__}" with self.tracer.start_span(operation_name) as span: if span: span.set_attribute("db.query", query[:500]) # Truncate long queries if parameters: # Don't log sensitive parameters safe_params = {} for key, value in parameters.items(): if key.lower() not in ['password', 'token', 'secret']: safe_params[key] = str(value)[:100] # Truncate values span.set_attribute("db.parameters", str(safe_params)) span.set_attribute("db.operation", func.__name__) start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time if span: span.set_attribute("db.duration", duration) span.set_attribute("db.success", True) return result except Exception as e: duration = time.time() - start_time if span: span.set_attribute("db.duration", duration) span.set_attribute("db.success", False) span.record_exception(e) raise return wrapper return decorator class ExternalServiceTracing: """External service call tracing""" def __init__(self, tracer: JaegerTracer): self.tracer = tracer async def trace_http_call(self, method: str, url: str, status_code: int, duration: float, service_name: str = "external"): """Trace external HTTP calls""" span_name = f"http.{service_name}" with self.tracer.start_span(span_name) as span: if span: span.set_attribute("http.method", method) span.set_attribute("http.url", url) span.set_attribute("http.status_code", status_code) span.set_attribute("http.duration", duration) span.set_attribute("service.name", service_name) if status_code >= 400: span.set_status(Status(StatusCode.ERROR, f"HTTP {status_code}")) else: span.set_status(Status(StatusCode.OK)) # Global tracer instance jaeger_tracer = JaegerTracer() db_tracer = DatabaseTracing(jaeger_tracer) external_tracer = ExternalServiceTracing(jaeger_tracer) def get_tracer() -> JaegerTracer: """Get the global Jaeger tracer instance""" return jaeger_tracer def trace_database_operation(operation_name: str): """Decorator for tracing database operations""" def decorator(func): async def wrapper(*args, **kwargs): with jaeger_tracer.start_span(f"db.{operation_name}") as span: if span: span.set_attribute("db.operation", operation_name) span.set_attribute("db.table", kwargs.get('table', 'unknown')) try: result = await func(*args, **kwargs) if span: span.set_attribute("db.success", True) return result except Exception as e: if span: span.set_attribute("db.success", False) span.record_exception(e) raise return wrapper return decorator def trace_external_call(service_name: str): """Decorator for tracing external service calls""" def decorator(func): async def wrapper(*args, **kwargs): with jaeger_tracer.start_span(f"external.{service_name}") as span: if span: span.set_attribute("external.service", service_name) span.set_attribute("external.operation", func.__name__) start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time if span: span.set_attribute("external.duration", duration) span.set_attribute("external.success", True) return result except Exception as e: duration = time.time() - start_time if span: span.set_attribute("external.duration", duration) span.set_attribute("external.success", False) span.record_exception(e) raise return wrapper return decorator