petter2025 commited on
Commit
dbb6695
·
verified ·
1 Parent(s): cc15516

Update utils/async_runner.py

Browse files
Files changed (1) hide show
  1. utils/async_runner.py +199 -55
utils/async_runner.py CHANGED
@@ -1,62 +1,147 @@
1
  """
2
- Async utilities for safe async/sync integration
 
3
  """
4
  import asyncio
5
  import functools
6
- from typing import Any, Callable, Coroutine
7
  import logging
 
 
8
 
9
  logger = logging.getLogger(__name__)
 
10
 
11
  class AsyncRunner:
12
- """Safely run async functions in sync context"""
 
 
 
13
 
14
  @staticmethod
15
- def run_async(coro: Coroutine) -> Any:
16
  """
17
- Run async coroutine in sync context safely
18
 
19
  Args:
20
  coro: Async coroutine to run
 
21
 
22
  Returns:
23
- Result of the coroutine
24
  """
 
 
25
  try:
26
- # Try to get existing event loop
27
- loop = asyncio.get_running_loop()
28
- # We're already in async context - create task
29
- logger.debug("Running in existing async context")
30
- future = asyncio.create_task(coro)
31
- return future
32
- except RuntimeError:
33
- # No running loop, create one
34
- logger.debug("Creating new event loop for async execution")
35
- loop = asyncio.new_event_loop()
36
- asyncio.set_event_loop(loop)
37
  try:
38
- return loop.run_until_complete(coro)
39
- finally:
40
- loop.close()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
 
42
  @staticmethod
43
- def sync_wrapper(async_func: Callable) -> Callable:
44
  """
45
- Decorator to make async function callable from sync context
46
 
47
- Args:
48
- async_func: Async function to wrap
49
-
50
- Returns:
51
- Sync-compatible function
 
 
52
  """
53
  @functools.wraps(async_func)
54
- def wrapper(*args, **kwargs) -> Any:
55
- return AsyncRunner.run_async(async_func(*args, **kwargs))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
  return wrapper
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
 
58
-
59
- def async_to_sync(async_func: Callable) -> Callable:
60
  """
61
  Convenience decorator to convert async function to sync
62
 
@@ -68,32 +153,91 @@ def async_to_sync(async_func: Callable) -> Callable:
68
  # Can now be called synchronously
69
  result = my_async_function()
70
  """
71
- return AsyncRunner.sync_wrapper(async_func)
72
 
73
 
74
- class SafeEventLoop:
75
- """Context manager for safe event loop handling"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
 
77
- def __init__(self, create_new: bool = False):
78
- self.create_new = create_new
79
- self.loop = None
80
- self.original_loop = None
 
 
 
 
 
 
 
 
 
 
 
 
 
81
 
82
- def __enter__(self):
83
- if self.create_new:
84
- self.original_loop = asyncio.get_event_loop_policy().get_event_loop()
85
- self.loop = asyncio.new_event_loop()
86
- asyncio.set_event_loop(self.loop)
87
- else:
88
- try:
89
- self.loop = asyncio.get_running_loop()
90
- except RuntimeError:
91
- self.loop = asyncio.new_event_loop()
92
- asyncio.set_event_loop(self.loop)
93
- return self.loop
 
 
 
94
 
95
- def __exit__(self, exc_type, exc_val, exc_tb):
96
- if self.create_new and self.original_loop:
97
- asyncio.set_event_loop(self.original_loop)
98
- if not self.loop.is_closed():
99
- self.loop.close()
 
 
 
 
 
 
 
 
 
1
  """
2
+ Enhanced Async Utilities for Safe Async/Sync Integration
3
+ FIXED VERSION: Consolidated from app.py with nest_asyncio compatibility and better error handling
4
  """
5
  import asyncio
6
  import functools
7
+ import time
8
  import logging
9
+ from typing import Any, Callable, Coroutine, Optional, TypeVar, Union
10
+ from contextlib import contextmanager
11
 
12
  logger = logging.getLogger(__name__)
13
+ T = TypeVar('T')
14
 
15
  class AsyncRunner:
16
+ """
17
+ Enhanced async runner with nest_asyncio compatibility and robust error handling
18
+ Consolidated from app.py local implementation
19
+ """
20
 
21
  @staticmethod
22
+ def run_async(coro: Coroutine[Any, Any, T], timeout: Optional[float] = 30.0) -> Union[T, dict]:
23
  """
24
+ FIXED: Run async coroutine in sync context with nest_asyncio compatibility
25
 
26
  Args:
27
  coro: Async coroutine to run
28
+ timeout: Maximum time to wait for coroutine completion (seconds)
29
 
30
  Returns:
31
+ Result of the coroutine or error dictionary if failed
32
  """
33
+ start_time = time.time()
34
+
35
  try:
36
+ # Try to get running loop first (nest_asyncio compatible)
 
 
 
 
 
 
 
 
 
 
37
  try:
38
+ loop = asyncio.get_running_loop()
39
+ logger.debug("✅ Running in existing async context (nest_asyncio detected)")
40
+
41
+ # In a running loop, we need to schedule as a task
42
+ # This handles the "event loop already running" case
43
+ task = asyncio.create_task(coro)
44
+
45
+ # For sync context, we need to run until complete
46
+ # Use asyncio.run_coroutine_threadsafe for thread safety
47
+ import concurrent.futures
48
+ future = asyncio.run_coroutine_threadsafe(coro, loop)
49
+
50
+ try:
51
+ result = future.result(timeout=timeout)
52
+ logger.debug(f"✅ Async execution completed in {time.time() - start_time:.2f}s")
53
+ return result
54
+ except concurrent.futures.TimeoutError:
55
+ logger.error(f"❌ Async execution timed out after {timeout}s")
56
+ future.cancel()
57
+ return {
58
+ "error": f"Async execution timed out after {timeout}s",
59
+ "status": "failed",
60
+ "timeout": True,
61
+ "boundary_note": "Execution boundary reached - timeout"
62
+ }
63
+
64
+ except RuntimeError:
65
+ # No running loop, create one
66
+ logger.debug("🔄 Creating new event loop for async execution")
67
+ loop = asyncio.new_event_loop()
68
+ asyncio.set_event_loop(loop)
69
+
70
+ try:
71
+ result = loop.run_until_complete(asyncio.wait_for(coro, timeout=timeout))
72
+ logger.debug(f"✅ Async execution completed in {time.time() - start_time:.2f}s")
73
+ return result
74
+ except asyncio.TimeoutError:
75
+ logger.error(f"❌ Async execution timed out after {timeout}s")
76
+ return {
77
+ "error": f"Async execution timed out after {timeout}s",
78
+ "status": "failed",
79
+ "timeout": True,
80
+ "boundary_note": "Execution boundary reached - timeout"
81
+ }
82
+ finally:
83
+ # Clean up the loop
84
+ if not loop.is_closed():
85
+ loop.close()
86
+
87
+ except Exception as e:
88
+ logger.error(f"❌ Async execution failed: {e}", exc_info=True)
89
+ return {
90
+ "error": str(e),
91
+ "status": "failed",
92
+ "execution_time": time.time() - start_time,
93
+ "boundary_note": "Execution boundary reached",
94
+ "error_type": type(e).__name__
95
+ }
96
 
97
  @staticmethod
98
+ def async_to_sync(async_func: Callable[..., Coroutine[Any, Any, T]]) -> Callable[..., Union[T, dict]]:
99
  """
100
+ FIXED: Decorator to convert async function to sync with enhanced error handling
101
 
102
+ Usage:
103
+ @AsyncRunner.async_to_sync
104
+ async def my_async_function():
105
+ ...
106
+
107
+ # Can now be called synchronously
108
+ result = my_async_function()
109
  """
110
  @functools.wraps(async_func)
111
+ def wrapper(*args, **kwargs) -> Union[T, dict]:
112
+ try:
113
+ # Create the coroutine
114
+ coro = async_func(*args, **kwargs)
115
+
116
+ # Run it with timeout
117
+ return AsyncRunner.run_async(coro)
118
+
119
+ except Exception as e:
120
+ logger.error(f"❌ Async to sync conversion failed: {e}", exc_info=True)
121
+ return {
122
+ "error": str(e),
123
+ "status": "failed",
124
+ "boundary_context": "OSS advisory only - execution requires Enterprise",
125
+ "error_type": type(e).__name__
126
+ }
127
  return wrapper
128
+
129
+ @staticmethod
130
+ def is_async_context() -> bool:
131
+ """
132
+ Check if we're currently in an async context
133
+
134
+ Returns:
135
+ True if in async context, False otherwise
136
+ """
137
+ try:
138
+ asyncio.get_running_loop()
139
+ return True
140
+ except RuntimeError:
141
+ return False
142
 
143
+ # Convenience function for the decorator
144
+ def async_to_sync(async_func: Callable[..., Coroutine[Any, Any, T]]) -> Callable[..., Union[T, dict]]:
145
  """
146
  Convenience decorator to convert async function to sync
147
 
 
153
  # Can now be called synchronously
154
  result = my_async_function()
155
  """
156
+ return AsyncRunner.async_to_sync(async_func)
157
 
158
 
159
+ @contextmanager
160
+ def safe_event_loop():
161
+ """
162
+ Context manager for safe event loop handling with cleanup
163
+
164
+ Usage:
165
+ with safe_event_loop() as loop:
166
+ result = loop.run_until_complete(async_function())
167
+ """
168
+ loop = None
169
+ try:
170
+ # Try to get existing loop
171
+ try:
172
+ loop = asyncio.get_running_loop()
173
+ logger.debug("Using existing event loop")
174
+ yield loop
175
+ return
176
+ except RuntimeError:
177
+ pass
178
+
179
+ # Create new loop
180
+ loop = asyncio.new_event_loop()
181
+ asyncio.set_event_loop(loop)
182
+ logger.debug("Created new event loop")
183
+ yield loop
184
+
185
+ finally:
186
+ # Cleanup
187
+ if loop and not loop.is_closed():
188
+ loop.close()
189
+ logger.debug("Closed event loop")
190
+
191
+
192
+ class AsyncCircuitBreaker:
193
+ """
194
+ Circuit breaker pattern for async operations to prevent cascading failures
195
+ """
196
 
197
+ def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 30.0):
198
+ self.failure_threshold = failure_threshold
199
+ self.recovery_timeout = recovery_timeout
200
+ self.failure_count = 0
201
+ self.last_failure_time = 0
202
+ self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
203
+
204
+ def can_execute(self) -> bool:
205
+ """Check if circuit breaker allows execution"""
206
+ if self.state == "OPEN":
207
+ # Check if recovery timeout has passed
208
+ if time.time() - self.last_failure_time > self.recovery_timeout:
209
+ self.state = "HALF_OPEN"
210
+ logger.info("Circuit breaker moving to HALF_OPEN state")
211
+ return True
212
+ return False
213
+ return True
214
 
215
+ def record_success(self):
216
+ """Record successful execution"""
217
+ if self.state == "HALF_OPEN":
218
+ self.state = "CLOSED"
219
+ logger.info("Circuit breaker reset to CLOSED state")
220
+ self.failure_count = 0
221
+
222
+ def record_failure(self):
223
+ """Record failed execution"""
224
+ self.failure_count += 1
225
+ self.last_failure_time = time.time()
226
+
227
+ if self.failure_count >= self.failure_threshold:
228
+ self.state = "OPEN"
229
+ logger.warning(f"Circuit breaker OPENED after {self.failure_count} failures")
230
 
231
+ @AsyncRunner.async_to_sync
232
+ async def execute(self, coro: Coroutine) -> Any:
233
+ """Execute async operation with circuit breaker protection"""
234
+ if not self.can_execute():
235
+ raise Exception("Circuit breaker is OPEN - operation blocked")
236
+
237
+ try:
238
+ result = await coro
239
+ self.record_success()
240
+ return result
241
+ except Exception as e:
242
+ self.record_failure()
243
+ raise e