| | from __future__ import annotations |
| |
|
| | import inspect |
| | import logging |
| | import types |
| | import typing |
| |
|
| | from ._models import Request |
| |
|
| |
|
| | class Trace: |
| | def __init__( |
| | self, |
| | name: str, |
| | logger: logging.Logger, |
| | request: Request | None = None, |
| | kwargs: dict[str, typing.Any] | None = None, |
| | ) -> None: |
| | self.name = name |
| | self.logger = logger |
| | self.trace_extension = ( |
| | None if request is None else request.extensions.get("trace") |
| | ) |
| | self.debug = self.logger.isEnabledFor(logging.DEBUG) |
| | self.kwargs = kwargs or {} |
| | self.return_value: typing.Any = None |
| | self.should_trace = self.debug or self.trace_extension is not None |
| | self.prefix = self.logger.name.split(".")[-1] |
| |
|
| | def trace(self, name: str, info: dict[str, typing.Any]) -> None: |
| | if self.trace_extension is not None: |
| | prefix_and_name = f"{self.prefix}.{name}" |
| | ret = self.trace_extension(prefix_and_name, info) |
| | if inspect.iscoroutine(ret): |
| | raise TypeError( |
| | "If you are using a synchronous interface, " |
| | "the callback of the `trace` extension should " |
| | "be a normal function instead of an asynchronous function." |
| | ) |
| |
|
| | if self.debug: |
| | if not info or "return_value" in info and info["return_value"] is None: |
| | message = name |
| | else: |
| | args = " ".join([f"{key}={value!r}" for key, value in info.items()]) |
| | message = f"{name} {args}" |
| | self.logger.debug(message) |
| |
|
| | def __enter__(self) -> Trace: |
| | if self.should_trace: |
| | info = self.kwargs |
| | self.trace(f"{self.name}.started", info) |
| | return self |
| |
|
| | def __exit__( |
| | self, |
| | exc_type: type[BaseException] | None = None, |
| | exc_value: BaseException | None = None, |
| | traceback: types.TracebackType | None = None, |
| | ) -> None: |
| | if self.should_trace: |
| | if exc_value is None: |
| | info = {"return_value": self.return_value} |
| | self.trace(f"{self.name}.complete", info) |
| | else: |
| | info = {"exception": exc_value} |
| | self.trace(f"{self.name}.failed", info) |
| |
|
| | async def atrace(self, name: str, info: dict[str, typing.Any]) -> None: |
| | if self.trace_extension is not None: |
| | prefix_and_name = f"{self.prefix}.{name}" |
| | coro = self.trace_extension(prefix_and_name, info) |
| | if not inspect.iscoroutine(coro): |
| | raise TypeError( |
| | "If you're using an asynchronous interface, " |
| | "the callback of the `trace` extension should " |
| | "be an asynchronous function rather than a normal function." |
| | ) |
| | await coro |
| |
|
| | if self.debug: |
| | if not info or "return_value" in info and info["return_value"] is None: |
| | message = name |
| | else: |
| | args = " ".join([f"{key}={value!r}" for key, value in info.items()]) |
| | message = f"{name} {args}" |
| | self.logger.debug(message) |
| |
|
| | async def __aenter__(self) -> Trace: |
| | if self.should_trace: |
| | info = self.kwargs |
| | await self.atrace(f"{self.name}.started", info) |
| | return self |
| |
|
| | async def __aexit__( |
| | self, |
| | exc_type: type[BaseException] | None = None, |
| | exc_value: BaseException | None = None, |
| | traceback: types.TracebackType | None = None, |
| | ) -> None: |
| | if self.should_trace: |
| | if exc_value is None: |
| | info = {"return_value": self.return_value} |
| | await self.atrace(f"{self.name}.complete", info) |
| | else: |
| | info = {"exception": exc_value} |
| | await self.atrace(f"{self.name}.failed", info) |
| |
|