Spaces:
Paused
Paused
| """ | |
| Jaeger Distributed Tracing Integration | |
| OpenTelemetry-based distributed tracing for microservices | |
| """ | |
| import os | |
| import time | |
| from typing import Optional | |
| from opentelemetry import trace | |
| from opentelemetry.exporter.jaeger.thrift import JaegerExporter | |
| from opentelemetry.sdk.trace import TracerProvider | |
| from opentelemetry.sdk.trace.export import BatchSpanProcessor | |
| from opentelemetry.trace import Status, StatusCode | |
| from opentelemetry.trace.propagation.tracecontext import TraceContextPropagator | |
| from core.logging.advanced_logging import structured_logger | |
| class JaegerTracer: | |
| """Jaeger distributed tracing integration""" | |
| def __init__(self, service_name: str = "zenith-backend"): | |
| self.service_name = service_name | |
| self.tracer = None | |
| self.logger = structured_logger | |
| self._setup_tracing() | |
| def _setup_tracing(self): | |
| """Initialize Jaeger tracing""" | |
| # Set up Jaeger exporter | |
| jaeger_host = os.getenv("JAEGER_HOST", "localhost") | |
| jaeger_port = int(os.getenv("JAEGER_PORT", "6831")) | |
| jaeger_exporter = JaegerExporter( | |
| agent_host_name=jaeger_host, | |
| agent_port=jaeger_port, | |
| ) | |
| # Set up trace provider | |
| trace.set_tracer_provider(TracerProvider()) | |
| tracer_provider = trace.get_tracer_provider() | |
| # Add span processor | |
| span_processor = BatchSpanProcessor(jaeger_exporter) | |
| tracer_provider.add_span_processor(span_processor) | |
| # Set global propagator for trace context | |
| trace.set_global_textmap(TraceContextPropagator()) | |
| # Get tracer | |
| self.tracer = trace.get_tracer(__name__) | |
| self.logger.info(f"Jaeger tracing initialized for service: {self.service_name}") | |
| def start_span(self, name: str, kind: trace.SpanKind = trace.SpanKind.INTERNAL): | |
| """Start a new span""" | |
| if not self.tracer: | |
| return None | |
| return self.tracer.start_as_current_span(name, kind=kind) | |
| def create_child_span(self, parent_span, name: str): | |
| """Create a child span""" | |
| if not self.tracer or not parent_span: | |
| return None | |
| return self.tracer.start_as_current_span(name, parent=parent_span) | |
| def set_span_attributes(self, span, attributes: dict): | |
| """Set attributes on a span""" | |
| if span: | |
| for key, value in attributes.items(): | |
| span.set_attribute(key, value) | |
| def set_span_status(self, span, status_code: StatusCode, description: str = ""): | |
| """Set span status""" | |
| if span: | |
| span.set_status(Status(status_code, description)) | |
| def record_exception(self, span, exception: Exception): | |
| """Record an exception on a span""" | |
| if span: | |
| span.record_exception(exception) | |
| span.set_status(Status(StatusCode.ERROR, str(exception))) | |
| def add_span_event(self, span, name: str, attributes: Optional[dict] = None): | |
| """Add an event to a span""" | |
| if span: | |
| span.add_event(name, attributes or {}) | |
| class TracingMiddleware: | |
| """FastAPI middleware for distributed tracing""" | |
| def __init__(self, app, tracer: JaegerTracer): | |
| self.app = app | |
| self.tracer = tracer | |
| async def __call__(self, scope, receive, send): | |
| if scope["type"] != "http": | |
| await self.app(scope, receive, send) | |
| return | |
| # Extract trace context from headers | |
| carrier = {} | |
| for header_name, header_value in scope.get("headers", []): | |
| if header_name.startswith(b"traceparent"): | |
| carrier["traceparent"] = header_value.decode() | |
| # Start span | |
| with self.tracer.start_span("http_request") as span: | |
| if span: | |
| # Set span attributes | |
| method = scope.get("method", "UNKNOWN") | |
| path = scope.get("path", "/") | |
| query_string = scope.get("query_string", b"").decode() | |
| span.set_attribute("http.method", method) | |
| span.set_attribute("http.url", f"{path}?{query_string}" if query_string else path) | |
| span.set_attribute("http.scheme", scope.get("scheme", "http")) | |
| span.set_attribute("http.host", scope.get("server", ["localhost"])[0]) | |
| # Add request headers as span attributes (excluding sensitive ones) | |
| headers = dict(scope.get("headers", [])) | |
| safe_headers = {} | |
| for header_name, header_value in headers.items(): | |
| header_name_str = header_name.decode().lower() | |
| # Skip sensitive headers | |
| if header_name_str not in ['authorization', 'cookie', 'x-api-key']: | |
| safe_headers[header_name_str] = header_value.decode()[:100] # Truncate long headers | |
| span.set_attribute("http.request_headers", str(safe_headers)) | |
| # Track response | |
| response_status = [200] | |
| response_headers = {} | |
| request_start = time.time() | |
| async def capture_response(message): | |
| nonlocal response_status, response_headers | |
| if message["type"] == "http.response.start": | |
| response_status[0] = message["status"] | |
| response_headers = dict(message.get("headers", [])) | |
| elif message["type"] == "http.response.body": | |
| # Could track response size here | |
| pass | |
| await send(message) | |
| try: | |
| await self.app(scope, receive, capture_response) | |
| # Set response attributes | |
| duration = time.time() - request_start | |
| span.set_attribute("http.status_code", response_status[0]) | |
| span.set_attribute("http.response_time", duration) | |
| # Set span status based on response | |
| if response_status[0] >= 500: | |
| span.set_status(Status(StatusCode.ERROR, f"HTTP {response_status[0]}")) | |
| elif response_status[0] >= 400: | |
| span.set_status(Status(StatusCode.ERROR, f"HTTP {response_status[0]}")) | |
| else: | |
| span.set_status(Status(StatusCode.OK)) | |
| except Exception as e: | |
| span.record_exception(e) | |
| span.set_status(Status(StatusCode.ERROR, str(e))) | |
| raise | |
| class DatabaseTracing: | |
| """Database operation tracing""" | |
| def __init__(self, tracer: JaegerTracer): | |
| self.tracer = tracer | |
| def trace_query(self, query: str, parameters: Optional[dict] = None): | |
| """Decorator for tracing database queries""" | |
| def decorator(func): | |
| async def wrapper(*args, **kwargs): | |
| operation_name = f"db.{func.__name__}" | |
| with self.tracer.start_span(operation_name) as span: | |
| if span: | |
| span.set_attribute("db.query", query[:500]) # Truncate long queries | |
| if parameters: | |
| # Don't log sensitive parameters | |
| safe_params = {} | |
| for key, value in parameters.items(): | |
| if key.lower() not in ['password', 'token', 'secret']: | |
| safe_params[key] = str(value)[:100] # Truncate values | |
| span.set_attribute("db.parameters", str(safe_params)) | |
| span.set_attribute("db.operation", func.__name__) | |
| start_time = time.time() | |
| try: | |
| result = await func(*args, **kwargs) | |
| duration = time.time() - start_time | |
| if span: | |
| span.set_attribute("db.duration", duration) | |
| span.set_attribute("db.success", True) | |
| return result | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| if span: | |
| span.set_attribute("db.duration", duration) | |
| span.set_attribute("db.success", False) | |
| span.record_exception(e) | |
| raise | |
| return wrapper | |
| return decorator | |
| class ExternalServiceTracing: | |
| """External service call tracing""" | |
| def __init__(self, tracer: JaegerTracer): | |
| self.tracer = tracer | |
| async def trace_http_call(self, method: str, url: str, status_code: int, duration: float, service_name: str = "external"): | |
| """Trace external HTTP calls""" | |
| span_name = f"http.{service_name}" | |
| with self.tracer.start_span(span_name) as span: | |
| if span: | |
| span.set_attribute("http.method", method) | |
| span.set_attribute("http.url", url) | |
| span.set_attribute("http.status_code", status_code) | |
| span.set_attribute("http.duration", duration) | |
| span.set_attribute("service.name", service_name) | |
| if status_code >= 400: | |
| span.set_status(Status(StatusCode.ERROR, f"HTTP {status_code}")) | |
| else: | |
| span.set_status(Status(StatusCode.OK)) | |
| # Global tracer instance | |
| jaeger_tracer = JaegerTracer() | |
| db_tracer = DatabaseTracing(jaeger_tracer) | |
| external_tracer = ExternalServiceTracing(jaeger_tracer) | |
| def get_tracer() -> JaegerTracer: | |
| """Get the global Jaeger tracer instance""" | |
| return jaeger_tracer | |
| def trace_database_operation(operation_name: str): | |
| """Decorator for tracing database operations""" | |
| def decorator(func): | |
| async def wrapper(*args, **kwargs): | |
| with jaeger_tracer.start_span(f"db.{operation_name}") as span: | |
| if span: | |
| span.set_attribute("db.operation", operation_name) | |
| span.set_attribute("db.table", kwargs.get('table', 'unknown')) | |
| try: | |
| result = await func(*args, **kwargs) | |
| if span: | |
| span.set_attribute("db.success", True) | |
| return result | |
| except Exception as e: | |
| if span: | |
| span.set_attribute("db.success", False) | |
| span.record_exception(e) | |
| raise | |
| return wrapper | |
| return decorator | |
| def trace_external_call(service_name: str): | |
| """Decorator for tracing external service calls""" | |
| def decorator(func): | |
| async def wrapper(*args, **kwargs): | |
| with jaeger_tracer.start_span(f"external.{service_name}") as span: | |
| if span: | |
| span.set_attribute("external.service", service_name) | |
| span.set_attribute("external.operation", func.__name__) | |
| start_time = time.time() | |
| try: | |
| result = await func(*args, **kwargs) | |
| duration = time.time() - start_time | |
| if span: | |
| span.set_attribute("external.duration", duration) | |
| span.set_attribute("external.success", True) | |
| return result | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| if span: | |
| span.set_attribute("external.duration", duration) | |
| span.set_attribute("external.success", False) | |
| span.record_exception(e) | |
| raise | |
| return wrapper | |
| return decorator | |