""" Async utilities for safe async/sync integration """ import asyncio import functools from typing import Any, Callable, Coroutine import logging logger = logging.getLogger(__name__) class AsyncRunner: """Safely run async functions in sync context""" @staticmethod def run_async(coro: Coroutine) -> Any: """ Run async coroutine in sync context safely Args: coro: Async coroutine to run Returns: Result of the coroutine """ try: # Try to get existing event loop loop = asyncio.get_running_loop() # We're already in async context - create task logger.debug("Running in existing async context") future = asyncio.create_task(coro) return future except RuntimeError: # No running loop, create one logger.debug("Creating new event loop for async execution") loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) finally: loop.close() @staticmethod def sync_wrapper(async_func: Callable) -> Callable: """ Decorator to make async function callable from sync context Args: async_func: Async function to wrap Returns: Sync-compatible function """ @functools.wraps(async_func) def wrapper(*args, **kwargs) -> Any: return AsyncRunner.run_async(async_func(*args, **kwargs)) return wrapper def async_to_sync(async_func: Callable) -> Callable: """ Convenience decorator to convert async function to sync Usage: @async_to_sync async def my_async_function(): ... # Can now be called synchronously result = my_async_function() """ return AsyncRunner.sync_wrapper(async_func) class SafeEventLoop: """Context manager for safe event loop handling""" def __init__(self, create_new: bool = False): self.create_new = create_new self.loop = None self.original_loop = None def __enter__(self): if self.create_new: self.original_loop = asyncio.get_event_loop_policy().get_event_loop() self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) else: try: self.loop = asyncio.get_running_loop() except RuntimeError: self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) return self.loop def __exit__(self, exc_type, exc_val, exc_tb): if self.create_new and self.original_loop: asyncio.set_event_loop(self.original_loop) if not self.loop.is_closed(): self.loop.close()