Amir Mahla commited on
Commit
1f6a70d
·
1 Parent(s): ea7380e

FIX pre-commit

Browse files
cua2-core/src/cua2_core/app.py CHANGED
@@ -22,7 +22,8 @@ async def lifespan(app: FastAPI):
22
  raise ValueError("HF_TOKEN is not set")
23
 
24
  num_workers = int(os.getenv("NUM_WORKERS", "1"))
25
- max_sandboxes = int(600 / num_workers)
 
26
 
27
  websocket_manager = WebSocketManager()
28
 
 
22
  raise ValueError("HF_TOKEN is not set")
23
 
24
  num_workers = int(os.getenv("NUM_WORKERS", "1"))
25
+ # max_sandboxes = int(600 / num_workers)
26
+ max_sandboxes = 600
27
 
28
  websocket_manager = WebSocketManager()
29
 
cua2-core/src/cua2_core/services/agent_service.py CHANGED
@@ -184,16 +184,37 @@ class AgentService:
184
 
185
  model = get_model(self.active_tasks[message_id].model_id)
186
 
187
- max_attempts = 20
188
- for _ in range(max_attempts):
 
189
  response = await self.sandbox_service.acquire_sandbox(message_id)
190
  if response.sandbox is not None and response.state == "ready":
191
  sandbox = response.sandbox
192
  break
193
  elif response.state == "max_sandboxes_reached":
194
- raise Exception("No sandbox available: pool limit reached")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
195
  await asyncio.sleep(2)
196
  if sandbox is None:
 
 
 
197
  raise Exception("No sandbox available: pool limit reached")
198
 
199
  data_dir = self.active_tasks[message_id].trace_path
 
184
 
185
  model = get_model(self.active_tasks[message_id].model_id)
186
 
187
+ # Wait for sandbox to be ready (it was already acquired in create_id_and_sandbox)
188
+ max_attempts = 60 # Increased timeout to 2 minutes (60 * 2s)
189
+ for attempt in range(max_attempts):
190
  response = await self.sandbox_service.acquire_sandbox(message_id)
191
  if response.sandbox is not None and response.state == "ready":
192
  sandbox = response.sandbox
193
  break
194
  elif response.state == "max_sandboxes_reached":
195
+ # Trigger cleanup of stuck and expired sandboxes before giving up
196
+ logger.warning(
197
+ f"Sandbox pool limit reached for {message_id}, attempting cleanup of stuck/expired sandboxes"
198
+ )
199
+ await self.sandbox_service.cleanup_stuck_creating_sandboxes()
200
+ await self.sandbox_service.cleanup_expired_ready_sandboxes()
201
+ # Try one more time after cleanup
202
+ response = await self.sandbox_service.acquire_sandbox(message_id)
203
+ if response.sandbox is not None and response.state == "ready":
204
+ sandbox = response.sandbox
205
+ break
206
+ elif response.state == "max_sandboxes_reached":
207
+ raise Exception("No sandbox available: pool limit reached")
208
+ # Log progress every 10 attempts
209
+ if attempt > 0 and attempt % 10 == 0:
210
+ logger.info(
211
+ f"Waiting for sandbox for {message_id}, attempt {attempt}/{max_attempts}, state: {response.state}"
212
+ )
213
  await asyncio.sleep(2)
214
  if sandbox is None:
215
+ # Final cleanup attempt before raising error
216
+ await self.sandbox_service.cleanup_stuck_creating_sandboxes()
217
+ await self.sandbox_service.cleanup_expired_ready_sandboxes()
218
  raise Exception("No sandbox available: pool limit reached")
219
 
220
  data_dir = self.active_tasks[message_id].trace_path
cua2-core/src/cua2_core/services/sandbox_service.py CHANGED
@@ -11,7 +11,7 @@ SANDBOX_METADATA: dict[str, dict[str, Any]] = {}
11
  SANDBOX_TIMEOUT = 500
12
  SANDBOX_CREATION_TIMEOUT = 200
13
  SANDBOX_CREATION_MAX_TIME = (
14
- 300 # Maximum time a sandbox can be in "creating" state (5 minutes)
15
  )
16
  WIDTH = 1280
17
  HEIGHT = 960
@@ -60,6 +60,7 @@ class SandboxService:
60
  time.sleep(3)
61
  return desktop
62
 
 
63
  try:
64
  desktop = await asyncio.to_thread(create_and_setup_sandbox)
65
  print(f"Sandbox ID for session {session_hash} is {desktop.sandbox_id}.")
@@ -74,6 +75,7 @@ class SandboxService:
74
  ):
75
  self.sandboxes[session_hash] = desktop
76
  self.sandbox_metadata[session_hash]["state"] = "ready"
 
77
  else:
78
  # Sandbox was released while creating, kill it immediately
79
  print(
@@ -86,17 +88,51 @@ class SandboxService:
86
 
87
  except Exception as e:
88
  print(f"Error creating sandbox for session {session_hash}: {str(e)}")
89
- # Clean up metadata on failure
90
  async with self.sandbox_lock:
91
  if session_hash in self.sandbox_metadata:
 
 
 
 
92
  del self.sandbox_metadata[session_hash]
 
 
 
 
 
 
 
 
 
 
93
 
94
  async def _periodic_cleanup(self):
95
- """Background task to periodically clean up stuck creating sandboxes"""
96
  while True:
97
  try:
98
- await asyncio.sleep(60) # Run every minute
99
- await self.cleanup_stuck_creating_sandboxes()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
100
  except asyncio.CancelledError:
101
  break
102
  except Exception as e:
@@ -105,7 +141,11 @@ class SandboxService:
105
  def start_periodic_cleanup(self):
106
  """Start the periodic cleanup task"""
107
  if self._cleanup_task is None or self._cleanup_task.done():
108
- self._cleanup_task = asyncio.create_task(self._periodic_cleanup())
 
 
 
 
109
 
110
  def stop_periodic_cleanup(self):
111
  """Stop the periodic cleanup task"""
@@ -159,8 +199,30 @@ class SandboxService:
159
  for meta in self.sandbox_metadata.values()
160
  if meta.get("state") == "creating"
161
  )
 
 
162
  # Check capacity BEFORE adding this session_hash to metadata
163
- if len(self.sandboxes) + creating_count >= self.max_sandboxes:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
164
  return SandboxResponse(sandbox=None, state="max_sandboxes_reached")
165
 
166
  # Mark that we're creating this sandbox
@@ -178,6 +240,14 @@ class SandboxService:
178
  asyncio.create_task(
179
  self._create_sandbox_background(session_hash, expired_sandbox)
180
  )
 
 
 
 
 
 
 
 
181
 
182
  # Check state after starting background task (it might complete very quickly)
183
  async with self.sandbox_lock:
@@ -257,6 +327,42 @@ class SandboxService:
257
 
258
  return len(stuck_sandboxes_to_kill)
259
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
260
  async def cleanup_sandboxes(self):
261
  sandboxes_to_kill = []
262
 
 
11
  SANDBOX_TIMEOUT = 500
12
  SANDBOX_CREATION_TIMEOUT = 200
13
  SANDBOX_CREATION_MAX_TIME = (
14
+ 120 # Maximum time a sandbox can be in "creating" state (2 minutes, reduced from 5)
15
  )
16
  WIDTH = 1280
17
  HEIGHT = 960
 
60
  time.sleep(3)
61
  return desktop
62
 
63
+ desktop = None
64
  try:
65
  desktop = await asyncio.to_thread(create_and_setup_sandbox)
66
  print(f"Sandbox ID for session {session_hash} is {desktop.sandbox_id}.")
 
75
  ):
76
  self.sandboxes[session_hash] = desktop
77
  self.sandbox_metadata[session_hash]["state"] = "ready"
78
+ print(f"Sandbox {session_hash} is now ready")
79
  else:
80
  # Sandbox was released while creating, kill it immediately
81
  print(
 
88
 
89
  except Exception as e:
90
  print(f"Error creating sandbox for session {session_hash}: {str(e)}")
91
+ # Clean up metadata on failure - CRITICAL to prevent leaks
92
  async with self.sandbox_lock:
93
  if session_hash in self.sandbox_metadata:
94
+ state = self.sandbox_metadata[session_hash].get("state")
95
+ print(
96
+ f"Cleaning up failed sandbox creation for {session_hash} (state was: {state})"
97
+ )
98
  del self.sandbox_metadata[session_hash]
99
+ # Also remove from sandboxes dict if it somehow got added
100
+ if session_hash in self.sandboxes:
101
+ del self.sandboxes[session_hash]
102
+ # Kill the sandbox if it was partially created
103
+ if desktop is not None:
104
+ try:
105
+ await asyncio.to_thread(desktop.kill)
106
+ print(f"Killed partially created sandbox for {session_hash}")
107
+ except Exception as kill_error:
108
+ print(f"Error killing partially created sandbox: {str(kill_error)}")
109
 
110
  async def _periodic_cleanup(self):
111
+ """Background task to periodically clean up stuck creating sandboxes and expired ready sandboxes"""
112
  while True:
113
  try:
114
+ await asyncio.sleep(30) # Run every 30 seconds (more aggressive)
115
+ cleaned_creating = await self.cleanup_stuck_creating_sandboxes()
116
+ cleaned_expired = await self.cleanup_expired_ready_sandboxes()
117
+ if cleaned_creating > 0 or cleaned_expired > 0:
118
+ print(
119
+ f"Periodic cleanup: removed {cleaned_creating} stuck creating + "
120
+ f"{cleaned_expired} expired ready = {cleaned_creating + cleaned_expired} total"
121
+ )
122
+ # Log sandbox pool state periodically for debugging
123
+ async with self.sandbox_lock:
124
+ ready_count = len(self.sandboxes)
125
+ creating_count = sum(
126
+ 1
127
+ for meta in self.sandbox_metadata.values()
128
+ if meta.get("state") == "creating"
129
+ )
130
+ total_count = ready_count + creating_count
131
+ if total_count > 0:
132
+ print(
133
+ f"Sandbox pool state: {ready_count} ready, {creating_count} creating, "
134
+ f"{total_count}/{self.max_sandboxes} total"
135
+ )
136
  except asyncio.CancelledError:
137
  break
138
  except Exception as e:
 
141
  def start_periodic_cleanup(self):
142
  """Start the periodic cleanup task"""
143
  if self._cleanup_task is None or self._cleanup_task.done():
144
+ try:
145
+ self._cleanup_task = asyncio.create_task(self._periodic_cleanup())
146
+ except RuntimeError as e:
147
+ # If called outside event loop, log but don't crash
148
+ print(f"Warning: Cannot start periodic cleanup (no event loop): {e}")
149
 
150
  def stop_periodic_cleanup(self):
151
  """Stop the periodic cleanup task"""
 
199
  for meta in self.sandbox_metadata.values()
200
  if meta.get("state") == "creating"
201
  )
202
+ ready_count = len(self.sandboxes)
203
+ total_count = ready_count + creating_count
204
  # Check capacity BEFORE adding this session_hash to metadata
205
+ if total_count >= self.max_sandboxes:
206
+ print(
207
+ f"Sandbox pool at capacity: {ready_count} ready + {creating_count} creating = "
208
+ f"{total_count}/{self.max_sandboxes}"
209
+ )
210
+ # CRITICAL: If we have an expired sandbox but can't create a new one,
211
+ # we must still kill the expired sandbox to prevent leaks
212
+ if expired_sandbox:
213
+ print(
214
+ f"Killing expired sandbox for {session_hash} even though pool is at capacity"
215
+ )
216
+
217
+ async def kill_expired():
218
+ try:
219
+ await asyncio.to_thread(expired_sandbox.kill)
220
+ except Exception as e:
221
+ print(
222
+ f"Error killing expired sandbox for {session_hash}: {str(e)}"
223
+ )
224
+
225
+ asyncio.create_task(kill_expired())
226
  return SandboxResponse(sandbox=None, state="max_sandboxes_reached")
227
 
228
  # Mark that we're creating this sandbox
 
240
  asyncio.create_task(
241
  self._create_sandbox_background(session_hash, expired_sandbox)
242
  )
243
+ elif expired_sandbox:
244
+ # If we're not creating but have an expired sandbox, kill it
245
+ # This shouldn't normally happen, but handle it defensively
246
+ print(f"Killing expired sandbox for {session_hash} (not creating new one)")
247
+ try:
248
+ await asyncio.to_thread(expired_sandbox.kill)
249
+ except Exception as e:
250
+ print(f"Error killing expired sandbox: {str(e)}")
251
 
252
  # Check state after starting background task (it might complete very quickly)
253
  async with self.sandbox_lock:
 
327
 
328
  return len(stuck_sandboxes_to_kill)
329
 
330
+ async def cleanup_expired_ready_sandboxes(self):
331
+ """Clean up ready sandboxes that have expired (not accessed for too long)"""
332
+ current_time = datetime.now()
333
+ expired_sandboxes_to_kill = []
334
+
335
+ async with self.sandbox_lock:
336
+ for session_hash, metadata in list(self.sandbox_metadata.items()):
337
+ if metadata.get("state") == "ready" and session_hash in self.sandboxes:
338
+ created_at = metadata.get("created_at")
339
+ if (
340
+ created_at
341
+ and (current_time - created_at).total_seconds()
342
+ >= SANDBOX_CREATION_TIMEOUT
343
+ ):
344
+ print(
345
+ f"Cleaning up expired ready sandbox for session {session_hash} "
346
+ f"(age: {(current_time - created_at).total_seconds():.1f}s)"
347
+ )
348
+ expired_sandboxes_to_kill.append(
349
+ (session_hash, self.sandboxes[session_hash])
350
+ )
351
+ del self.sandboxes[session_hash]
352
+ del self.sandbox_metadata[session_hash]
353
+
354
+ # Kill expired sandboxes outside of lock
355
+ for session_hash, sandbox in expired_sandboxes_to_kill:
356
+ try:
357
+ await asyncio.to_thread(sandbox.kill)
358
+ print(f"Killed expired ready sandbox for session {session_hash}")
359
+ except Exception as e:
360
+ print(
361
+ f"Error killing expired ready sandbox for session {session_hash}: {str(e)}"
362
+ )
363
+
364
+ return len(expired_sandboxes_to_kill)
365
+
366
  async def cleanup_sandboxes(self):
367
  sandboxes_to_kill = []
368