Spaces:
Running
Running
| """Observability setup using Arize Phoenix for LLM tracing. | |
| Provides OpenTelemetry-compatible distributed tracing for LLM calls, | |
| retrieval operations, and LangGraph execution. Gracefully degrades | |
| when Phoenix is not installed or configured. | |
| Usage: | |
| Call setup_tracing() once at application startup (e.g., in app/main.py). | |
| All trace_* functions will automatically emit spans when tracing is enabled. | |
| """ | |
| from __future__ import annotations | |
| from config.settings import settings | |
| from utils.logging import get_logger | |
| _log = get_logger(__name__) | |
| # Module-level state | |
| _tracer = None | |
| _phoenix_configured = False | |
| _phoenix_project_name: str = settings.app_name | |
| def setup_tracing() -> bool: | |
| """Initialize Phoenix tracing if ``settings.phoenix_endpoint`` is set. | |
| This function is safe to call unconditionally at startup — it will | |
| log a message and return immediately if Phoenix is not configured. | |
| Tracing failures never crash the application. | |
| Returns: | |
| True if tracing was successfully enabled, False otherwise. | |
| """ | |
| global _tracer, _phoenix_configured, _phoenix_project_name | |
| # BYOK mode mandates: no third-party telemetry sees a request. Phoenix | |
| # spans capture LLM prompts and completions, which would include the | |
| # visitor's keys-in-context and any private text they uploaded. Hard | |
| # disable in BYOK regardless of phoenix_endpoint configuration. | |
| if settings.byok_mode: | |
| _log.info("phoenix_tracing_disabled", reason="BYOK mode forbids external telemetry") | |
| return False | |
| if not settings.phoenix_endpoint: | |
| _log.info("phoenix_tracing_disabled", reason="No phoenix_endpoint configured") | |
| return False | |
| try: | |
| from phoenix.otel import register | |
| tracer_provider = register( | |
| project_name=settings.app_name, | |
| endpoint=settings.phoenix_endpoint, | |
| ) | |
| # Attempt to instrument LLM and retrieval calls | |
| _instrument_providers() | |
| _phoenix_configured = True | |
| _phoenix_project_name = settings.app_name | |
| _log.info( | |
| "phoenix_tracing_enabled", | |
| endpoint=settings.phoenix_endpoint, | |
| project=settings.app_name, | |
| tracer_provider=str(tracer_provider), | |
| ) | |
| return True | |
| except ImportError: | |
| _log.warning( | |
| "phoenix_import_failed", | |
| msg=( | |
| "arize-phoenix not installed; tracing unavailable. " | |
| "Install with: pip install 'arize-phoenix-otel'" | |
| ), | |
| ) | |
| return False | |
| except Exception as exc: | |
| _log.error( | |
| "phoenix_tracing_init_error", | |
| error=str(exc), | |
| endpoint=settings.phoenix_endpoint, | |
| ) | |
| return False | |
| def _instrument_providers() -> None: | |
| """Instrument LLM and retrieval providers with OpenTelemetry. | |
| Attempts to auto-instrument supported providers. Failures are | |
| logged but never raised — partial instrumentation is acceptable. | |
| """ | |
| # Instrument LangChain/LangGraph if available | |
| try: | |
| from openinference.instrumentation.langchain import LangChainInstrumentor | |
| LangChainInstrumentor().instrument() | |
| _log.info("instrumented_langchain") | |
| except ImportError: | |
| _log.debug( | |
| "langchain_instrumentation_skipped", | |
| reason="openinference-instrumentation-langchain not installed", | |
| ) | |
| except Exception as exc: | |
| _log.debug("langchain_instrumentation_error", reason=str(exc)) | |
| # Instrument OpenAI-compatible calls if available | |
| try: | |
| from openinference.instrumentation.openai import OpenAIInstrumentor | |
| OpenAIInstrumentor().instrument() | |
| _log.info("instrumented_openai") | |
| except ImportError: | |
| _log.debug( | |
| "openai_instrumentation_skipped", | |
| reason="openinference-instrumentation-openai not installed", | |
| ) | |
| except Exception as exc: | |
| _log.debug("openai_instrumentation_error", reason=str(exc)) | |
| def trace_llm_call( | |
| provider: str, | |
| model: str, | |
| prompt: str, | |
| response: str, | |
| latency_ms: float, | |
| tokens: dict[str, int] | None = None, | |
| ) -> None: | |
| """Record a manual trace span for an LLM call. | |
| Can be used as an explicit trace point when auto-instrumentation | |
| is unavailable or for custom tracking. | |
| Args: | |
| provider: LLM provider name (e.g., "ollama", "groq"). | |
| model: Model identifier used for generation. | |
| prompt: The input prompt text. | |
| response: The generated response text. | |
| latency_ms: Response latency in milliseconds. | |
| tokens: Optional token usage dict with keys like | |
| "prompt_tokens", "completion_tokens", "total_tokens". | |
| """ | |
| if not _phoenix_configured: | |
| return | |
| try: | |
| from opentelemetry import trace | |
| tracer = trace.get_tracer("secureagentrag.llm") | |
| with tracer.start_as_current_span("llm_call") as span: | |
| span.set_attribute("llm.provider", provider) | |
| span.set_attribute("llm.model", model) | |
| span.set_attribute("llm.prompt_length", len(prompt)) | |
| span.set_attribute("llm.response_length", len(response)) | |
| span.set_attribute("llm.latency_ms", latency_ms) | |
| if tokens: | |
| for key, value in tokens.items(): | |
| span.set_attribute(f"llm.tokens.{key}", value) | |
| except Exception as exc: | |
| _log.debug("trace_llm_call_failed", error=str(exc)) | |
| def trace_retrieval( | |
| query: str, | |
| num_results: int, | |
| latency_ms: float, | |
| method: str = "hybrid", | |
| ) -> None: | |
| """Record a manual trace span for a retrieval operation. | |
| Args: | |
| query: The search query string. | |
| num_results: Number of results returned. | |
| latency_ms: Retrieval latency in milliseconds. | |
| method: Retrieval method used ("hybrid", "dense", "bm25"). | |
| """ | |
| if not _phoenix_configured: | |
| return | |
| try: | |
| from opentelemetry import trace | |
| tracer = trace.get_tracer("secureagentrag.retrieval") | |
| with tracer.start_as_current_span("retrieval") as span: | |
| span.set_attribute("retrieval.query_length", len(query)) | |
| span.set_attribute("retrieval.num_results", num_results) | |
| span.set_attribute("retrieval.latency_ms", latency_ms) | |
| span.set_attribute("retrieval.method", method) | |
| except Exception as exc: | |
| _log.debug("trace_retrieval_failed", error=str(exc)) | |
| def trace_graph_execution( | |
| query: str, | |
| nodes_executed: list[str], | |
| total_latency_ms: float, | |
| final_confidence: float, | |
| retries: int = 0, | |
| ) -> None: | |
| """Record a manual trace span for LangGraph pipeline execution. | |
| Args: | |
| query: The original user query. | |
| nodes_executed: List of graph node names that were executed. | |
| total_latency_ms: Total pipeline execution time in milliseconds. | |
| final_confidence: Final confidence score of the generated answer. | |
| retries: Number of corrective retrieval retries performed. | |
| """ | |
| if not _phoenix_configured: | |
| return | |
| try: | |
| from opentelemetry import trace | |
| tracer = trace.get_tracer("secureagentrag.graph") | |
| with tracer.start_as_current_span("graph_execution") as span: | |
| span.set_attribute("graph.query_length", len(query)) | |
| span.set_attribute("graph.nodes_executed", ",".join(nodes_executed)) | |
| span.set_attribute("graph.total_latency_ms", total_latency_ms) | |
| span.set_attribute("graph.confidence", final_confidence) | |
| span.set_attribute("graph.retries", retries) | |
| except Exception as exc: | |
| _log.debug("trace_graph_execution_failed", error=str(exc)) | |
| def get_trace_url() -> str | None: | |
| """Return the Phoenix dashboard URL if tracing is configured. | |
| Returns: | |
| Phoenix UI URL string, or None if Phoenix is not configured. | |
| """ | |
| if not _phoenix_configured or not settings.phoenix_endpoint: | |
| return None | |
| # Phoenix UI typically runs on the same host | |
| endpoint = settings.phoenix_endpoint.rstrip("/") | |
| # Replace gRPC/collector port with UI port if needed | |
| if ":4317" in endpoint: | |
| return endpoint.replace(":4317", ":6006") | |
| if ":6006" in endpoint: | |
| return endpoint | |
| return endpoint | |
| def is_tracing_enabled() -> bool: | |
| """Check if Phoenix tracing is currently active. | |
| Returns: | |
| True if tracing was successfully configured, False otherwise. | |
| """ | |
| return _phoenix_configured | |