Spaces:
Running
Running
| # ============================================================ | |
| # app/core/observability.py - OpenTelemetry Setup | |
| # ============================================================ | |
| import os | |
| import logging | |
| import time | |
| import uuid | |
| from typing import Optional, Dict, Any | |
| from functools import wraps | |
| from contextlib import contextmanager | |
| import asyncio | |
| from opentelemetry import trace, metrics | |
| from opentelemetry.sdk.trace import TracerProvider | |
| from opentelemetry.sdk.trace.export import BatchSpanProcessor | |
| from opentelemetry.sdk.resources import Resource | |
| from opentelemetry.sdk.metrics import MeterProvider | |
| from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader | |
| from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter | |
| from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter | |
| from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor | |
| from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor | |
| from opentelemetry.instrumentation.requests import RequestsInstrumentor | |
| from opentelemetry.instrumentation.redis import RedisInstrumentor | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| from starlette.requests import Request | |
| logger = logging.getLogger(__name__) | |
| # ============================================================ | |
| # Helper Functions | |
| # ============================================================ | |
| def _parse_otlp_headers(headers_str: str) -> Dict[str, str]: | |
| """Parse OTEL_EXPORTER_OTLP_HEADERS (format: key1=val1,key2=val2)""" | |
| headers = {} | |
| try: | |
| if not headers_str: | |
| return headers | |
| for pair in headers_str.split(","): | |
| if "=" in pair: | |
| key, val = pair.split("=", 1) | |
| headers[key.strip()] = val.strip() | |
| except Exception as e: | |
| logger.warning(f"Failed to parse OTLP headers: {e}") | |
| return headers | |
| # ============================================================ | |
| # Configuration | |
| # ============================================================ | |
| class ObservabilityConfig: | |
| """Observability settings from environment""" | |
| ENABLED: bool = os.getenv("OTEL_ENABLED", "true").lower() == "true" | |
| ENVIRONMENT: str = os.getenv("ENVIRONMENT", "development") | |
| SERVICE_NAME: str = os.getenv("OTEL_SERVICE_NAME", "lojiz-aida") | |
| SERVICE_VERSION: str = os.getenv("OTEL_SERVICE_VERSION", "1.0.0") | |
| # Exporters | |
| OTLP_ENDPOINT: Optional[str] = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT") | |
| OTLP_HEADERS: Dict[str, str] = _parse_otlp_headers( | |
| os.getenv("OTEL_EXPORTER_OTLP_HEADERS", "") | |
| ) | |
| # Sampling | |
| TRACE_SAMPLE_RATE: float = float(os.getenv("OTEL_TRACE_SAMPLE_RATE", "0.1")) | |
| # Console export for development | |
| CONSOLE_EXPORT: bool = os.getenv("OTEL_CONSOLE_EXPORT", "false").lower() == "true" | |
| # ============================================================ | |
| # Initialize Tracing | |
| # ============================================================ | |
| def init_tracing(): | |
| """Initialize OpenTelemetry tracing""" | |
| if not ObservabilityConfig.ENABLED: | |
| logger.info("âš ï¸ OpenTelemetry disabled (OTEL_ENABLED=false)") | |
| return | |
| try: | |
| # Create resource | |
| resource = Resource.create({ | |
| "service.name": ObservabilityConfig.SERVICE_NAME, | |
| "service.version": ObservabilityConfig.SERVICE_VERSION, | |
| "deployment.environment": ObservabilityConfig.ENVIRONMENT, | |
| }) | |
| # Create tracer provider | |
| tracer_provider = TracerProvider(resource=resource) | |
| # Add exporter if configured | |
| if ObservabilityConfig.OTLP_ENDPOINT: | |
| try: | |
| exporter = OTLPSpanExporter( | |
| endpoint=ObservabilityConfig.OTLP_ENDPOINT, | |
| headers=ObservabilityConfig.OTLP_HEADERS, | |
| ) | |
| tracer_provider.add_span_processor(BatchSpanProcessor(exporter)) | |
| logger.info(f"✅ OTLP exporter configured: {ObservabilityConfig.OTLP_ENDPOINT}") | |
| except Exception as e: | |
| logger.warning(f"âš ï¸ OTLP exporter failed: {e}") | |
| # Console export for development | |
| if ObservabilityConfig.CONSOLE_EXPORT or ObservabilityConfig.ENVIRONMENT == "development": | |
| from opentelemetry.sdk.trace.export import SimpleSpanProcessor, ConsoleSpanExporter | |
| tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) | |
| logger.info("✅ Console span exporter enabled") | |
| # Set global tracer provider | |
| trace.set_tracer_provider(tracer_provider) | |
| logger.info("✅ OpenTelemetry tracing initialized") | |
| except Exception as e: | |
| logger.error(f"⌠Failed to initialize tracing: {e}") | |
| raise | |
| # ============================================================ | |
| # Initialize Metrics | |
| # ============================================================ | |
| def init_metrics(): | |
| """Initialize OpenTelemetry metrics""" | |
| if not ObservabilityConfig.ENABLED: | |
| return | |
| try: | |
| resource = Resource.create({ | |
| "service.name": ObservabilityConfig.SERVICE_NAME, | |
| "service.version": ObservabilityConfig.SERVICE_VERSION, | |
| }) | |
| metric_readers = [] | |
| # Add OTLP exporter if configured | |
| if ObservabilityConfig.OTLP_ENDPOINT: | |
| try: | |
| exporter = OTLPMetricExporter( | |
| endpoint=ObservabilityConfig.OTLP_ENDPOINT, | |
| headers=ObservabilityConfig.OTLP_HEADERS, | |
| ) | |
| metric_readers.append( | |
| PeriodicExportingMetricReader(exporter, interval_millis=5000) | |
| ) | |
| logger.info("✅ OTLP metrics exporter configured") | |
| except Exception as e: | |
| logger.warning(f"âš ï¸ OTLP metrics exporter failed: {e}") | |
| meter_provider = MeterProvider( | |
| resource=resource, | |
| metric_readers=metric_readers, | |
| ) | |
| metrics.set_meter_provider(meter_provider) | |
| logger.info("✅ OpenTelemetry metrics initialized") | |
| except Exception as e: | |
| logger.error(f"⌠Failed to initialize metrics: {e}") | |
| # ============================================================ | |
| # Instrumentation | |
| # ============================================================ | |
| def instrument_fastapi(app): | |
| """Auto-instrument FastAPI app""" | |
| if not ObservabilityConfig.ENABLED: | |
| return | |
| try: | |
| FastAPIInstrumentor.instrument_app( | |
| app, | |
| excluded_urls="docs,openapi.json,health", | |
| ) | |
| logger.info("✅ FastAPI instrumented") | |
| except Exception as e: | |
| logger.warning(f"âš ï¸ FastAPI instrumentation failed: {e}") | |
| def instrument_libraries(): | |
| """Auto-instrument external libraries""" | |
| if not ObservabilityConfig.ENABLED: | |
| return | |
| try: | |
| HTTPXClientInstrumentor().instrument() | |
| RequestsInstrumentor().instrument() | |
| RedisInstrumentor().instrument() | |
| logger.info("✅ External libraries instrumented") | |
| except Exception as e: | |
| logger.warning(f"âš ï¸ Library instrumentation failed: {e}") | |
| # ============================================================ | |
| # Custom Tracer & Metrics | |
| # ============================================================ | |
| def get_tracer(name: str = __name__): | |
| """Get a tracer instance""" | |
| return trace.get_tracer(name) | |
| def get_meter(name: str = __name__): | |
| """Get a meter instance""" | |
| return metrics.get_meter(name) | |
| # ============================================================ | |
| # Span Context Manager | |
| # ============================================================ | |
| def trace_operation(operation_name: str, attributes: Optional[Dict[str, Any]] = None): | |
| """Context manager for tracing operations""" | |
| tracer = get_tracer(__name__) | |
| with tracer.start_as_current_span(operation_name) as span: | |
| if attributes: | |
| for key, value in attributes.items(): | |
| if value is not None: | |
| try: | |
| span.set_attribute(key, value) | |
| except Exception: | |
| pass # Skip invalid attributes | |
| try: | |
| yield span | |
| except Exception as e: | |
| span.record_exception(e) | |
| raise | |
| # ============================================================ | |
| # Decorators for tracing functions | |
| # ============================================================ | |
| def trace_function(func): | |
| """Decorator to trace function execution""" | |
| async def async_wrapper(*args, **kwargs): | |
| with trace_operation(f"{func.__module__}.{func.__name__}"): | |
| return await func(*args, **kwargs) | |
| def sync_wrapper(*args, **kwargs): | |
| with trace_operation(f"{func.__module__}.{func.__name__}"): | |
| return func(*args, **kwargs) | |
| if asyncio.iscoroutinefunction(func): | |
| return async_wrapper | |
| return sync_wrapper | |
| # ============================================================ | |
| # Middleware for Request Tracing | |
| # ============================================================ | |
| class RequestContextMiddleware(BaseHTTPMiddleware): | |
| """Add request context to all spans""" | |
| async def dispatch(self, request: Request, call_next): | |
| # Generate or get request ID | |
| request_id = request.headers.get("x-request-id", str(uuid.uuid4())) | |
| # Get current span and set request attributes | |
| current_span = trace.get_current_span() | |
| if current_span and current_span.is_recording(): | |
| current_span.set_attributes({ | |
| "http.request.id": request_id, | |
| "http.request.method": request.method, | |
| "http.request.path": request.url.path, | |
| "http.request.query": str(request.url.query), | |
| }) | |
| # Store in request state for use downstream | |
| request.state.request_id = request_id | |
| # Call next middleware | |
| start_time = time.time() | |
| response = await call_next(request) | |
| duration = time.time() - start_time | |
| # Update span with response | |
| if current_span and current_span.is_recording(): | |
| current_span.set_attributes({ | |
| "http.response.status_code": response.status_code, | |
| "http.duration_ms": int(duration * 1000), | |
| }) | |
| # Add request ID to response headers | |
| response.headers["x-request-id"] = request_id | |
| return response | |
| # ============================================================ | |
| # Token Counter for LLM usage tracking | |
| # ============================================================ | |
| class TokenUsageTracker: | |
| """Track token usage for LLM calls""" | |
| def __init__(self): | |
| self.meter = get_meter(__name__) | |
| self.token_counter = self.meter.create_counter( | |
| name="llm.tokens.used", | |
| unit="1", | |
| description="Total tokens used in LLM calls" | |
| ) | |
| self.cost_counter = self.meter.create_counter( | |
| name="llm.cost", | |
| unit="USD", | |
| description="Total cost of LLM calls" | |
| ) | |
| def record_tokens(self, model: str, prompt_tokens: int, completion_tokens: int, cost: float = 0.0): | |
| """Record token usage""" | |
| total_tokens = prompt_tokens + completion_tokens | |
| self.token_counter.add( | |
| total_tokens, | |
| {"model": model, "type": "total"} | |
| ) | |
| self.token_counter.add( | |
| prompt_tokens, | |
| {"model": model, "type": "prompt"} | |
| ) | |
| self.token_counter.add( | |
| completion_tokens, | |
| {"model": model, "type": "completion"} | |
| ) | |
| if cost > 0: | |
| self.cost_counter.add(cost, {"model": model}) | |
| # ============================================================ | |
| # Export for global use | |
| # ============================================================ | |
| _token_tracker = None | |
| def get_token_tracker() -> TokenUsageTracker: | |
| """Get or create token usage tracker""" | |
| global _token_tracker | |
| if _token_tracker is None: | |
| _token_tracker = TokenUsageTracker() | |
| return _token_tracker |