| """ |
| Async utilities for safe async/sync integration |
| """ |
| import asyncio |
| import functools |
| from typing import Any, Callable, Coroutine |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class AsyncRunner: |
| """Safely run async functions in sync context""" |
| |
| @staticmethod |
| def run_async(coro: Coroutine) -> Any: |
| """ |
| Run async coroutine in sync context safely |
| |
| Args: |
| coro: Async coroutine to run |
| |
| Returns: |
| Result of the coroutine |
| """ |
| try: |
| |
| loop = asyncio.get_running_loop() |
| |
| logger.debug("Running in existing async context") |
| future = asyncio.create_task(coro) |
| return future |
| except RuntimeError: |
| |
| logger.debug("Creating new event loop for async execution") |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
| return loop.run_until_complete(coro) |
| finally: |
| loop.close() |
| |
| @staticmethod |
| def sync_wrapper(async_func: Callable) -> Callable: |
| """ |
| Decorator to make async function callable from sync context |
| |
| Args: |
| async_func: Async function to wrap |
| |
| Returns: |
| Sync-compatible function |
| """ |
| @functools.wraps(async_func) |
| def wrapper(*args, **kwargs) -> Any: |
| return AsyncRunner.run_async(async_func(*args, **kwargs)) |
| return wrapper |
|
|
|
|
| def async_to_sync(async_func: Callable) -> Callable: |
| """ |
| Convenience decorator to convert async function to sync |
| |
| Usage: |
| @async_to_sync |
| async def my_async_function(): |
| ... |
| |
| # Can now be called synchronously |
| result = my_async_function() |
| """ |
| return AsyncRunner.sync_wrapper(async_func) |
|
|
|
|
| class SafeEventLoop: |
| """Context manager for safe event loop handling""" |
| |
| def __init__(self, create_new: bool = False): |
| self.create_new = create_new |
| self.loop = None |
| self.original_loop = None |
| |
| def __enter__(self): |
| if self.create_new: |
| self.original_loop = asyncio.get_event_loop_policy().get_event_loop() |
| self.loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(self.loop) |
| else: |
| try: |
| self.loop = asyncio.get_running_loop() |
| except RuntimeError: |
| self.loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(self.loop) |
| return self.loop |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| if self.create_new and self.original_loop: |
| asyncio.set_event_loop(self.original_loop) |
| if not self.loop.is_closed(): |
| self.loop.close() |