petter2025's picture
Create utils/async_runner.py
eccf061 verified
raw
history blame
2.97 kB
"""
Async utilities for safe async/sync integration
"""
import asyncio
import functools
from typing import Any, Callable, Coroutine
import logging
logger = logging.getLogger(__name__)
class AsyncRunner:
"""Safely run async functions in sync context"""
@staticmethod
def run_async(coro: Coroutine) -> Any:
"""
Run async coroutine in sync context safely
Args:
coro: Async coroutine to run
Returns:
Result of the coroutine
"""
try:
# Try to get existing event loop
loop = asyncio.get_running_loop()
# We're already in async context - create task
logger.debug("Running in existing async context")
future = asyncio.create_task(coro)
return future
except RuntimeError:
# No running loop, create one
logger.debug("Creating new event loop for async execution")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
@staticmethod
def sync_wrapper(async_func: Callable) -> Callable:
"""
Decorator to make async function callable from sync context
Args:
async_func: Async function to wrap
Returns:
Sync-compatible function
"""
@functools.wraps(async_func)
def wrapper(*args, **kwargs) -> Any:
return AsyncRunner.run_async(async_func(*args, **kwargs))
return wrapper
def async_to_sync(async_func: Callable) -> Callable:
"""
Convenience decorator to convert async function to sync
Usage:
@async_to_sync
async def my_async_function():
...
# Can now be called synchronously
result = my_async_function()
"""
return AsyncRunner.sync_wrapper(async_func)
class SafeEventLoop:
"""Context manager for safe event loop handling"""
def __init__(self, create_new: bool = False):
self.create_new = create_new
self.loop = None
self.original_loop = None
def __enter__(self):
if self.create_new:
self.original_loop = asyncio.get_event_loop_policy().get_event_loop()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
else:
try:
self.loop = asyncio.get_running_loop()
except RuntimeError:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
return self.loop
def __exit__(self, exc_type, exc_val, exc_tb):
if self.create_new and self.original_loop:
asyncio.set_event_loop(self.original_loop)
if not self.loop.is_closed():
self.loop.close()