| from abc import ABCMeta, abstractmethod | |
| from types import TracebackType | |
| from typing import Optional, Type, TypeVar | |
| T = TypeVar("T") | |
| class AsyncResource(metaclass=ABCMeta): | |
| """ | |
| Abstract base class for all closeable asynchronous resources. | |
| Works as an asynchronous context manager which returns the instance itself on enter, and calls | |
| :meth:`aclose` on exit. | |
| """ | |
| async def __aenter__(self: T) -> T: | |
| return self | |
| async def __aexit__( | |
| self, | |
| exc_type: Optional[Type[BaseException]], | |
| exc_val: Optional[BaseException], | |
| exc_tb: Optional[TracebackType], | |
| ) -> None: | |
| await self.aclose() | |
| async def aclose(self) -> None: | |
| """Close the resource.""" | |