rag_template / src /monitoring.py
Guilherme Favaron
Sync: Complete project update (Phase 6) - API, Metadata, Eval, Docs
a686b1b
"""
Sistema de observabilidade com Prometheus e OpenTelemetry.
Metricas:
- Contadores de requests
- Histogramas de latencia
- Gauges de estado do sistema
- Metricas customizadas do RAG
"""
from typing import Dict, Any, Optional
import time
from functools import wraps
from contextlib import contextmanager
try:
from prometheus_client import (
Counter,
Histogram,
Gauge,
generate_latest,
CONTENT_TYPE_LATEST,
CollectorRegistry
)
PROMETHEUS_AVAILABLE = True
except ImportError:
PROMETHEUS_AVAILABLE = False
print("Aviso: prometheus_client nao instalado. Metricas desabilitadas.")
try:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
OTEL_AVAILABLE = True
except ImportError:
OTEL_AVAILABLE = False
print("Aviso: opentelemetry nao instalado. Traces desabilitados.")
class MetricsCollector:
"""Coletor de metricas Prometheus."""
def __init__(self, registry: Optional['CollectorRegistry'] = None):
"""
Inicializa coletor.
Args:
registry: Registry do Prometheus (opcional)
"""
if not PROMETHEUS_AVAILABLE:
self.enabled = False
return
self.enabled = True
self.registry = registry
# Contadores
self.requests_total = Counter(
'rag_requests_total',
'Total de requests ao sistema',
['endpoint', 'method'],
registry=registry
)
self.queries_total = Counter(
'rag_queries_total',
'Total de queries RAG',
registry=registry
)
self.documents_ingested_total = Counter(
'rag_documents_ingested_total',
'Total de documentos ingeridos',
registry=registry
)
self.chunks_created_total = Counter(
'rag_chunks_created_total',
'Total de chunks criados',
registry=registry
)
self.errors_total = Counter(
'rag_errors_total',
'Total de erros',
['error_type'],
registry=registry
)
# Histogramas de latencia
self.query_latency = Histogram(
'rag_query_latency_seconds',
'Latencia de queries RAG',
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
registry=registry
)
self.embedding_latency = Histogram(
'rag_embedding_latency_seconds',
'Latencia de geracao de embeddings',
buckets=[0.05, 0.1, 0.25, 0.5, 1.0],
registry=registry
)
self.generation_latency = Histogram(
'rag_generation_latency_seconds',
'Latencia de geracao de respostas',
buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 20.0],
registry=registry
)
self.db_query_latency = Histogram(
'rag_db_query_latency_seconds',
'Latencia de queries ao banco',
buckets=[0.01, 0.05, 0.1, 0.25, 0.5],
registry=registry
)
# Gauges
self.active_connections = Gauge(
'rag_active_connections',
'Conexoes ativas ao banco',
registry=registry
)
self.cache_size = Gauge(
'rag_cache_size_bytes',
'Tamanho do cache de embeddings',
registry=registry
)
self.documents_count = Gauge(
'rag_documents_count',
'Numero total de documentos',
registry=registry
)
self.chunks_count = Gauge(
'rag_chunks_count',
'Numero total de chunks',
registry=registry
)
def increment_requests(self, endpoint: str, method: str):
"""Incrementa contador de requests."""
if self.enabled:
self.requests_total.labels(endpoint=endpoint, method=method).inc()
def increment_queries(self):
"""Incrementa contador de queries."""
if self.enabled:
self.queries_total.inc()
def increment_documents_ingested(self, count: int = 1):
"""Incrementa contador de documentos ingeridos."""
if self.enabled:
self.documents_ingested_total.inc(count)
def increment_chunks_created(self, count: int):
"""Incrementa contador de chunks criados."""
if self.enabled:
self.chunks_created_total.inc(count)
def increment_errors(self, error_type: str):
"""Incrementa contador de erros."""
if self.enabled:
self.errors_total.labels(error_type=error_type).inc()
@contextmanager
def measure_query_latency(self):
"""Context manager para medir latencia de query."""
start = time.time()
try:
yield
finally:
if self.enabled:
self.query_latency.observe(time.time() - start)
@contextmanager
def measure_embedding_latency(self):
"""Context manager para medir latencia de embedding."""
start = time.time()
try:
yield
finally:
if self.enabled:
self.embedding_latency.observe(time.time() - start)
@contextmanager
def measure_generation_latency(self):
"""Context manager para medir latencia de geracao."""
start = time.time()
try:
yield
finally:
if self.enabled:
self.generation_latency.observe(time.time() - start)
@contextmanager
def measure_db_query_latency(self):
"""Context manager para medir latencia de query ao banco."""
start = time.time()
try:
yield
finally:
if self.enabled:
self.db_query_latency.observe(time.time() - start)
def set_active_connections(self, count: int):
"""Define numero de conexoes ativas."""
if self.enabled:
self.active_connections.set(count)
def set_cache_size(self, size_bytes: int):
"""Define tamanho do cache."""
if self.enabled:
self.cache_size.set(size_bytes)
def set_documents_count(self, count: int):
"""Define numero de documentos."""
if self.enabled:
self.documents_count.set(count)
def set_chunks_count(self, count: int):
"""Define numero de chunks."""
if self.enabled:
self.chunks_count.set(count)
def get_metrics(self) -> str:
"""
Retorna metricas em formato Prometheus.
Returns:
Metricas formatadas
"""
if not self.enabled:
return ""
return generate_latest(self.registry).decode('utf-8')
class TracingManager:
"""Gerenciador de traces OpenTelemetry."""
def __init__(
self,
service_name: str = "rag-template",
otlp_endpoint: Optional[str] = None
):
"""
Inicializa tracing.
Args:
service_name: Nome do servico
otlp_endpoint: Endpoint OTLP (opcional)
"""
if not OTEL_AVAILABLE:
self.enabled = False
return
self.enabled = True
self.service_name = service_name
# Configurar resource
resource = Resource.create({"service.name": service_name})
# Configurar tracer provider
provider = TracerProvider(resource=resource)
# Adicionar exporter se endpoint fornecido
if otlp_endpoint:
exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)
# Configurar trace provider global
trace.set_tracer_provider(provider)
# Obter tracer
self.tracer = trace.get_tracer(__name__)
@contextmanager
def trace_operation(self, operation_name: str, attributes: Optional[Dict[str, Any]] = None):
"""
Context manager para criar span.
Args:
operation_name: Nome da operacao
attributes: Atributos do span (opcional)
"""
if not self.enabled:
yield
return
with self.tracer.start_as_current_span(operation_name) as span:
if attributes:
for key, value in attributes.items():
span.set_attribute(key, value)
yield span
# Instancia global
_metrics_collector = None
_tracing_manager = None
def get_metrics_collector(registry: Optional['CollectorRegistry'] = None) -> MetricsCollector:
"""
Obtem instancia global do coletor de metricas.
Args:
registry: Registry do Prometheus (opcional)
Returns:
Coletor de metricas
"""
global _metrics_collector
if _metrics_collector is None:
_metrics_collector = MetricsCollector(registry=registry)
return _metrics_collector
def get_tracing_manager(
service_name: str = "rag-template",
otlp_endpoint: Optional[str] = None
) -> TracingManager:
"""
Obtem instancia global do gerenciador de tracing.
Args:
service_name: Nome do servico
otlp_endpoint: Endpoint OTLP (opcional)
Returns:
Gerenciador de tracing
"""
global _tracing_manager
if _tracing_manager is None:
_tracing_manager = TracingManager(
service_name=service_name,
otlp_endpoint=otlp_endpoint
)
return _tracing_manager
# Decoradores
def track_query_latency(func):
"""Decorator para rastrear latencia de queries."""
@wraps(func)
def wrapper(*args, **kwargs):
metrics = get_metrics_collector()
with metrics.measure_query_latency():
result = func(*args, **kwargs)
metrics.increment_queries()
return result
return wrapper
def track_embedding_latency(func):
"""Decorator para rastrear latencia de embeddings."""
@wraps(func)
def wrapper(*args, **kwargs):
metrics = get_metrics_collector()
with metrics.measure_embedding_latency():
return func(*args, **kwargs)
return wrapper
def track_generation_latency(func):
"""Decorator para rastrear latencia de geracao."""
@wraps(func)
def wrapper(*args, **kwargs):
metrics = get_metrics_collector()
with metrics.measure_generation_latency():
return func(*args, **kwargs)
return wrapper
def trace_operation(operation_name: str):
"""Decorator para criar span OpenTelemetry."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
tracing = get_tracing_manager()
with tracing.trace_operation(operation_name):
return func(*args, **kwargs)
return wrapper
return decorator