import os from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult class FilteredConsoleExporter(SpanExporter): WATCHED_SPANS = { "call_llm", "vector_store", "ChampService", "convert_messages_langchain", "invoke", "retrieving documents", "PromptSanitizer", "sanitize docs_content", "sanitize retrieval_query", "sanitize_document", } def export(self, spans): for span in spans: if ( span.name in self.WATCHED_SPANS and span.end_time is not None and span.start_time is not None ): print( f"[{span.name}] duration: {(span.end_time - span.start_time) / 1e6:.2f}ms" ) return SpanExportResult.SUCCESS def shutdown(self): pass def setup_telemetry(app): if os.getenv("ENV", "dev").lower() != "dev": return provider = TracerProvider() provider.add_span_processor(BatchSpanProcessor(FilteredConsoleExporter())) trace.set_tracer_provider(provider) FastAPIInstrumentor.instrument_app(app) HTTPXClientInstrumentor().instrument()