Spaces:
Sleeping
Sleeping
| import inspect | |
| import sys | |
| import aworld.trace as atrace | |
| from types import CodeType, FrameType | |
| from typing import Optional, TypedDict, Union | |
| from functools import lru_cache | |
| from pathlib import Path | |
| StackInfo = TypedDict('StackInfo', {'code.filepath': str, 'code.lineno': int, 'code.function': str}, total=False) | |
| NON_USER_CODE_PREFIXES: tuple[str, ...] = () | |
| def add_non_user_code_prefix(path: Union[str, Path]) -> None: | |
| global NON_USER_CODE_PREFIXES | |
| path = str(Path(path).absolute()) | |
| NON_USER_CODE_PREFIXES += (path,) | |
| add_non_user_code_prefix(Path(inspect.__file__).parent) | |
| add_non_user_code_prefix(Path(atrace.__file__).parent) | |
| def get_user_stack_info() -> StackInfo: | |
| """Get the stack info for the first calling frame in user code. | |
| See is_user_code for details. | |
| Returns an empty dict if no such frame is found. | |
| """ | |
| frame, _stacklevel = get_user_frame_and_stacklevel() | |
| if frame: | |
| return get_stack_info_from_frame(frame) | |
| return {} | |
| def get_user_frame_and_stacklevel() -> tuple[Optional[FrameType], int]: | |
| """Get the first calling frame in user code and a corresponding stacklevel that can be passed to `warnings.warn`. | |
| See is_user_code for details. | |
| Returns `(None, 0)` if no such frame is found. | |
| """ | |
| frame = inspect.currentframe() | |
| stacklevel = 0 | |
| while frame: | |
| if is_user_code(frame.f_code): | |
| return frame, stacklevel | |
| frame = frame.f_back | |
| stacklevel += 1 | |
| return None, 0 | |
| def get_stack_info_from_frame(frame: FrameType) -> StackInfo: | |
| return { | |
| **get_code_object_info(frame.f_code), | |
| 'code.lineno': frame.f_lineno, | |
| } | |
| def get_code_object_info(code: CodeType) -> StackInfo: | |
| result = get_filepath_attribute(code.co_filename) | |
| if code.co_name != '<module>': # pragma: no branch | |
| result['code.function'] = code.co_qualname if sys.version_info >= (3, 11) else code.co_name | |
| result['code.lineno'] = code.co_firstlineno | |
| return result | |
| def get_filepath_attribute(file: str) -> StackInfo: | |
| path = Path(file) | |
| if path.is_absolute(): | |
| try: | |
| path = path.relative_to(Path('.').resolve()) | |
| except ValueError: # pragma: no cover | |
| # happens if filename path is not within CWD | |
| pass | |
| return {'code.filepath': str(path)} | |
| def is_user_code(code: CodeType) -> bool: | |
| """Check if the code object is from user code. | |
| A code object is not user code if: | |
| - It is from a file in | |
| - the standard library | |
| - site-packages (specifically wherever opentelemetry is installed) | |
| - an unknown location (e.g. a dynamically generated code object) indicated by a filename starting with '<' | |
| - It is a list/dict/set comprehension. | |
| These are artificial frames only created before Python 3.12, | |
| and they are always called directly from the enclosing function so it makes sense to skip them. | |
| On the other hand, generator expressions and lambdas might be called far away from where they are defined. | |
| """ | |
| return not ( | |
| str(Path(code.co_filename).absolute()).startswith(NON_USER_CODE_PREFIXES) | |
| or code.co_filename.startswith('<') | |
| or code.co_name in ('<listcomp>', '<dictcomp>', '<setcomp>') | |
| ) |