bibibi12345 commited on
Commit
1c56302
·
1 Parent(s): fd6cc91
Files changed (1) hide show
  1. app.py +85 -24
app.py CHANGED
@@ -47,6 +47,7 @@ def serve_static(filename):
47
 
48
  async def process_fal_request(request_id, model_endpoint, fal_arguments, api_key):
49
  """Process FAL API request asynchronously"""
 
50
  try:
51
  print(f"[DEBUG] Starting async processing for request {request_id}")
52
  print(f"[DEBUG] Model endpoint: {model_endpoint}")
@@ -72,24 +73,47 @@ async def process_fal_request(request_id, model_endpoint, fal_arguments, api_key
72
  'result': None
73
  }
74
 
75
- # Collect logs asynchronously
76
  print(f"[DEBUG] Starting to collect events...")
77
  event_count = 0
78
- async for event in handler.iter_events(with_logs=True):
79
- event_count += 1
80
- print(f"[DEBUG] Event #{event_count}: {type(event).__name__}")
81
-
82
- if hasattr(event, 'logs') and event.logs:
83
- for log in event.logs:
84
- log_msg = log.get("message", "")
85
- print(f"[DEBUG] Log: {log_msg}")
86
- active_requests[request_id]['logs'].append(log_msg)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87
 
88
  print(f"[DEBUG] Total events collected: {event_count}")
89
 
90
- # Get the final result
91
  print(f"[DEBUG] Getting final result...")
92
- result = await handler.get()
 
 
 
 
93
 
94
  print(f"[DEBUG] Result received: {json.dumps(result, indent=2)[:500] if result else 'None'}...")
95
 
@@ -105,6 +129,11 @@ async def process_fal_request(request_id, model_endpoint, fal_arguments, api_key
105
  print(f"[DEBUG] Async processing completed for request {request_id}")
106
  return result
107
 
 
 
 
 
 
108
  except Exception as e:
109
  print(f"[ERROR] Exception in process_fal_request: {str(e)}")
110
  import traceback
@@ -117,17 +146,23 @@ def run_async_task(request_id, model_endpoint, fal_arguments, api_key):
117
  """Run async task with proper event loop management"""
118
  print(f"[DEBUG run_async_task] Starting for request {request_id}")
119
 
120
- # Always create a fresh event loop for each task to avoid conflicts
121
  loop = None
122
  try:
123
  # Create a new event loop for this thread
124
  loop = asyncio.new_event_loop()
125
  asyncio.set_event_loop(loop)
126
 
 
 
 
127
  # Run the async task in the new loop
128
  result = loop.run_until_complete(process_fal_request(request_id, model_endpoint, fal_arguments, api_key))
129
  print(f"[DEBUG run_async_task] Completed with result keys: {result.keys() if result else 'None'}")
130
 
 
 
 
131
  except Exception as e:
132
  print(f"[ERROR run_async_task] Failed: {str(e)}")
133
  import traceback
@@ -137,22 +172,48 @@ def run_async_task(request_id, model_endpoint, fal_arguments, api_key):
137
  active_requests[request_id]['status'] = 'error'
138
  active_requests[request_id]['error'] = str(e)
139
  finally:
140
- # Always close and cleanup the event loop
141
  if loop:
142
  try:
143
- # Cancel any remaining tasks
144
- pending = asyncio.all_tasks(loop)
145
- for task in pending:
146
- task.cancel()
147
- # Run until all tasks are cancelled
148
- if pending:
149
- loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
150
  except Exception as cleanup_error:
151
  print(f"[WARNING] Error during loop cleanup: {cleanup_error}")
152
  finally:
153
- loop.close()
154
- # Clear the event loop from the thread
155
- asyncio.set_event_loop(None)
 
 
 
 
 
 
 
 
 
 
 
156
 
157
  @app.route('/api/generate', methods=['POST'])
158
  def generate():
 
47
 
48
  async def process_fal_request(request_id, model_endpoint, fal_arguments, api_key):
49
  """Process FAL API request asynchronously"""
50
+ handler = None
51
  try:
52
  print(f"[DEBUG] Starting async processing for request {request_id}")
53
  print(f"[DEBUG] Model endpoint: {model_endpoint}")
 
73
  'result': None
74
  }
75
 
76
+ # Collect logs asynchronously with better error handling
77
  print(f"[DEBUG] Starting to collect events...")
78
  event_count = 0
79
+ max_events = 1000 # Prevent infinite loops
80
+
81
+ try:
82
+ async for event in handler.iter_events(with_logs=True):
83
+ event_count += 1
84
+ print(f"[DEBUG] Event #{event_count}: {type(event).__name__}")
85
+
86
+ if hasattr(event, 'logs') and event.logs:
87
+ for log in event.logs:
88
+ log_msg = log.get("message", "")
89
+ print(f"[DEBUG] Log: {log_msg}")
90
+ active_requests[request_id]['logs'].append(log_msg)
91
+
92
+ # Safety check to prevent infinite loops
93
+ if event_count >= max_events:
94
+ print(f"[WARNING] Max events ({max_events}) reached, breaking loop")
95
+ break
96
+ except asyncio.CancelledError:
97
+ print(f"[WARNING] Event iteration cancelled for request {request_id}")
98
+ raise
99
+ except RuntimeError as e:
100
+ if "Event loop is closed" in str(e):
101
+ print(f"[WARNING] Event loop closed during iteration, attempting to continue...")
102
+ else:
103
+ print(f"[WARNING] Runtime error during event iteration: {e}")
104
+ except Exception as iter_error:
105
+ print(f"[WARNING] Error during event iteration: {iter_error}")
106
+ # Continue to try to get the result even if event iteration fails
107
 
108
  print(f"[DEBUG] Total events collected: {event_count}")
109
 
110
+ # Get the final result with timeout to prevent hanging
111
  print(f"[DEBUG] Getting final result...")
112
+ try:
113
+ result = await asyncio.wait_for(handler.get(), timeout=60) # 60 second timeout
114
+ except asyncio.TimeoutError:
115
+ print(f"[ERROR] Timeout waiting for result")
116
+ raise Exception("Timeout waiting for FAL API result")
117
 
118
  print(f"[DEBUG] Result received: {json.dumps(result, indent=2)[:500] if result else 'None'}...")
119
 
 
129
  print(f"[DEBUG] Async processing completed for request {request_id}")
130
  return result
131
 
132
+ except asyncio.CancelledError:
133
+ print(f"[WARNING] Task cancelled for request {request_id}")
134
+ active_requests[request_id]['status'] = 'error'
135
+ active_requests[request_id]['error'] = 'Task was cancelled'
136
+ raise
137
  except Exception as e:
138
  print(f"[ERROR] Exception in process_fal_request: {str(e)}")
139
  import traceback
 
146
  """Run async task with proper event loop management"""
147
  print(f"[DEBUG run_async_task] Starting for request {request_id}")
148
 
149
+ # Create and run event loop in a way that prevents premature closure
150
  loop = None
151
  try:
152
  # Create a new event loop for this thread
153
  loop = asyncio.new_event_loop()
154
  asyncio.set_event_loop(loop)
155
 
156
+ # Enable debug mode to get better error messages
157
+ loop.set_debug(False) # Set to True for more verbose debugging
158
+
159
  # Run the async task in the new loop
160
  result = loop.run_until_complete(process_fal_request(request_id, model_endpoint, fal_arguments, api_key))
161
  print(f"[DEBUG run_async_task] Completed with result keys: {result.keys() if result else 'None'}")
162
 
163
+ # Ensure all async generators are closed properly
164
+ loop.run_until_complete(asyncio.sleep(0)) # Process any remaining callbacks
165
+
166
  except Exception as e:
167
  print(f"[ERROR run_async_task] Failed: {str(e)}")
168
  import traceback
 
172
  active_requests[request_id]['status'] = 'error'
173
  active_requests[request_id]['error'] = str(e)
174
  finally:
175
+ # More careful cleanup to avoid closing loop while still in use
176
  if loop:
177
  try:
178
+ # Give time for any final async operations to complete
179
+ if not loop.is_closed():
180
+ # Run a small delay to let any final callbacks execute
181
+ loop.run_until_complete(asyncio.sleep(0.5))
182
+
183
+ # Get all tasks that are still pending
184
+ pending = asyncio.all_tasks(loop)
185
+
186
+ # Cancel them gracefully
187
+ for task in pending:
188
+ task.cancel()
189
+
190
+ # Wait for all tasks to be cancelled, with a timeout
191
+ if pending:
192
+ loop.run_until_complete(
193
+ asyncio.wait_for(
194
+ asyncio.gather(*pending, return_exceptions=True),
195
+ timeout=5.0
196
+ )
197
+ )
198
+ except asyncio.TimeoutError:
199
+ print(f"[WARNING] Timeout during task cleanup")
200
  except Exception as cleanup_error:
201
  print(f"[WARNING] Error during loop cleanup: {cleanup_error}")
202
  finally:
203
+ # Close the loop only after everything is done
204
+ try:
205
+ if not loop.is_closed():
206
+ # Stop the loop first to ensure it's not running
207
+ loop.stop()
208
+ # Give it a moment to stop
209
+ loop.run_until_complete(asyncio.sleep(0))
210
+ # Now close it
211
+ loop.close()
212
+ except Exception as close_error:
213
+ print(f"[WARNING] Error closing loop: {close_error}")
214
+ finally:
215
+ # Clear the event loop from the thread
216
+ asyncio.set_event_loop(None)
217
 
218
  @app.route('/api/generate', methods=['POST'])
219
  def generate():