copper-mind / app /async_bridge.py
ifieryarrows's picture
Sync from GitHub (tests passed)
8e48995 verified
"""
Utilities for safely running async callables from synchronous code.
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Awaitable, Callable, TypeVar
T = TypeVar("T")
def _run_coroutine(async_fn: Callable[..., Awaitable[T]], *args: Any, **kwargs: Any) -> T:
"""Run an async callable in a fresh event loop."""
return asyncio.run(async_fn(*args, **kwargs))
def run_async_from_sync(
async_fn: Callable[..., Awaitable[T]],
*args: Any,
**kwargs: Any,
) -> T:
"""
Execute an async callable from sync code.
If no loop is running in the current thread, run directly with ``asyncio.run``.
If a loop is already running, execute in a dedicated worker thread with its own loop.
"""
try:
asyncio.get_running_loop()
except RuntimeError:
return _run_coroutine(async_fn, *args, **kwargs)
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(_run_coroutine, async_fn, *args, **kwargs)
return future.result()