| """
|
| Observability Metrics (Phase 3.5.4+)
|
| ====================================
|
| Central definition of Prometheus metrics and utility decorators.
|
| Includes OpenTelemetry tracing support.
|
| """
|
|
|
| import time
|
| import functools
|
| import contextvars
|
| from typing import Optional, Dict, Any, Callable
|
|
|
| from prometheus_client import Counter, Histogram, Gauge
|
|
|
|
|
| try:
|
| from opentelemetry import trace
|
| from opentelemetry.sdk.trace import TracerProvider
|
| from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
|
| from opentelemetry.sdk.resources import Resource
|
| from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
|
| from opentelemetry.context import Context
|
|
|
| OTEL_AVAILABLE = True
|
| tracer = trace.get_tracer(__name__)
|
| propagator = TraceContextTextMapPropagator()
|
| except ImportError:
|
| OTEL_AVAILABLE = False
|
| tracer = None
|
| propagator = None
|
|
|
|
|
| _trace_id_var: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('trace_id', default=None)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| API_REQUEST_COUNT = Counter(
|
| "haim_api_request_count",
|
| "Total API requests",
|
| ["method", "endpoint", "status"]
|
| )
|
| API_REQUEST_LATENCY = Histogram(
|
| "haim_api_request_latency_seconds",
|
| "API request latency",
|
| ["method", "endpoint"]
|
| )
|
|
|
|
|
| ENGINE_MEMORY_COUNT = Gauge(
|
| "haim_engine_memory_total",
|
| "Total memories in the system",
|
| ["tier"]
|
| )
|
| ENGINE_STORE_LATENCY = Histogram(
|
| "haim_engine_store_seconds",
|
| "Time taken to store memory",
|
| ["tier"]
|
| )
|
| ENGINE_QUERY_LATENCY = Histogram(
|
| "haim_engine_query_seconds",
|
| "Time taken to query memories"
|
| )
|
|
|
|
|
| STORE_DURATION_SECONDS = Histogram(
|
| "mnemocore_store_duration_seconds",
|
| "Duration of memory store operations",
|
| ["tier"],
|
| buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
|
| )
|
|
|
| QUERY_DURATION_SECONDS = Histogram(
|
| "mnemocore_query_duration_seconds",
|
| "Duration of memory query operations",
|
| buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
|
| )
|
|
|
| MEMORY_COUNT_TOTAL = Gauge(
|
| "mnemocore_memory_count_total",
|
| "Total number of memories by tier",
|
| ["tier"]
|
| )
|
|
|
| QUEUE_LENGTH = Gauge(
|
| "mnemocore_queue_length",
|
| "Current length of the processing queue"
|
| )
|
|
|
| ERROR_TOTAL = Counter(
|
| "mnemocore_error_total",
|
| "Total number of errors",
|
| ["error_type"]
|
| )
|
|
|
|
|
| STORAGE_OPERATION_COUNT = Counter(
|
| "haim_storage_ops_total",
|
| "Storage operations",
|
| ["backend", "operation", "status"]
|
| )
|
| STORAGE_LATENCY = Histogram(
|
| "haim_storage_latency_seconds",
|
| "Storage operation latency",
|
| ["backend", "operation"]
|
| )
|
|
|
|
|
| BUS_EVENTS_PUBLISHED = Counter(
|
| "haim_bus_events_published",
|
| "Events published to bus",
|
| ["type"]
|
| )
|
| BUS_EVENTS_CONSUMED = Counter(
|
| "haim_bus_events_consumed",
|
| "Events consumed from bus",
|
| ["consumer", "type"]
|
| )
|
|
|
|
|
| DREAM_LOOP_TOTAL = Counter(
|
| "haim_dream_loop_total",
|
| "Total dream cycles completed",
|
| ["status"]
|
| )
|
| DREAM_LOOP_ITERATION_SECONDS = Histogram(
|
| "haim_dream_iteration_seconds",
|
| "Time taken for each dream loop iteration",
|
| []
|
| )
|
| DREAM_LOOP_INSIGHTS_GENERATED = Counter(
|
| "haim_dream_insights_generated_total",
|
| "Total insights generated by dream loop",
|
| ["type"]
|
| )
|
| DREAM_LOOP_ACTIVE = Gauge(
|
| "haim_dream_loop_active",
|
| "Whether the dream loop is currently running (1=active, 0=stopped)"
|
| )
|
|
|
|
|
|
|
|
|
|
|
|
|
| def init_opentelemetry(service_name: str = "mnemocore", exporter: str = "console") -> Optional["TracerProvider"]:
|
| """
|
| Initialize OpenTelemetry tracing.
|
|
|
| Args:
|
| service_name: Name of the service for tracing.
|
| exporter: Exporter type ('console', 'otlp', or 'none').
|
|
|
| Returns:
|
| TracerProvider if OTEL is available, None otherwise.
|
| """
|
| if not OTEL_AVAILABLE:
|
| return None
|
|
|
| resource = Resource.create({"service.name": service_name})
|
| provider = TracerProvider(resource=resource)
|
|
|
| if exporter == "console":
|
| processor = BatchSpanProcessor(ConsoleSpanExporter())
|
| provider.add_span_processor(processor)
|
| elif exporter == "otlp":
|
| try:
|
| from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
| processor = BatchSpanProcessor(OTLPSpanExporter())
|
| provider.add_span_processor(processor)
|
| except ImportError:
|
|
|
| processor = BatchSpanProcessor(ConsoleSpanExporter())
|
| provider.add_span_processor(processor)
|
|
|
| trace.set_tracer_provider(provider)
|
| return provider
|
|
|
|
|
| def get_trace_id() -> Optional[str]:
|
| """Get the current trace ID from context."""
|
| return _trace_id_var.get()
|
|
|
|
|
| def set_trace_id(trace_id: str) -> None:
|
| """Set the trace ID in context."""
|
| _trace_id_var.set(trace_id)
|
|
|
|
|
| def extract_trace_context(headers: Dict[str, str]) -> Optional[str]:
|
| """
|
| Extract trace context from HTTP headers.
|
|
|
| Args:
|
| headers: Dictionary of HTTP headers.
|
|
|
| Returns:
|
| The trace ID if found, None otherwise.
|
| """
|
| if not OTEL_AVAILABLE or propagator is None:
|
| return None
|
|
|
| ctx = propagator.extract(headers)
|
| span_ctx = trace.get_current_span(ctx).get_span_context()
|
|
|
| if span_ctx.is_valid:
|
| trace_id = format(span_ctx.trace_id, '032x')
|
| set_trace_id(trace_id)
|
| return trace_id
|
|
|
| return None
|
|
|
|
|
| def inject_trace_context() -> Dict[str, str]:
|
| """
|
| Inject trace context into HTTP headers.
|
|
|
| Returns:
|
| Dictionary with trace headers.
|
| """
|
| if not OTEL_AVAILABLE or propagator is None:
|
| return {}
|
|
|
| headers: Dict[str, str] = {}
|
| propagator.inject(headers)
|
| return headers
|
|
|
|
|
|
|
|
|
|
|
|
|
| def track_latency(metric: Histogram, labels: dict = None):
|
| """Decorator to track function execution time."""
|
| def decorator(func):
|
| @functools.wraps(func)
|
| def wrapper(*args, **kwargs):
|
| start_time = time.time()
|
| try:
|
| result = func(*args, **kwargs)
|
| return result
|
| finally:
|
| duration = time.time() - start_time
|
| if labels:
|
| metric.labels(**labels).observe(duration)
|
| else:
|
| metric.observe(duration)
|
| return wrapper
|
| return decorator
|
|
|
|
|
| def track_async_latency(metric: Histogram, labels: dict = None):
|
| """Decorator to track async function execution time."""
|
| def decorator(func):
|
| @functools.wraps(func)
|
| async def wrapper(*args, **kwargs):
|
| start_time = time.time()
|
| try:
|
| result = await func(*args, **kwargs)
|
| return result
|
| finally:
|
| duration = time.time() - start_time
|
| if labels:
|
| metric.labels(**labels).observe(duration)
|
| else:
|
| metric.observe(duration)
|
| return wrapper
|
| return decorator
|
|
|
|
|
| def timer(metric: Histogram, labels: Optional[Dict[str, str]] = None):
|
| """
|
| Timer decorator for async functions with OpenTelemetry span support.
|
|
|
| Usage:
|
| @timer(STORE_DURATION_SECONDS, labels={"tier": "hot"})
|
| async def store(...):
|
| ...
|
| """
|
| def decorator(func: Callable) -> Callable:
|
| @functools.wraps(func)
|
| async def wrapper(*args, **kwargs):
|
| start_time = time.time()
|
|
|
|
|
| span = None
|
| if OTEL_AVAILABLE and tracer is not None:
|
| span = tracer.start_span(func.__name__)
|
|
|
| try:
|
| result = await func(*args, **kwargs)
|
| return result
|
| except Exception as e:
|
|
|
| error_type = type(e).__name__
|
| ERROR_TOTAL.labels(error_type=error_type).inc()
|
|
|
| if span:
|
| span.record_exception(e)
|
| raise
|
| finally:
|
| duration = time.time() - start_time
|
|
|
|
|
| if labels:
|
| metric.labels(**labels).observe(duration)
|
| else:
|
| metric.observe(duration)
|
|
|
|
|
| if span:
|
| span.set_attribute("duration_seconds", duration)
|
| trace_id = get_trace_id()
|
| if trace_id:
|
| span.set_attribute("trace_id", trace_id)
|
| span.end()
|
|
|
| return wrapper
|
| return decorator
|
|
|
|
|
| def traced(name: Optional[str] = None):
|
| """
|
| Decorator to create an OpenTelemetry span for a function.
|
|
|
| Usage:
|
| @traced("my_operation")
|
| async def my_function(...):
|
| ...
|
| """
|
| def decorator(func: Callable) -> Callable:
|
| span_name = name or func.__name__
|
|
|
| @functools.wraps(func)
|
| async def wrapper(*args, **kwargs):
|
| if not OTEL_AVAILABLE or tracer is None:
|
| return await func(*args, **kwargs)
|
|
|
| with tracer.start_as_current_span(span_name) as span:
|
| trace_id = get_trace_id()
|
| if trace_id:
|
| span.set_attribute("trace_id", trace_id)
|
|
|
| try:
|
| result = await func(*args, **kwargs)
|
| return result
|
| except Exception as e:
|
| span.record_exception(e)
|
| raise
|
|
|
| return wrapper
|
| return decorator
|
|
|
|
|
|
|
|
|
|
|
|
|
| def update_memory_count(tier: str, count: int) -> None:
|
| """Update the memory count gauge for a specific tier."""
|
| MEMORY_COUNT_TOTAL.labels(tier=tier).set(count)
|
| ENGINE_MEMORY_COUNT.labels(tier=tier).set(count)
|
|
|
|
|
| def update_queue_length(length: int) -> None:
|
| """Update the queue length gauge."""
|
| QUEUE_LENGTH.set(length)
|
|
|
|
|
| def record_error(error_type: str) -> None:
|
| """Record an error in the error counter."""
|
| ERROR_TOTAL.labels(error_type=error_type).inc()
|
|
|