Amir Mahla commited on
Commit
8f218a6
·
1 Parent(s): 0d44ddc

FIX sandbox limit reached to often

Browse files
cua2-core/src/cua2_core/app.py CHANGED
@@ -29,6 +29,9 @@ async def lifespan(app: FastAPI):
29
 
30
  agent_service = AgentService(websocket_manager, sandbox_service, num_workers)
31
 
 
 
 
32
  # Store services in app state for access in routes
33
  app.state.websocket_manager = websocket_manager
34
  app.state.sandbox_service = sandbox_service
@@ -39,6 +42,7 @@ async def lifespan(app: FastAPI):
39
  yield
40
 
41
  print("Shutting down services...")
 
42
  await agent_service.cleanup()
43
  await sandbox_service.cleanup_sandboxes()
44
  print("Services shut down successfully")
 
29
 
30
  agent_service = AgentService(websocket_manager, sandbox_service, num_workers)
31
 
32
+ # Start periodic cleanup of stuck sandboxes
33
+ sandbox_service.start_periodic_cleanup()
34
+
35
  # Store services in app state for access in routes
36
  app.state.websocket_manager = websocket_manager
37
  app.state.sandbox_service = sandbox_service
 
42
  yield
43
 
44
  print("Shutting down services...")
45
+ sandbox_service.stop_periodic_cleanup()
46
  await agent_service.cleanup()
47
  await sandbox_service.cleanup_sandboxes()
48
  print("Services shut down successfully")
cua2-core/src/cua2_core/services/sandbox_service.py CHANGED
@@ -10,6 +10,9 @@ from pydantic import BaseModel
10
  SANDBOX_METADATA: dict[str, dict[str, Any]] = {}
11
  SANDBOX_TIMEOUT = 500
12
  SANDBOX_CREATION_TIMEOUT = 200
 
 
 
13
  WIDTH = 1280
14
  HEIGHT = 960
15
 
@@ -29,6 +32,7 @@ class SandboxService:
29
  self.sandboxes: dict[str, Sandbox] = {}
30
  self.sandbox_metadata: dict[str, dict[str, Any]] = {}
31
  self.sandbox_lock = asyncio.Lock()
 
32
 
33
  async def _create_sandbox_background(
34
  self, session_hash: str, expired_sandbox: Sandbox | None
@@ -60,12 +64,25 @@ class SandboxService:
60
  desktop = await asyncio.to_thread(create_and_setup_sandbox)
61
  print(f"Sandbox ID for session {session_hash} is {desktop.sandbox_id}.")
62
 
63
- # Log sandbox creation
64
-
65
  # Update sandbox state under lock
66
  async with self.sandbox_lock:
67
- self.sandboxes[session_hash] = desktop
68
- self.sandbox_metadata[session_hash]["state"] = "ready"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
 
70
  except Exception as e:
71
  print(f"Error creating sandbox for session {session_hash}: {str(e)}")
@@ -74,6 +91,27 @@ class SandboxService:
74
  if session_hash in self.sandbox_metadata:
75
  del self.sandbox_metadata[session_hash]
76
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
77
  async def acquire_sandbox(self, session_hash: str) -> SandboxResponse:
78
  current_time = datetime.now()
79
  should_create = False
@@ -114,10 +152,19 @@ class SandboxService:
114
  del self.sandbox_metadata[session_hash]
115
 
116
  # Check if we have capacity
117
- if len(self.sandboxes) >= self.max_sandboxes:
 
 
 
 
 
 
 
 
118
  return SandboxResponse(sandbox=None, state="max_sandboxes_reached")
119
 
120
  # Mark that we're creating this sandbox
 
121
  print(f"Creating new sandbox for session {session_hash}")
122
  self.sandbox_metadata[session_hash] = {
123
  "state": "creating",
@@ -132,14 +179,19 @@ class SandboxService:
132
  self._create_sandbox_background(session_hash, expired_sandbox)
133
  )
134
 
 
135
  async with self.sandbox_lock:
136
- if self.sandbox_metadata[session_hash]["state"] == "creating":
137
- return SandboxResponse(sandbox=None, state="creating")
138
- if self.sandbox_metadata[session_hash]["state"] == "ready":
139
- return SandboxResponse(
140
- sandbox=self.sandboxes[session_hash], state="ready"
141
- )
 
 
142
 
 
 
143
  return SandboxResponse(sandbox=None, state="creating")
144
 
145
  async def release_sandbox(self, session_hash: str):
@@ -151,8 +203,14 @@ class SandboxService:
151
  print(f"Releasing sandbox for session {session_hash}")
152
  sandbox_to_kill = self.sandboxes[session_hash]
153
  del self.sandboxes[session_hash]
154
- if session_hash in self.sandbox_metadata:
155
- del self.sandbox_metadata[session_hash]
 
 
 
 
 
 
156
 
157
  # Kill sandbox outside of lock
158
  if sandbox_to_kill:
@@ -161,6 +219,44 @@ class SandboxService:
161
  except Exception as e:
162
  print(f"Error killing sandbox for session {session_hash}: {str(e)}")
163
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
164
  async def cleanup_sandboxes(self):
165
  sandboxes_to_kill = []
166
 
 
10
  SANDBOX_METADATA: dict[str, dict[str, Any]] = {}
11
  SANDBOX_TIMEOUT = 500
12
  SANDBOX_CREATION_TIMEOUT = 200
13
+ SANDBOX_CREATION_MAX_TIME = (
14
+ 300 # Maximum time a sandbox can be in "creating" state (5 minutes)
15
+ )
16
  WIDTH = 1280
17
  HEIGHT = 960
18
 
 
32
  self.sandboxes: dict[str, Sandbox] = {}
33
  self.sandbox_metadata: dict[str, dict[str, Any]] = {}
34
  self.sandbox_lock = asyncio.Lock()
35
+ self._cleanup_task: asyncio.Task | None = None
36
 
37
  async def _create_sandbox_background(
38
  self, session_hash: str, expired_sandbox: Sandbox | None
 
64
  desktop = await asyncio.to_thread(create_and_setup_sandbox)
65
  print(f"Sandbox ID for session {session_hash} is {desktop.sandbox_id}.")
66
 
 
 
67
  # Update sandbox state under lock
68
  async with self.sandbox_lock:
69
+ # Double-check metadata still exists and is in "creating" state
70
+ # (it might have been released while we were creating)
71
+ if (
72
+ session_hash in self.sandbox_metadata
73
+ and self.sandbox_metadata[session_hash].get("state") == "creating"
74
+ ):
75
+ self.sandboxes[session_hash] = desktop
76
+ self.sandbox_metadata[session_hash]["state"] = "ready"
77
+ else:
78
+ # Sandbox was released while creating, kill it immediately
79
+ print(
80
+ f"Sandbox {session_hash} was released during creation, killing it"
81
+ )
82
+ try:
83
+ await asyncio.to_thread(desktop.kill)
84
+ except Exception as kill_error:
85
+ print(f"Error killing orphaned sandbox: {str(kill_error)}")
86
 
87
  except Exception as e:
88
  print(f"Error creating sandbox for session {session_hash}: {str(e)}")
 
91
  if session_hash in self.sandbox_metadata:
92
  del self.sandbox_metadata[session_hash]
93
 
94
+ async def _periodic_cleanup(self):
95
+ """Background task to periodically clean up stuck creating sandboxes"""
96
+ while True:
97
+ try:
98
+ await asyncio.sleep(60) # Run every minute
99
+ await self.cleanup_stuck_creating_sandboxes()
100
+ except asyncio.CancelledError:
101
+ break
102
+ except Exception as e:
103
+ print(f"Error in periodic cleanup: {str(e)}")
104
+
105
+ def start_periodic_cleanup(self):
106
+ """Start the periodic cleanup task"""
107
+ if self._cleanup_task is None or self._cleanup_task.done():
108
+ self._cleanup_task = asyncio.create_task(self._periodic_cleanup())
109
+
110
+ def stop_periodic_cleanup(self):
111
+ """Stop the periodic cleanup task"""
112
+ if self._cleanup_task and not self._cleanup_task.done():
113
+ self._cleanup_task.cancel()
114
+
115
  async def acquire_sandbox(self, session_hash: str) -> SandboxResponse:
116
  current_time = datetime.now()
117
  should_create = False
 
152
  del self.sandbox_metadata[session_hash]
153
 
154
  # Check if we have capacity
155
+ # Count both ready sandboxes and sandboxes in "creating" state
156
+ # We count BEFORE adding this one to ensure we don't exceed the limit
157
+ creating_count = sum(
158
+ 1
159
+ for meta in self.sandbox_metadata.values()
160
+ if meta.get("state") == "creating"
161
+ )
162
+ # Check capacity BEFORE adding this session_hash to metadata
163
+ if len(self.sandboxes) + creating_count >= self.max_sandboxes:
164
  return SandboxResponse(sandbox=None, state="max_sandboxes_reached")
165
 
166
  # Mark that we're creating this sandbox
167
+ # This happens atomically within the lock, so no race condition
168
  print(f"Creating new sandbox for session {session_hash}")
169
  self.sandbox_metadata[session_hash] = {
170
  "state": "creating",
 
179
  self._create_sandbox_background(session_hash, expired_sandbox)
180
  )
181
 
182
+ # Check state after starting background task (it might complete very quickly)
183
  async with self.sandbox_lock:
184
+ if session_hash in self.sandbox_metadata:
185
+ state = self.sandbox_metadata[session_hash].get("state")
186
+ if state == "creating":
187
+ return SandboxResponse(sandbox=None, state="creating")
188
+ if state == "ready" and session_hash in self.sandboxes:
189
+ return SandboxResponse(
190
+ sandbox=self.sandboxes[session_hash], state="ready"
191
+ )
192
 
193
+ # If metadata doesn't exist, it means creation failed immediately
194
+ # Return "creating" anyway as the caller will retry
195
  return SandboxResponse(sandbox=None, state="creating")
196
 
197
  async def release_sandbox(self, session_hash: str):
 
203
  print(f"Releasing sandbox for session {session_hash}")
204
  sandbox_to_kill = self.sandboxes[session_hash]
205
  del self.sandboxes[session_hash]
206
+ # Always clean up metadata, even if sandbox is still in "creating" state
207
+ if session_hash in self.sandbox_metadata:
208
+ state = self.sandbox_metadata[session_hash].get("state")
209
+ if state == "creating":
210
+ print(
211
+ f"Cleaning up stuck 'creating' sandbox for session {session_hash}"
212
+ )
213
+ del self.sandbox_metadata[session_hash]
214
 
215
  # Kill sandbox outside of lock
216
  if sandbox_to_kill:
 
219
  except Exception as e:
220
  print(f"Error killing sandbox for session {session_hash}: {str(e)}")
221
 
222
+ async def cleanup_stuck_creating_sandboxes(self):
223
+ """Clean up sandboxes that have been stuck in 'creating' state for too long"""
224
+ current_time = datetime.now()
225
+ stuck_sandboxes_to_kill = []
226
+
227
+ async with self.sandbox_lock:
228
+ for session_hash, metadata in list(self.sandbox_metadata.items()):
229
+ if metadata.get("state") == "creating":
230
+ created_at = metadata.get("created_at")
231
+ if (
232
+ created_at
233
+ and (current_time - created_at).total_seconds()
234
+ > SANDBOX_CREATION_MAX_TIME
235
+ ):
236
+ print(
237
+ f"Cleaning up stuck 'creating' sandbox for session {session_hash} "
238
+ f"(stuck for {(current_time - created_at).total_seconds():.1f}s)"
239
+ )
240
+ # Collect sandbox to kill if it exists
241
+ if session_hash in self.sandboxes:
242
+ stuck_sandboxes_to_kill.append(
243
+ (session_hash, self.sandboxes[session_hash])
244
+ )
245
+ del self.sandboxes[session_hash]
246
+ del self.sandbox_metadata[session_hash]
247
+
248
+ # Kill stuck sandboxes outside of lock
249
+ for session_hash, sandbox in stuck_sandboxes_to_kill:
250
+ try:
251
+ await asyncio.to_thread(sandbox.kill)
252
+ print(f"Killed stuck sandbox for session {session_hash}")
253
+ except Exception as e:
254
+ print(
255
+ f"Error killing stuck sandbox for session {session_hash}: {str(e)}"
256
+ )
257
+
258
+ return len(stuck_sandboxes_to_kill)
259
+
260
  async def cleanup_sandboxes(self):
261
  sandboxes_to_kill = []
262