Spaces:
Paused
Paused
| import os | |
| from opentelemetry import trace | |
| from opentelemetry.sdk.trace import TracerProvider | |
| from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter | |
| from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor | |
| from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor | |
| from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult | |
| class FilteredConsoleExporter(SpanExporter): | |
| WATCHED_SPANS = { | |
| "call_llm", | |
| "vector_store", | |
| "ChampService", | |
| "convert_messages_langchain", | |
| "invoke", | |
| "retrieving documents", | |
| "PromptSanitizer", | |
| "sanitize docs_content", | |
| "sanitize retrieval_query", | |
| "sanitize_document", | |
| } | |
| def export(self, spans): | |
| for span in spans: | |
| if ( | |
| span.name in self.WATCHED_SPANS | |
| and span.end_time is not None | |
| and span.start_time is not None | |
| ): | |
| print( | |
| f"[{span.name}] duration: {(span.end_time - span.start_time) / 1e6:.2f}ms" | |
| ) | |
| return SpanExportResult.SUCCESS | |
| def shutdown(self): | |
| pass | |
| def setup_telemetry(app): | |
| if os.getenv("ENV", "dev").lower() != "dev": | |
| return | |
| provider = TracerProvider() | |
| provider.add_span_processor(BatchSpanProcessor(FilteredConsoleExporter())) | |
| trace.set_tracer_provider(provider) | |
| FastAPIInstrumentor.instrument_app(app) | |
| HTTPXClientInstrumentor().instrument() | |