Spaces:
Paused
Paused
| import os | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast | |
| import litellm | |
| from litellm._logging import verbose_logger | |
| from litellm.integrations.custom_logger import CustomLogger | |
| from litellm.types.services import ServiceLoggerPayload | |
| from litellm.types.utils import ( | |
| ChatCompletionMessageToolCall, | |
| Function, | |
| StandardCallbackDynamicParams, | |
| StandardLoggingPayload, | |
| ) | |
| if TYPE_CHECKING: | |
| from opentelemetry.sdk.trace.export import SpanExporter as _SpanExporter | |
| from opentelemetry.trace import Span as _Span | |
| from litellm.proxy._types import ( | |
| ManagementEndpointLoggingPayload as _ManagementEndpointLoggingPayload, | |
| ) | |
| from litellm.proxy.proxy_server import UserAPIKeyAuth as _UserAPIKeyAuth | |
| Span = Union[_Span, Any] | |
| SpanExporter = Union[_SpanExporter, Any] | |
| UserAPIKeyAuth = Union[_UserAPIKeyAuth, Any] | |
| ManagementEndpointLoggingPayload = Union[_ManagementEndpointLoggingPayload, Any] | |
| else: | |
| Span = Any | |
| SpanExporter = Any | |
| UserAPIKeyAuth = Any | |
| ManagementEndpointLoggingPayload = Any | |
| LITELLM_TRACER_NAME = os.getenv("OTEL_TRACER_NAME", "litellm") | |
| LITELLM_RESOURCE: Dict[Any, Any] = { | |
| "service.name": os.getenv("OTEL_SERVICE_NAME", "litellm"), | |
| "deployment.environment": os.getenv("OTEL_ENVIRONMENT_NAME", "production"), | |
| "model_id": os.getenv("OTEL_SERVICE_NAME", "litellm"), | |
| } | |
| RAW_REQUEST_SPAN_NAME = "raw_gen_ai_request" | |
| LITELLM_REQUEST_SPAN_NAME = "litellm_request" | |
| class OpenTelemetryConfig: | |
| exporter: Union[str, SpanExporter] = "console" | |
| endpoint: Optional[str] = None | |
| headers: Optional[str] = None | |
| def from_env(cls): | |
| """ | |
| OTEL_HEADERS=x-honeycomb-team=B85YgLm9**** | |
| OTEL_EXPORTER="otlp_http" | |
| OTEL_ENDPOINT="https://api.honeycomb.io/v1/traces" | |
| OTEL_HEADERS gets sent as headers = {"x-honeycomb-team": "B85YgLm96******"} | |
| """ | |
| from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( | |
| InMemorySpanExporter, | |
| ) | |
| if os.getenv("OTEL_EXPORTER") == "in_memory": | |
| return cls(exporter=InMemorySpanExporter()) | |
| return cls( | |
| exporter=os.getenv("OTEL_EXPORTER", "console"), | |
| endpoint=os.getenv("OTEL_ENDPOINT"), | |
| headers=os.getenv( | |
| "OTEL_HEADERS" | |
| ), # example: OTEL_HEADERS=x-honeycomb-team=B85YgLm96***" | |
| ) | |
| class OpenTelemetry(CustomLogger): | |
| def __init__( | |
| self, | |
| config: Optional[OpenTelemetryConfig] = None, | |
| callback_name: Optional[str] = None, | |
| **kwargs, | |
| ): | |
| from opentelemetry import trace | |
| from opentelemetry.sdk.resources import Resource | |
| from opentelemetry.sdk.trace import TracerProvider | |
| from opentelemetry.trace import SpanKind | |
| if config is None: | |
| config = OpenTelemetryConfig.from_env() | |
| self.config = config | |
| self.OTEL_EXPORTER = self.config.exporter | |
| self.OTEL_ENDPOINT = self.config.endpoint | |
| self.OTEL_HEADERS = self.config.headers | |
| provider = TracerProvider(resource=Resource(attributes=LITELLM_RESOURCE)) | |
| provider.add_span_processor(self._get_span_processor()) | |
| self.callback_name = callback_name | |
| trace.set_tracer_provider(provider) | |
| self.tracer = trace.get_tracer(LITELLM_TRACER_NAME) | |
| self.span_kind = SpanKind | |
| _debug_otel = str(os.getenv("DEBUG_OTEL", "False")).lower() | |
| if _debug_otel == "true": | |
| # Set up logging | |
| import logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| logging.getLogger(__name__) | |
| # Enable OpenTelemetry logging | |
| otel_exporter_logger = logging.getLogger("opentelemetry.sdk.trace.export") | |
| otel_exporter_logger.setLevel(logging.DEBUG) | |
| # init CustomLogger params | |
| super().__init__(**kwargs) | |
| self._init_otel_logger_on_litellm_proxy() | |
| def _init_otel_logger_on_litellm_proxy(self): | |
| """ | |
| Initializes OpenTelemetry for litellm proxy server | |
| - Adds Otel as a service callback | |
| - Sets `proxy_server.open_telemetry_logger` to self | |
| """ | |
| from litellm.proxy import proxy_server | |
| # Add Otel as a service callback | |
| if "otel" not in litellm.service_callback: | |
| litellm.service_callback.append("otel") | |
| setattr(proxy_server, "open_telemetry_logger", self) | |
| def log_success_event(self, kwargs, response_obj, start_time, end_time): | |
| self._handle_sucess(kwargs, response_obj, start_time, end_time) | |
| def log_failure_event(self, kwargs, response_obj, start_time, end_time): | |
| self._handle_failure(kwargs, response_obj, start_time, end_time) | |
| async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): | |
| self._handle_sucess(kwargs, response_obj, start_time, end_time) | |
| async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): | |
| self._handle_failure(kwargs, response_obj, start_time, end_time) | |
| async def async_service_success_hook( | |
| self, | |
| payload: ServiceLoggerPayload, | |
| parent_otel_span: Optional[Span] = None, | |
| start_time: Optional[Union[datetime, float]] = None, | |
| end_time: Optional[Union[datetime, float]] = None, | |
| event_metadata: Optional[dict] = None, | |
| ): | |
| from opentelemetry import trace | |
| from opentelemetry.trace import Status, StatusCode | |
| _start_time_ns = 0 | |
| _end_time_ns = 0 | |
| if isinstance(start_time, float): | |
| _start_time_ns = int(start_time * 1e9) | |
| else: | |
| _start_time_ns = self._to_ns(start_time) | |
| if isinstance(end_time, float): | |
| _end_time_ns = int(end_time * 1e9) | |
| else: | |
| _end_time_ns = self._to_ns(end_time) | |
| if parent_otel_span is not None: | |
| _span_name = payload.service | |
| service_logging_span = self.tracer.start_span( | |
| name=_span_name, | |
| context=trace.set_span_in_context(parent_otel_span), | |
| start_time=_start_time_ns, | |
| ) | |
| self.safe_set_attribute( | |
| span=service_logging_span, | |
| key="call_type", | |
| value=payload.call_type, | |
| ) | |
| self.safe_set_attribute( | |
| span=service_logging_span, | |
| key="service", | |
| value=payload.service.value, | |
| ) | |
| if event_metadata: | |
| for key, value in event_metadata.items(): | |
| if value is None: | |
| value = "None" | |
| if isinstance(value, dict): | |
| try: | |
| value = str(value) | |
| except Exception: | |
| value = "litellm logging error - could_not_json_serialize" | |
| self.safe_set_attribute( | |
| span=service_logging_span, | |
| key=key, | |
| value=value, | |
| ) | |
| service_logging_span.set_status(Status(StatusCode.OK)) | |
| service_logging_span.end(end_time=_end_time_ns) | |
| async def async_service_failure_hook( | |
| self, | |
| payload: ServiceLoggerPayload, | |
| error: Optional[str] = "", | |
| parent_otel_span: Optional[Span] = None, | |
| start_time: Optional[Union[datetime, float]] = None, | |
| end_time: Optional[Union[float, datetime]] = None, | |
| event_metadata: Optional[dict] = None, | |
| ): | |
| from opentelemetry import trace | |
| from opentelemetry.trace import Status, StatusCode | |
| _start_time_ns = 0 | |
| _end_time_ns = 0 | |
| if isinstance(start_time, float): | |
| _start_time_ns = int(int(start_time) * 1e9) | |
| else: | |
| _start_time_ns = self._to_ns(start_time) | |
| if isinstance(end_time, float): | |
| _end_time_ns = int(int(end_time) * 1e9) | |
| else: | |
| _end_time_ns = self._to_ns(end_time) | |
| if parent_otel_span is not None: | |
| _span_name = payload.service | |
| service_logging_span = self.tracer.start_span( | |
| name=_span_name, | |
| context=trace.set_span_in_context(parent_otel_span), | |
| start_time=_start_time_ns, | |
| ) | |
| self.safe_set_attribute( | |
| span=service_logging_span, | |
| key="call_type", | |
| value=payload.call_type, | |
| ) | |
| self.safe_set_attribute( | |
| span=service_logging_span, | |
| key="service", | |
| value=payload.service.value, | |
| ) | |
| if error: | |
| self.safe_set_attribute( | |
| span=service_logging_span, | |
| key="error", | |
| value=error, | |
| ) | |
| if event_metadata: | |
| for key, value in event_metadata.items(): | |
| if isinstance(value, dict): | |
| try: | |
| value = str(value) | |
| except Exception: | |
| value = "litllm logging error - could_not_json_serialize" | |
| self.safe_set_attribute( | |
| span=service_logging_span, | |
| key=key, | |
| value=value, | |
| ) | |
| service_logging_span.set_status(Status(StatusCode.ERROR)) | |
| service_logging_span.end(end_time=_end_time_ns) | |
| async def async_post_call_failure_hook( | |
| self, | |
| request_data: dict, | |
| original_exception: Exception, | |
| user_api_key_dict: UserAPIKeyAuth, | |
| ): | |
| from opentelemetry import trace | |
| from opentelemetry.trace import Status, StatusCode | |
| parent_otel_span = user_api_key_dict.parent_otel_span | |
| if parent_otel_span is not None: | |
| parent_otel_span.set_status(Status(StatusCode.ERROR)) | |
| _span_name = "Failed Proxy Server Request" | |
| # Exception Logging Child Span | |
| exception_logging_span = self.tracer.start_span( | |
| name=_span_name, | |
| context=trace.set_span_in_context(parent_otel_span), | |
| ) | |
| self.safe_set_attribute( | |
| span=exception_logging_span, | |
| key="exception", | |
| value=str(original_exception), | |
| ) | |
| exception_logging_span.set_status(Status(StatusCode.ERROR)) | |
| exception_logging_span.end(end_time=self._to_ns(datetime.now())) | |
| # End Parent OTEL Sspan | |
| parent_otel_span.end(end_time=self._to_ns(datetime.now())) | |
| def _handle_sucess(self, kwargs, response_obj, start_time, end_time): | |
| from opentelemetry import trace | |
| from opentelemetry.trace import Status, StatusCode | |
| verbose_logger.debug( | |
| "OpenTelemetry Logger: Logging kwargs: %s, OTEL config settings=%s", | |
| kwargs, | |
| self.config, | |
| ) | |
| _parent_context, parent_otel_span = self._get_span_context(kwargs) | |
| self._add_dynamic_span_processor_if_needed(kwargs) | |
| # Span 1: Requst sent to litellm SDK | |
| span = self.tracer.start_span( | |
| name=self._get_span_name(kwargs), | |
| start_time=self._to_ns(start_time), | |
| context=_parent_context, | |
| ) | |
| span.set_status(Status(StatusCode.OK)) | |
| self.set_attributes(span, kwargs, response_obj) | |
| if litellm.turn_off_message_logging is True: | |
| pass | |
| elif self.message_logging is not True: | |
| pass | |
| else: | |
| # Span 2: Raw Request / Response to LLM | |
| raw_request_span = self.tracer.start_span( | |
| name=RAW_REQUEST_SPAN_NAME, | |
| start_time=self._to_ns(start_time), | |
| context=trace.set_span_in_context(span), | |
| ) | |
| raw_request_span.set_status(Status(StatusCode.OK)) | |
| self.set_raw_request_attributes(raw_request_span, kwargs, response_obj) | |
| raw_request_span.end(end_time=self._to_ns(end_time)) | |
| span.end(end_time=self._to_ns(end_time)) | |
| if parent_otel_span is not None: | |
| parent_otel_span.end(end_time=self._to_ns(datetime.now())) | |
| def _add_dynamic_span_processor_if_needed(self, kwargs): | |
| """ | |
| Helper method to add a span processor with dynamic headers if needed. | |
| This allows for per-request configuration of telemetry exporters by | |
| extracting headers from standard_callback_dynamic_params. | |
| """ | |
| from opentelemetry import trace | |
| standard_callback_dynamic_params: Optional[ | |
| StandardCallbackDynamicParams | |
| ] = kwargs.get("standard_callback_dynamic_params") | |
| if not standard_callback_dynamic_params: | |
| return | |
| # Extract headers from dynamic params | |
| dynamic_headers = {} | |
| # Handle Arize headers | |
| if standard_callback_dynamic_params.get("arize_space_key"): | |
| dynamic_headers["space_key"] = standard_callback_dynamic_params.get( | |
| "arize_space_key" | |
| ) | |
| if standard_callback_dynamic_params.get("arize_api_key"): | |
| dynamic_headers["api_key"] = standard_callback_dynamic_params.get( | |
| "arize_api_key" | |
| ) | |
| # Only create a span processor if we have headers to use | |
| if len(dynamic_headers) > 0: | |
| from opentelemetry.sdk.trace import TracerProvider | |
| provider = trace.get_tracer_provider() | |
| if isinstance(provider, TracerProvider): | |
| span_processor = self._get_span_processor( | |
| dynamic_headers=dynamic_headers | |
| ) | |
| provider.add_span_processor(span_processor) | |
| def _handle_failure(self, kwargs, response_obj, start_time, end_time): | |
| from opentelemetry.trace import Status, StatusCode | |
| verbose_logger.debug( | |
| "OpenTelemetry Logger: Failure HandlerLogging kwargs: %s, OTEL config settings=%s", | |
| kwargs, | |
| self.config, | |
| ) | |
| _parent_context, parent_otel_span = self._get_span_context(kwargs) | |
| # Span 1: Requst sent to litellm SDK | |
| span = self.tracer.start_span( | |
| name=self._get_span_name(kwargs), | |
| start_time=self._to_ns(start_time), | |
| context=_parent_context, | |
| ) | |
| span.set_status(Status(StatusCode.ERROR)) | |
| self.set_attributes(span, kwargs, response_obj) | |
| span.end(end_time=self._to_ns(end_time)) | |
| if parent_otel_span is not None: | |
| parent_otel_span.end(end_time=self._to_ns(datetime.now())) | |
| def set_tools_attributes(self, span: Span, tools): | |
| import json | |
| from litellm.proxy._types import SpanAttributes | |
| if not tools: | |
| return | |
| try: | |
| for i, tool in enumerate(tools): | |
| function = tool.get("function") | |
| if not function: | |
| continue | |
| prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}" | |
| self.safe_set_attribute( | |
| span=span, | |
| key=f"{prefix}.name", | |
| value=function.get("name"), | |
| ) | |
| self.safe_set_attribute( | |
| span=span, | |
| key=f"{prefix}.description", | |
| value=function.get("description"), | |
| ) | |
| self.safe_set_attribute( | |
| span=span, | |
| key=f"{prefix}.parameters", | |
| value=json.dumps(function.get("parameters")), | |
| ) | |
| except Exception as e: | |
| verbose_logger.error( | |
| "OpenTelemetry: Error setting tools attributes: %s", str(e) | |
| ) | |
| pass | |
| def cast_as_primitive_value_type(self, value) -> Union[str, bool, int, float]: | |
| """ | |
| Casts the value to a primitive OTEL type if it is not already a primitive type. | |
| OTEL supports - str, bool, int, float | |
| If it's not a primitive type, then it's converted to a string | |
| """ | |
| if value is None: | |
| return "" | |
| if isinstance(value, (str, bool, int, float)): | |
| return value | |
| try: | |
| return str(value) | |
| except Exception: | |
| return "" | |
| def _tool_calls_kv_pair( | |
| tool_calls: List[ChatCompletionMessageToolCall], | |
| ) -> Dict[str, Any]: | |
| from litellm.proxy._types import SpanAttributes | |
| kv_pairs: Dict[str, Any] = {} | |
| for idx, tool_call in enumerate(tool_calls): | |
| _function = tool_call.get("function") | |
| if not _function: | |
| continue | |
| keys = Function.__annotations__.keys() | |
| for key in keys: | |
| _value = _function.get(key) | |
| if _value: | |
| kv_pairs[ | |
| f"{SpanAttributes.LLM_COMPLETIONS}.{idx}.function_call.{key}" | |
| ] = _value | |
| return kv_pairs | |
| def set_attributes( # noqa: PLR0915 | |
| self, span: Span, kwargs, response_obj: Optional[Any] | |
| ): | |
| try: | |
| if self.callback_name == "arize_phoenix": | |
| from litellm.integrations.arize.arize_phoenix import ArizePhoenixLogger | |
| ArizePhoenixLogger.set_arize_phoenix_attributes( | |
| span, kwargs, response_obj | |
| ) | |
| return | |
| elif self.callback_name == "langtrace": | |
| from litellm.integrations.langtrace import LangtraceAttributes | |
| LangtraceAttributes().set_langtrace_attributes( | |
| span, kwargs, response_obj | |
| ) | |
| return | |
| from litellm.proxy._types import SpanAttributes | |
| optional_params = kwargs.get("optional_params", {}) | |
| litellm_params = kwargs.get("litellm_params", {}) or {} | |
| standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get( | |
| "standard_logging_object" | |
| ) | |
| if standard_logging_payload is None: | |
| raise ValueError("standard_logging_object not found in kwargs") | |
| # https://github.com/open-telemetry/semantic-conventions/blob/main/model/registry/gen-ai.yaml | |
| # Following Conventions here: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/llm-spans.md | |
| ############################################# | |
| ############ LLM CALL METADATA ############## | |
| ############################################# | |
| metadata = standard_logging_payload["metadata"] | |
| for key, value in metadata.items(): | |
| self.safe_set_attribute( | |
| span=span, key="metadata.{}".format(key), value=value | |
| ) | |
| ############################################# | |
| ########## LLM Request Attributes ########### | |
| ############################################# | |
| # The name of the LLM a request is being made to | |
| if kwargs.get("model"): | |
| self.safe_set_attribute( | |
| span=span, | |
| key=SpanAttributes.LLM_REQUEST_MODEL, | |
| value=kwargs.get("model"), | |
| ) | |
| # The LLM request type | |
| self.safe_set_attribute( | |
| span=span, | |
| key=SpanAttributes.LLM_REQUEST_TYPE, | |
| value=standard_logging_payload["call_type"], | |
| ) | |
| # The Generative AI Provider: Azure, OpenAI, etc. | |
| self.safe_set_attribute( | |
| span=span, | |
| key=SpanAttributes.LLM_SYSTEM, | |
| value=litellm_params.get("custom_llm_provider", "Unknown"), | |
| ) | |
| # The maximum number of tokens the LLM generates for a request. | |
| if optional_params.get("max_tokens"): | |
| self.safe_set_attribute( | |
| span=span, | |
| key=SpanAttributes.LLM_REQUEST_MAX_TOKENS, | |
| value=optional_params.get("max_tokens"), | |
| ) | |
| # The temperature setting for the LLM request. | |
| if optional_params.get("temperature"): | |
| self.safe_set_attribute( | |
| span=span, | |
| key=SpanAttributes.LLM_REQUEST_TEMPERATURE, | |
| value=optional_params.get("temperature"), | |
| ) | |
| # The top_p sampling setting for the LLM request. | |
| if optional_params.get("top_p"): | |
| self.safe_set_attribute( | |
| span=span, | |
| key=SpanAttributes.LLM_REQUEST_TOP_P, | |
| value=optional_params.get("top_p"), | |
| ) | |
| self.safe_set_attribute( | |
| span=span, | |
| key=SpanAttributes.LLM_IS_STREAMING, | |
| value=str(optional_params.get("stream", False)), | |
| ) | |
| if optional_params.get("user"): | |
| self.safe_set_attribute( | |
| span=span, | |
| key=SpanAttributes.LLM_USER, | |
| value=optional_params.get("user"), | |
| ) | |
| # The unique identifier for the completion. | |
| if response_obj and response_obj.get("id"): | |
| self.safe_set_attribute( | |
| span=span, key="gen_ai.response.id", value=response_obj.get("id") | |
| ) | |
| # The model used to generate the response. | |
| if response_obj and response_obj.get("model"): | |
| self.safe_set_attribute( | |
| span=span, | |
| key=SpanAttributes.LLM_RESPONSE_MODEL, | |
| value=response_obj.get("model"), | |
| ) | |
| usage = response_obj and response_obj.get("usage") | |
| if usage: | |
| self.safe_set_attribute( | |
| span=span, | |
| key=SpanAttributes.LLM_USAGE_TOTAL_TOKENS, | |
| value=usage.get("total_tokens"), | |
| ) | |
| # The number of tokens used in the LLM response (completion). | |
| self.safe_set_attribute( | |
| span=span, | |
| key=SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, | |
| value=usage.get("completion_tokens"), | |
| ) | |
| # The number of tokens used in the LLM prompt. | |
| self.safe_set_attribute( | |
| span=span, | |
| key=SpanAttributes.LLM_USAGE_PROMPT_TOKENS, | |
| value=usage.get("prompt_tokens"), | |
| ) | |
| ######################################################################## | |
| ########## LLM Request Medssages / tools / content Attributes ########### | |
| ######################################################################### | |
| if litellm.turn_off_message_logging is True: | |
| return | |
| if self.message_logging is not True: | |
| return | |
| if optional_params.get("tools"): | |
| tools = optional_params["tools"] | |
| self.set_tools_attributes(span, tools) | |
| if kwargs.get("messages"): | |
| for idx, prompt in enumerate(kwargs.get("messages")): | |
| if prompt.get("role"): | |
| self.safe_set_attribute( | |
| span=span, | |
| key=f"{SpanAttributes.LLM_PROMPTS}.{idx}.role", | |
| value=prompt.get("role"), | |
| ) | |
| if prompt.get("content"): | |
| if not isinstance(prompt.get("content"), str): | |
| prompt["content"] = str(prompt.get("content")) | |
| self.safe_set_attribute( | |
| span=span, | |
| key=f"{SpanAttributes.LLM_PROMPTS}.{idx}.content", | |
| value=prompt.get("content"), | |
| ) | |
| ############################################# | |
| ########## LLM Response Attributes ########## | |
| ############################################# | |
| if response_obj is not None: | |
| if response_obj.get("choices"): | |
| for idx, choice in enumerate(response_obj.get("choices")): | |
| if choice.get("finish_reason"): | |
| self.safe_set_attribute( | |
| span=span, | |
| key=f"{SpanAttributes.LLM_COMPLETIONS}.{idx}.finish_reason", | |
| value=choice.get("finish_reason"), | |
| ) | |
| if choice.get("message"): | |
| if choice.get("message").get("role"): | |
| self.safe_set_attribute( | |
| span=span, | |
| key=f"{SpanAttributes.LLM_COMPLETIONS}.{idx}.role", | |
| value=choice.get("message").get("role"), | |
| ) | |
| if choice.get("message").get("content"): | |
| if not isinstance( | |
| choice.get("message").get("content"), str | |
| ): | |
| choice["message"]["content"] = str( | |
| choice.get("message").get("content") | |
| ) | |
| self.safe_set_attribute( | |
| span=span, | |
| key=f"{SpanAttributes.LLM_COMPLETIONS}.{idx}.content", | |
| value=choice.get("message").get("content"), | |
| ) | |
| message = choice.get("message") | |
| tool_calls = message.get("tool_calls") | |
| if tool_calls: | |
| kv_pairs = OpenTelemetry._tool_calls_kv_pair(tool_calls) # type: ignore | |
| for key, value in kv_pairs.items(): | |
| self.safe_set_attribute( | |
| span=span, | |
| key=key, | |
| value=value, | |
| ) | |
| except Exception as e: | |
| verbose_logger.exception( | |
| "OpenTelemetry logging error in set_attributes %s", str(e) | |
| ) | |
| def _cast_as_primitive_value_type(self, value) -> Union[str, bool, int, float]: | |
| """ | |
| Casts the value to a primitive OTEL type if it is not already a primitive type. | |
| OTEL supports - str, bool, int, float | |
| If it's not a primitive type, then it's converted to a string | |
| """ | |
| if value is None: | |
| return "" | |
| if isinstance(value, (str, bool, int, float)): | |
| return value | |
| try: | |
| return str(value) | |
| except Exception: | |
| return "" | |
| def safe_set_attribute(self, span: Span, key: str, value: Any): | |
| """ | |
| Safely sets an attribute on the span, ensuring the value is a primitive type. | |
| """ | |
| primitive_value = self._cast_as_primitive_value_type(value) | |
| span.set_attribute(key, primitive_value) | |
| def set_raw_request_attributes(self, span: Span, kwargs, response_obj): | |
| kwargs.get("optional_params", {}) | |
| litellm_params = kwargs.get("litellm_params", {}) or {} | |
| custom_llm_provider = litellm_params.get("custom_llm_provider", "Unknown") | |
| _raw_response = kwargs.get("original_response") | |
| _additional_args = kwargs.get("additional_args", {}) or {} | |
| complete_input_dict = _additional_args.get("complete_input_dict") | |
| ############################################# | |
| ########## LLM Request Attributes ########### | |
| ############################################# | |
| # OTEL Attributes for the RAW Request to https://docs.anthropic.com/en/api/messages | |
| if complete_input_dict and isinstance(complete_input_dict, dict): | |
| for param, val in complete_input_dict.items(): | |
| self.safe_set_attribute( | |
| span=span, key=f"llm.{custom_llm_provider}.{param}", value=val | |
| ) | |
| ############################################# | |
| ########## LLM Response Attributes ########## | |
| ############################################# | |
| if _raw_response and isinstance(_raw_response, str): | |
| # cast sr -> dict | |
| import json | |
| try: | |
| _raw_response = json.loads(_raw_response) | |
| for param, val in _raw_response.items(): | |
| self.safe_set_attribute( | |
| span=span, | |
| key=f"llm.{custom_llm_provider}.{param}", | |
| value=val, | |
| ) | |
| except json.JSONDecodeError: | |
| verbose_logger.debug( | |
| "litellm.integrations.opentelemetry.py::set_raw_request_attributes() - raw_response not json string - {}".format( | |
| _raw_response | |
| ) | |
| ) | |
| self.safe_set_attribute( | |
| span=span, | |
| key=f"llm.{custom_llm_provider}.stringified_raw_response", | |
| value=_raw_response, | |
| ) | |
| def _to_ns(self, dt): | |
| return int(dt.timestamp() * 1e9) | |
| def _get_span_name(self, kwargs): | |
| return LITELLM_REQUEST_SPAN_NAME | |
| def get_traceparent_from_header(self, headers): | |
| if headers is None: | |
| return None | |
| _traceparent = headers.get("traceparent", None) | |
| if _traceparent is None: | |
| return None | |
| from opentelemetry.trace.propagation.tracecontext import ( | |
| TraceContextTextMapPropagator, | |
| ) | |
| propagator = TraceContextTextMapPropagator() | |
| carrier = {"traceparent": _traceparent} | |
| _parent_context = propagator.extract(carrier=carrier) | |
| return _parent_context | |
| def _get_span_context(self, kwargs): | |
| from opentelemetry import trace | |
| from opentelemetry.trace.propagation.tracecontext import ( | |
| TraceContextTextMapPropagator, | |
| ) | |
| litellm_params = kwargs.get("litellm_params", {}) or {} | |
| proxy_server_request = litellm_params.get("proxy_server_request", {}) or {} | |
| headers = proxy_server_request.get("headers", {}) or {} | |
| traceparent = headers.get("traceparent", None) | |
| _metadata = litellm_params.get("metadata", {}) or {} | |
| parent_otel_span = _metadata.get("litellm_parent_otel_span", None) | |
| """ | |
| Two way to use parents in opentelemetry | |
| - using the traceparent header | |
| - using the parent_otel_span in the [metadata][parent_otel_span] | |
| """ | |
| if parent_otel_span is not None: | |
| return trace.set_span_in_context(parent_otel_span), parent_otel_span | |
| if traceparent is None: | |
| return None, None | |
| else: | |
| carrier = {"traceparent": traceparent} | |
| return TraceContextTextMapPropagator().extract(carrier=carrier), None | |
| def _get_span_processor(self, dynamic_headers: Optional[dict] = None): | |
| from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( | |
| OTLPSpanExporter as OTLPSpanExporterGRPC, | |
| ) | |
| from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( | |
| OTLPSpanExporter as OTLPSpanExporterHTTP, | |
| ) | |
| from opentelemetry.sdk.trace.export import ( | |
| BatchSpanProcessor, | |
| ConsoleSpanExporter, | |
| SimpleSpanProcessor, | |
| SpanExporter, | |
| ) | |
| verbose_logger.debug( | |
| "OpenTelemetry Logger, initializing span processor \nself.OTEL_EXPORTER: %s\nself.OTEL_ENDPOINT: %s\nself.OTEL_HEADERS: %s", | |
| self.OTEL_EXPORTER, | |
| self.OTEL_ENDPOINT, | |
| self.OTEL_HEADERS, | |
| ) | |
| _split_otel_headers = OpenTelemetry._get_headers_dictionary( | |
| headers=dynamic_headers or self.OTEL_HEADERS | |
| ) | |
| if hasattr( | |
| self.OTEL_EXPORTER, "export" | |
| ): # Check if it has the export method that SpanExporter requires | |
| verbose_logger.debug( | |
| "OpenTelemetry: intiializing SpanExporter. Value of OTEL_EXPORTER: %s", | |
| self.OTEL_EXPORTER, | |
| ) | |
| return SimpleSpanProcessor(cast(SpanExporter, self.OTEL_EXPORTER)) | |
| if self.OTEL_EXPORTER == "console": | |
| verbose_logger.debug( | |
| "OpenTelemetry: intiializing console exporter. Value of OTEL_EXPORTER: %s", | |
| self.OTEL_EXPORTER, | |
| ) | |
| return BatchSpanProcessor(ConsoleSpanExporter()) | |
| elif self.OTEL_EXPORTER == "otlp_http": | |
| verbose_logger.debug( | |
| "OpenTelemetry: intiializing http exporter. Value of OTEL_EXPORTER: %s", | |
| self.OTEL_EXPORTER, | |
| ) | |
| return BatchSpanProcessor( | |
| OTLPSpanExporterHTTP( | |
| endpoint=self.OTEL_ENDPOINT, headers=_split_otel_headers | |
| ), | |
| ) | |
| elif self.OTEL_EXPORTER == "otlp_grpc": | |
| verbose_logger.debug( | |
| "OpenTelemetry: intiializing grpc exporter. Value of OTEL_EXPORTER: %s", | |
| self.OTEL_EXPORTER, | |
| ) | |
| return BatchSpanProcessor( | |
| OTLPSpanExporterGRPC( | |
| endpoint=self.OTEL_ENDPOINT, headers=_split_otel_headers | |
| ), | |
| ) | |
| else: | |
| verbose_logger.debug( | |
| "OpenTelemetry: intiializing console exporter. Value of OTEL_EXPORTER: %s", | |
| self.OTEL_EXPORTER, | |
| ) | |
| return BatchSpanProcessor(ConsoleSpanExporter()) | |
| def _get_headers_dictionary(headers: Optional[Union[str, dict]]) -> Dict[str, str]: | |
| """ | |
| Convert a string or dictionary of headers into a dictionary of headers. | |
| """ | |
| _split_otel_headers: Dict[str, str] = {} | |
| if headers: | |
| if isinstance(headers, str): | |
| # when passed HEADERS="x-honeycomb-team=B85YgLm96******" | |
| # Split only on first '=' occurrence | |
| parts = headers.split("=", 1) | |
| if len(parts) == 2: | |
| _split_otel_headers = {parts[0]: parts[1]} | |
| else: | |
| _split_otel_headers = {} | |
| elif isinstance(headers, dict): | |
| _split_otel_headers = headers | |
| return _split_otel_headers | |
| async def async_management_endpoint_success_hook( | |
| self, | |
| logging_payload: ManagementEndpointLoggingPayload, | |
| parent_otel_span: Optional[Span] = None, | |
| ): | |
| from opentelemetry import trace | |
| from opentelemetry.trace import Status, StatusCode | |
| _start_time_ns = 0 | |
| _end_time_ns = 0 | |
| start_time = logging_payload.start_time | |
| end_time = logging_payload.end_time | |
| if isinstance(start_time, float): | |
| _start_time_ns = int(start_time * 1e9) | |
| else: | |
| _start_time_ns = self._to_ns(start_time) | |
| if isinstance(end_time, float): | |
| _end_time_ns = int(end_time * 1e9) | |
| else: | |
| _end_time_ns = self._to_ns(end_time) | |
| if parent_otel_span is not None: | |
| _span_name = logging_payload.route | |
| management_endpoint_span = self.tracer.start_span( | |
| name=_span_name, | |
| context=trace.set_span_in_context(parent_otel_span), | |
| start_time=_start_time_ns, | |
| ) | |
| _request_data = logging_payload.request_data | |
| if _request_data is not None: | |
| for key, value in _request_data.items(): | |
| self.safe_set_attribute( | |
| span=management_endpoint_span, | |
| key=f"request.{key}", | |
| value=value, | |
| ) | |
| _response = logging_payload.response | |
| if _response is not None: | |
| for key, value in _response.items(): | |
| self.safe_set_attribute( | |
| span=management_endpoint_span, | |
| key=f"response.{key}", | |
| value=value, | |
| ) | |
| management_endpoint_span.set_status(Status(StatusCode.OK)) | |
| management_endpoint_span.end(end_time=_end_time_ns) | |
| async def async_management_endpoint_failure_hook( | |
| self, | |
| logging_payload: ManagementEndpointLoggingPayload, | |
| parent_otel_span: Optional[Span] = None, | |
| ): | |
| from opentelemetry import trace | |
| from opentelemetry.trace import Status, StatusCode | |
| _start_time_ns = 0 | |
| _end_time_ns = 0 | |
| start_time = logging_payload.start_time | |
| end_time = logging_payload.end_time | |
| if isinstance(start_time, float): | |
| _start_time_ns = int(int(start_time) * 1e9) | |
| else: | |
| _start_time_ns = self._to_ns(start_time) | |
| if isinstance(end_time, float): | |
| _end_time_ns = int(int(end_time) * 1e9) | |
| else: | |
| _end_time_ns = self._to_ns(end_time) | |
| if parent_otel_span is not None: | |
| _span_name = logging_payload.route | |
| management_endpoint_span = self.tracer.start_span( | |
| name=_span_name, | |
| context=trace.set_span_in_context(parent_otel_span), | |
| start_time=_start_time_ns, | |
| ) | |
| _request_data = logging_payload.request_data | |
| if _request_data is not None: | |
| for key, value in _request_data.items(): | |
| self.safe_set_attribute( | |
| span=management_endpoint_span, | |
| key=f"request.{key}", | |
| value=value, | |
| ) | |
| _exception = logging_payload.exception | |
| self.safe_set_attribute( | |
| span=management_endpoint_span, | |
| key="exception", | |
| value=str(_exception), | |
| ) | |
| management_endpoint_span.set_status(Status(StatusCode.ERROR)) | |
| management_endpoint_span.end(end_time=_end_time_ns) | |
| def create_litellm_proxy_request_started_span( | |
| self, | |
| start_time: datetime, | |
| headers: dict, | |
| ) -> Optional[Span]: | |
| """ | |
| Create a span for the received proxy server request. | |
| """ | |
| return self.tracer.start_span( | |
| name="Received Proxy Server Request", | |
| start_time=self._to_ns(start_time), | |
| context=self.get_traceparent_from_header(headers=headers), | |
| kind=self.span_kind.SERVER, | |
| ) | |