fciannella's picture
Working with service run on 7860
53ea588
raw
history blame
7.03 kB
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: BSD 2-Clause License
"""Tracing utilities."""
import asyncio
import contextlib
import enum
import functools
import inspect
from collections.abc import Callable
import opentelemetry.trace
from opentelemetry import metrics, trace
class Traceable:
"""Base class for traceable objects.
This class provides tracing functionality using OpenTelemetry. It initializes tracing and metrics
components and manages spans for tracing operations.
"""
def __init__(self, name: str, **kwargs):
"""Initialize a traceable object.
Args:
name (str): Name of the traceable object used for the initial span
**kwargs: Additional keyword arguments passed to parent class
"""
super().__init__(**kwargs)
self._tracer = trace.get_tracer("traceable")
self._meter = metrics.get_meter("meter")
self._parent_span_id = trace.get_current_span().get_span_context().span_id
self._span = self._tracer.start_span(name)
self._span.end()
@property
def meter(self):
"""Returns the OpenTelemetry meter instance.
The meter is used for collecting metrics and measurements in OpenTelemetry.
Returns:
opentelemetry.metrics.Meter: The OpenTelemetry meter instance used by this traceable object.
"""
return self._meter
class AttachmentStrategy(enum.Enum):
"""Attachment strategy for the @traced annotation.
CHILD: span will be attached to the class span if no parent or to its parent otherwise.
LINK : span will be attached to the class span but linked to its parent.
NONE : span will be attached to the class span even if nested under another span.
"""
CHILD = enum.auto()
LINK = enum.auto()
NONE = enum.auto()
@contextlib.contextmanager
def __traced_context_manager(
self: Traceable, func: Callable, name: str | None, attachment_strategy: AttachmentStrategy
):
if not isinstance(self, Traceable):
raise RuntimeError("@traced annotation can only be used in classes inheriting from Traceable")
stack = contextlib.ExitStack()
try:
current_span = trace.get_current_span()
is_span_class_parent_span = current_span.get_span_context().span_id == self._parent_span_id
match attachment_strategy:
case AttachmentStrategy.CHILD if not is_span_class_parent_span:
stack.enter_context(self._tracer.start_as_current_span(func.__name__ if name is None else name)) # type: ignore
case AttachmentStrategy.LINK:
if is_span_class_parent_span:
link = trace.Link(self._span.get_span_context())
else:
link = trace.Link(current_span.get_span_context())
stack.enter_context(opentelemetry.trace.use_span(span=self._span, end_on_exit=False)) # type: ignore
stack.enter_context(
self._tracer.start_as_current_span(func.__name__ if name is None else name, links=[link])
) # type: ignore
case AttachmentStrategy.NONE | AttachmentStrategy.CHILD:
stack.enter_context(opentelemetry.trace.use_span(span=self._span, end_on_exit=False)) # type: ignore
stack.enter_context(self._tracer.start_as_current_span(func.__name__ if name is None else name)) # type: ignore
yield
finally:
stack.close()
def __traced_decorator(func, name, attachment_strategy: AttachmentStrategy):
@functools.wraps(func)
async def coroutine_wrapper(self: Traceable, *args, **kwargs):
exception = None
with __traced_context_manager(self, func, name, attachment_strategy):
try:
return await func(self, *args, **kwargs)
except asyncio.CancelledError as e:
exception = e
if exception:
raise exception
@functools.wraps(func)
async def generator_wrapper(self: Traceable, *args, **kwargs):
exception = None
with __traced_context_manager(self, func, name, attachment_strategy):
try:
async for v in func(self, *args, **kwargs):
yield v
except asyncio.CancelledError as e:
exception = e
if exception:
raise exception
if inspect.iscoroutinefunction(func):
return coroutine_wrapper
if inspect.isasyncgenfunction(func):
return generator_wrapper
raise ValueError("@traced annotation can only be used on async or async generator functions")
def traced(
func: Callable | None = None,
*,
name: str | None = None,
attachment_strategy: AttachmentStrategy = AttachmentStrategy.CHILD,
):
"""Decorator that adds tracing to an async function.
This decorator wraps an async function to add OpenTelemetry tracing capabilities.
It creates a new span for the function and maintains proper context propagation.
For FrameProcessor process_frame method, it also ensures the parent span is properly set.
Args:
func: The async function to be traced.
name: Name for the span. Defaults to the name of the function.
attachment_strategy (AttachmentStrategy): The attachment strategy to use (Possible values are
CHILD, LINK, NONE)
Returns:
A wrapped async function with tracing capabilities.
Raises:
RuntimeError: If used on a function in a class that doesn't inherit from Traceable.
Example:
@traceable
class MyClass:
@traced
async def my_function(self):
pass
"""
if func is not None:
return __traced_decorator(func, name=name, attachment_strategy=attachment_strategy)
else:
return functools.partial(__traced_decorator, name=name, attachment_strategy=attachment_strategy)
def traceable(cls):
"""Class decorator that makes a class traceable for OpenTelemetry instrumentation.
This decorator creates a new class that inherits from both the original class and
the Traceable base class. The new class will be ready to be instrumented for tracing using
the @traced decorator.
Args:
cls: The class to make traceable.
Returns:
TracedClass: A new class that inherits from both the input class and Traceable,
with tracing capabilities added.
Example:
@traceable
class MyClass:
@traced
def my_method(self):
pass
"""
@functools.wraps(cls, updated=())
class TracedClass(cls, Traceable):
def __init__(self, *args, **kwargs):
cls.__init__(self, *args, **kwargs)
if hasattr(self, "name"):
Traceable.__init__(self, self.name)
else:
Traceable.__init__(self, cls.__name__)
return TracedClass