Amir Mahla commited on
Commit
2d6254f
·
1 Parent(s): ebff663

NEW workflow

Browse files
cua2-core/src/cua2_core/services/agent_service.py CHANGED
@@ -127,10 +127,32 @@ class AgentService:
127
  trace.steps = []
128
  trace.traceMetadata = AgentTraceMetadata(traceId=trace_id)
129
 
 
130
  async with self._lock:
131
  if self.task_websockets[trace_id] != websocket:
132
- raise WebSocketException("WebSocket mismatch")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
133
 
 
 
134
  active_task = ActiveTask(
135
  message_id=trace_id,
136
  instruction=trace.instruction,
@@ -184,83 +206,67 @@ class AgentService:
184
 
185
  model = get_model(self.active_tasks[message_id].model_id)
186
 
187
- # Wait for sandbox to be ready (it was already acquired in create_id_and_sandbox)
188
- max_attempts = 60 # Increased timeout to 2 minutes (60 * 2s)
189
- last_state = None
190
  for attempt in range(max_attempts):
191
  response = await self.sandbox_service.acquire_sandbox(message_id)
192
- last_state = response.state
 
 
 
 
 
 
 
 
 
 
193
  if response.sandbox is not None and response.state == "ready":
194
  sandbox = response.sandbox
195
  break
196
- elif response.state == "max_sandboxes_reached":
197
- # Trigger cleanup of stuck and expired sandboxes before giving up
 
 
 
 
 
198
  logger.warning(
199
- f"Sandbox pool limit reached for {message_id}, attempting cleanup of stuck/expired sandboxes"
200
- )
201
- cleaned_creating = (
202
- await self.sandbox_service.cleanup_stuck_creating_sandboxes()
203
  )
204
- cleaned_expired = (
205
- await self.sandbox_service.cleanup_expired_ready_sandboxes()
206
- )
207
- logger.info(
208
- f"Cleanup completed: removed {cleaned_creating} stuck creating + {cleaned_expired} expired ready sandboxes"
209
- )
210
- # Try one more time after cleanup
211
- response = await self.sandbox_service.acquire_sandbox(message_id)
212
- last_state = response.state
213
- if response.sandbox is not None and response.state == "ready":
214
- sandbox = response.sandbox
215
- break
216
- elif response.state == "max_sandboxes_reached":
217
- (
218
- available_count,
219
- non_available_count,
220
- ) = await self.sandbox_service.get_sandbox_counts()
221
- raise Exception(
222
- f"No sandbox available: pool limit reached (available: {available_count}, non-available: {non_available_count}, max: {self.max_sandboxes})"
223
- )
224
  # Log progress every 10 attempts
225
  if attempt > 0 and attempt % 10 == 0:
226
  logger.info(
227
  f"Waiting for sandbox for {message_id}, attempt {attempt}/{max_attempts}, state: {response.state}"
228
  )
 
229
  await asyncio.sleep(2)
230
 
231
- # If sandbox is still None after all attempts, do final cleanup and check
232
  if sandbox is None:
233
- logger.warning(
234
- f"Sandbox for {message_id} still not ready after {max_attempts} attempts (last state: {last_state}), performing final cleanup"
235
- )
236
- # Final cleanup attempt before raising error - be more aggressive
237
- cleaned_creating = (
238
- await self.sandbox_service.cleanup_stuck_creating_sandboxes()
239
- )
240
- cleaned_expired = (
241
- await self.sandbox_service.cleanup_expired_ready_sandboxes()
242
- )
243
- logger.info(
244
- f"Final cleanup: removed {cleaned_creating} stuck creating + {cleaned_expired} expired ready sandboxes"
245
- )
246
-
247
- # Try one last time after cleanup
248
  (
249
  available_count,
250
- non_available_count,
251
  ) = await self.sandbox_service.get_sandbox_counts()
252
- # Provide more detailed error message
253
- error_msg = (
254
- f"No sandbox available for {message_id}: "
255
- f"available: {available_count}, non-available: {non_available_count}, "
256
- f"max: {self.max_sandboxes}, last_state: {last_state}"
257
- )
258
- if non_available_count > 0:
259
- error_msg += (
260
- f". There are {non_available_count} sandbox(s) stuck in 'creating' state "
261
- f"that may need manual cleanup or the cleanup threshold may be too high."
262
  )
263
- raise Exception(error_msg)
 
 
 
264
 
265
  data_dir = self.active_tasks[message_id].trace_path
266
  user_content = self.active_tasks[message_id].instruction
@@ -317,9 +323,14 @@ class AgentService:
317
  f"Error processing task: {traceback.format_exc()}", exc_info=True
318
  )
319
  final_state = "error"
320
- await self.websocket_manager.send_agent_error(
321
- error="Error processing task", websocket=websocket
322
- )
 
 
 
 
 
323
 
324
  finally:
325
  # Send completion event
 
127
  trace.steps = []
128
  trace.traceMetadata = AgentTraceMetadata(traceId=trace_id)
129
 
130
+ trace_id_to_release = None
131
  async with self._lock:
132
  if self.task_websockets[trace_id] != websocket:
133
+ # Release sandbox before raising exception to prevent leak
134
+ # Do this outside the lock to avoid deadlock
135
+ trace_id_to_release = trace_id
136
+ # Remove from task_websockets since we're rejecting this
137
+ if trace_id in self.task_websockets:
138
+ del self.task_websockets[trace_id]
139
+
140
+ # Release sandbox outside of lock if there was a mismatch
141
+ if trace_id_to_release:
142
+ try:
143
+ await self.sandbox_service.release_sandbox(trace_id_to_release)
144
+ logger.info(
145
+ f"Released sandbox for {trace_id_to_release} due to WebSocket mismatch"
146
+ )
147
+ except Exception as e:
148
+ logger.error(
149
+ f"Error releasing sandbox for {trace_id_to_release}: {e}",
150
+ exc_info=True,
151
+ )
152
+ raise WebSocketException("WebSocket mismatch")
153
 
154
+ # Continue with normal processing if no mismatch
155
+ async with self._lock:
156
  active_task = ActiveTask(
157
  message_id=trace_id,
158
  instruction=trace.instruction,
 
206
 
207
  model = get_model(self.active_tasks[message_id].model_id)
208
 
209
+ # Wait for sandbox to be ready
210
+ max_attempts = 60 # 2 minutes timeout (60 * 2s)
211
+ sandbox = None
212
  for attempt in range(max_attempts):
213
  response = await self.sandbox_service.acquire_sandbox(message_id)
214
+
215
+ # Check for creation errors
216
+ if response.error:
217
+ logger.error(
218
+ f"Sandbox creation failed for {message_id}: {response.error}",
219
+ exc_info=False,
220
+ )
221
+ # Continue retrying - might succeed on next attempt
222
+ await asyncio.sleep(2)
223
+ continue
224
+
225
  if response.sandbox is not None and response.state == "ready":
226
  sandbox = response.sandbox
227
  break
228
+
229
+ if response.state == "max_sandboxes_reached":
230
+ # Service handles cleanup automatically, but log the state
231
+ (
232
+ available_count,
233
+ pending_count,
234
+ ) = await self.sandbox_service.get_sandbox_counts()
235
  logger.warning(
236
+ f"Sandbox pool at capacity for {message_id}: "
237
+ f"{available_count} ready, {pending_count} pending, max: {self.max_sandboxes}"
 
 
238
  )
239
+ # Wait a bit and retry - cleanup may free up space
240
+ await asyncio.sleep(2)
241
+ continue
242
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
243
  # Log progress every 10 attempts
244
  if attempt > 0 and attempt % 10 == 0:
245
  logger.info(
246
  f"Waiting for sandbox for {message_id}, attempt {attempt}/{max_attempts}, state: {response.state}"
247
  )
248
+
249
  await asyncio.sleep(2)
250
 
251
+ # Check if we got a sandbox
252
  if sandbox is None:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
253
  (
254
  available_count,
255
+ pending_count,
256
  ) = await self.sandbox_service.get_sandbox_counts()
257
+ # Check for any final errors
258
+ final_response = await self.sandbox_service.acquire_sandbox(message_id)
259
+ error_info = ""
260
+ if final_response.error:
261
+ error_info = f" Last creation error: {final_response.error}"
262
+ logger.error(
263
+ f"Sandbox creation failed for {message_id} after {max_attempts} attempts: {final_response.error}",
264
+ exc_info=False,
 
 
265
  )
266
+ raise Exception(
267
+ f"No sandbox available for {message_id} after {max_attempts} attempts: "
268
+ f"{available_count} ready, {pending_count} pending, max: {self.max_sandboxes}.{error_info}"
269
+ )
270
 
271
  data_dir = self.active_tasks[message_id].trace_path
272
  user_content = self.active_tasks[message_id].instruction
 
323
  f"Error processing task: {traceback.format_exc()}", exc_info=True
324
  )
325
  final_state = "error"
326
+ if (
327
+ not websocket_exception
328
+ and websocket
329
+ and websocket.client_state == WebSocketState.CONNECTED
330
+ ):
331
+ await self.websocket_manager.send_agent_error(
332
+ error="Error processing task", websocket=websocket
333
+ )
334
 
335
  finally:
336
  # Send completion event
cua2-core/src/cua2_core/services/sandbox_service.py CHANGED
@@ -2,19 +2,13 @@ import asyncio
2
  import os
3
  import time
4
  from datetime import datetime
5
- from typing import Any, Literal
6
 
7
  from e2b_desktop import Sandbox
8
  from pydantic import BaseModel
9
 
10
- SANDBOX_METADATA: dict[str, dict[str, Any]] = {}
11
  SANDBOX_TIMEOUT = 500
12
- SANDBOX_READY_TIMEOUT = 200
13
- SANDBOX_CREATION_MAX_TIME = (
14
- 90 # Maximum time a sandbox can be in "creating" state (90 seconds)
15
- # Reduced from 120 to be more aggressive and clean up stuck sandboxes
16
- # before the agent_service loop times out (which waits 60 attempts * 2s = 120s)
17
- )
18
  WIDTH = 1280
19
  HEIGHT = 960
20
 
@@ -24,116 +18,242 @@ class SandboxResponse(BaseModel):
24
 
25
  sandbox: Sandbox | None
26
  state: Literal["creating", "ready", "max_sandboxes_reached"]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
 
28
 
29
  class SandboxService:
 
 
 
 
 
 
 
 
 
 
30
  def __init__(self, max_sandboxes: int = 50):
31
  if not os.getenv("E2B_API_KEY"):
32
  raise ValueError("E2B_API_KEY is not set")
33
  self.max_sandboxes = max_sandboxes
34
- self.sandboxes: dict[str, Sandbox] = {}
35
- self.sandbox_metadata: dict[str, dict[str, Any]] = {}
36
- self.sandbox_lock = asyncio.Lock()
 
 
 
37
  self._cleanup_task: asyncio.Task | None = None
38
 
39
- async def _create_sandbox_background(
40
- self, session_hash: str, expired_sandbox: Sandbox | None
41
- ):
42
- """Background task to create and setup a sandbox."""
43
- # Kill expired sandbox first
44
- if expired_sandbox:
45
- try:
46
- print(f"Closing expired sandbox for session {session_hash}")
47
- await asyncio.to_thread(expired_sandbox.kill)
48
- except Exception as e:
49
- print(f"Error closing expired sandbox: {str(e)}")
50
-
51
- def create_and_setup_sandbox():
52
- desktop = Sandbox.create(
53
- api_key=os.getenv("E2B_API_KEY"),
54
- resolution=(WIDTH, HEIGHT),
55
- dpi=96,
56
- timeout=SANDBOX_TIMEOUT,
57
- template="k0wmnzir0zuzye6dndlw",
58
- )
59
- desktop.stream.start(require_auth=True)
60
- setup_cmd = """sudo mkdir -p /usr/lib/firefox-esr/distribution && echo '{"policies":{"OverrideFirstRunPage":"","OverridePostUpdatePage":"","DisableProfileImport":true,"DontCheckDefaultBrowser":true}}' | sudo tee /usr/lib/firefox-esr/distribution/policies.json > /dev/null"""
61
- desktop.commands.run(setup_cmd)
62
- time.sleep(3)
63
- return desktop
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65
  desktop = None
66
  try:
67
- desktop = await asyncio.to_thread(create_and_setup_sandbox)
68
- print(f"Sandbox ID for session {session_hash} is {desktop.sandbox_id}.")
69
-
70
- # Update sandbox state under lock
71
- async with self.sandbox_lock:
72
- # Double-check metadata still exists and is in "creating" state
73
- # (it might have been released while we were creating)
74
- if (
75
- session_hash in self.sandbox_metadata
76
- and self.sandbox_metadata[session_hash].get("state") == "creating"
77
- ):
78
- self.sandboxes[session_hash] = desktop
79
- self.sandbox_metadata[session_hash]["state"] = "ready"
80
- print(f"Sandbox {session_hash} is now ready")
81
- else:
82
- # Sandbox was released while creating, kill it immediately
83
  print(
84
- f"Sandbox {session_hash} was released during creation, killing it"
85
  )
86
- try:
87
- await asyncio.to_thread(desktop.kill)
88
- except Exception as kill_error:
89
- print(f"Error killing orphaned sandbox: {str(kill_error)}")
90
 
91
- except Exception as e:
92
- print(f"Error creating sandbox for session {session_hash}: {str(e)}")
93
- # Clean up metadata on failure - CRITICAL to prevent leaks
94
- async with self.sandbox_lock:
95
- if session_hash in self.sandbox_metadata:
96
- state = self.sandbox_metadata[session_hash].get("state")
97
  print(
98
- f"Cleaning up failed sandbox creation for {session_hash} (state was: {state})"
99
  )
100
- del self.sandbox_metadata[session_hash]
101
- # Also remove from sandboxes dict if it somehow got added
102
- if session_hash in self.sandboxes:
103
- del self.sandboxes[session_hash]
104
- # Kill the sandbox if it was partially created
105
- if desktop is not None:
106
- try:
107
- await asyncio.to_thread(desktop.kill)
108
- print(f"Killed partially created sandbox for {session_hash}")
109
- except Exception as kill_error:
110
- print(f"Error killing partially created sandbox: {str(kill_error)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
111
 
112
  async def _periodic_cleanup(self):
113
- """Background task to periodically clean up stuck creating sandboxes and expired ready sandboxes"""
114
  while True:
115
  try:
116
- await asyncio.sleep(30) # Run every 30 seconds (more aggressive)
117
- cleaned_creating = await self.cleanup_stuck_creating_sandboxes()
118
- cleaned_expired = await self.cleanup_expired_ready_sandboxes()
119
- if cleaned_creating > 0 or cleaned_expired > 0:
120
- print(
121
- f"Periodic cleanup: removed {cleaned_creating} stuck creating + "
122
- f"{cleaned_expired} expired ready = {cleaned_creating + cleaned_expired} total"
123
- )
124
- # Log sandbox pool state periodically for debugging
125
- async with self.sandbox_lock:
126
  ready_count = len(self.sandboxes)
127
- creating_count = sum(
128
- 1
129
- for meta in self.sandbox_metadata.values()
130
- if meta.get("state") == "creating"
131
- )
132
- total_count = ready_count + creating_count
133
- if total_count > 0:
134
  print(
135
- f"Sandbox pool state: {ready_count} ready, {creating_count} creating, "
136
- f"{total_count}/{self.max_sandboxes} total"
137
  )
138
  except asyncio.CancelledError:
139
  break
@@ -146,7 +266,6 @@ class SandboxService:
146
  try:
147
  self._cleanup_task = asyncio.create_task(self._periodic_cleanup())
148
  except RuntimeError as e:
149
- # If called outside event loop, log but don't crash
150
  print(f"Warning: Cannot start periodic cleanup (no event loop): {e}")
151
 
152
  def stop_periodic_cleanup(self):
@@ -154,314 +273,14 @@ class SandboxService:
154
  if self._cleanup_task and not self._cleanup_task.done():
155
  self._cleanup_task.cancel()
156
 
157
- async def acquire_sandbox(self, session_hash: str) -> SandboxResponse:
158
- current_time = datetime.now()
159
- should_create = False
160
- expired_sandbox = None
161
-
162
- # Quick check under lock - only check state and mark creation
163
- async with self.sandbox_lock:
164
- # Check if sandbox exists and is ready
165
- if (
166
- session_hash in self.sandboxes
167
- and session_hash in self.sandbox_metadata
168
- and self.sandbox_metadata[session_hash].get("state") == "ready"
169
- and (
170
- current_time - self.sandbox_metadata[session_hash]["created_at"]
171
- ).total_seconds()
172
- < SANDBOX_READY_TIMEOUT
173
- ):
174
- print(f"Reusing Sandbox for session {session_hash}")
175
- self.sandbox_metadata[session_hash]["last_accessed"] = current_time
176
- return SandboxResponse(
177
- sandbox=self.sandboxes[session_hash], state="ready"
178
- )
179
-
180
- # Check if sandbox is already being created
181
- if (
182
- session_hash in self.sandbox_metadata
183
- and self.sandbox_metadata[session_hash].get("state") == "creating"
184
- ):
185
- # Check if this sandbox has been stuck in "creating" state for too long
186
- created_at = self.sandbox_metadata[session_hash].get("created_at")
187
- if created_at:
188
- stuck_duration = (current_time - created_at).total_seconds()
189
- if stuck_duration > SANDBOX_CREATION_MAX_TIME:
190
- # This sandbox is stuck - clean it up immediately
191
- print(
192
- f"Sandbox for session {session_hash} has been stuck in 'creating' state "
193
- f"for {stuck_duration:.1f}s (threshold: {SANDBOX_CREATION_MAX_TIME}s) - cleaning up"
194
- )
195
- # Remove from metadata and sandboxes dict
196
- if session_hash in self.sandboxes:
197
- # Schedule kill outside of lock with error handling
198
- stuck_sandbox = self.sandboxes[session_hash]
199
- del self.sandboxes[session_hash]
200
-
201
- async def kill_stuck():
202
- try:
203
- await asyncio.to_thread(stuck_sandbox.kill)
204
- except Exception as e:
205
- print(
206
- f"Error killing stuck sandbox for {session_hash}: {str(e)}"
207
- )
208
-
209
- asyncio.create_task(kill_stuck())
210
- del self.sandbox_metadata[session_hash]
211
- # Fall through to create a new sandbox
212
- else:
213
- print(
214
- f"Sandbox for session {session_hash} is already being created"
215
- )
216
- return SandboxResponse(sandbox=None, state="creating")
217
- else:
218
- # Missing created_at - corrupted metadata, clean it up
219
- print(
220
- f"WARNING: Sandbox {session_hash} in 'creating' state has no 'created_at' - cleaning up"
221
- )
222
- if session_hash in self.sandboxes:
223
- stuck_sandbox = self.sandboxes[session_hash]
224
- del self.sandboxes[session_hash]
225
-
226
- async def kill_stuck():
227
- try:
228
- await asyncio.to_thread(stuck_sandbox.kill)
229
- except Exception as e:
230
- print(
231
- f"Error killing stuck sandbox for {session_hash}: {str(e)}"
232
- )
233
-
234
- asyncio.create_task(kill_stuck())
235
- del self.sandbox_metadata[session_hash]
236
- # Fall through to create a new sandbox
237
-
238
- # Mark expired sandbox for cleanup (remove from dict within lock)
239
- if session_hash in self.sandboxes:
240
- print(f"Marking expired sandbox for session {session_hash} for cleanup")
241
- expired_sandbox = self.sandboxes[session_hash]
242
- del self.sandboxes[session_hash]
243
- if session_hash in self.sandbox_metadata:
244
- del self.sandbox_metadata[session_hash]
245
-
246
- # Check if we have capacity
247
- # Count both ready sandboxes and sandboxes in "creating" state
248
- # We count BEFORE adding this one to ensure we don't exceed the limit
249
- creating_count = sum(
250
- 1
251
- for meta in self.sandbox_metadata.values()
252
- if meta.get("state") == "creating"
253
- )
254
- ready_count = len(self.sandboxes)
255
- total_count = ready_count + creating_count
256
- # Check capacity BEFORE adding this session_hash to metadata
257
- if total_count >= self.max_sandboxes:
258
- print(
259
- f"Sandbox pool at capacity: {ready_count} ready + {creating_count} creating = "
260
- f"{total_count}/{self.max_sandboxes}"
261
- )
262
- # CRITICAL: If we have an expired sandbox but can't create a new one,
263
- # we must still kill the expired sandbox to prevent leaks
264
- if expired_sandbox:
265
- print(
266
- f"Killing expired sandbox for {session_hash} even though pool is at capacity"
267
- )
268
-
269
- async def kill_expired():
270
- try:
271
- await asyncio.to_thread(expired_sandbox.kill)
272
- except Exception as e:
273
- print(
274
- f"Error killing expired sandbox for {session_hash}: {str(e)}"
275
- )
276
-
277
- asyncio.create_task(kill_expired())
278
- return SandboxResponse(sandbox=None, state="max_sandboxes_reached")
279
-
280
- # Mark that we're creating this sandbox
281
- # This happens atomically within the lock, so no race condition
282
- print(f"Creating new sandbox for session {session_hash}")
283
- self.sandbox_metadata[session_hash] = {
284
- "state": "creating",
285
- "created_at": current_time,
286
- "last_accessed": current_time,
287
- }
288
- should_create = True
289
-
290
- # Start sandbox creation in background without waiting
291
- if should_create:
292
- asyncio.create_task(
293
- self._create_sandbox_background(session_hash, expired_sandbox)
294
- )
295
- elif expired_sandbox:
296
- # If we're not creating but have an expired sandbox, kill it
297
- # This shouldn't normally happen, but handle it defensively
298
- print(f"Killing expired sandbox for {session_hash} (not creating new one)")
299
- try:
300
- await asyncio.to_thread(expired_sandbox.kill)
301
- except Exception as e:
302
- print(f"Error killing expired sandbox: {str(e)}")
303
-
304
- # Check state after starting background task (it might complete very quickly)
305
- async with self.sandbox_lock:
306
- if session_hash in self.sandbox_metadata:
307
- state = self.sandbox_metadata[session_hash].get("state")
308
- if state == "creating":
309
- return SandboxResponse(sandbox=None, state="creating")
310
- if state == "ready" and session_hash in self.sandboxes:
311
- return SandboxResponse(
312
- sandbox=self.sandboxes[session_hash], state="ready"
313
- )
314
-
315
- # If metadata doesn't exist, it means creation failed immediately
316
- # Return "creating" anyway as the caller will retry
317
- return SandboxResponse(sandbox=None, state="creating")
318
-
319
- async def release_sandbox(self, session_hash: str):
320
- sandbox_to_kill = None
321
-
322
- # Remove from dictionaries under lock
323
- async with self.sandbox_lock:
324
- if session_hash in self.sandboxes:
325
- print(f"Releasing sandbox for session {session_hash}")
326
- sandbox_to_kill = self.sandboxes[session_hash]
327
- del self.sandboxes[session_hash]
328
- # Always clean up metadata, even if sandbox is still in "creating" state
329
- if session_hash in self.sandbox_metadata:
330
- state = self.sandbox_metadata[session_hash].get("state")
331
- if state == "creating":
332
- print(
333
- f"Cleaning up stuck 'creating' sandbox for session {session_hash}"
334
- )
335
- del self.sandbox_metadata[session_hash]
336
-
337
- # Kill sandbox outside of lock
338
- if sandbox_to_kill:
339
- try:
340
- await asyncio.to_thread(sandbox_to_kill.kill)
341
- except Exception as e:
342
- print(f"Error killing sandbox for session {session_hash}: {str(e)}")
343
-
344
- async def cleanup_stuck_creating_sandboxes(self):
345
- """Clean up sandboxes that have been stuck in 'creating' state for too long"""
346
- current_time = datetime.now()
347
- stuck_sandboxes_to_kill = []
348
-
349
- async with self.sandbox_lock:
350
- for session_hash, metadata in list(self.sandbox_metadata.items()):
351
- if metadata.get("state") == "creating":
352
- created_at = metadata.get("created_at")
353
- # Clean up if:
354
- # 1. created_at exists and is older than threshold, OR
355
- # 2. created_at is missing (corrupted metadata - should never happen but handle it)
356
- should_cleanup = False
357
- stuck_duration = 0.0
358
-
359
- if created_at:
360
- stuck_duration = (current_time - created_at).total_seconds()
361
- if stuck_duration > SANDBOX_CREATION_MAX_TIME:
362
- should_cleanup = True
363
- else:
364
- # Missing created_at is a bug, but clean it up anyway
365
- print(
366
- f"WARNING: Sandbox {session_hash} in 'creating' state has no 'created_at' timestamp - cleaning up"
367
- )
368
- should_cleanup = True
369
- stuck_duration = float("inf")
370
-
371
- if should_cleanup:
372
- print(
373
- f"Cleaning up stuck 'creating' sandbox for session {session_hash} "
374
- f"(stuck for {stuck_duration:.1f}s)"
375
- )
376
- # Collect sandbox to kill if it exists
377
- if session_hash in self.sandboxes:
378
- stuck_sandboxes_to_kill.append(
379
- (session_hash, self.sandboxes[session_hash])
380
- )
381
- del self.sandboxes[session_hash]
382
- del self.sandbox_metadata[session_hash]
383
-
384
- # Kill stuck sandboxes outside of lock
385
- for session_hash, sandbox in stuck_sandboxes_to_kill:
386
- try:
387
- await asyncio.to_thread(sandbox.kill)
388
- print(f"Killed stuck sandbox for session {session_hash}")
389
- except Exception as e:
390
- print(
391
- f"Error killing stuck sandbox for session {session_hash}: {str(e)}"
392
- )
393
-
394
- return len(stuck_sandboxes_to_kill)
395
-
396
- async def cleanup_expired_ready_sandboxes(self):
397
- """Clean up ready sandboxes that have expired (not accessed for too long)"""
398
- current_time = datetime.now()
399
- expired_sandboxes_to_kill = []
400
-
401
- async with self.sandbox_lock:
402
- for session_hash, metadata in list(self.sandbox_metadata.items()):
403
- if metadata.get("state") == "ready" and session_hash in self.sandboxes:
404
- created_at = metadata.get("created_at")
405
- if (
406
- created_at
407
- and (current_time - created_at).total_seconds()
408
- >= SANDBOX_READY_TIMEOUT
409
- ):
410
- print(
411
- f"Cleaning up expired ready sandbox for session {session_hash} "
412
- f"(age: {(current_time - created_at).total_seconds():.1f}s)"
413
- )
414
- expired_sandboxes_to_kill.append(
415
- (session_hash, self.sandboxes[session_hash])
416
- )
417
- del self.sandboxes[session_hash]
418
- del self.sandbox_metadata[session_hash]
419
-
420
- # Kill expired sandboxes outside of lock
421
- for session_hash, sandbox in expired_sandboxes_to_kill:
422
- try:
423
- await asyncio.to_thread(sandbox.kill)
424
- print(f"Killed expired ready sandbox for session {session_hash}")
425
- except Exception as e:
426
- print(
427
- f"Error killing expired ready sandbox for session {session_hash}: {str(e)}"
428
- )
429
-
430
- return len(expired_sandboxes_to_kill)
431
-
432
- async def get_sandbox_counts(self) -> tuple[int, int]:
433
- """
434
- Get the count of available (ready) and non-available (creating) sandboxes.
435
-
436
- Returns:
437
- Tuple of (available_count, non_available_count)
438
- """
439
- async with self.sandbox_lock:
440
- ready_count = len(self.sandboxes)
441
- creating_count = sum(
442
- 1
443
- for meta in self.sandbox_metadata.values()
444
- if meta.get("state") == "creating"
445
- )
446
- return (ready_count, creating_count)
447
-
448
  async def cleanup_sandboxes(self):
449
- sandboxes_to_kill = []
 
 
 
450
 
451
- # Collect sandboxes under lock
452
- async with self.sandbox_lock:
453
- for session_hash in list(self.sandboxes.keys()):
454
- sandboxes_to_kill.append((session_hash, self.sandboxes[session_hash]))
455
- del self.sandboxes[session_hash]
456
- if session_hash in self.sandbox_metadata:
457
- del self.sandbox_metadata[session_hash]
458
-
459
- # Kill all sandboxes outside of lock
460
- for session_hash, sandbox in sandboxes_to_kill:
461
- try:
462
- await asyncio.to_thread(sandbox.kill)
463
- except Exception as e:
464
- print(f"Error killing sandbox for session {session_hash}: {str(e)}")
465
 
466
 
467
  if __name__ == "__main__":
 
2
  import os
3
  import time
4
  from datetime import datetime
5
+ from typing import Literal
6
 
7
  from e2b_desktop import Sandbox
8
  from pydantic import BaseModel
9
 
 
10
  SANDBOX_TIMEOUT = 500
11
+ SANDBOX_READY_TIMEOUT = 200 # Seconds before a sandbox expires
 
 
 
 
 
12
  WIDTH = 1280
13
  HEIGHT = 960
14
 
 
18
 
19
  sandbox: Sandbox | None
20
  state: Literal["creating", "ready", "max_sandboxes_reached"]
21
+ error: str | None = None # Error message if creation failed
22
+
23
+
24
+ class SandboxEntry:
25
+ """Simple container for sandbox and its metadata"""
26
+
27
+ def __init__(self, sandbox: Sandbox):
28
+ self.sandbox = sandbox
29
+ self.created_at = datetime.now()
30
+ self.last_accessed = datetime.now()
31
+
32
+ def is_expired(self) -> bool:
33
+ """Check if sandbox has expired"""
34
+ age = (datetime.now() - self.created_at).total_seconds()
35
+ return age >= SANDBOX_READY_TIMEOUT
36
+
37
+ def update_access(self):
38
+ """Update last access time"""
39
+ self.last_accessed = datetime.now()
40
 
41
 
42
  class SandboxService:
43
+ """
44
+ Simplified sandbox service for production use.
45
+
46
+ Key simplifications:
47
+ - Non-blocking sandbox creation (background tasks)
48
+ - Simple expiration-based cleanup
49
+ - Minimal state tracking (pending vs ready)
50
+ - Straightforward locking
51
+ """
52
+
53
  def __init__(self, max_sandboxes: int = 50):
54
  if not os.getenv("E2B_API_KEY"):
55
  raise ValueError("E2B_API_KEY is not set")
56
  self.max_sandboxes = max_sandboxes
57
+ self.sandboxes: dict[str, SandboxEntry] = {} # Ready sandboxes
58
+ self.pending: set[str] = set() # Session hashes currently being created
59
+ self.creation_errors: dict[
60
+ str, str
61
+ ] = {} # Track creation errors by session_hash
62
+ self.lock = asyncio.Lock()
63
  self._cleanup_task: asyncio.Task | None = None
64
 
65
+ def _create_and_setup_sandbox(self) -> Sandbox:
66
+ """Create and setup a sandbox (synchronous operation)"""
67
+ desktop = Sandbox.create(
68
+ api_key=os.getenv("E2B_API_KEY"),
69
+ resolution=(WIDTH, HEIGHT),
70
+ dpi=96,
71
+ timeout=SANDBOX_TIMEOUT,
72
+ template="k0wmnzir0zuzye6dndlw",
73
+ )
74
+ desktop.stream.start(require_auth=True)
75
+ setup_cmd = """sudo mkdir -p /usr/lib/firefox-esr/distribution && echo '{"policies":{"OverrideFirstRunPage":"","OverridePostUpdatePage":"","DisableProfileImport":true,"DontCheckDefaultBrowser":true}}' | sudo tee /usr/lib/firefox-esr/distribution/policies.json > /dev/null"""
76
+ desktop.commands.run(setup_cmd)
77
+ time.sleep(3)
78
+ return desktop
79
+
80
+ async def acquire_sandbox(self, session_hash: str) -> SandboxResponse:
81
+ """
82
+ Acquire a sandbox for a session.
83
+ Returns immediately - either with ready sandbox, or "creating" if one is being created.
84
+ """
85
+ async with self.lock:
86
+ # Check if we have a valid sandbox for this session
87
+ if session_hash in self.sandboxes:
88
+ entry = self.sandboxes[session_hash]
89
+ if not entry.is_expired():
90
+ entry.update_access()
91
+ print(f"Reusing sandbox for session {session_hash}")
92
+ return SandboxResponse(sandbox=entry.sandbox, state="ready")
93
+ else:
94
+ # Expired - remove it
95
+ print(f"Removing expired sandbox for session {session_hash}")
96
+ old_entry = self.sandboxes.pop(session_hash)
97
+ # Schedule cleanup outside lock
98
+ asyncio.create_task(
99
+ self._kill_sandbox_safe(old_entry.sandbox, session_hash)
100
+ )
101
+
102
+ # Check if already being created
103
+ if session_hash in self.pending:
104
+ print(f"Sandbox for session {session_hash} is already being created")
105
+ # Check if there was a creation error
106
+ if session_hash in self.creation_errors:
107
+ error_msg = self.creation_errors.pop(session_hash)
108
+ return SandboxResponse(
109
+ sandbox=None, state="creating", error=error_msg
110
+ )
111
+ return SandboxResponse(sandbox=None, state="creating")
112
 
113
+ # Check if there's a previous creation error (shouldn't happen, but handle it)
114
+ if session_hash in self.creation_errors:
115
+ error_msg = self.creation_errors.pop(session_hash)
116
+ return SandboxResponse(sandbox=None, state="creating", error=error_msg)
117
+
118
+ # Check capacity (count both ready and pending)
119
+ total_count = len(self.sandboxes) + len(self.pending)
120
+ if total_count >= self.max_sandboxes:
121
+ print(
122
+ f"Sandbox pool at capacity: {len(self.sandboxes)} ready + {len(self.pending)} pending = {total_count}/{self.max_sandboxes}"
123
+ )
124
+ # Try to clean up expired sandboxes first
125
+ await self._cleanup_expired_internal()
126
+ # Recheck capacity after cleanup
127
+ total_count = len(self.sandboxes) + len(self.pending)
128
+ if total_count >= self.max_sandboxes:
129
+ return SandboxResponse(sandbox=None, state="max_sandboxes_reached")
130
+
131
+ # Mark as pending and start creation in background
132
+ self.pending.add(session_hash)
133
+ print(f"Starting creation of sandbox for session {session_hash}")
134
+
135
+ # Start creation in background (non-blocking)
136
+ asyncio.create_task(self._create_sandbox_background(session_hash))
137
+ return SandboxResponse(sandbox=None, state="creating")
138
+
139
+ async def _create_sandbox_background(self, session_hash: str):
140
+ """Background task to create a sandbox"""
141
  desktop = None
142
  try:
143
+ desktop = await asyncio.to_thread(self._create_and_setup_sandbox)
144
+ print(
145
+ f"Sandbox created for session {session_hash}, ID: {desktop.sandbox_id}"
146
+ )
147
+
148
+ async with self.lock:
149
+ # Check if session was released while creating (removed from pending)
150
+ was_released = session_hash not in self.pending
151
+ self.pending.discard(session_hash)
152
+
153
+ if was_released:
154
+ # Session was released - kill the sandbox
 
 
 
 
155
  print(
156
+ f"Session {session_hash} was released during creation, killing sandbox"
157
  )
158
+ asyncio.create_task(self._kill_sandbox_safe(desktop, session_hash))
159
+ return
 
 
160
 
161
+ # Check capacity before adding
162
+ if len(self.sandboxes) >= self.max_sandboxes:
 
 
 
 
163
  print(
164
+ f"Pool at capacity, killing newly created sandbox for {session_hash}"
165
  )
166
+ asyncio.create_task(self._kill_sandbox_safe(desktop, session_hash))
167
+ return
168
+
169
+ # Add to pool
170
+ self.sandboxes[session_hash] = SandboxEntry(desktop)
171
+ print(f"Sandbox {session_hash} is now ready")
172
+
173
+ except Exception as e:
174
+ error_msg = str(e)
175
+ import traceback
176
+
177
+ error_details = traceback.format_exc()
178
+ print(f"Error creating sandbox for session {session_hash}: {error_msg}")
179
+ print(f"Full traceback: {error_details}")
180
+
181
+ async with self.lock:
182
+ self.pending.discard(session_hash)
183
+ # Store error so agent service can retrieve it
184
+ self.creation_errors[session_hash] = error_msg
185
+ if desktop:
186
+ asyncio.create_task(self._kill_sandbox_safe(desktop, session_hash))
187
+
188
+ async def release_sandbox(self, session_hash: str):
189
+ """Release a sandbox for a session"""
190
+ sandbox = None
191
+ async with self.lock:
192
+ if session_hash in self.sandboxes:
193
+ entry = self.sandboxes.pop(session_hash)
194
+ sandbox = entry.sandbox
195
+ # Also remove from pending if it's there (creation will be cancelled)
196
+ self.pending.discard(session_hash)
197
+ # Clean up any stored errors
198
+ self.creation_errors.pop(session_hash, None)
199
+
200
+ if sandbox:
201
+ await self._kill_sandbox_safe(sandbox, session_hash)
202
+ print(f"Released sandbox for session {session_hash}")
203
+
204
+ async def _kill_sandbox_safe(self, sandbox: Sandbox, session_hash: str):
205
+ """Safely kill a sandbox with error handling"""
206
+ try:
207
+ await asyncio.to_thread(sandbox.kill)
208
+ except Exception as e:
209
+ print(f"Error killing sandbox for session {session_hash}: {str(e)}")
210
+
211
+ async def _cleanup_expired_internal(self) -> int:
212
+ """Internal cleanup of expired sandboxes (must be called with lock held)"""
213
+ expired = []
214
+ for session_hash, entry in list(self.sandboxes.items()):
215
+ if entry.is_expired():
216
+ expired.append((session_hash, entry.sandbox))
217
+ del self.sandboxes[session_hash]
218
+
219
+ # Kill expired sandboxes outside lock
220
+ for session_hash, sandbox in expired:
221
+ await self._kill_sandbox_safe(sandbox, session_hash)
222
+ print(f"Cleaned up expired sandbox for session {session_hash}")
223
+
224
+ return len(expired)
225
+
226
+ async def cleanup_expired_ready_sandboxes(self) -> int:
227
+ """Clean up expired ready sandboxes"""
228
+ async with self.lock:
229
+ return await self._cleanup_expired_internal()
230
+
231
+ async def get_sandbox_counts(self) -> tuple[int, int]:
232
+ """
233
+ Get the count of available (ready) and non-available (pending) sandboxes.
234
+ Returns: (available_count, non_available_count)
235
+ """
236
+ async with self.lock:
237
+ available = len(self.sandboxes)
238
+ non_available = len(self.pending)
239
+ return (available, non_available)
240
 
241
  async def _periodic_cleanup(self):
242
+ """Periodic cleanup task"""
243
  while True:
244
  try:
245
+ await asyncio.sleep(60) # Run every minute
246
+ async with self.lock:
247
+ cleaned = await self._cleanup_expired_internal()
248
+ if cleaned > 0:
249
+ print(f"Periodic cleanup: removed {cleaned} expired sandboxes")
250
+ # Log pool state
 
 
 
 
251
  ready_count = len(self.sandboxes)
252
+ pending_count = len(self.pending)
253
+ total = ready_count + pending_count
254
+ if total > 0:
 
 
 
 
255
  print(
256
+ f"Sandbox pool: {ready_count} ready, {pending_count} pending, {total}/{self.max_sandboxes} total"
 
257
  )
258
  except asyncio.CancelledError:
259
  break
 
266
  try:
267
  self._cleanup_task = asyncio.create_task(self._periodic_cleanup())
268
  except RuntimeError as e:
 
269
  print(f"Warning: Cannot start periodic cleanup (no event loop): {e}")
270
 
271
  def stop_periodic_cleanup(self):
 
273
  if self._cleanup_task and not self._cleanup_task.done():
274
  self._cleanup_task.cancel()
275
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
276
  async def cleanup_sandboxes(self):
277
+ """Clean up all sandboxes"""
278
+ async with self.lock:
279
+ sandboxes_to_kill = list(self.sandboxes.values())
280
+ self.sandboxes.clear()
281
 
282
+ for entry in sandboxes_to_kill:
283
+ await self._kill_sandbox_safe(entry.sandbox, "cleanup")
 
 
 
 
 
 
 
 
 
 
 
 
284
 
285
 
286
  if __name__ == "__main__":