Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import asyncio | |
| import functools | |
| from typing import TypeVar, Callable, Awaitable | |
| from typing_extensions import ParamSpec | |
| import anyio | |
| import sniffio | |
| import anyio.to_thread | |
| T_Retval = TypeVar("T_Retval") | |
| T_ParamSpec = ParamSpec("T_ParamSpec") | |
| async def to_thread( | |
| func: Callable[T_ParamSpec, T_Retval], /, *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs | |
| ) -> T_Retval: | |
| if sniffio.current_async_library() == "asyncio": | |
| return await asyncio.to_thread(func, *args, **kwargs) | |
| return await anyio.to_thread.run_sync( | |
| functools.partial(func, *args, **kwargs), | |
| ) | |
| # inspired by `asyncer`, https://github.com/tiangolo/asyncer | |
| def asyncify(function: Callable[T_ParamSpec, T_Retval]) -> Callable[T_ParamSpec, Awaitable[T_Retval]]: | |
| """ | |
| Take a blocking function and create an async one that receives the same | |
| positional and keyword arguments. | |
| Usage: | |
| ```python | |
| def blocking_func(arg1, arg2, kwarg1=None): | |
| # blocking code | |
| return result | |
| result = asyncify(blocking_function)(arg1, arg2, kwarg1=value1) | |
| ``` | |
| ## Arguments | |
| `function`: a blocking regular callable (e.g. a function) | |
| ## Return | |
| An async function that takes the same positional and keyword arguments as the | |
| original one, that when called runs the same original function in a thread worker | |
| and returns the result. | |
| """ | |
| async def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval: | |
| return await to_thread(function, *args, **kwargs) | |
| return wrapper | |