File size: 8,764 Bytes
eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 eccf061 dbb6695 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 | """
Enhanced Async Utilities for Safe Async/Sync Integration
FIXED VERSION: Consolidated from app.py with nest_asyncio compatibility and better error handling
"""
import asyncio
import functools
import time
import logging
from typing import Any, Callable, Coroutine, Optional, TypeVar, Union
from contextlib import contextmanager
logger = logging.getLogger(__name__)
T = TypeVar('T')
class AsyncRunner:
"""
Enhanced async runner with nest_asyncio compatibility and robust error handling
Consolidated from app.py local implementation
"""
@staticmethod
def run_async(coro: Coroutine[Any, Any, T], timeout: Optional[float] = 30.0) -> Union[T, dict]:
"""
FIXED: Run async coroutine in sync context with nest_asyncio compatibility
Args:
coro: Async coroutine to run
timeout: Maximum time to wait for coroutine completion (seconds)
Returns:
Result of the coroutine or error dictionary if failed
"""
start_time = time.time()
try:
# Try to get running loop first (nest_asyncio compatible)
try:
loop = asyncio.get_running_loop()
logger.debug("β
Running in existing async context (nest_asyncio detected)")
# In a running loop, we need to schedule as a task
# This handles the "event loop already running" case
task = asyncio.create_task(coro)
# For sync context, we need to run until complete
# Use asyncio.run_coroutine_threadsafe for thread safety
import concurrent.futures
future = asyncio.run_coroutine_threadsafe(coro, loop)
try:
result = future.result(timeout=timeout)
logger.debug(f"β
Async execution completed in {time.time() - start_time:.2f}s")
return result
except concurrent.futures.TimeoutError:
logger.error(f"β Async execution timed out after {timeout}s")
future.cancel()
return {
"error": f"Async execution timed out after {timeout}s",
"status": "failed",
"timeout": True,
"boundary_note": "Execution boundary reached - timeout"
}
except RuntimeError:
# No running loop, create one
logger.debug("π Creating new event loop for async execution")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(asyncio.wait_for(coro, timeout=timeout))
logger.debug(f"β
Async execution completed in {time.time() - start_time:.2f}s")
return result
except asyncio.TimeoutError:
logger.error(f"β Async execution timed out after {timeout}s")
return {
"error": f"Async execution timed out after {timeout}s",
"status": "failed",
"timeout": True,
"boundary_note": "Execution boundary reached - timeout"
}
finally:
# Clean up the loop
if not loop.is_closed():
loop.close()
except Exception as e:
logger.error(f"β Async execution failed: {e}", exc_info=True)
return {
"error": str(e),
"status": "failed",
"execution_time": time.time() - start_time,
"boundary_note": "Execution boundary reached",
"error_type": type(e).__name__
}
@staticmethod
def async_to_sync(async_func: Callable[..., Coroutine[Any, Any, T]]) -> Callable[..., Union[T, dict]]:
"""
FIXED: Decorator to convert async function to sync with enhanced error handling
Usage:
@AsyncRunner.async_to_sync
async def my_async_function():
...
# Can now be called synchronously
result = my_async_function()
"""
@functools.wraps(async_func)
def wrapper(*args, **kwargs) -> Union[T, dict]:
try:
# Create the coroutine
coro = async_func(*args, **kwargs)
# Run it with timeout
return AsyncRunner.run_async(coro)
except Exception as e:
logger.error(f"β Async to sync conversion failed: {e}", exc_info=True)
return {
"error": str(e),
"status": "failed",
"boundary_context": "OSS advisory only - execution requires Enterprise",
"error_type": type(e).__name__
}
return wrapper
@staticmethod
def is_async_context() -> bool:
"""
Check if we're currently in an async context
Returns:
True if in async context, False otherwise
"""
try:
asyncio.get_running_loop()
return True
except RuntimeError:
return False
# Convenience function for the decorator
def async_to_sync(async_func: Callable[..., Coroutine[Any, Any, T]]) -> Callable[..., Union[T, dict]]:
"""
Convenience decorator to convert async function to sync
Usage:
@async_to_sync
async def my_async_function():
...
# Can now be called synchronously
result = my_async_function()
"""
return AsyncRunner.async_to_sync(async_func)
@contextmanager
def safe_event_loop():
"""
Context manager for safe event loop handling with cleanup
Usage:
with safe_event_loop() as loop:
result = loop.run_until_complete(async_function())
"""
loop = None
try:
# Try to get existing loop
try:
loop = asyncio.get_running_loop()
logger.debug("Using existing event loop")
yield loop
return
except RuntimeError:
pass
# Create new loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
logger.debug("Created new event loop")
yield loop
finally:
# Cleanup
if loop and not loop.is_closed():
loop.close()
logger.debug("Closed event loop")
class AsyncCircuitBreaker:
"""
Circuit breaker pattern for async operations to prevent cascading failures
"""
def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 30.0):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
def can_execute(self) -> bool:
"""Check if circuit breaker allows execution"""
if self.state == "OPEN":
# Check if recovery timeout has passed
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
logger.info("Circuit breaker moving to HALF_OPEN state")
return True
return False
return True
def record_success(self):
"""Record successful execution"""
if self.state == "HALF_OPEN":
self.state = "CLOSED"
logger.info("Circuit breaker reset to CLOSED state")
self.failure_count = 0
def record_failure(self):
"""Record failed execution"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
logger.warning(f"Circuit breaker OPENED after {self.failure_count} failures")
@AsyncRunner.async_to_sync
async def execute(self, coro: Coroutine) -> Any:
"""Execute async operation with circuit breaker protection"""
if not self.can_execute():
raise Exception("Circuit breaker is OPEN - operation blocked")
try:
result = await coro
self.record_success()
return result
except Exception as e:
self.record_failure()
raise e |