Spaces:
Sleeping
Sleeping
| import inspect | |
| import contextlib | |
| import functools | |
| from typing import TYPE_CHECKING, Callable, Any, Union, Iterable | |
| from aworld.trace.base import ( | |
| AttributeValueType | |
| ) | |
| from aworld.trace.stack_info import get_filepath_attribute | |
| from aworld.trace.constants import ( | |
| ATTRIBUTES_MESSAGE_TEMPLATE_KEY | |
| ) | |
| if TYPE_CHECKING: | |
| from aworld.trace.context_manager import TraceManager, ContextSpan | |
| def trace_func(trace_manager: "TraceManager", | |
| msg_template: str = None, | |
| attributes: dict[str, AttributeValueType] = None, | |
| span_name: str = None, | |
| extract_args: Union[bool, Iterable[str]] = False): | |
| """A decorator that traces the execution of a function. | |
| Args: | |
| trace_manager: The trace manager to use. | |
| msg_template: The message template to use. | |
| attributes: The attributes to use. | |
| span_name: The span name to use. | |
| extract_args: Whether to extract arguments from the function call. | |
| Returns: | |
| The decorated function. | |
| """ | |
| def decorator(func: Callable) -> Callable: | |
| func_meta = get_function_meta(func, msg_template) | |
| func_meta.update(attributes or {}) | |
| final_span_name = span_name or func_meta.get(ATTRIBUTES_MESSAGE_TEMPLATE_KEY) or func.__name__ | |
| if inspect.isgeneratorfunction(func): | |
| def wrapper(*args, **kwargs): | |
| with open_func_span(trace_manager, func_meta, final_span_name, | |
| get_func_args(func, extract_args, *args, **kwargs)): | |
| for item in func(*args, **kwargs): | |
| yield item | |
| elif inspect.isasyncgenfunction(func): | |
| async def wrapper(*args, **kwargs): | |
| with open_func_span(trace_manager, func_meta, final_span_name, | |
| get_func_args(func, extract_args, *args, **kwargs)): | |
| async for item in func(*args, **kwargs): | |
| yield item | |
| elif inspect.iscoroutinefunction(func): | |
| async def wrapper(*args, **kwargs): | |
| with open_func_span(trace_manager, func_meta, final_span_name, | |
| get_func_args(func, extract_args, *args, **kwargs)): | |
| return await func(*args, **kwargs) | |
| else: | |
| def wrapper(*args, **kwargs): | |
| with open_func_span(trace_manager, func_meta, final_span_name, | |
| get_func_args(func, extract_args, *args, **kwargs)): | |
| return func(*args, **kwargs) | |
| wrapper = functools.wraps(func)(wrapper) # type: ignore | |
| return wrapper | |
| return decorator | |
| def open_func_span(trace_manager: "TraceManager", | |
| func_meta: dict[str, AttributeValueType], | |
| span_name: str, | |
| func_args: dict[str, AttributeValueType]): | |
| """Open a function span. | |
| Args: | |
| func_meta: The function meta information. | |
| span_name: The span name. | |
| Returns: | |
| The function span. | |
| """ | |
| func_meta.update(func_args) | |
| return trace_manager._create_auto_span(name=span_name, attributes=func_meta) | |
| def get_func_args(func: Callable, | |
| extract_args: Union[bool, Iterable[str]] = False, | |
| *args, | |
| **kwargs): | |
| """Get the arguments of a function. | |
| Args: | |
| func: The function to get the arguments of. | |
| extract_args: Whether to extract arguments from the function call. | |
| *args: The positional arguments. | |
| **kwargs: The keyword arguments. | |
| Returns: | |
| The arguments of the function. | |
| """ | |
| func_sig = inspect.signature(func) | |
| if func_sig.parameters: | |
| func_args = func_sig.bind(*args, **kwargs).arguments | |
| if extract_args is not False: | |
| if isinstance(extract_args, bool): | |
| extract_args = func_sig.parameters.keys() | |
| func_args = {k: v for k, v in func_args.items() if k in extract_args} | |
| return func_args | |
| return {} | |
| def get_function_meta(func: Any, | |
| msg_template: str = None) -> dict[str, AttributeValueType]: | |
| """Get the meta information of a function.\ | |
| Args: | |
| func: The function to get the meta information of. | |
| msg_template: The message template to use. | |
| Returns: | |
| The meta information of the function. | |
| """ | |
| func = inspect.unwrap(func) | |
| if not inspect.isfunction(func) and hasattr(func, '__call__'): | |
| func = func.__call__ | |
| func = inspect.unwrap(func) | |
| func_name = getattr(func, '__qualname__', getattr(func, '__name__', build_func_name(func))) | |
| if not msg_template: | |
| try: | |
| msg_template = f'Calling {inspect.getmodule(func).__name__}.{func_name}' # type: ignore | |
| except Exception: # pragma: no cover | |
| msg_template = f'Calling {func_name}' | |
| meta: dict[str, AttributeValueType] = { | |
| 'code.function': func_name, | |
| ATTRIBUTES_MESSAGE_TEMPLATE_KEY: msg_template, | |
| } | |
| with contextlib.suppress(Exception): | |
| meta['code.lineno'] = func.__code__.co_firstlineno | |
| with contextlib.suppress(Exception): | |
| # get code.filepath | |
| meta.update(get_filepath_attribute(inspect.getsourcefile(func))) | |
| func_sig = inspect.signature(func) | |
| if func_sig.parameters: | |
| meta['func.args'] = [str(param) for param in func_sig.parameters.values() | |
| if param.name != 'self'] | |
| return meta | |
| def build_func_name(func: Any) -> str: | |
| """Build the function name. | |
| Args: | |
| func: The function to build the name of. | |
| Returns: | |
| The function name. | |
| """ | |
| try: | |
| result = repr(func) | |
| except Exception: | |
| result = f'<{type(func).__name__} object>' | |
| return result | |