MnemoCore / src /mnemocore /core /metrics.py
Granis87's picture
Initial upload of MnemoCore
dbb04e4 verified
"""
Observability Metrics (Phase 3.5.4+)
====================================
Central definition of Prometheus metrics and utility decorators.
Includes OpenTelemetry tracing support.
"""
import time
import functools
import contextvars
from typing import Optional, Dict, Any, Callable
from prometheus_client import Counter, Histogram, Gauge
# OpenTelemetry imports (optional - gracefully degrade if not installed)
try:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.context import Context
OTEL_AVAILABLE = True
tracer = trace.get_tracer(__name__)
propagator = TraceContextTextMapPropagator()
except ImportError:
OTEL_AVAILABLE = False
tracer = None
propagator = None
# Context variable for trace ID propagation
_trace_id_var: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('trace_id', default=None)
# =============================================================================
# Prometheus Metrics Definitions
# =============================================================================
# --- API Metrics ---
API_REQUEST_COUNT = Counter(
"haim_api_request_count",
"Total API requests",
["method", "endpoint", "status"]
)
API_REQUEST_LATENCY = Histogram(
"haim_api_request_latency_seconds",
"API request latency",
["method", "endpoint"]
)
# --- Engine Metrics ---
ENGINE_MEMORY_COUNT = Gauge(
"haim_engine_memory_total",
"Total memories in the system",
["tier"]
)
ENGINE_STORE_LATENCY = Histogram(
"haim_engine_store_seconds",
"Time taken to store memory",
["tier"]
)
ENGINE_QUERY_LATENCY = Histogram(
"haim_engine_query_seconds",
"Time taken to query memories"
)
# --- New Metrics (Phase 4.1 Observability) ---
STORE_DURATION_SECONDS = Histogram(
"mnemocore_store_duration_seconds",
"Duration of memory store operations",
["tier"],
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
QUERY_DURATION_SECONDS = Histogram(
"mnemocore_query_duration_seconds",
"Duration of memory query operations",
buckets=[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
MEMORY_COUNT_TOTAL = Gauge(
"mnemocore_memory_count_total",
"Total number of memories by tier",
["tier"]
)
QUEUE_LENGTH = Gauge(
"mnemocore_queue_length",
"Current length of the processing queue"
)
ERROR_TOTAL = Counter(
"mnemocore_error_total",
"Total number of errors",
["error_type"]
)
# --- Storage Metrics (Redis/Qdrant) ---
STORAGE_OPERATION_COUNT = Counter(
"haim_storage_ops_total",
"Storage operations",
["backend", "operation", "status"]
)
STORAGE_LATENCY = Histogram(
"haim_storage_latency_seconds",
"Storage operation latency",
["backend", "operation"]
)
# --- Bus Metrics ---
BUS_EVENTS_PUBLISHED = Counter(
"haim_bus_events_published",
"Events published to bus",
["type"]
)
BUS_EVENTS_CONSUMED = Counter(
"haim_bus_events_consumed",
"Events consumed from bus",
["consumer", "type"]
)
# --- Dream Loop Metrics (Subconscious background processing) ---
DREAM_LOOP_TOTAL = Counter(
"haim_dream_loop_total",
"Total dream cycles completed",
["status"] # success, error
)
DREAM_LOOP_ITERATION_SECONDS = Histogram(
"haim_dream_iteration_seconds",
"Time taken for each dream loop iteration",
[] # No labels needed
)
DREAM_LOOP_INSIGHTS_GENERATED = Counter(
"haim_dream_insights_generated_total",
"Total insights generated by dream loop",
["type"] # concept, parallel, meta
)
DREAM_LOOP_ACTIVE = Gauge(
"haim_dream_loop_active",
"Whether the dream loop is currently running (1=active, 0=stopped)"
)
# =============================================================================
# OpenTelemetry Configuration
# =============================================================================
def init_opentelemetry(service_name: str = "mnemocore", exporter: str = "console") -> Optional["TracerProvider"]:
"""
Initialize OpenTelemetry tracing.
Args:
service_name: Name of the service for tracing.
exporter: Exporter type ('console', 'otlp', or 'none').
Returns:
TracerProvider if OTEL is available, None otherwise.
"""
if not OTEL_AVAILABLE:
return None
resource = Resource.create({"service.name": service_name})
provider = TracerProvider(resource=resource)
if exporter == "console":
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
elif exporter == "otlp":
try:
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
processor = BatchSpanProcessor(OTLPSpanExporter())
provider.add_span_processor(processor)
except ImportError:
# Fallback to console if OTLP exporter not available
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
return provider
def get_trace_id() -> Optional[str]:
"""Get the current trace ID from context."""
return _trace_id_var.get()
def set_trace_id(trace_id: str) -> None:
"""Set the trace ID in context."""
_trace_id_var.set(trace_id)
def extract_trace_context(headers: Dict[str, str]) -> Optional[str]:
"""
Extract trace context from HTTP headers.
Args:
headers: Dictionary of HTTP headers.
Returns:
The trace ID if found, None otherwise.
"""
if not OTEL_AVAILABLE or propagator is None:
return None
ctx = propagator.extract(headers)
span_ctx = trace.get_current_span(ctx).get_span_context()
if span_ctx.is_valid:
trace_id = format(span_ctx.trace_id, '032x')
set_trace_id(trace_id)
return trace_id
return None
def inject_trace_context() -> Dict[str, str]:
"""
Inject trace context into HTTP headers.
Returns:
Dictionary with trace headers.
"""
if not OTEL_AVAILABLE or propagator is None:
return {}
headers: Dict[str, str] = {}
propagator.inject(headers)
return headers
# =============================================================================
# Decorators
# =============================================================================
def track_latency(metric: Histogram, labels: dict = None):
"""Decorator to track function execution time."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
if labels:
metric.labels(**labels).observe(duration)
else:
metric.observe(duration)
return wrapper
return decorator
def track_async_latency(metric: Histogram, labels: dict = None):
"""Decorator to track async function execution time."""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
if labels:
metric.labels(**labels).observe(duration)
else:
metric.observe(duration)
return wrapper
return decorator
def timer(metric: Histogram, labels: Optional[Dict[str, str]] = None):
"""
Timer decorator for async functions with OpenTelemetry span support.
Usage:
@timer(STORE_DURATION_SECONDS, labels={"tier": "hot"})
async def store(...):
...
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
# Create OTEL span if available
span = None
if OTEL_AVAILABLE and tracer is not None:
span = tracer.start_span(func.__name__)
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
# Increment error counter
error_type = type(e).__name__
ERROR_TOTAL.labels(error_type=error_type).inc()
if span:
span.record_exception(e)
raise
finally:
duration = time.time() - start_time
# Record Prometheus metric
if labels:
metric.labels(**labels).observe(duration)
else:
metric.observe(duration)
# End OTEL span
if span:
span.set_attribute("duration_seconds", duration)
trace_id = get_trace_id()
if trace_id:
span.set_attribute("trace_id", trace_id)
span.end()
return wrapper
return decorator
def traced(name: Optional[str] = None):
"""
Decorator to create an OpenTelemetry span for a function.
Usage:
@traced("my_operation")
async def my_function(...):
...
"""
def decorator(func: Callable) -> Callable:
span_name = name or func.__name__
@functools.wraps(func)
async def wrapper(*args, **kwargs):
if not OTEL_AVAILABLE or tracer is None:
return await func(*args, **kwargs)
with tracer.start_as_current_span(span_name) as span:
trace_id = get_trace_id()
if trace_id:
span.set_attribute("trace_id", trace_id)
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
span.record_exception(e)
raise
return wrapper
return decorator
# =============================================================================
# Helper Functions
# =============================================================================
def update_memory_count(tier: str, count: int) -> None:
"""Update the memory count gauge for a specific tier."""
MEMORY_COUNT_TOTAL.labels(tier=tier).set(count)
ENGINE_MEMORY_COUNT.labels(tier=tier).set(count)
def update_queue_length(length: int) -> None:
"""Update the queue length gauge."""
QUEUE_LENGTH.set(length)
def record_error(error_type: str) -> None:
"""Record an error in the error counter."""
ERROR_TOTAL.labels(error_type=error_type).inc()