""" Enhanced Async Utilities for Safe Async/Sync Integration FIXED VERSION: Consolidated from app.py with nest_asyncio compatibility and better error handling """ import asyncio import functools import time import logging from typing import Any, Callable, Coroutine, Optional, TypeVar, Union from contextlib import contextmanager logger = logging.getLogger(__name__) T = TypeVar('T') class AsyncRunner: """ Enhanced async runner with nest_asyncio compatibility and robust error handling Consolidated from app.py local implementation """ @staticmethod def run_async(coro: Coroutine[Any, Any, T], timeout: Optional[float] = 30.0) -> Union[T, dict]: """ FIXED: Run async coroutine in sync context with nest_asyncio compatibility Args: coro: Async coroutine to run timeout: Maximum time to wait for coroutine completion (seconds) Returns: Result of the coroutine or error dictionary if failed """ start_time = time.time() try: # Try to get running loop first (nest_asyncio compatible) try: loop = asyncio.get_running_loop() logger.debug("✅ Running in existing async context (nest_asyncio detected)") # In a running loop, we need to schedule as a task # This handles the "event loop already running" case task = asyncio.create_task(coro) # For sync context, we need to run until complete # Use asyncio.run_coroutine_threadsafe for thread safety import concurrent.futures future = asyncio.run_coroutine_threadsafe(coro, loop) try: result = future.result(timeout=timeout) logger.debug(f"✅ Async execution completed in {time.time() - start_time:.2f}s") return result except concurrent.futures.TimeoutError: logger.error(f"❌ Async execution timed out after {timeout}s") future.cancel() return { "error": f"Async execution timed out after {timeout}s", "status": "failed", "timeout": True, "boundary_note": "Execution boundary reached - timeout" } except RuntimeError: # No running loop, create one logger.debug("🔄 Creating new event loop for async execution") loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: result = loop.run_until_complete(asyncio.wait_for(coro, timeout=timeout)) logger.debug(f"✅ Async execution completed in {time.time() - start_time:.2f}s") return result except asyncio.TimeoutError: logger.error(f"❌ Async execution timed out after {timeout}s") return { "error": f"Async execution timed out after {timeout}s", "status": "failed", "timeout": True, "boundary_note": "Execution boundary reached - timeout" } finally: # Clean up the loop if not loop.is_closed(): loop.close() except Exception as e: logger.error(f"❌ Async execution failed: {e}", exc_info=True) return { "error": str(e), "status": "failed", "execution_time": time.time() - start_time, "boundary_note": "Execution boundary reached", "error_type": type(e).__name__ } @staticmethod def async_to_sync(async_func: Callable[..., Coroutine[Any, Any, T]]) -> Callable[..., Union[T, dict]]: """ FIXED: Decorator to convert async function to sync with enhanced error handling Usage: @AsyncRunner.async_to_sync async def my_async_function(): ... # Can now be called synchronously result = my_async_function() """ @functools.wraps(async_func) def wrapper(*args, **kwargs) -> Union[T, dict]: try: # Create the coroutine coro = async_func(*args, **kwargs) # Run it with timeout return AsyncRunner.run_async(coro) except Exception as e: logger.error(f"❌ Async to sync conversion failed: {e}", exc_info=True) return { "error": str(e), "status": "failed", "boundary_context": "OSS advisory only - execution requires Enterprise", "error_type": type(e).__name__ } return wrapper @staticmethod def is_async_context() -> bool: """ Check if we're currently in an async context Returns: True if in async context, False otherwise """ try: asyncio.get_running_loop() return True except RuntimeError: return False # Convenience function for the decorator def async_to_sync(async_func: Callable[..., Coroutine[Any, Any, T]]) -> Callable[..., Union[T, dict]]: """ Convenience decorator to convert async function to sync Usage: @async_to_sync async def my_async_function(): ... # Can now be called synchronously result = my_async_function() """ return AsyncRunner.async_to_sync(async_func) @contextmanager def safe_event_loop(): """ Context manager for safe event loop handling with cleanup Usage: with safe_event_loop() as loop: result = loop.run_until_complete(async_function()) """ loop = None try: # Try to get existing loop try: loop = asyncio.get_running_loop() logger.debug("Using existing event loop") yield loop return except RuntimeError: pass # Create new loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) logger.debug("Created new event loop") yield loop finally: # Cleanup if loop and not loop.is_closed(): loop.close() logger.debug("Closed event loop") class AsyncCircuitBreaker: """ Circuit breaker pattern for async operations to prevent cascading failures """ def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 30.0): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.failure_count = 0 self.last_failure_time = 0 self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN def can_execute(self) -> bool: """Check if circuit breaker allows execution""" if self.state == "OPEN": # Check if recovery timeout has passed if time.time() - self.last_failure_time > self.recovery_timeout: self.state = "HALF_OPEN" logger.info("Circuit breaker moving to HALF_OPEN state") return True return False return True def record_success(self): """Record successful execution""" if self.state == "HALF_OPEN": self.state = "CLOSED" logger.info("Circuit breaker reset to CLOSED state") self.failure_count = 0 def record_failure(self): """Record failed execution""" self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = "OPEN" logger.warning(f"Circuit breaker OPENED after {self.failure_count} failures") @AsyncRunner.async_to_sync async def execute(self, coro: Coroutine) -> Any: """Execute async operation with circuit breaker protection""" if not self.can_execute(): raise Exception("Circuit breaker is OPEN - operation blocked") try: result = await coro self.record_success() return result except Exception as e: self.record_failure() raise e