petter2025's picture
Update utils/async_runner.py
dbb6695 verified
raw
history blame
8.76 kB
"""
Enhanced Async Utilities for Safe Async/Sync Integration
FIXED VERSION: Consolidated from app.py with nest_asyncio compatibility and better error handling
"""
import asyncio
import functools
import time
import logging
from typing import Any, Callable, Coroutine, Optional, TypeVar, Union
from contextlib import contextmanager
logger = logging.getLogger(__name__)
T = TypeVar('T')
class AsyncRunner:
"""
Enhanced async runner with nest_asyncio compatibility and robust error handling
Consolidated from app.py local implementation
"""
@staticmethod
def run_async(coro: Coroutine[Any, Any, T], timeout: Optional[float] = 30.0) -> Union[T, dict]:
"""
FIXED: Run async coroutine in sync context with nest_asyncio compatibility
Args:
coro: Async coroutine to run
timeout: Maximum time to wait for coroutine completion (seconds)
Returns:
Result of the coroutine or error dictionary if failed
"""
start_time = time.time()
try:
# Try to get running loop first (nest_asyncio compatible)
try:
loop = asyncio.get_running_loop()
logger.debug("βœ… Running in existing async context (nest_asyncio detected)")
# In a running loop, we need to schedule as a task
# This handles the "event loop already running" case
task = asyncio.create_task(coro)
# For sync context, we need to run until complete
# Use asyncio.run_coroutine_threadsafe for thread safety
import concurrent.futures
future = asyncio.run_coroutine_threadsafe(coro, loop)
try:
result = future.result(timeout=timeout)
logger.debug(f"βœ… Async execution completed in {time.time() - start_time:.2f}s")
return result
except concurrent.futures.TimeoutError:
logger.error(f"❌ Async execution timed out after {timeout}s")
future.cancel()
return {
"error": f"Async execution timed out after {timeout}s",
"status": "failed",
"timeout": True,
"boundary_note": "Execution boundary reached - timeout"
}
except RuntimeError:
# No running loop, create one
logger.debug("πŸ”„ Creating new event loop for async execution")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(asyncio.wait_for(coro, timeout=timeout))
logger.debug(f"βœ… Async execution completed in {time.time() - start_time:.2f}s")
return result
except asyncio.TimeoutError:
logger.error(f"❌ Async execution timed out after {timeout}s")
return {
"error": f"Async execution timed out after {timeout}s",
"status": "failed",
"timeout": True,
"boundary_note": "Execution boundary reached - timeout"
}
finally:
# Clean up the loop
if not loop.is_closed():
loop.close()
except Exception as e:
logger.error(f"❌ Async execution failed: {e}", exc_info=True)
return {
"error": str(e),
"status": "failed",
"execution_time": time.time() - start_time,
"boundary_note": "Execution boundary reached",
"error_type": type(e).__name__
}
@staticmethod
def async_to_sync(async_func: Callable[..., Coroutine[Any, Any, T]]) -> Callable[..., Union[T, dict]]:
"""
FIXED: Decorator to convert async function to sync with enhanced error handling
Usage:
@AsyncRunner.async_to_sync
async def my_async_function():
...
# Can now be called synchronously
result = my_async_function()
"""
@functools.wraps(async_func)
def wrapper(*args, **kwargs) -> Union[T, dict]:
try:
# Create the coroutine
coro = async_func(*args, **kwargs)
# Run it with timeout
return AsyncRunner.run_async(coro)
except Exception as e:
logger.error(f"❌ Async to sync conversion failed: {e}", exc_info=True)
return {
"error": str(e),
"status": "failed",
"boundary_context": "OSS advisory only - execution requires Enterprise",
"error_type": type(e).__name__
}
return wrapper
@staticmethod
def is_async_context() -> bool:
"""
Check if we're currently in an async context
Returns:
True if in async context, False otherwise
"""
try:
asyncio.get_running_loop()
return True
except RuntimeError:
return False
# Convenience function for the decorator
def async_to_sync(async_func: Callable[..., Coroutine[Any, Any, T]]) -> Callable[..., Union[T, dict]]:
"""
Convenience decorator to convert async function to sync
Usage:
@async_to_sync
async def my_async_function():
...
# Can now be called synchronously
result = my_async_function()
"""
return AsyncRunner.async_to_sync(async_func)
@contextmanager
def safe_event_loop():
"""
Context manager for safe event loop handling with cleanup
Usage:
with safe_event_loop() as loop:
result = loop.run_until_complete(async_function())
"""
loop = None
try:
# Try to get existing loop
try:
loop = asyncio.get_running_loop()
logger.debug("Using existing event loop")
yield loop
return
except RuntimeError:
pass
# Create new loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
logger.debug("Created new event loop")
yield loop
finally:
# Cleanup
if loop and not loop.is_closed():
loop.close()
logger.debug("Closed event loop")
class AsyncCircuitBreaker:
"""
Circuit breaker pattern for async operations to prevent cascading failures
"""
def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 30.0):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
def can_execute(self) -> bool:
"""Check if circuit breaker allows execution"""
if self.state == "OPEN":
# Check if recovery timeout has passed
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
logger.info("Circuit breaker moving to HALF_OPEN state")
return True
return False
return True
def record_success(self):
"""Record successful execution"""
if self.state == "HALF_OPEN":
self.state = "CLOSED"
logger.info("Circuit breaker reset to CLOSED state")
self.failure_count = 0
def record_failure(self):
"""Record failed execution"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
logger.warning(f"Circuit breaker OPENED after {self.failure_count} failures")
@AsyncRunner.async_to_sync
async def execute(self, coro: Coroutine) -> Any:
"""Execute async operation with circuit breaker protection"""
if not self.can_execute():
raise Exception("Circuit breaker is OPEN - operation blocked")
try:
result = await coro
self.record_success()
return result
except Exception as e:
self.record_failure()
raise e