| """ |
| Enhanced Async Utilities for Safe Async/Sync Integration |
| FIXED VERSION: Consolidated from app.py with nest_asyncio compatibility and better error handling |
| """ |
| import asyncio |
| import functools |
| import time |
| import logging |
| from typing import Any, Callable, Coroutine, Optional, TypeVar, Union |
| from contextlib import contextmanager |
|
|
| logger = logging.getLogger(__name__) |
| T = TypeVar('T') |
|
|
| class AsyncRunner: |
| """ |
| Enhanced async runner with nest_asyncio compatibility and robust error handling |
| Consolidated from app.py local implementation |
| """ |
| |
| @staticmethod |
| def run_async(coro: Coroutine[Any, Any, T], timeout: Optional[float] = 30.0) -> Union[T, dict]: |
| """ |
| FIXED: Run async coroutine in sync context with nest_asyncio compatibility |
| |
| Args: |
| coro: Async coroutine to run |
| timeout: Maximum time to wait for coroutine completion (seconds) |
| |
| Returns: |
| Result of the coroutine or error dictionary if failed |
| """ |
| start_time = time.time() |
| |
| try: |
| |
| try: |
| loop = asyncio.get_running_loop() |
| logger.debug("β
Running in existing async context (nest_asyncio detected)") |
| |
| |
| |
| task = asyncio.create_task(coro) |
| |
| |
| |
| import concurrent.futures |
| future = asyncio.run_coroutine_threadsafe(coro, loop) |
| |
| try: |
| result = future.result(timeout=timeout) |
| logger.debug(f"β
Async execution completed in {time.time() - start_time:.2f}s") |
| return result |
| except concurrent.futures.TimeoutError: |
| logger.error(f"β Async execution timed out after {timeout}s") |
| future.cancel() |
| return { |
| "error": f"Async execution timed out after {timeout}s", |
| "status": "failed", |
| "timeout": True, |
| "boundary_note": "Execution boundary reached - timeout" |
| } |
| |
| except RuntimeError: |
| |
| logger.debug("π Creating new event loop for async execution") |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| |
| try: |
| result = loop.run_until_complete(asyncio.wait_for(coro, timeout=timeout)) |
| logger.debug(f"β
Async execution completed in {time.time() - start_time:.2f}s") |
| return result |
| except asyncio.TimeoutError: |
| logger.error(f"β Async execution timed out after {timeout}s") |
| return { |
| "error": f"Async execution timed out after {timeout}s", |
| "status": "failed", |
| "timeout": True, |
| "boundary_note": "Execution boundary reached - timeout" |
| } |
| finally: |
| |
| if not loop.is_closed(): |
| loop.close() |
| |
| except Exception as e: |
| logger.error(f"β Async execution failed: {e}", exc_info=True) |
| return { |
| "error": str(e), |
| "status": "failed", |
| "execution_time": time.time() - start_time, |
| "boundary_note": "Execution boundary reached", |
| "error_type": type(e).__name__ |
| } |
| |
| @staticmethod |
| def async_to_sync(async_func: Callable[..., Coroutine[Any, Any, T]]) -> Callable[..., Union[T, dict]]: |
| """ |
| FIXED: Decorator to convert async function to sync with enhanced error handling |
| |
| Usage: |
| @AsyncRunner.async_to_sync |
| async def my_async_function(): |
| ... |
| |
| # Can now be called synchronously |
| result = my_async_function() |
| """ |
| @functools.wraps(async_func) |
| def wrapper(*args, **kwargs) -> Union[T, dict]: |
| try: |
| |
| coro = async_func(*args, **kwargs) |
| |
| |
| return AsyncRunner.run_async(coro) |
| |
| except Exception as e: |
| logger.error(f"β Async to sync conversion failed: {e}", exc_info=True) |
| return { |
| "error": str(e), |
| "status": "failed", |
| "boundary_context": "OSS advisory only - execution requires Enterprise", |
| "error_type": type(e).__name__ |
| } |
| return wrapper |
| |
| @staticmethod |
| def is_async_context() -> bool: |
| """ |
| Check if we're currently in an async context |
| |
| Returns: |
| True if in async context, False otherwise |
| """ |
| try: |
| asyncio.get_running_loop() |
| return True |
| except RuntimeError: |
| return False |
|
|
| |
| def async_to_sync(async_func: Callable[..., Coroutine[Any, Any, T]]) -> Callable[..., Union[T, dict]]: |
| """ |
| Convenience decorator to convert async function to sync |
| |
| Usage: |
| @async_to_sync |
| async def my_async_function(): |
| ... |
| |
| # Can now be called synchronously |
| result = my_async_function() |
| """ |
| return AsyncRunner.async_to_sync(async_func) |
|
|
|
|
| @contextmanager |
| def safe_event_loop(): |
| """ |
| Context manager for safe event loop handling with cleanup |
| |
| Usage: |
| with safe_event_loop() as loop: |
| result = loop.run_until_complete(async_function()) |
| """ |
| loop = None |
| try: |
| |
| try: |
| loop = asyncio.get_running_loop() |
| logger.debug("Using existing event loop") |
| yield loop |
| return |
| except RuntimeError: |
| pass |
| |
| |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| logger.debug("Created new event loop") |
| yield loop |
| |
| finally: |
| |
| if loop and not loop.is_closed(): |
| loop.close() |
| logger.debug("Closed event loop") |
|
|
|
|
| class AsyncCircuitBreaker: |
| """ |
| Circuit breaker pattern for async operations to prevent cascading failures |
| """ |
| |
| def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 30.0): |
| self.failure_threshold = failure_threshold |
| self.recovery_timeout = recovery_timeout |
| self.failure_count = 0 |
| self.last_failure_time = 0 |
| self.state = "CLOSED" |
| |
| def can_execute(self) -> bool: |
| """Check if circuit breaker allows execution""" |
| if self.state == "OPEN": |
| |
| if time.time() - self.last_failure_time > self.recovery_timeout: |
| self.state = "HALF_OPEN" |
| logger.info("Circuit breaker moving to HALF_OPEN state") |
| return True |
| return False |
| return True |
| |
| def record_success(self): |
| """Record successful execution""" |
| if self.state == "HALF_OPEN": |
| self.state = "CLOSED" |
| logger.info("Circuit breaker reset to CLOSED state") |
| self.failure_count = 0 |
| |
| def record_failure(self): |
| """Record failed execution""" |
| self.failure_count += 1 |
| self.last_failure_time = time.time() |
| |
| if self.failure_count >= self.failure_threshold: |
| self.state = "OPEN" |
| logger.warning(f"Circuit breaker OPENED after {self.failure_count} failures") |
| |
| @AsyncRunner.async_to_sync |
| async def execute(self, coro: Coroutine) -> Any: |
| """Execute async operation with circuit breaker protection""" |
| if not self.can_execute(): |
| raise Exception("Circuit breaker is OPEN - operation blocked") |
| |
| try: |
| result = await coro |
| self.record_success() |
| return result |
| except Exception as e: |
| self.record_failure() |
| raise e |