File size: 8,790 Bytes
f4ef3b8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
"""Observability setup using Arize Phoenix for LLM tracing.



Provides OpenTelemetry-compatible distributed tracing for LLM calls,

retrieval operations, and LangGraph execution. Gracefully degrades

when Phoenix is not installed or configured.



Usage:

    Call setup_tracing() once at application startup (e.g., in app/main.py).

    All trace_* functions will automatically emit spans when tracing is enabled.

"""

from __future__ import annotations

from config.settings import settings
from utils.logging import get_logger

_log = get_logger(__name__)

# Module-level state
_tracer = None
_phoenix_configured = False
_phoenix_project_name: str = settings.app_name


def setup_tracing() -> bool:
    """Initialize Phoenix tracing if ``settings.phoenix_endpoint`` is set.



    This function is safe to call unconditionally at startup — it will

    log a message and return immediately if Phoenix is not configured.

    Tracing failures never crash the application.



    Returns:

        True if tracing was successfully enabled, False otherwise.

    """
    global _tracer, _phoenix_configured, _phoenix_project_name

    # BYOK mode mandates: no third-party telemetry sees a request. Phoenix
    # spans capture LLM prompts and completions, which would include the
    # visitor's keys-in-context and any private text they uploaded. Hard
    # disable in BYOK regardless of phoenix_endpoint configuration.
    if settings.byok_mode:
        _log.info("phoenix_tracing_disabled", reason="BYOK mode forbids external telemetry")
        return False

    if not settings.phoenix_endpoint:
        _log.info("phoenix_tracing_disabled", reason="No phoenix_endpoint configured")
        return False

    try:
        from phoenix.otel import register

        tracer_provider = register(
            project_name=settings.app_name,
            endpoint=settings.phoenix_endpoint,
        )

        # Attempt to instrument LLM and retrieval calls
        _instrument_providers()

        _phoenix_configured = True
        _phoenix_project_name = settings.app_name
        _log.info(
            "phoenix_tracing_enabled",
            endpoint=settings.phoenix_endpoint,
            project=settings.app_name,
            tracer_provider=str(tracer_provider),
        )
        return True
    except ImportError:
        _log.warning(
            "phoenix_import_failed",
            msg=(
                "arize-phoenix not installed; tracing unavailable. "
                "Install with: pip install 'arize-phoenix-otel'"
            ),
        )
        return False
    except Exception as exc:
        _log.error(
            "phoenix_tracing_init_error",
            error=str(exc),
            endpoint=settings.phoenix_endpoint,
        )
        return False


def _instrument_providers() -> None:
    """Instrument LLM and retrieval providers with OpenTelemetry.



    Attempts to auto-instrument supported providers. Failures are

    logged but never raised — partial instrumentation is acceptable.

    """
    # Instrument LangChain/LangGraph if available
    try:
        from openinference.instrumentation.langchain import LangChainInstrumentor

        LangChainInstrumentor().instrument()
        _log.info("instrumented_langchain")
    except ImportError:
        _log.debug(
            "langchain_instrumentation_skipped",
            reason="openinference-instrumentation-langchain not installed",
        )
    except Exception as exc:
        _log.debug("langchain_instrumentation_error", reason=str(exc))

    # Instrument OpenAI-compatible calls if available
    try:
        from openinference.instrumentation.openai import OpenAIInstrumentor

        OpenAIInstrumentor().instrument()
        _log.info("instrumented_openai")
    except ImportError:
        _log.debug(
            "openai_instrumentation_skipped",
            reason="openinference-instrumentation-openai not installed",
        )
    except Exception as exc:
        _log.debug("openai_instrumentation_error", reason=str(exc))


def trace_llm_call(

    provider: str,

    model: str,

    prompt: str,

    response: str,

    latency_ms: float,

    tokens: dict[str, int] | None = None,

) -> None:
    """Record a manual trace span for an LLM call.



    Can be used as an explicit trace point when auto-instrumentation

    is unavailable or for custom tracking.



    Args:

        provider: LLM provider name (e.g., "ollama", "groq").

        model: Model identifier used for generation.

        prompt: The input prompt text.

        response: The generated response text.

        latency_ms: Response latency in milliseconds.

        tokens: Optional token usage dict with keys like

            "prompt_tokens", "completion_tokens", "total_tokens".

    """
    if not _phoenix_configured:
        return

    try:
        from opentelemetry import trace

        tracer = trace.get_tracer("secureagentrag.llm")
        with tracer.start_as_current_span("llm_call") as span:
            span.set_attribute("llm.provider", provider)
            span.set_attribute("llm.model", model)
            span.set_attribute("llm.prompt_length", len(prompt))
            span.set_attribute("llm.response_length", len(response))
            span.set_attribute("llm.latency_ms", latency_ms)
            if tokens:
                for key, value in tokens.items():
                    span.set_attribute(f"llm.tokens.{key}", value)
    except Exception as exc:
        _log.debug("trace_llm_call_failed", error=str(exc))


def trace_retrieval(

    query: str,

    num_results: int,

    latency_ms: float,

    method: str = "hybrid",

) -> None:
    """Record a manual trace span for a retrieval operation.



    Args:

        query: The search query string.

        num_results: Number of results returned.

        latency_ms: Retrieval latency in milliseconds.

        method: Retrieval method used ("hybrid", "dense", "bm25").

    """
    if not _phoenix_configured:
        return

    try:
        from opentelemetry import trace

        tracer = trace.get_tracer("secureagentrag.retrieval")
        with tracer.start_as_current_span("retrieval") as span:
            span.set_attribute("retrieval.query_length", len(query))
            span.set_attribute("retrieval.num_results", num_results)
            span.set_attribute("retrieval.latency_ms", latency_ms)
            span.set_attribute("retrieval.method", method)
    except Exception as exc:
        _log.debug("trace_retrieval_failed", error=str(exc))


def trace_graph_execution(

    query: str,

    nodes_executed: list[str],

    total_latency_ms: float,

    final_confidence: float,

    retries: int = 0,

) -> None:
    """Record a manual trace span for LangGraph pipeline execution.



    Args:

        query: The original user query.

        nodes_executed: List of graph node names that were executed.

        total_latency_ms: Total pipeline execution time in milliseconds.

        final_confidence: Final confidence score of the generated answer.

        retries: Number of corrective retrieval retries performed.

    """
    if not _phoenix_configured:
        return

    try:
        from opentelemetry import trace

        tracer = trace.get_tracer("secureagentrag.graph")
        with tracer.start_as_current_span("graph_execution") as span:
            span.set_attribute("graph.query_length", len(query))
            span.set_attribute("graph.nodes_executed", ",".join(nodes_executed))
            span.set_attribute("graph.total_latency_ms", total_latency_ms)
            span.set_attribute("graph.confidence", final_confidence)
            span.set_attribute("graph.retries", retries)
    except Exception as exc:
        _log.debug("trace_graph_execution_failed", error=str(exc))


def get_trace_url() -> str | None:
    """Return the Phoenix dashboard URL if tracing is configured.



    Returns:

        Phoenix UI URL string, or None if Phoenix is not configured.

    """
    if not _phoenix_configured or not settings.phoenix_endpoint:
        return None

    # Phoenix UI typically runs on the same host
    endpoint = settings.phoenix_endpoint.rstrip("/")
    # Replace gRPC/collector port with UI port if needed
    if ":4317" in endpoint:
        return endpoint.replace(":4317", ":6006")
    if ":6006" in endpoint:
        return endpoint
    return endpoint


def is_tracing_enabled() -> bool:
    """Check if Phoenix tracing is currently active.



    Returns:

        True if tracing was successfully configured, False otherwise.

    """
    return _phoenix_configured