""" Sistema de observabilidade com Prometheus e OpenTelemetry. Metricas: - Contadores de requests - Histogramas de latencia - Gauges de estado do sistema - Metricas customizadas do RAG """ from typing import Dict, Any, Optional import time from functools import wraps from contextlib import contextmanager try: from prometheus_client import ( Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST, CollectorRegistry ) PROMETHEUS_AVAILABLE = True except ImportError: PROMETHEUS_AVAILABLE = False print("Aviso: prometheus_client nao instalado. Metricas desabilitadas.") try: from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.resources import Resource from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace.export import BatchSpanProcessor OTEL_AVAILABLE = True except ImportError: OTEL_AVAILABLE = False print("Aviso: opentelemetry nao instalado. Traces desabilitados.") class MetricsCollector: """Coletor de metricas Prometheus.""" def __init__(self, registry: Optional['CollectorRegistry'] = None): """ Inicializa coletor. Args: registry: Registry do Prometheus (opcional) """ if not PROMETHEUS_AVAILABLE: self.enabled = False return self.enabled = True self.registry = registry # Contadores self.requests_total = Counter( 'rag_requests_total', 'Total de requests ao sistema', ['endpoint', 'method'], registry=registry ) self.queries_total = Counter( 'rag_queries_total', 'Total de queries RAG', registry=registry ) self.documents_ingested_total = Counter( 'rag_documents_ingested_total', 'Total de documentos ingeridos', registry=registry ) self.chunks_created_total = Counter( 'rag_chunks_created_total', 'Total de chunks criados', registry=registry ) self.errors_total = Counter( 'rag_errors_total', 'Total de erros', ['error_type'], registry=registry ) # Histogramas de latencia self.query_latency = Histogram( 'rag_query_latency_seconds', 'Latencia de queries RAG', buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0], registry=registry ) self.embedding_latency = Histogram( 'rag_embedding_latency_seconds', 'Latencia de geracao de embeddings', buckets=[0.05, 0.1, 0.25, 0.5, 1.0], registry=registry ) self.generation_latency = Histogram( 'rag_generation_latency_seconds', 'Latencia de geracao de respostas', buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 20.0], registry=registry ) self.db_query_latency = Histogram( 'rag_db_query_latency_seconds', 'Latencia de queries ao banco', buckets=[0.01, 0.05, 0.1, 0.25, 0.5], registry=registry ) # Gauges self.active_connections = Gauge( 'rag_active_connections', 'Conexoes ativas ao banco', registry=registry ) self.cache_size = Gauge( 'rag_cache_size_bytes', 'Tamanho do cache de embeddings', registry=registry ) self.documents_count = Gauge( 'rag_documents_count', 'Numero total de documentos', registry=registry ) self.chunks_count = Gauge( 'rag_chunks_count', 'Numero total de chunks', registry=registry ) def increment_requests(self, endpoint: str, method: str): """Incrementa contador de requests.""" if self.enabled: self.requests_total.labels(endpoint=endpoint, method=method).inc() def increment_queries(self): """Incrementa contador de queries.""" if self.enabled: self.queries_total.inc() def increment_documents_ingested(self, count: int = 1): """Incrementa contador de documentos ingeridos.""" if self.enabled: self.documents_ingested_total.inc(count) def increment_chunks_created(self, count: int): """Incrementa contador de chunks criados.""" if self.enabled: self.chunks_created_total.inc(count) def increment_errors(self, error_type: str): """Incrementa contador de erros.""" if self.enabled: self.errors_total.labels(error_type=error_type).inc() @contextmanager def measure_query_latency(self): """Context manager para medir latencia de query.""" start = time.time() try: yield finally: if self.enabled: self.query_latency.observe(time.time() - start) @contextmanager def measure_embedding_latency(self): """Context manager para medir latencia de embedding.""" start = time.time() try: yield finally: if self.enabled: self.embedding_latency.observe(time.time() - start) @contextmanager def measure_generation_latency(self): """Context manager para medir latencia de geracao.""" start = time.time() try: yield finally: if self.enabled: self.generation_latency.observe(time.time() - start) @contextmanager def measure_db_query_latency(self): """Context manager para medir latencia de query ao banco.""" start = time.time() try: yield finally: if self.enabled: self.db_query_latency.observe(time.time() - start) def set_active_connections(self, count: int): """Define numero de conexoes ativas.""" if self.enabled: self.active_connections.set(count) def set_cache_size(self, size_bytes: int): """Define tamanho do cache.""" if self.enabled: self.cache_size.set(size_bytes) def set_documents_count(self, count: int): """Define numero de documentos.""" if self.enabled: self.documents_count.set(count) def set_chunks_count(self, count: int): """Define numero de chunks.""" if self.enabled: self.chunks_count.set(count) def get_metrics(self) -> str: """ Retorna metricas em formato Prometheus. Returns: Metricas formatadas """ if not self.enabled: return "" return generate_latest(self.registry).decode('utf-8') class TracingManager: """Gerenciador de traces OpenTelemetry.""" def __init__( self, service_name: str = "rag-template", otlp_endpoint: Optional[str] = None ): """ Inicializa tracing. Args: service_name: Nome do servico otlp_endpoint: Endpoint OTLP (opcional) """ if not OTEL_AVAILABLE: self.enabled = False return self.enabled = True self.service_name = service_name # Configurar resource resource = Resource.create({"service.name": service_name}) # Configurar tracer provider provider = TracerProvider(resource=resource) # Adicionar exporter se endpoint fornecido if otlp_endpoint: exporter = OTLPSpanExporter(endpoint=otlp_endpoint) processor = BatchSpanProcessor(exporter) provider.add_span_processor(processor) # Configurar trace provider global trace.set_tracer_provider(provider) # Obter tracer self.tracer = trace.get_tracer(__name__) @contextmanager def trace_operation(self, operation_name: str, attributes: Optional[Dict[str, Any]] = None): """ Context manager para criar span. Args: operation_name: Nome da operacao attributes: Atributos do span (opcional) """ if not self.enabled: yield return with self.tracer.start_as_current_span(operation_name) as span: if attributes: for key, value in attributes.items(): span.set_attribute(key, value) yield span # Instancia global _metrics_collector = None _tracing_manager = None def get_metrics_collector(registry: Optional['CollectorRegistry'] = None) -> MetricsCollector: """ Obtem instancia global do coletor de metricas. Args: registry: Registry do Prometheus (opcional) Returns: Coletor de metricas """ global _metrics_collector if _metrics_collector is None: _metrics_collector = MetricsCollector(registry=registry) return _metrics_collector def get_tracing_manager( service_name: str = "rag-template", otlp_endpoint: Optional[str] = None ) -> TracingManager: """ Obtem instancia global do gerenciador de tracing. Args: service_name: Nome do servico otlp_endpoint: Endpoint OTLP (opcional) Returns: Gerenciador de tracing """ global _tracing_manager if _tracing_manager is None: _tracing_manager = TracingManager( service_name=service_name, otlp_endpoint=otlp_endpoint ) return _tracing_manager # Decoradores def track_query_latency(func): """Decorator para rastrear latencia de queries.""" @wraps(func) def wrapper(*args, **kwargs): metrics = get_metrics_collector() with metrics.measure_query_latency(): result = func(*args, **kwargs) metrics.increment_queries() return result return wrapper def track_embedding_latency(func): """Decorator para rastrear latencia de embeddings.""" @wraps(func) def wrapper(*args, **kwargs): metrics = get_metrics_collector() with metrics.measure_embedding_latency(): return func(*args, **kwargs) return wrapper def track_generation_latency(func): """Decorator para rastrear latencia de geracao.""" @wraps(func) def wrapper(*args, **kwargs): metrics = get_metrics_collector() with metrics.measure_generation_latency(): return func(*args, **kwargs) return wrapper def trace_operation(operation_name: str): """Decorator para criar span OpenTelemetry.""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): tracing = get_tracing_manager() with tracing.trace_operation(operation_name): return func(*args, **kwargs) return wrapper return decorator