File size: 8,764 Bytes
eccf061
dbb6695
 
eccf061
 
 
dbb6695
eccf061
dbb6695
 
eccf061
 
dbb6695
eccf061
 
dbb6695
 
 
 
eccf061
 
dbb6695
eccf061
dbb6695
eccf061
 
 
dbb6695
eccf061
 
dbb6695
eccf061
dbb6695
 
eccf061
dbb6695
eccf061
dbb6695
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eccf061
 
dbb6695
eccf061
dbb6695
eccf061
dbb6695
 
 
 
 
 
 
eccf061
 
dbb6695
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eccf061
dbb6695
 
 
 
 
 
 
 
 
 
 
 
 
 
eccf061
dbb6695
 
eccf061
 
 
 
 
 
 
 
 
 
 
dbb6695
eccf061
 
dbb6695
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eccf061
dbb6695
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eccf061
dbb6695
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eccf061
dbb6695
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
"""
Enhanced Async Utilities for Safe Async/Sync Integration
FIXED VERSION: Consolidated from app.py with nest_asyncio compatibility and better error handling
"""
import asyncio
import functools
import time
import logging
from typing import Any, Callable, Coroutine, Optional, TypeVar, Union
from contextlib import contextmanager

logger = logging.getLogger(__name__)
T = TypeVar('T')

class AsyncRunner:
    """
    Enhanced async runner with nest_asyncio compatibility and robust error handling
    Consolidated from app.py local implementation
    """
    
    @staticmethod
    def run_async(coro: Coroutine[Any, Any, T], timeout: Optional[float] = 30.0) -> Union[T, dict]:
        """
        FIXED: Run async coroutine in sync context with nest_asyncio compatibility
        
        Args:
            coro: Async coroutine to run
            timeout: Maximum time to wait for coroutine completion (seconds)
            
        Returns:
            Result of the coroutine or error dictionary if failed
        """
        start_time = time.time()
        
        try:
            # Try to get running loop first (nest_asyncio compatible)
            try:
                loop = asyncio.get_running_loop()
                logger.debug("βœ… Running in existing async context (nest_asyncio detected)")
                
                # In a running loop, we need to schedule as a task
                # This handles the "event loop already running" case
                task = asyncio.create_task(coro)
                
                # For sync context, we need to run until complete
                # Use asyncio.run_coroutine_threadsafe for thread safety
                import concurrent.futures
                future = asyncio.run_coroutine_threadsafe(coro, loop)
                
                try:
                    result = future.result(timeout=timeout)
                    logger.debug(f"βœ… Async execution completed in {time.time() - start_time:.2f}s")
                    return result
                except concurrent.futures.TimeoutError:
                    logger.error(f"❌ Async execution timed out after {timeout}s")
                    future.cancel()
                    return {
                        "error": f"Async execution timed out after {timeout}s",
                        "status": "failed",
                        "timeout": True,
                        "boundary_note": "Execution boundary reached - timeout"
                    }
                    
            except RuntimeError:
                # No running loop, create one
                logger.debug("πŸ”„ Creating new event loop for async execution")
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
                
                try:
                    result = loop.run_until_complete(asyncio.wait_for(coro, timeout=timeout))
                    logger.debug(f"βœ… Async execution completed in {time.time() - start_time:.2f}s")
                    return result
                except asyncio.TimeoutError:
                    logger.error(f"❌ Async execution timed out after {timeout}s")
                    return {
                        "error": f"Async execution timed out after {timeout}s",
                        "status": "failed",
                        "timeout": True,
                        "boundary_note": "Execution boundary reached - timeout"
                    }
                finally:
                    # Clean up the loop
                    if not loop.is_closed():
                        loop.close()
            
        except Exception as e:
            logger.error(f"❌ Async execution failed: {e}", exc_info=True)
            return {
                "error": str(e),
                "status": "failed",
                "execution_time": time.time() - start_time,
                "boundary_note": "Execution boundary reached",
                "error_type": type(e).__name__
            }
    
    @staticmethod
    def async_to_sync(async_func: Callable[..., Coroutine[Any, Any, T]]) -> Callable[..., Union[T, dict]]:
        """
        FIXED: Decorator to convert async function to sync with enhanced error handling
        
        Usage:
            @AsyncRunner.async_to_sync
            async def my_async_function():
                ...
                
            # Can now be called synchronously
            result = my_async_function()
        """
        @functools.wraps(async_func)
        def wrapper(*args, **kwargs) -> Union[T, dict]:
            try:
                # Create the coroutine
                coro = async_func(*args, **kwargs)
                
                # Run it with timeout
                return AsyncRunner.run_async(coro)
                
            except Exception as e:
                logger.error(f"❌ Async to sync conversion failed: {e}", exc_info=True)
                return {
                    "error": str(e),
                    "status": "failed",
                    "boundary_context": "OSS advisory only - execution requires Enterprise",
                    "error_type": type(e).__name__
                }
        return wrapper
    
    @staticmethod
    def is_async_context() -> bool:
        """
        Check if we're currently in an async context
        
        Returns:
            True if in async context, False otherwise
        """
        try:
            asyncio.get_running_loop()
            return True
        except RuntimeError:
            return False

# Convenience function for the decorator
def async_to_sync(async_func: Callable[..., Coroutine[Any, Any, T]]) -> Callable[..., Union[T, dict]]:
    """
    Convenience decorator to convert async function to sync
    
    Usage:
        @async_to_sync
        async def my_async_function():
            ...
            
        # Can now be called synchronously
        result = my_async_function()
    """
    return AsyncRunner.async_to_sync(async_func)


@contextmanager
def safe_event_loop():
    """
    Context manager for safe event loop handling with cleanup
    
    Usage:
        with safe_event_loop() as loop:
            result = loop.run_until_complete(async_function())
    """
    loop = None
    try:
        # Try to get existing loop
        try:
            loop = asyncio.get_running_loop()
            logger.debug("Using existing event loop")
            yield loop
            return
        except RuntimeError:
            pass
        
        # Create new loop
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        logger.debug("Created new event loop")
        yield loop
        
    finally:
        # Cleanup
        if loop and not loop.is_closed():
            loop.close()
            logger.debug("Closed event loop")


class AsyncCircuitBreaker:
    """
    Circuit breaker pattern for async operations to prevent cascading failures
    """
    
    def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 30.0):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = 0
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
        
    def can_execute(self) -> bool:
        """Check if circuit breaker allows execution"""
        if self.state == "OPEN":
            # Check if recovery timeout has passed
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "HALF_OPEN"
                logger.info("Circuit breaker moving to HALF_OPEN state")
                return True
            return False
        return True
    
    def record_success(self):
        """Record successful execution"""
        if self.state == "HALF_OPEN":
            self.state = "CLOSED"
            logger.info("Circuit breaker reset to CLOSED state")
        self.failure_count = 0
    
    def record_failure(self):
        """Record failed execution"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = "OPEN"
            logger.warning(f"Circuit breaker OPENED after {self.failure_count} failures")
    
    @AsyncRunner.async_to_sync
    async def execute(self, coro: Coroutine) -> Any:
        """Execute async operation with circuit breaker protection"""
        if not self.can_execute():
            raise Exception("Circuit breaker is OPEN - operation blocked")
        
        try:
            result = await coro
            self.record_success()
            return result
        except Exception as e:
            self.record_failure()
            raise e