| | """ |
| | Implementation for async generators. |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | from asyncio import get_running_loop |
| | from contextlib import asynccontextmanager |
| | from queue import Empty, Full, Queue |
| | from typing import Any, AsyncGenerator, Callable, Iterable, TypeVar |
| |
|
| | from .utils import run_in_executor_with_context |
| |
|
| | __all__ = [ |
| | "aclosing", |
| | "generator_to_async_generator", |
| | ] |
| |
|
| | _T_Generator = TypeVar("_T_Generator", bound=AsyncGenerator[Any, None]) |
| |
|
| |
|
| | @asynccontextmanager |
| | async def aclosing( |
| | thing: _T_Generator, |
| | ) -> AsyncGenerator[_T_Generator, None]: |
| | "Similar to `contextlib.aclosing`, in Python 3.10." |
| | try: |
| | yield thing |
| | finally: |
| | await thing.aclose() |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | DEFAULT_BUFFER_SIZE: int = 1000 |
| |
|
| | _T = TypeVar("_T") |
| |
|
| |
|
| | class _Done: |
| | pass |
| |
|
| |
|
| | async def generator_to_async_generator( |
| | get_iterable: Callable[[], Iterable[_T]], |
| | buffer_size: int = DEFAULT_BUFFER_SIZE, |
| | ) -> AsyncGenerator[_T, None]: |
| | """ |
| | Turn a generator or iterable into an async generator. |
| | |
| | This works by running the generator in a background thread. |
| | |
| | :param get_iterable: Function that returns a generator or iterable when |
| | called. |
| | :param buffer_size: Size of the queue between the async consumer and the |
| | synchronous generator that produces items. |
| | """ |
| | quitting = False |
| | |
| | q: Queue[_T | _Done] = Queue(maxsize=buffer_size) |
| | loop = get_running_loop() |
| |
|
| | def runner() -> None: |
| | """ |
| | Consume the generator in background thread. |
| | When items are received, they'll be pushed to the queue. |
| | """ |
| | try: |
| | for item in get_iterable(): |
| | |
| | |
| | if quitting: |
| | return |
| |
|
| | while True: |
| | try: |
| | q.put(item, timeout=1) |
| | except Full: |
| | if quitting: |
| | return |
| | continue |
| | else: |
| | break |
| |
|
| | finally: |
| | while True: |
| | try: |
| | q.put(_Done(), timeout=1) |
| | except Full: |
| | if quitting: |
| | return |
| | continue |
| | else: |
| | break |
| |
|
| | |
| | runner_f = run_in_executor_with_context(runner) |
| |
|
| | try: |
| | while True: |
| | try: |
| | item = q.get_nowait() |
| | except Empty: |
| | item = await loop.run_in_executor(None, q.get) |
| | if isinstance(item, _Done): |
| | break |
| | else: |
| | yield item |
| | finally: |
| | |
| | |
| | quitting = True |
| |
|
| | |
| | |
| | await runner_f |
| |
|