""" Observability Metrics (Phase 3.5.4+) ==================================== Central definition of Prometheus metrics and utility decorators. Includes OpenTelemetry tracing support. """ import time import functools import contextvars from typing import Optional, Dict, Any, Callable from prometheus_client import Counter, Histogram, Gauge # OpenTelemetry imports (optional - gracefully degrade if not installed) try: from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.sdk.resources import Resource from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from opentelemetry.context import Context OTEL_AVAILABLE = True tracer = trace.get_tracer(__name__) propagator = TraceContextTextMapPropagator() except ImportError: OTEL_AVAILABLE = False tracer = None propagator = None # Context variable for trace ID propagation _trace_id_var: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('trace_id', default=None) # ============================================================================= # Prometheus Metrics Definitions # ============================================================================= # --- API Metrics --- API_REQUEST_COUNT = Counter( "haim_api_request_count", "Total API requests", ["method", "endpoint", "status"] ) API_REQUEST_LATENCY = Histogram( "haim_api_request_latency_seconds", "API request latency", ["method", "endpoint"] ) # --- Engine Metrics --- ENGINE_MEMORY_COUNT = Gauge( "haim_engine_memory_total", "Total memories in the system", ["tier"] ) ENGINE_STORE_LATENCY = Histogram( "haim_engine_store_seconds", "Time taken to store memory", ["tier"] ) ENGINE_QUERY_LATENCY = Histogram( "haim_engine_query_seconds", "Time taken to query memories" ) # --- New Metrics (Phase 4.1 Observability) --- STORE_DURATION_SECONDS = Histogram( "mnemocore_store_duration_seconds", "Duration of memory store operations", ["tier"], buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0] ) QUERY_DURATION_SECONDS = Histogram( "mnemocore_query_duration_seconds", "Duration of memory query operations", buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0] ) MEMORY_COUNT_TOTAL = Gauge( "mnemocore_memory_count_total", "Total number of memories by tier", ["tier"] ) QUEUE_LENGTH = Gauge( "mnemocore_queue_length", "Current length of the processing queue" ) ERROR_TOTAL = Counter( "mnemocore_error_total", "Total number of errors", ["error_type"] ) # --- Storage Metrics (Redis/Qdrant) --- STORAGE_OPERATION_COUNT = Counter( "haim_storage_ops_total", "Storage operations", ["backend", "operation", "status"] ) STORAGE_LATENCY = Histogram( "haim_storage_latency_seconds", "Storage operation latency", ["backend", "operation"] ) # --- Bus Metrics --- BUS_EVENTS_PUBLISHED = Counter( "haim_bus_events_published", "Events published to bus", ["type"] ) BUS_EVENTS_CONSUMED = Counter( "haim_bus_events_consumed", "Events consumed from bus", ["consumer", "type"] ) # --- Dream Loop Metrics (Subconscious background processing) --- DREAM_LOOP_TOTAL = Counter( "haim_dream_loop_total", "Total dream cycles completed", ["status"] # success, error ) DREAM_LOOP_ITERATION_SECONDS = Histogram( "haim_dream_iteration_seconds", "Time taken for each dream loop iteration", [] # No labels needed ) DREAM_LOOP_INSIGHTS_GENERATED = Counter( "haim_dream_insights_generated_total", "Total insights generated by dream loop", ["type"] # concept, parallel, meta ) DREAM_LOOP_ACTIVE = Gauge( "haim_dream_loop_active", "Whether the dream loop is currently running (1=active, 0=stopped)" ) # ============================================================================= # OpenTelemetry Configuration # ============================================================================= def init_opentelemetry(service_name: str = "mnemocore", exporter: str = "console") -> Optional["TracerProvider"]: """ Initialize OpenTelemetry tracing. Args: service_name: Name of the service for tracing. exporter: Exporter type ('console', 'otlp', or 'none'). Returns: TracerProvider if OTEL is available, None otherwise. """ if not OTEL_AVAILABLE: return None resource = Resource.create({"service.name": service_name}) provider = TracerProvider(resource=resource) if exporter == "console": processor = BatchSpanProcessor(ConsoleSpanExporter()) provider.add_span_processor(processor) elif exporter == "otlp": try: from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter processor = BatchSpanProcessor(OTLPSpanExporter()) provider.add_span_processor(processor) except ImportError: # Fallback to console if OTLP exporter not available processor = BatchSpanProcessor(ConsoleSpanExporter()) provider.add_span_processor(processor) trace.set_tracer_provider(provider) return provider def get_trace_id() -> Optional[str]: """Get the current trace ID from context.""" return _trace_id_var.get() def set_trace_id(trace_id: str) -> None: """Set the trace ID in context.""" _trace_id_var.set(trace_id) def extract_trace_context(headers: Dict[str, str]) -> Optional[str]: """ Extract trace context from HTTP headers. Args: headers: Dictionary of HTTP headers. Returns: The trace ID if found, None otherwise. """ if not OTEL_AVAILABLE or propagator is None: return None ctx = propagator.extract(headers) span_ctx = trace.get_current_span(ctx).get_span_context() if span_ctx.is_valid: trace_id = format(span_ctx.trace_id, '032x') set_trace_id(trace_id) return trace_id return None def inject_trace_context() -> Dict[str, str]: """ Inject trace context into HTTP headers. Returns: Dictionary with trace headers. """ if not OTEL_AVAILABLE or propagator is None: return {} headers: Dict[str, str] = {} propagator.inject(headers) return headers # ============================================================================= # Decorators # ============================================================================= def track_latency(metric: Histogram, labels: dict = None): """Decorator to track function execution time.""" def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) return result finally: duration = time.time() - start_time if labels: metric.labels(**labels).observe(duration) else: metric.observe(duration) return wrapper return decorator def track_async_latency(metric: Histogram, labels: dict = None): """Decorator to track async function execution time.""" def decorator(func): @functools.wraps(func) async def wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) return result finally: duration = time.time() - start_time if labels: metric.labels(**labels).observe(duration) else: metric.observe(duration) return wrapper return decorator def timer(metric: Histogram, labels: Optional[Dict[str, str]] = None): """ Timer decorator for async functions with OpenTelemetry span support. Usage: @timer(STORE_DURATION_SECONDS, labels={"tier": "hot"}) async def store(...): ... """ def decorator(func: Callable) -> Callable: @functools.wraps(func) async def wrapper(*args, **kwargs): start_time = time.time() # Create OTEL span if available span = None if OTEL_AVAILABLE and tracer is not None: span = tracer.start_span(func.__name__) try: result = await func(*args, **kwargs) return result except Exception as e: # Increment error counter error_type = type(e).__name__ ERROR_TOTAL.labels(error_type=error_type).inc() if span: span.record_exception(e) raise finally: duration = time.time() - start_time # Record Prometheus metric if labels: metric.labels(**labels).observe(duration) else: metric.observe(duration) # End OTEL span if span: span.set_attribute("duration_seconds", duration) trace_id = get_trace_id() if trace_id: span.set_attribute("trace_id", trace_id) span.end() return wrapper return decorator def traced(name: Optional[str] = None): """ Decorator to create an OpenTelemetry span for a function. Usage: @traced("my_operation") async def my_function(...): ... """ def decorator(func: Callable) -> Callable: span_name = name or func.__name__ @functools.wraps(func) async def wrapper(*args, **kwargs): if not OTEL_AVAILABLE or tracer is None: return await func(*args, **kwargs) with tracer.start_as_current_span(span_name) as span: trace_id = get_trace_id() if trace_id: span.set_attribute("trace_id", trace_id) try: result = await func(*args, **kwargs) return result except Exception as e: span.record_exception(e) raise return wrapper return decorator # ============================================================================= # Helper Functions # ============================================================================= def update_memory_count(tier: str, count: int) -> None: """Update the memory count gauge for a specific tier.""" MEMORY_COUNT_TOTAL.labels(tier=tier).set(count) ENGINE_MEMORY_COUNT.labels(tier=tier).set(count) def update_queue_length(length: int) -> None: """Update the queue length gauge.""" QUEUE_LENGTH.set(length) def record_error(error_type: str) -> None: """Record an error in the error counter.""" ERROR_TOTAL.labels(error_type=error_type).inc()