lewtun HF Staff OpenAI Codex commited on
Commit
a436c02
·
2 Parent(s): ab0ff247636865

Deploy 2026-05-04

Browse files

Co-authored-by: OpenAI Codex <codex@openai.com>

agent/context_manager/manager.py CHANGED
@@ -79,6 +79,23 @@ _COMPACT_PROMPT = (
79
  "will be have to be filled in."
80
  )
81
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82
  # Used when seeding a brand-new session from prior browser-cached messages.
83
  # Here we're writing a note to *ourselves* — so preserve the tool-call trail,
84
  # files produced, and planned next steps in first person. Optimized for
@@ -240,8 +257,6 @@ class ContextManager:
240
  """Add a message to the history"""
241
  if token_count:
242
  self.running_context_usage = token_count
243
- if not getattr(message, "timestamp", None):
244
- message.timestamp = datetime.now().isoformat()
245
  self.items.append(message)
246
  if self.on_message_added:
247
  self.on_message_added(message)
@@ -314,7 +329,6 @@ class ContextManager:
314
  content="Tool was not executed (interrupted or error).",
315
  tool_call_id=tc.id,
316
  name=tc.function.name,
317
- timestamp=datetime.now().isoformat(),
318
  )
319
  )
320
 
@@ -374,6 +388,81 @@ class ContextManager:
374
  def needs_compaction(self) -> bool:
375
  return self.running_context_usage > self.compaction_threshold and bool(self.items)
376
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
377
  async def compact(
378
  self,
379
  model_name: str,
@@ -386,6 +475,13 @@ class ContextManager:
386
  ``session`` is optional — if passed, the underlying summarization
387
  LLM call is recorded via ``telemetry.record_llm_call(kind=
388
  "compaction")`` so its cost shows up in ``total_cost_usd``.
 
 
 
 
 
 
 
389
  """
390
  if not self.needs_compaction:
391
  return
@@ -409,12 +505,45 @@ class ContextManager:
409
  idx = len(self.items) - self.untouched_messages
410
  while idx > 1 and self.items[idx].role != "user":
411
  idx -= 1
 
 
 
 
 
 
 
 
 
412
 
413
  recent_messages = self.items[idx:]
414
  messages_to_summarize = self.items[first_user_idx + 1:idx]
415
 
416
- # improbable, messages would have to very long
 
 
 
 
 
 
 
 
 
 
 
 
 
417
  if not messages_to_summarize:
 
 
 
 
 
 
 
 
 
 
 
418
  return
419
 
420
  summary, completion_tokens = await summarize_messages(
@@ -430,7 +559,6 @@ class ContextManager:
430
  summarized_message = Message(
431
  role="assistant",
432
  content=summary,
433
- timestamp=datetime.now().isoformat(),
434
  )
435
 
436
  # Reconstruct: system + first user msg + summary + recent messages
@@ -439,16 +567,19 @@ class ContextManager:
439
  head.append(first_user_msg)
440
  self.items = head + [summarized_message] + recent_messages
441
 
442
- # Count the actual post-compact context — system prompt + first user
443
- # turn + summary + the preserved tail all contribute, not just the
444
- # summary. litellm.token_counter uses the model's real tokenizer.
445
- from litellm import token_counter
446
-
447
- try:
448
- self.running_context_usage = token_counter(
449
- model=model_name,
450
- messages=[m.model_dump() for m in self.items],
 
 
 
 
 
 
451
  )
452
- except Exception as e:
453
- logger.warning("token_counter failed post-compact (%s); falling back to rough estimate", e)
454
- self.running_context_usage = len(self.system_prompt) // 4 + completion_tokens
 
79
  "will be have to be filled in."
80
  )
81
 
82
+ # Per-message ceiling. If a single message in the "untouched" tail is larger
83
+ # than this, compaction can't recover even after summarizing the middle —
84
+ # producing the infinite compaction loop seen 2026-05-03 in pod logs (200k
85
+ # context shrinks to 200k+ because one tool output is 80k tokens). We replace
86
+ # such messages with a placeholder before compaction runs.
87
+ _MAX_TOKENS_PER_MESSAGE = 50_000
88
+
89
+
90
+ class CompactionFailedError(Exception):
91
+ """Raised when compaction can't reduce context below the threshold.
92
+
93
+ Typically means an individual preserved message (system, first user, or
94
+ untouched tail) exceeds what truncation can fix in one pass. The caller
95
+ must terminate the session — retrying produces an infinite loop that
96
+ burns Bedrock budget for free (~$3 per re-attempt on Opus).
97
+ """
98
+
99
  # Used when seeding a brand-new session from prior browser-cached messages.
100
  # Here we're writing a note to *ourselves* — so preserve the tool-call trail,
101
  # files produced, and planned next steps in first person. Optimized for
 
257
  """Add a message to the history"""
258
  if token_count:
259
  self.running_context_usage = token_count
 
 
260
  self.items.append(message)
261
  if self.on_message_added:
262
  self.on_message_added(message)
 
329
  content="Tool was not executed (interrupted or error).",
330
  tool_call_id=tc.id,
331
  name=tc.function.name,
 
332
  )
333
  )
334
 
 
388
  def needs_compaction(self) -> bool:
389
  return self.running_context_usage > self.compaction_threshold and bool(self.items)
390
 
391
+ def _truncate_oversized(
392
+ self, messages: list[Message], model_name: str
393
+ ) -> list[Message]:
394
+ """Replace any message > _MAX_TOKENS_PER_MESSAGE with a placeholder.
395
+
396
+ These are typically tool outputs (CSV dumps, file contents) sitting in
397
+ the untouched tail or first-user position that compaction can't shrink
398
+ — they pass through verbatim, keeping context above threshold and
399
+ triggering an infinite compaction retry loop.
400
+ """
401
+ from litellm import token_counter
402
+
403
+ out: list[Message] = []
404
+ for msg in messages:
405
+ # System messages are sacred — they're the agent's instructions.
406
+ # In edge cases (items < untouched_messages), the slice math in
407
+ # compact() can let items[0] (the system message) leak into the
408
+ # recent_messages list. Defense-in-depth: never truncate it.
409
+ if msg.role == "system":
410
+ out.append(msg)
411
+ continue
412
+ try:
413
+ n = token_counter(model=model_name, messages=[msg.model_dump()])
414
+ except Exception:
415
+ # token_counter occasionally fails on edge-case content;
416
+ # don't drop the message, just keep it as-is.
417
+ out.append(msg)
418
+ continue
419
+ if n <= _MAX_TOKENS_PER_MESSAGE:
420
+ out.append(msg)
421
+ continue
422
+ placeholder = (
423
+ f"[truncated for compaction — original was {n} tokens, "
424
+ f"removed to keep context under {self.compaction_threshold} tokens]"
425
+ )
426
+ logger.warning(
427
+ "Truncating %s message: %d -> %d tokens for compaction",
428
+ msg.role, n, len(placeholder) // 4,
429
+ )
430
+ # Preserve all known assistant-side fields (tool_calls, thinking_blocks,
431
+ # reasoning_content, provider_specific_fields) even when content is
432
+ # replaced. Anthropic extended-thinking models reject the next request
433
+ # with "Invalid signature in thinking block" if thinking_blocks is
434
+ # dropped from a prior assistant message.
435
+ kept = {
436
+ k: getattr(msg, k, None)
437
+ for k in (
438
+ "tool_call_id",
439
+ "tool_calls",
440
+ "name",
441
+ "thinking_blocks",
442
+ "reasoning_content",
443
+ "provider_specific_fields",
444
+ )
445
+ if getattr(msg, k, None) is not None
446
+ }
447
+ out.append(Message(role=msg.role, content=placeholder, **kept))
448
+ return out
449
+
450
+ def _recompute_usage(self, model_name: str) -> None:
451
+ """Refresh ``running_context_usage`` from current items via real tokenizer."""
452
+ from litellm import token_counter
453
+
454
+ try:
455
+ self.running_context_usage = token_counter(
456
+ model=model_name,
457
+ messages=[m.model_dump() for m in self.items],
458
+ )
459
+ except Exception as e:
460
+ logger.warning("token_counter failed (%s); rough estimate", e)
461
+ # Rough fallback: 4 chars per token.
462
+ self.running_context_usage = sum(
463
+ len(getattr(m, "content", "") or "") for m in self.items
464
+ ) // 4
465
+
466
  async def compact(
467
  self,
468
  model_name: str,
 
475
  ``session`` is optional — if passed, the underlying summarization
476
  LLM call is recorded via ``telemetry.record_llm_call(kind=
477
  "compaction")`` so its cost shows up in ``total_cost_usd``.
478
+
479
+ Raises ``CompactionFailedError`` if the post-compact context is still
480
+ over the threshold. This happens when a preserved message (typically
481
+ a giant tool output stuck in the untouched tail) is too large for
482
+ truncation to fix. The caller must terminate the session — retrying
483
+ is what caused the 2026-05-03 infinite-compaction-loop pattern that
484
+ burned Bedrock budget invisibly.
485
  """
486
  if not self.needs_compaction:
487
  return
 
505
  idx = len(self.items) - self.untouched_messages
506
  while idx > 1 and self.items[idx].role != "user":
507
  idx -= 1
508
+ # The real invariant is "idx must be strictly after first_user_idx,
509
+ # otherwise recent_messages overlaps with the messages we put in
510
+ # head". The walk-back's `idx > 1` guard is necessary (no system in
511
+ # recent) but insufficient (first_user is also in head and would be
512
+ # duplicated). Anthropic API rejects two consecutive user messages
513
+ # with a 400 — bot review on PR #213 caught this on the second clamp
514
+ # iteration.
515
+ if idx <= first_user_idx:
516
+ idx = first_user_idx + 1
517
 
518
  recent_messages = self.items[idx:]
519
  messages_to_summarize = self.items[first_user_idx + 1:idx]
520
 
521
+ # Truncate any message that's larger than _MAX_TOKENS_PER_MESSAGE in
522
+ # the parts we PRESERVE through compaction (first_user + recent_tail).
523
+ # These are the only places where individual messages can defeat
524
+ # compaction by being intrinsically too large. Messages in
525
+ # ``messages_to_summarize`` are folded into the summary, so their size
526
+ # doesn't matter on its own.
527
+ if first_user_msg is not None:
528
+ truncated = self._truncate_oversized([first_user_msg], model_name)
529
+ first_user_msg = truncated[0]
530
+ recent_messages = self._truncate_oversized(recent_messages, model_name)
531
+
532
+ # If there's nothing to summarize but the preserved messages are now
533
+ # truncated and small, just rebuild and recompute. This is rare but
534
+ # avoids returning silently with the old (over-threshold) state.
535
  if not messages_to_summarize:
536
+ head = [system_msg] if system_msg else []
537
+ if first_user_msg:
538
+ head.append(first_user_msg)
539
+ self.items = head + recent_messages
540
+ self._recompute_usage(model_name)
541
+ if self.running_context_usage > self.compaction_threshold:
542
+ raise CompactionFailedError(
543
+ f"Nothing to summarize but context ({self.running_context_usage}) "
544
+ f"still over threshold ({self.compaction_threshold}) after truncation. "
545
+ f"System prompt or first user message likely exceeds the budget."
546
+ )
547
  return
548
 
549
  summary, completion_tokens = await summarize_messages(
 
559
  summarized_message = Message(
560
  role="assistant",
561
  content=summary,
 
562
  )
563
 
564
  # Reconstruct: system + first user msg + summary + recent messages
 
567
  head.append(first_user_msg)
568
  self.items = head + [summarized_message] + recent_messages
569
 
570
+ self._recompute_usage(model_name)
571
+
572
+ # Hard verify: if compaction didn't bring us below the threshold even
573
+ # after truncating oversized preserved messages, retrying just burns
574
+ # Bedrock budget on the same useless compaction call. Raise so the
575
+ # caller can terminate the session cleanly. Pre-2026-05-04, the
576
+ # caller looped indefinitely (~$3/Opus retry) until the pod was
577
+ # killed — invisible to the dataset because the session never
578
+ # finished cleanly.
579
+ if self.running_context_usage > self.compaction_threshold:
580
+ raise CompactionFailedError(
581
+ f"Compaction ineffective: {self.running_context_usage} tokens "
582
+ f"still over threshold {self.compaction_threshold} after summarize "
583
+ f"and truncation. Likely the system prompt + first user + summary "
584
+ f"+ truncated tail still exceeds budget."
585
  )
 
 
 
agent/core/agent_loop.py CHANGED
@@ -516,19 +516,56 @@ def _friendly_error_message(error: Exception) -> str | None:
516
 
517
 
518
  async def _compact_and_notify(session: Session) -> None:
519
- """Run compaction and send event if context was reduced."""
 
 
 
 
 
 
 
 
 
520
  cm = session.context_manager
521
  old_usage = cm.running_context_usage
522
  logger.debug(
523
  "Compaction check: usage=%d, max=%d, threshold=%d, needs_compact=%s",
524
  old_usage, cm.model_max_tokens, cm.compaction_threshold, cm.needs_compaction,
525
  )
526
- await cm.compact(
527
- model_name=session.config.model_name,
528
- tool_specs=session.tool_router.get_tool_specs_for_llm(),
529
- hf_token=session.hf_token,
530
- session=session,
531
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
532
  new_usage = cm.running_context_usage
533
  if new_usage != old_usage:
534
  logger.warning(
@@ -1035,8 +1072,15 @@ class Handlers:
1035
  if session.is_cancelled:
1036
  break
1037
 
1038
- # Compact before calling the LLM if context is near the limit
 
 
 
 
 
1039
  await _compact_and_notify(session)
 
 
1040
 
1041
  # Doom-loop detection: break out of repeated tool call patterns
1042
  doom_prompt = check_for_doom_loop(session.context_manager.items)
@@ -1421,7 +1465,7 @@ class Handlers:
1421
  iteration += 1
1422
 
1423
  except ContextWindowExceededError:
1424
- # Force compact and retry this iteration
1425
  cm = session.context_manager
1426
  logger.warning(
1427
  "ContextWindowExceededError at iteration %d — forcing compaction "
@@ -1430,6 +1474,12 @@ class Handlers:
1430
  )
1431
  cm.running_context_usage = cm.model_max_tokens + 1
1432
  await _compact_and_notify(session)
 
 
 
 
 
 
1433
  continue
1434
 
1435
  except Exception as e:
 
516
 
517
 
518
  async def _compact_and_notify(session: Session) -> None:
519
+ """Run compaction and send event if context was reduced.
520
+
521
+ Catches ``CompactionFailedError`` and ends the session cleanly instead
522
+ of letting the caller retry. Pre-2026-05-04 the caller looped on
523
+ ContextWindowExceededError → compact → re-trigger, burning Bedrock
524
+ budget at ~$3/Opus retry while the session never reached the upload
525
+ path (so the cost was invisible in the dataset).
526
+ """
527
+ from agent.context_manager.manager import CompactionFailedError
528
+
529
  cm = session.context_manager
530
  old_usage = cm.running_context_usage
531
  logger.debug(
532
  "Compaction check: usage=%d, max=%d, threshold=%d, needs_compact=%s",
533
  old_usage, cm.model_max_tokens, cm.compaction_threshold, cm.needs_compaction,
534
  )
535
+ try:
536
+ await cm.compact(
537
+ model_name=session.config.model_name,
538
+ tool_specs=session.tool_router.get_tool_specs_for_llm(),
539
+ hf_token=session.hf_token,
540
+ session=session,
541
+ )
542
+ except CompactionFailedError as e:
543
+ logger.error(
544
+ "Compaction failed for session %s: %s — terminating session",
545
+ session.session_id, e,
546
+ )
547
+ # Persist the failure event so the dataset has a record of WHY this
548
+ # session ended (and the cost it incurred up to that point) even if
549
+ # save_and_upload_detached has issues downstream.
550
+ await session.send_event(Event(
551
+ event_type="session_terminated",
552
+ data={
553
+ "reason": "compaction_failed",
554
+ "context_usage": cm.running_context_usage,
555
+ "context_threshold": cm.compaction_threshold,
556
+ "error": str(e)[:300],
557
+ "user_message": (
558
+ "Your conversation has grown too large to continue. "
559
+ "The work you've done is saved — start a new session to keep going."
560
+ ),
561
+ },
562
+ ))
563
+ # Stop the agent loop; the finally in _run_session will fire
564
+ # cleanup_sandbox + save_trajectory so the dataset captures
565
+ # everything that did happen.
566
+ session.is_running = False
567
+ return
568
+
569
  new_usage = cm.running_context_usage
570
  if new_usage != old_usage:
571
  logger.warning(
 
1072
  if session.is_cancelled:
1073
  break
1074
 
1075
+ # Compact before calling the LLM if context is near the limit.
1076
+ # When _compact_and_notify catches CompactionFailedError it sets
1077
+ # session.is_running = False; we MUST exit the loop here, otherwise
1078
+ # the LLM call below fires with an over-threshold context, hits
1079
+ # ContextWindowExceededError, and we end up looping again on the
1080
+ # except path — exactly the bug this PR is supposed to fix.
1081
  await _compact_and_notify(session)
1082
+ if not session.is_running:
1083
+ break
1084
 
1085
  # Doom-loop detection: break out of repeated tool call patterns
1086
  doom_prompt = check_for_doom_loop(session.context_manager.items)
 
1465
  iteration += 1
1466
 
1467
  except ContextWindowExceededError:
1468
+ # Force compact and retry this iteration.
1469
  cm = session.context_manager
1470
  logger.warning(
1471
  "ContextWindowExceededError at iteration %d — forcing compaction "
 
1474
  )
1475
  cm.running_context_usage = cm.model_max_tokens + 1
1476
  await _compact_and_notify(session)
1477
+ # Same guard as the top of the loop: if compaction couldn't
1478
+ # bring us under threshold, _compact_and_notify has already
1479
+ # emitted session_terminated and set is_running=False. Continue
1480
+ # would just re-call the LLM with the same too-big context.
1481
+ if not session.is_running:
1482
+ break
1483
  continue
1484
 
1485
  except Exception as e:
agent/tools/sandbox_client.py CHANGED
@@ -65,6 +65,7 @@ MAX_TIMEOUT = 1200
65
  WAIT_TIMEOUT = 600
66
  WAIT_INTERVAL = 5
67
  API_WAIT_TIMEOUT = 180
 
68
 
69
 
70
  def _is_transient_space_visibility_error(error: Exception) -> bool:
@@ -75,6 +76,59 @@ def _is_transient_space_visibility_error(error: Exception) -> bool:
75
  message = str(error)
76
  return "Repository Not Found" in message or "404 Client Error" in message
77
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78
  _DOCKERFILE = """\
79
  FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim
80
 
@@ -627,10 +681,13 @@ class Sandbox:
627
  # Some template duplicates can initially inherit the template hardware.
628
  # Explicitly request the target tier so automatic CPU sandboxes never
629
  # silently come up on GPU hardware.
630
- api.request_space_hardware(
 
631
  space_id,
632
  hardware=hardware,
633
  sleep_time=sleep_time,
 
 
634
  )
635
  _log(f"Requested hardware: {hardware}")
636
 
 
65
  WAIT_TIMEOUT = 600
66
  WAIT_INTERVAL = 5
67
  API_WAIT_TIMEOUT = 180
68
+ HARDWARE_REQUEST_TIMEOUT = 60
69
 
70
 
71
  def _is_transient_space_visibility_error(error: Exception) -> bool:
 
76
  message = str(error)
77
  return "Repository Not Found" in message or "404 Client Error" in message
78
 
79
+
80
+ def _is_transient_space_management_error(error: Exception) -> bool:
81
+ """Return True when a just-created private Space is not manageable yet."""
82
+ response = getattr(error, "response", None)
83
+ if getattr(response, "status_code", None) in {401, 404}:
84
+ return True
85
+ message = str(error)
86
+ return (
87
+ "Repository Not Found" in message
88
+ or "401 Client Error" in message
89
+ or "404 Client Error" in message
90
+ )
91
+
92
+
93
+ def _request_space_hardware_with_retry(
94
+ api: HfApi,
95
+ space_id: str,
96
+ *,
97
+ hardware: str,
98
+ sleep_time: int | None,
99
+ log: Callable[[str], object],
100
+ check_cancel: Callable[[], object],
101
+ ) -> None:
102
+ """Request hardware, retrying while Hub permissions propagate for a new Space."""
103
+ deadline = time.time() + HARDWARE_REQUEST_TIMEOUT
104
+ attempt = 0
105
+ while True:
106
+ check_cancel()
107
+ try:
108
+ api.request_space_hardware(
109
+ space_id,
110
+ hardware=hardware,
111
+ sleep_time=sleep_time,
112
+ )
113
+ return
114
+ except Exception as e:
115
+ if not _is_transient_space_management_error(e):
116
+ raise
117
+
118
+ remaining = deadline - time.time()
119
+ if remaining <= 0:
120
+ raise
121
+
122
+ attempt += 1
123
+ status_code = getattr(getattr(e, "response", None), "status_code", None)
124
+ status = f"HTTP {status_code}" if status_code else type(e).__name__
125
+ log(
126
+ f" Hardware request not accepted yet ({status}); "
127
+ f"retrying ({attempt})..."
128
+ )
129
+ time.sleep(min(WAIT_INTERVAL, remaining))
130
+
131
+
132
  _DOCKERFILE = """\
133
  FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim
134
 
 
681
  # Some template duplicates can initially inherit the template hardware.
682
  # Explicitly request the target tier so automatic CPU sandboxes never
683
  # silently come up on GPU hardware.
684
+ _request_space_hardware_with_retry(
685
+ api,
686
  space_id,
687
  hardware=hardware,
688
  sleep_time=sleep_time,
689
+ log=_log,
690
+ check_cancel=_check_cancel,
691
  )
692
  _log(f"Requested hardware: {hardware}")
693
 
agent/tools/sandbox_tool.py CHANGED
@@ -120,6 +120,49 @@ async def _seed_trackio_dashboard_safe(session: Any, space_id: str) -> None:
120
  _log(f"trackio dashboard seed failed: {e}")
121
 
122
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
123
  # ── Tool name mapping (short agent names → Sandbox client names) ──────
124
 
125
 
@@ -313,6 +356,7 @@ async def _create_sandbox_locked(
313
  session.sandbox = sb
314
  session.sandbox_hardware = hardware
315
  session.sandbox_preload_error = None
 
316
 
317
  # Telemetry: sandbox creation (infra consumption signal)
318
  from agent.core import telemetry
@@ -448,28 +492,38 @@ async def teardown_session_sandbox(session: Any) -> None:
448
  session.sandbox = None
449
  session.sandbox_hardware = None
450
 
451
- if not (sandbox and getattr(sandbox, "_owns_space", False)):
452
  return
453
 
454
- space_id = getattr(sandbox, "space_id", None)
455
- last_err: Exception | None = None
456
- for attempt in range(3):
457
- try:
458
- logger.info("Deleting sandbox %s (attempt %s/3)...", space_id, attempt + 1)
459
- await asyncio.to_thread(sandbox.delete)
460
- from agent.core import telemetry
461
- await telemetry.record_sandbox_destroy(session, sandbox)
462
  return
463
- except Exception as e:
464
- last_err = e
465
- if attempt < 2:
466
- await asyncio.sleep(2 ** attempt)
467
- logger.error(
468
- "Failed to delete sandbox %s after 3 attempts: %s. "
469
- "Orphan sweep script will pick it up.",
470
- space_id,
471
- last_err,
472
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
473
 
474
 
475
  # ── sandbox_create tool ──────────────────────────────────────────────
 
120
  _log(f"trackio dashboard seed failed: {e}")
121
 
122
 
123
+ async def _update_persisted_sandbox_fields(session: Any, **fields: Any) -> None:
124
+ """Best-effort update of sandbox metadata on the durable session record."""
125
+ store = getattr(session, "persistence_store", None)
126
+ session_id = getattr(session, "session_id", None)
127
+ if not (store and session_id and hasattr(store, "update_session_fields")):
128
+ return
129
+ try:
130
+ await store.update_session_fields(session_id, **fields)
131
+ except Exception as e:
132
+ logger.warning("Failed to persist sandbox metadata for %s: %s", session_id, e)
133
+
134
+
135
+ async def _persist_active_sandbox(
136
+ session: Any,
137
+ sandbox: Sandbox,
138
+ *,
139
+ hardware: str,
140
+ ) -> None:
141
+ space_id = getattr(sandbox, "space_id", None)
142
+ if not space_id:
143
+ return
144
+ owner = space_id.split("/", 1)[0] if "/" in space_id else None
145
+ await _update_persisted_sandbox_fields(
146
+ session,
147
+ sandbox_space_id=space_id,
148
+ sandbox_hardware=hardware,
149
+ sandbox_owner=owner,
150
+ sandbox_created_at=datetime.now(timezone.utc),
151
+ sandbox_status="active",
152
+ )
153
+
154
+
155
+ async def _clear_persisted_sandbox(session: Any) -> None:
156
+ await _update_persisted_sandbox_fields(
157
+ session,
158
+ sandbox_space_id=None,
159
+ sandbox_hardware=None,
160
+ sandbox_owner=None,
161
+ sandbox_created_at=None,
162
+ sandbox_status="destroyed",
163
+ )
164
+
165
+
166
  # ── Tool name mapping (short agent names → Sandbox client names) ──────
167
 
168
 
 
356
  session.sandbox = sb
357
  session.sandbox_hardware = hardware
358
  session.sandbox_preload_error = None
359
+ await _persist_active_sandbox(session, sb, hardware=hardware)
360
 
361
  # Telemetry: sandbox creation (infra consumption signal)
362
  from agent.core import telemetry
 
492
  session.sandbox = None
493
  session.sandbox_hardware = None
494
 
495
+ if not sandbox:
496
  return
497
 
498
+ try:
499
+ if not getattr(sandbox, "_owns_space", False):
 
 
 
 
 
 
500
  return
501
+
502
+ space_id = getattr(sandbox, "space_id", None)
503
+ last_err: Exception | None = None
504
+ for attempt in range(3):
505
+ try:
506
+ logger.info(
507
+ "Deleting sandbox %s (attempt %s/3)...",
508
+ space_id,
509
+ attempt + 1,
510
+ )
511
+ await asyncio.to_thread(sandbox.delete)
512
+ from agent.core import telemetry
513
+ await telemetry.record_sandbox_destroy(session, sandbox)
514
+ return
515
+ except Exception as e:
516
+ last_err = e
517
+ if attempt < 2:
518
+ await asyncio.sleep(2 ** attempt)
519
+ logger.error(
520
+ "Failed to delete sandbox %s after 3 attempts: %s. "
521
+ "Orphan — sweep script will pick it up.",
522
+ space_id,
523
+ last_err,
524
+ )
525
+ finally:
526
+ await _clear_persisted_sandbox(session)
527
 
528
 
529
  # ── sandbox_create tool ──────────────────────────────────────────────
backend/routes/agent.py CHANGED
@@ -213,6 +213,7 @@ async def _check_session_access(
213
  session_id: str,
214
  user: dict[str, Any],
215
  request: Request | None = None,
 
216
  ) -> AgentSession:
217
  """Verify and lazily load the user's session. Raises 403 or 404."""
218
  hf_token = resolve_hf_request_token(request) if request is not None else user.get("hf_token")
@@ -221,6 +222,7 @@ async def _check_session_access(
221
  user["user_id"],
222
  hf_token=hf_token,
223
  hf_username=user.get("username"),
 
224
  )
225
  if not agent_session:
226
  raise HTTPException(status_code=404, detail="Session not found")
@@ -605,7 +607,7 @@ async def teardown_session_sandbox(
605
  session_id: str, user: dict = Depends(get_current_user)
606
  ) -> dict:
607
  """Best-effort sandbox teardown that preserves durable chat history."""
608
- await _check_session_access(session_id, user)
609
  task = asyncio.create_task(session_manager.teardown_sandbox(session_id))
610
  _background_teardown_tasks.add(task)
611
  task.add_done_callback(_background_teardown_tasks.discard)
@@ -617,7 +619,7 @@ async def delete_session(
617
  session_id: str, user: dict = Depends(get_current_user)
618
  ) -> dict:
619
  """Delete a session. Only accessible by the session owner."""
620
- await _check_session_access(session_id, user)
621
  success = await session_manager.delete_session(session_id)
622
  if not success:
623
  raise HTTPException(status_code=404, detail="Session not found")
 
213
  session_id: str,
214
  user: dict[str, Any],
215
  request: Request | None = None,
216
+ preload_sandbox: bool = True,
217
  ) -> AgentSession:
218
  """Verify and lazily load the user's session. Raises 403 or 404."""
219
  hf_token = resolve_hf_request_token(request) if request is not None else user.get("hf_token")
 
222
  user["user_id"],
223
  hf_token=hf_token,
224
  hf_username=user.get("username"),
225
+ preload_sandbox=preload_sandbox,
226
  )
227
  if not agent_session:
228
  raise HTTPException(status_code=404, detail="Session not found")
 
607
  session_id: str, user: dict = Depends(get_current_user)
608
  ) -> dict:
609
  """Best-effort sandbox teardown that preserves durable chat history."""
610
+ await _check_session_access(session_id, user, preload_sandbox=False)
611
  task = asyncio.create_task(session_manager.teardown_sandbox(session_id))
612
  _background_teardown_tasks.add(task)
613
  task.add_done_callback(_background_teardown_tasks.discard)
 
619
  session_id: str, user: dict = Depends(get_current_user)
620
  ) -> dict:
621
  """Delete a session. Only accessible by the session owner."""
622
+ await _check_session_access(session_id, user, preload_sandbox=False)
623
  success = await session_manager.delete_session(session_id)
624
  if not success:
625
  raise HTTPException(status_code=404, detail="Session not found")
backend/session_manager.py CHANGED
@@ -3,6 +3,7 @@
3
  import asyncio
4
  import json
5
  import logging
 
6
  import uuid
7
  from dataclasses import dataclass, field
8
  from datetime import datetime
@@ -117,6 +118,8 @@ class SessionCapacityError(Exception):
117
  MAX_SESSIONS: int = 200
118
  MAX_SESSIONS_PER_USER: int = 10
119
  DEFAULT_YOLO_COST_CAP_USD: float = 5.0
 
 
120
 
121
 
122
  class SessionManager:
@@ -137,6 +140,7 @@ class SessionManager:
137
 
138
  async def close(self) -> None:
139
  """Flush and close shared background resources."""
 
140
  await self.messaging_gateway.close()
141
  if self.persistence_store is not None:
142
  await self.persistence_store.close()
@@ -372,6 +376,89 @@ class SessionManager:
372
  agent_session.hf_username = hf_username
373
  agent_session.session.hf_username = hf_username
374
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
375
  async def persist_session_snapshot(
376
  self,
377
  agent_session: AgentSession,
@@ -427,6 +514,7 @@ class SessionManager:
427
  user_id: str,
428
  hf_token: str | None = None,
429
  hf_username: str | None = None,
 
430
  ) -> AgentSession | None:
431
  """Return a live runtime session, lazily restoring it from Mongo."""
432
  async with self._lock:
@@ -463,6 +551,12 @@ class SessionManager:
463
  if user_id != "dev" and owner != "dev" and owner != user_id:
464
  return None
465
 
 
 
 
 
 
 
466
  from litellm import Message
467
 
468
  model = meta.get("model") or self.config.model_name
@@ -533,7 +627,8 @@ class SessionManager:
533
  hf_username=hf_username,
534
  )
535
  return started
536
- self._start_cpu_sandbox_preload(agent_session)
 
537
  logger.info("Restored session %s for user %s", session_id, owner or user_id)
538
  return agent_session
539
 
@@ -614,8 +709,8 @@ class SessionManager:
614
  event_queue=event_queue,
615
  tool_router=tool_router,
616
  )
617
- self._start_cpu_sandbox_preload(agent_session)
618
  await self.persist_session_snapshot(agent_session, runtime_state="idle")
 
619
 
620
  if is_pro is not None and user_id and user_id != "dev":
621
  await self._track_pro_status(agent_session, is_pro=is_pro)
@@ -725,6 +820,42 @@ class SessionManager:
725
 
726
  await teardown_session_sandbox(session)
727
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
728
  async def _run_session(
729
  self,
730
  session_id: str,
 
3
  import asyncio
4
  import json
5
  import logging
6
+ import os
7
  import uuid
8
  from dataclasses import dataclass, field
9
  from datetime import datetime
 
118
  MAX_SESSIONS: int = 200
119
  MAX_SESSIONS_PER_USER: int = 10
120
  DEFAULT_YOLO_COST_CAP_USD: float = 5.0
121
+ SANDBOX_SHUTDOWN_CLEANUP_CONCURRENCY: int = 10
122
+ SANDBOX_SHUTDOWN_CLEANUP_TIMEOUT_S: float = 60.0
123
 
124
 
125
  class SessionManager:
 
140
 
141
  async def close(self) -> None:
142
  """Flush and close shared background resources."""
143
+ await self._cleanup_all_sandboxes_on_close()
144
  await self.messaging_gateway.close()
145
  if self.persistence_store is not None:
146
  await self.persistence_store.close()
 
376
  agent_session.hf_username = hf_username
377
  agent_session.session.hf_username = hf_username
378
 
379
+ async def _clear_persisted_sandbox_metadata(self, session_id: str) -> None:
380
+ try:
381
+ await self._store().update_session_fields(
382
+ session_id,
383
+ sandbox_space_id=None,
384
+ sandbox_hardware=None,
385
+ sandbox_owner=None,
386
+ sandbox_created_at=None,
387
+ sandbox_status="destroyed",
388
+ )
389
+ except Exception as e:
390
+ logger.warning("Failed to clear sandbox metadata for %s: %s", session_id, e)
391
+
392
+ async def _cleanup_persisted_sandbox(
393
+ self,
394
+ session_id: str,
395
+ metadata: dict[str, Any],
396
+ *,
397
+ hf_token: str | None,
398
+ ) -> None:
399
+ """Delete a sandbox recorded by a previous backend process, if any."""
400
+ space_id = metadata.get("sandbox_space_id")
401
+ if not isinstance(space_id, str) or not space_id:
402
+ return
403
+ if metadata.get("sandbox_status") == "destroyed":
404
+ return
405
+
406
+ tokens: list[tuple[str, str]] = []
407
+ seen: set[str] = set()
408
+ for label, token in (
409
+ ("user", hf_token),
410
+ ("admin", os.environ.get("HF_ADMIN_TOKEN")),
411
+ ):
412
+ if token and token not in seen:
413
+ tokens.append((label, token))
414
+ seen.add(token)
415
+
416
+ if not tokens:
417
+ logger.warning(
418
+ "Cannot clean persisted sandbox %s for session %s: no HF token available",
419
+ space_id,
420
+ session_id,
421
+ )
422
+ return
423
+
424
+ last_err: Exception | None = None
425
+ for label, token in tokens:
426
+ try:
427
+ from huggingface_hub import HfApi
428
+
429
+ api = HfApi(token=token)
430
+ await asyncio.to_thread(
431
+ api.delete_repo,
432
+ repo_id=space_id,
433
+ repo_type="space",
434
+ )
435
+ logger.info(
436
+ "Deleted persisted sandbox %s for session %s with %s token",
437
+ space_id,
438
+ session_id,
439
+ label,
440
+ )
441
+ await self._clear_persisted_sandbox_metadata(session_id)
442
+ return
443
+ except Exception as e:
444
+ status_code = getattr(getattr(e, "response", None), "status_code", None)
445
+ if status_code == 404:
446
+ logger.info(
447
+ "Persisted sandbox %s for session %s is already gone",
448
+ space_id,
449
+ session_id,
450
+ )
451
+ await self._clear_persisted_sandbox_metadata(session_id)
452
+ return
453
+ last_err = e
454
+
455
+ logger.warning(
456
+ "Failed to delete persisted sandbox %s for session %s: %s",
457
+ space_id,
458
+ session_id,
459
+ last_err,
460
+ )
461
+
462
  async def persist_session_snapshot(
463
  self,
464
  agent_session: AgentSession,
 
514
  user_id: str,
515
  hf_token: str | None = None,
516
  hf_username: str | None = None,
517
+ preload_sandbox: bool = True,
518
  ) -> AgentSession | None:
519
  """Return a live runtime session, lazily restoring it from Mongo."""
520
  async with self._lock:
 
551
  if user_id != "dev" and owner != "dev" and owner != user_id:
552
  return None
553
 
554
+ await self._cleanup_persisted_sandbox(
555
+ session_id,
556
+ meta,
557
+ hf_token=hf_token,
558
+ )
559
+
560
  from litellm import Message
561
 
562
  model = meta.get("model") or self.config.model_name
 
627
  hf_username=hf_username,
628
  )
629
  return started
630
+ if preload_sandbox:
631
+ self._start_cpu_sandbox_preload(agent_session)
632
  logger.info("Restored session %s for user %s", session_id, owner or user_id)
633
  return agent_session
634
 
 
709
  event_queue=event_queue,
710
  tool_router=tool_router,
711
  )
 
712
  await self.persist_session_snapshot(agent_session, runtime_state="idle")
713
+ self._start_cpu_sandbox_preload(agent_session)
714
 
715
  if is_pro is not None and user_id and user_id != "dev":
716
  await self._track_pro_status(agent_session, is_pro=is_pro)
 
820
 
821
  await teardown_session_sandbox(session)
822
 
823
+ async def _cleanup_all_sandboxes_on_close(self) -> None:
824
+ """Best-effort sandbox cleanup for graceful backend shutdown."""
825
+ async with self._lock:
826
+ agent_sessions = list(self.sessions.values())
827
+ if not agent_sessions:
828
+ return
829
+
830
+ semaphore = asyncio.Semaphore(SANDBOX_SHUTDOWN_CLEANUP_CONCURRENCY)
831
+
832
+ async def _cleanup_one(agent_session: AgentSession) -> None:
833
+ async with semaphore:
834
+ try:
835
+ await self._cleanup_sandbox(agent_session.session)
836
+ except Exception as e:
837
+ logger.warning(
838
+ "Shutdown sandbox cleanup failed for %s: %s",
839
+ agent_session.session_id,
840
+ e,
841
+ )
842
+
843
+ tasks = [
844
+ asyncio.create_task(_cleanup_one(agent_session))
845
+ for agent_session in agent_sessions
846
+ ]
847
+ try:
848
+ await asyncio.wait_for(
849
+ asyncio.gather(*tasks, return_exceptions=True),
850
+ timeout=SANDBOX_SHUTDOWN_CLEANUP_TIMEOUT_S,
851
+ )
852
+ except asyncio.TimeoutError:
853
+ logger.warning(
854
+ "Timed out after %.0fs cleaning up sandboxes on shutdown; "
855
+ "orphan sweeper will handle any stragglers",
856
+ SANDBOX_SHUTDOWN_CLEANUP_TIMEOUT_S,
857
+ )
858
+
859
  async def _run_session(
860
  self,
861
  session_id: str,
scripts/sweep_orphan_sandboxes.py CHANGED
@@ -49,17 +49,12 @@ something up to kill it.
49
  - Logs every action to stdout in JSON Lines for downstream auditing.
50
 
51
  ================================================================================
52
- Cron suggestion
53
  ================================================================================
54
 
55
- GitHub Actions, daily at 04:00 UTC:
56
 
57
- schedule:
58
- - cron: "0 4 * * *"
59
- env:
60
- HF_ADMIN_TOKEN: ${{ secrets.HF_ADMIN_TOKEN }}
61
- steps:
62
- - run: python scripts/sweep_orphan_sandboxes.py --apply --max-age-days 7
63
  """
64
 
65
  import argparse
 
49
  - Logs every action to stdout in JSON Lines for downstream auditing.
50
 
51
  ================================================================================
52
+ Manual usage
53
  ================================================================================
54
 
55
+ Run manually with an admin token when a backstop cleanup is needed:
56
 
57
+ HF_ADMIN_TOKEN=... python scripts/sweep_orphan_sandboxes.py --apply --max-age-days 7
 
 
 
 
 
58
  """
59
 
60
  import argparse
tests/unit/test_agent_model_gating.py CHANGED
@@ -1,5 +1,6 @@
1
  """Tests for gated model handling in backend/routes/agent.py."""
2
 
 
3
  import sys
4
  from pathlib import Path
5
  from types import SimpleNamespace
@@ -253,3 +254,58 @@ async def test_set_session_yolo_calls_manager_with_cap_presence(monkeypatch):
253
  },
254
  )
255
  ]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """Tests for gated model handling in backend/routes/agent.py."""
2
 
3
+ import asyncio
4
  import sys
5
  from pathlib import Path
6
  from types import SimpleNamespace
 
254
  },
255
  )
256
  ]
257
+
258
+
259
+ @pytest.mark.asyncio
260
+ async def test_delete_session_access_check_skips_sandbox_preload(monkeypatch):
261
+ ensure_calls = []
262
+ delete_calls = []
263
+
264
+ async def fake_ensure_session_loaded(session_id, user_id, **kwargs):
265
+ ensure_calls.append((session_id, user_id, kwargs))
266
+ return SimpleNamespace(user_id=user_id)
267
+
268
+ async def fake_delete_session(session_id):
269
+ delete_calls.append(session_id)
270
+ return True
271
+
272
+ monkeypatch.setattr(
273
+ agent.session_manager,
274
+ "ensure_session_loaded",
275
+ fake_ensure_session_loaded,
276
+ )
277
+ monkeypatch.setattr(agent.session_manager, "delete_session", fake_delete_session)
278
+
279
+ response = await agent.delete_session("s1", {"user_id": "u1"})
280
+
281
+ assert response == {"status": "deleted", "session_id": "s1"}
282
+ assert delete_calls == ["s1"]
283
+ assert ensure_calls[0][2]["preload_sandbox"] is False
284
+
285
+
286
+ @pytest.mark.asyncio
287
+ async def test_teardown_session_access_check_skips_sandbox_preload(monkeypatch):
288
+ ensure_calls = []
289
+ teardown_calls = []
290
+
291
+ async def fake_ensure_session_loaded(session_id, user_id, **kwargs):
292
+ ensure_calls.append((session_id, user_id, kwargs))
293
+ return SimpleNamespace(user_id=user_id)
294
+
295
+ async def fake_teardown_sandbox(session_id):
296
+ teardown_calls.append(session_id)
297
+ return True
298
+
299
+ monkeypatch.setattr(
300
+ agent.session_manager,
301
+ "ensure_session_loaded",
302
+ fake_ensure_session_loaded,
303
+ )
304
+ monkeypatch.setattr(agent.session_manager, "teardown_sandbox", fake_teardown_sandbox)
305
+
306
+ response = await agent.teardown_session_sandbox("s1", {"user_id": "u1"})
307
+ await asyncio.sleep(0)
308
+
309
+ assert response == {"status": "teardown_requested", "session_id": "s1"}
310
+ assert teardown_calls == ["s1"]
311
+ assert ensure_calls[0][2]["preload_sandbox"] is False
tests/unit/test_compaction_loop_break.py ADDED
@@ -0,0 +1,360 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Regression tests for the 2026-05-03 infinite-compaction-loop bug.
2
+
3
+ Pod logs from prod-114 showed sessions stuck retrying compaction every
4
+ few seconds because a single oversized tool output in the untouched tail
5
+ kept the post-compact context above the 90% threshold:
6
+
7
+ Context compacted: 200001 -> 215566 tokens
8
+ Context compacted: 215566 -> 215572 tokens
9
+ ContextWindowExceededError — forcing compaction
10
+ ... (continues for 5+ minutes)
11
+
12
+ These tests cover three fixes:
13
+
14
+ 1. ``_truncate_oversized`` replaces oversized message content with a
15
+ placeholder and preserves all extended-thinking metadata fields.
16
+ 2. ``compact()`` raises ``CompactionFailedError`` when the post-compact
17
+ context is still over threshold.
18
+ 3. ``_compact_and_notify`` catches the error, sets ``session.is_running
19
+ = False``, and emits a ``session_terminated`` event so callers can
20
+ exit the agent loop.
21
+
22
+ The P0 caught by PR #213 review (loop didn't actually exit on
23
+ ``is_running = False``) would have been caught by an end-to-end
24
+ behavioral test of #3 — that gap is closed by the
25
+ ``test_compact_and_notify_terminates_session`` case below.
26
+ """
27
+
28
+ from __future__ import annotations
29
+
30
+ from unittest.mock import AsyncMock, MagicMock, patch
31
+
32
+ import pytest
33
+ from litellm import Message
34
+
35
+ from agent.context_manager.manager import (
36
+ CompactionFailedError,
37
+ ContextManager,
38
+ _MAX_TOKENS_PER_MESSAGE,
39
+ )
40
+
41
+
42
+ # ── helpers ────────────────────────────────────────────────────────────
43
+
44
+
45
+ def _make_cm(
46
+ *,
47
+ model_max_tokens: int = 100_000,
48
+ compact_size: int = 1_000,
49
+ untouched_messages: int = 5,
50
+ ) -> ContextManager:
51
+ cm = ContextManager.__new__(ContextManager)
52
+ cm.system_prompt = "system"
53
+ cm.model_max_tokens = model_max_tokens
54
+ cm.compact_size = compact_size
55
+ cm.running_context_usage = 0
56
+ cm.untouched_messages = untouched_messages
57
+ cm.items = [Message(role="system", content="system")]
58
+ cm.on_message_added = None
59
+ return cm
60
+
61
+
62
+ def _msg(role: str, content: str | None = "x", **extra) -> Message:
63
+ return Message(role=role, content=content, **extra)
64
+
65
+
66
+ # ── _truncate_oversized ────────────────────────────────────────────────
67
+
68
+
69
+ def test_truncate_oversized_skips_messages_below_threshold():
70
+ cm = _make_cm()
71
+ msgs = [_msg("user", "small content")]
72
+ with patch("litellm.token_counter", return_value=100):
73
+ out = cm._truncate_oversized(msgs, "anthropic/claude-opus-4-6")
74
+ assert out == msgs # unchanged
75
+
76
+
77
+ def test_truncate_oversized_replaces_content_above_threshold():
78
+ cm = _make_cm()
79
+ big = "x" * (_MAX_TOKENS_PER_MESSAGE * 5)
80
+ msgs = [_msg("user", big)]
81
+ # token_counter returns the simulated big size for any message in this test
82
+ with patch("litellm.token_counter", return_value=_MAX_TOKENS_PER_MESSAGE * 2):
83
+ out = cm._truncate_oversized(msgs, "anthropic/claude-opus-4-6")
84
+ assert len(out) == 1
85
+ assert out[0].content != big
86
+ assert "[truncated for compaction" in out[0].content
87
+ assert str(_MAX_TOKENS_PER_MESSAGE * 2) in out[0].content
88
+
89
+
90
+ def test_truncate_oversized_preserves_thinking_blocks():
91
+ """Anthropic extended-thinking models reject the next request with
92
+ ``Invalid signature in thinking block`` if a prior assistant message
93
+ drops thinking_blocks. Truncation must keep this metadata.
94
+ """
95
+ cm = _make_cm()
96
+ big = "x" * (_MAX_TOKENS_PER_MESSAGE * 5)
97
+ thinking = [{"type": "thinking", "thinking": "...", "signature": "abc123"}]
98
+ msg = Message(role="assistant", content=big)
99
+ msg.thinking_blocks = thinking
100
+ msg.reasoning_content = "deep thought"
101
+ with patch("litellm.token_counter", return_value=_MAX_TOKENS_PER_MESSAGE * 2):
102
+ out = cm._truncate_oversized([msg], "anthropic/claude-opus-4-6")
103
+ assert getattr(out[0], "thinking_blocks", None) == thinking
104
+ assert getattr(out[0], "reasoning_content", None) == "deep thought"
105
+
106
+
107
+ def test_truncate_oversized_never_touches_system_message():
108
+ """The system prompt is the agent's instructions — must never be truncated.
109
+
110
+ Caught by the integration smoke test on PR #213: when items has fewer than
111
+ ``untouched_messages`` entries, the slice math in ``compact()`` can let
112
+ ``items[0]`` (the system message) leak into the ``recent_messages`` list
113
+ that gets passed to ``_truncate_oversized``. The function must guard
114
+ explicitly against this.
115
+ """
116
+ cm = _make_cm()
117
+ huge_system = "x" * (_MAX_TOKENS_PER_MESSAGE * 5)
118
+ msgs = [_msg("system", huge_system)]
119
+ with patch("litellm.token_counter", return_value=_MAX_TOKENS_PER_MESSAGE * 2):
120
+ out = cm._truncate_oversized(msgs, "anthropic/claude-opus-4-6")
121
+ assert out[0].content == huge_system, "system message must never be truncated"
122
+
123
+
124
+ def test_truncate_oversized_resilient_to_token_counter_failure():
125
+ """token_counter occasionally raises on edge-case content. A blip there
126
+ must NOT drop the message — better to leave it and let compaction
127
+ handle it (or fail with CompactionFailedError) than to lose data.
128
+ """
129
+ cm = _make_cm()
130
+ msgs = [_msg("user", "anything")]
131
+ with patch("litellm.token_counter", side_effect=Exception("counter blew up")):
132
+ out = cm._truncate_oversized(msgs, "anthropic/claude-opus-4-6")
133
+ assert out == msgs
134
+
135
+
136
+ # ── compact() raises CompactionFailedError ─────────────────────────────
137
+
138
+
139
+ @pytest.mark.asyncio
140
+ async def test_compact_raises_when_post_compact_still_over_threshold():
141
+ """The whole point of the new behavior: don't loop on a useless
142
+ compaction call. Raise so the caller can terminate the session.
143
+ """
144
+ cm = _make_cm(model_max_tokens=100_000)
145
+ # Build a context that's "over threshold" from the start
146
+ cm.items = [
147
+ Message(role="system", content="system"),
148
+ Message(role="user", content="task"),
149
+ Message(role="assistant", content="x" * 1000),
150
+ Message(role="user", content="follow-up 1"),
151
+ Message(role="assistant", content="reply 1"),
152
+ Message(role="user", content="follow-up 2"),
153
+ Message(role="assistant", content="reply 2"),
154
+ ]
155
+ cm.running_context_usage = 95_000 # over threshold (90% of 100k = 90k)
156
+
157
+ # Mock summarize_messages to return a tiny summary; mock _recompute_usage
158
+ # to keep the running_context_usage above threshold so compact() raises.
159
+ async def fake_summarize(*args, **kwargs):
160
+ return ("summary", 10)
161
+
162
+ def fake_recompute(self, model_name):
163
+ # Simulate post-compact still over threshold
164
+ self.running_context_usage = 95_000
165
+
166
+ with (
167
+ patch("agent.context_manager.manager.summarize_messages", side_effect=fake_summarize),
168
+ patch.object(ContextManager, "_recompute_usage", fake_recompute),
169
+ # Avoid token_counter calls in _truncate_oversized
170
+ patch("litellm.token_counter", return_value=100),
171
+ ):
172
+ with pytest.raises(CompactionFailedError):
173
+ await cm.compact(
174
+ model_name="anthropic/claude-opus-4-6",
175
+ tool_specs=None,
176
+ hf_token=None,
177
+ session=None,
178
+ )
179
+
180
+
181
+ @pytest.mark.asyncio
182
+ async def test_compact_does_not_duplicate_system_when_idx_is_zero():
183
+ """Regression for the second P0 caught by bot review on PR #213.
184
+
185
+ When ``len(items) == untouched_messages`` (the canonical 5-message
186
+ early-compaction case: system + user-task + giant-tool-output +
187
+ user-followup + assistant-reply), ``idx`` initialises to 0 and the
188
+ walk-back ``while idx > 1`` loop is a no-op. Without an explicit
189
+ clamp ``if idx < 1: idx = 1``, ``recent_messages = items[0:]``
190
+ starts at the system message, and the rebuild duplicates system +
191
+ first-user. Anthropic API rejects two system messages.
192
+ """
193
+ cm = _make_cm(model_max_tokens=100_000, untouched_messages=5)
194
+ cm.items = [
195
+ Message(role="system", content="system"),
196
+ Message(role="user", content="task"),
197
+ Message(role="assistant", content="ok"), # would be the only
198
+ # message_to_summarize but the
199
+ # idx bug pulls it into recent
200
+ Message(role="user", content="followup"),
201
+ Message(role="assistant", content="reply"),
202
+ ] # exactly 5 = untouched_messages, so idx initialises to 0
203
+ cm.running_context_usage = 95_000
204
+
205
+ async def fake_summarize(*args, **kwargs):
206
+ return ("summary", 10)
207
+
208
+ def fake_recompute(self, model_name):
209
+ self.running_context_usage = 5_000
210
+
211
+ with (
212
+ patch("agent.context_manager.manager.summarize_messages", side_effect=fake_summarize),
213
+ patch.object(ContextManager, "_recompute_usage", fake_recompute),
214
+ patch("litellm.token_counter", return_value=100),
215
+ ):
216
+ await cm.compact(
217
+ model_name="anthropic/claude-opus-4-6",
218
+ tool_specs=None,
219
+ hf_token=None,
220
+ session=None,
221
+ )
222
+
223
+ # Critical assertion: only ONE system message in items
224
+ system_count = sum(1 for m in cm.items if m.role == "system")
225
+ assert system_count == 1, (
226
+ f"Expected exactly 1 system message, found {system_count}. "
227
+ f"Roles: {[m.role for m in cm.items]}"
228
+ )
229
+ # And the first-user "task" message must also appear exactly once.
230
+ # Bot review on PR #213 caught a follow-up bug: clamping idx=1
231
+ # excludes the system but still overlaps with first_user_idx (also 1),
232
+ # so first_user_msg ends up in BOTH head and recent_messages →
233
+ # duplicate user message → Anthropic 400 (two consecutive user roles).
234
+ task_count = sum(
235
+ 1 for m in cm.items
236
+ if m.role == "user" and (m.content or "") == "task"
237
+ )
238
+ assert task_count == 1, (
239
+ f"Expected exactly 1 'task' user message, found {task_count}. "
240
+ f"Roles+content: {[(m.role, (m.content or '')[:20]) for m in cm.items]}"
241
+ )
242
+ # Defense in depth: no two consecutive same-role messages (Anthropic
243
+ # API contract). System counts separately.
244
+ non_system = [m for m in cm.items if m.role != "system"]
245
+ for i in range(1, len(non_system)):
246
+ assert non_system[i].role != non_system[i-1].role, (
247
+ f"Two consecutive {non_system[i].role} messages at non-system "
248
+ f"position {i-1},{i} — Anthropic API rejects this. "
249
+ f"Roles: {[m.role for m in cm.items]}"
250
+ )
251
+
252
+
253
+ @pytest.mark.asyncio
254
+ async def test_compact_succeeds_when_post_compact_under_threshold():
255
+ """Happy path: when compaction does its job, no exception raised."""
256
+ cm = _make_cm(model_max_tokens=100_000)
257
+ cm.items = [
258
+ Message(role="system", content="system"),
259
+ Message(role="user", content="task"),
260
+ Message(role="assistant", content="x" * 1000),
261
+ Message(role="user", content="follow-up"),
262
+ Message(role="assistant", content="reply"),
263
+ Message(role="user", content="follow-up 2"),
264
+ Message(role="assistant", content="reply 2"),
265
+ ]
266
+ cm.running_context_usage = 95_000
267
+
268
+ async def fake_summarize(*args, **kwargs):
269
+ return ("summary", 10)
270
+
271
+ def fake_recompute(self, model_name):
272
+ self.running_context_usage = 5_000 # well under threshold
273
+
274
+ with (
275
+ patch("agent.context_manager.manager.summarize_messages", side_effect=fake_summarize),
276
+ patch.object(ContextManager, "_recompute_usage", fake_recompute),
277
+ patch("litellm.token_counter", return_value=100),
278
+ ):
279
+ await cm.compact(
280
+ model_name="anthropic/claude-opus-4-6",
281
+ tool_specs=None,
282
+ hf_token=None,
283
+ session=None,
284
+ )
285
+ assert cm.running_context_usage == 5_000
286
+
287
+
288
+ # ── _compact_and_notify behavior on CompactionFailedError ──────────────
289
+
290
+
291
+ @pytest.mark.asyncio
292
+ async def test_compact_and_notify_terminates_session_on_failure():
293
+ """The PR's #213's P0 bug-class: setting ``is_running = False`` is
294
+ only effective if the agent loop checks it. This test asserts the
295
+ flag IS set AND a ``session_terminated`` event is emitted, so a
296
+ follow-up assertion in the agent loop test catches the loop-exit.
297
+ """
298
+ from agent.core.agent_loop import _compact_and_notify
299
+
300
+ session = MagicMock()
301
+ session.session_id = "sess-123"
302
+ session.is_running = True
303
+ session.config.model_name = "anthropic/claude-opus-4-6"
304
+ session.hf_token = None
305
+ session.tool_router.get_tool_specs_for_llm.return_value = []
306
+ session.send_event = AsyncMock()
307
+
308
+ cm = MagicMock()
309
+ cm.running_context_usage = 95_000
310
+ cm.compaction_threshold = 90_000
311
+ cm.model_max_tokens = 100_000
312
+ cm.items = []
313
+ cm.needs_compaction = True
314
+ cm.compact = AsyncMock(side_effect=CompactionFailedError("ineffective"))
315
+ session.context_manager = cm
316
+
317
+ await _compact_and_notify(session)
318
+
319
+ assert session.is_running is False, (
320
+ "_compact_and_notify must set is_running=False so the agent loop "
321
+ "can exit. P0 caught by bot review on PR #213 was that the loop "
322
+ "didn't actually check this flag."
323
+ )
324
+ assert session.send_event.await_count == 1
325
+ event = session.send_event.await_args.args[0]
326
+ assert event.event_type == "session_terminated"
327
+ assert event.data["reason"] == "compaction_failed"
328
+ assert event.data["context_usage"] == 95_000
329
+
330
+
331
+ @pytest.mark.asyncio
332
+ async def test_compact_and_notify_passes_through_on_success():
333
+ """When compaction succeeds, no termination event, is_running stays True."""
334
+ from agent.core.agent_loop import _compact_and_notify
335
+
336
+ session = MagicMock()
337
+ session.session_id = "sess-456"
338
+ session.is_running = True
339
+ session.config.model_name = "anthropic/claude-opus-4-6"
340
+ session.hf_token = None
341
+ session.tool_router.get_tool_specs_for_llm.return_value = []
342
+ session.send_event = AsyncMock()
343
+
344
+ cm = MagicMock()
345
+ cm.running_context_usage = 5_000
346
+ cm.compaction_threshold = 90_000
347
+ cm.model_max_tokens = 100_000
348
+ cm.items = []
349
+ cm.needs_compaction = False
350
+ cm.compact = AsyncMock(return_value=None) # success
351
+ session.context_manager = cm
352
+
353
+ # Pretend old_usage == new_usage so the "compacted" event is also skipped
354
+ await _compact_and_notify(session)
355
+
356
+ assert session.is_running is True
357
+ # No session_terminated event emitted
358
+ for call in session.send_event.await_args_list:
359
+ ev = call.args[0]
360
+ assert ev.event_type != "session_terminated"
tests/unit/test_dangling_tool_calls.py CHANGED
@@ -67,15 +67,6 @@ def test_no_orphan_means_no_stub():
67
  assert tool_msgs[0].content == "ok"
68
 
69
 
70
- def test_add_message_records_message_timestamp():
71
- cm = _make_cm()
72
- msg = Message(role="user", content="hello")
73
-
74
- cm.add_message(msg)
75
-
76
- assert getattr(cm.items[-1], "timestamp", None)
77
-
78
-
79
  def test_multiple_dangling_tool_calls_in_one_assistant_message_are_all_patched():
80
  cm = _make_cm()
81
  cm.items.extend([
 
67
  assert tool_msgs[0].content == "ok"
68
 
69
 
 
 
 
 
 
 
 
 
 
70
  def test_multiple_dangling_tool_calls_in_one_assistant_message_are_all_patched():
71
  cm = _make_cm()
72
  cm.items.extend([
tests/unit/test_sandbox_private_spaces.py CHANGED
@@ -3,6 +3,8 @@ import threading
3
  import time
4
  from types import SimpleNamespace
5
 
 
 
6
  from agent.core import telemetry
7
  from agent.tools import sandbox_client, sandbox_tool
8
  from agent.tools.sandbox_client import Sandbox
@@ -91,6 +93,101 @@ def test_sandbox_client_retries_transient_runtime_404(monkeypatch):
91
  assert runtime_calls == 2
92
 
93
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94
  def test_sandbox_tool_forces_private_spaces(monkeypatch):
95
  captured_kwargs = {}
96
 
@@ -148,6 +245,7 @@ def test_orphan_sweep_preserves_spaces_without_last_modified():
148
 
149
  def test_ensure_sandbox_overrides_private_argument(monkeypatch):
150
  captured_kwargs = {}
 
151
 
152
  class FakeApi:
153
  def __init__(self, token=None):
@@ -158,14 +256,23 @@ def test_ensure_sandbox_overrides_private_argument(monkeypatch):
158
 
159
  class FakeSession:
160
  def __init__(self):
 
161
  self.hf_token = "hf-token"
162
  self.sandbox = None
163
  self.event_queue = SimpleNamespace(put_nowait=lambda event: None)
164
  self._cancelled = asyncio.Event()
 
 
 
 
 
165
 
166
  async def send_event(self, event):
167
  pass
168
 
 
 
 
169
  def fake_create(**kwargs):
170
  captured_kwargs.update(kwargs)
171
  return SimpleNamespace(
@@ -192,6 +299,11 @@ def test_ensure_sandbox_overrides_private_argument(monkeypatch):
192
  assert error is None
193
  assert sb is not None
194
  assert captured_kwargs["private"] is True
 
 
 
 
 
195
 
196
 
197
  def test_sandbox_creation_is_serialized_per_owner(monkeypatch):
@@ -356,6 +468,7 @@ def test_sandbox_create_replaces_auto_cpu_sandbox(monkeypatch):
356
 
357
  def test_teardown_cancels_preload_and_deletes_owned_sandbox(monkeypatch):
358
  deleted: list[str] = []
 
359
 
360
  async def fake_record_sandbox_destroy(*args, **kwargs):
361
  pass
@@ -369,6 +482,7 @@ def test_teardown_cancels_preload_and_deletes_owned_sandbox(monkeypatch):
369
  await asyncio.sleep(0)
370
 
371
  session = SimpleNamespace(
 
372
  sandbox=SimpleNamespace(
373
  space_id="alice/sandbox-12345678",
374
  _owns_space=True,
@@ -377,17 +491,28 @@ def test_teardown_cancels_preload_and_deletes_owned_sandbox(monkeypatch):
377
  sandbox_hardware="cpu-basic",
378
  sandbox_preload_task=asyncio.create_task(preload()),
379
  sandbox_preload_cancel_event=cancel_event,
 
 
 
 
 
380
  )
381
 
382
  await sandbox_tool.teardown_session_sandbox(session)
383
  return session, cancel_event
384
 
 
 
 
385
  session, cancel_event = asyncio.run(run())
386
 
387
  assert cancel_event.is_set()
388
  assert deleted == ["alice/sandbox-12345678"]
389
  assert session.sandbox is None
390
  assert session.sandbox_hardware is None
 
 
 
391
 
392
 
393
  def test_cancel_sandbox_preload_cancels_task_after_timeout(monkeypatch):
 
3
  import time
4
  from types import SimpleNamespace
5
 
6
+ import pytest
7
+
8
  from agent.core import telemetry
9
  from agent.tools import sandbox_client, sandbox_tool
10
  from agent.tools.sandbox_client import Sandbox
 
93
  assert runtime_calls == 2
94
 
95
 
96
+ def test_sandbox_client_retries_transient_hardware_401(monkeypatch):
97
+ hardware_calls = 0
98
+ logs: list[str] = []
99
+
100
+ class FakeResponse:
101
+ status_code = 401
102
+
103
+ class FakeHardware401(Exception):
104
+ response = FakeResponse()
105
+
106
+ def __str__(self):
107
+ return "401 Client Error: Repository Not Found"
108
+
109
+ class FakeApi:
110
+ def __init__(self, token=None):
111
+ self.token = token
112
+
113
+ def duplicate_space(self, **kwargs):
114
+ pass
115
+
116
+ def request_space_hardware(self, space_id, hardware, sleep_time=None):
117
+ nonlocal hardware_calls
118
+ hardware_calls += 1
119
+ if hardware_calls == 1:
120
+ raise FakeHardware401()
121
+ return SimpleNamespace(stage="BUILDING", hardware=None)
122
+
123
+ def add_space_secret(self, *args, **kwargs):
124
+ pass
125
+
126
+ def get_space_runtime(self, space_id):
127
+ return SimpleNamespace(stage="RUNNING", hardware="cpu-basic")
128
+
129
+ monkeypatch.setattr(sandbox_client, "HfApi", FakeApi)
130
+ monkeypatch.setattr(sandbox_client.time, "sleep", lambda seconds: None)
131
+ monkeypatch.setattr(
132
+ Sandbox,
133
+ "_setup_server",
134
+ staticmethod(lambda *args, **kwargs: None),
135
+ )
136
+ monkeypatch.setattr(Sandbox, "_wait_for_api", lambda self, *args, **kwargs: None)
137
+
138
+ sandbox = Sandbox.create(owner="alice", token="hf-token", log=logs.append)
139
+
140
+ assert sandbox.space_id.startswith("alice/sandbox-")
141
+ assert hardware_calls == 2
142
+ assert any("Hardware request not accepted yet (HTTP 401)" in log for log in logs)
143
+
144
+
145
+ def test_sandbox_hardware_retry_reraises_after_timeout(monkeypatch):
146
+ calls = 0
147
+ logs: list[str] = []
148
+ sleeps: list[float] = []
149
+
150
+ class FakeResponse:
151
+ status_code = 401
152
+
153
+ class FakeHardware401(Exception):
154
+ response = FakeResponse()
155
+
156
+ def __str__(self):
157
+ return "401 Client Error: Repository Not Found"
158
+
159
+ first_error = FakeHardware401("first")
160
+ timeout_error = FakeHardware401("timeout")
161
+
162
+ class FakeApi:
163
+ def request_space_hardware(self, space_id, hardware, sleep_time=None):
164
+ nonlocal calls
165
+ calls += 1
166
+ if calls == 1:
167
+ raise first_error
168
+ raise timeout_error
169
+
170
+ timestamps = iter([100.0, 100.0, 161.0])
171
+
172
+ monkeypatch.setattr(sandbox_client.time, "time", lambda: next(timestamps))
173
+ monkeypatch.setattr(sandbox_client.time, "sleep", sleeps.append)
174
+
175
+ with pytest.raises(FakeHardware401) as excinfo:
176
+ sandbox_client._request_space_hardware_with_retry(
177
+ FakeApi(),
178
+ "alice/sandbox-12345678",
179
+ hardware="cpu-basic",
180
+ sleep_time=None,
181
+ log=logs.append,
182
+ check_cancel=lambda: None,
183
+ )
184
+
185
+ assert excinfo.value is timeout_error
186
+ assert calls == 2
187
+ assert sleeps == [sandbox_client.WAIT_INTERVAL]
188
+ assert len(logs) == 1
189
+
190
+
191
  def test_sandbox_tool_forces_private_spaces(monkeypatch):
192
  captured_kwargs = {}
193
 
 
245
 
246
  def test_ensure_sandbox_overrides_private_argument(monkeypatch):
247
  captured_kwargs = {}
248
+ persisted: list[dict] = []
249
 
250
  class FakeApi:
251
  def __init__(self, token=None):
 
256
 
257
  class FakeSession:
258
  def __init__(self):
259
+ self.session_id = "s1"
260
  self.hf_token = "hf-token"
261
  self.sandbox = None
262
  self.event_queue = SimpleNamespace(put_nowait=lambda event: None)
263
  self._cancelled = asyncio.Event()
264
+ self.persistence_store = SimpleNamespace(
265
+ update_session_fields=lambda session_id, **fields: _record_metadata(
266
+ session_id, fields
267
+ )
268
+ )
269
 
270
  async def send_event(self, event):
271
  pass
272
 
273
+ async def _record_metadata(session_id, fields):
274
+ persisted.append({"session_id": session_id, **fields})
275
+
276
  def fake_create(**kwargs):
277
  captured_kwargs.update(kwargs)
278
  return SimpleNamespace(
 
299
  assert error is None
300
  assert sb is not None
301
  assert captured_kwargs["private"] is True
302
+ assert persisted[-1]["session_id"] == "s1"
303
+ assert persisted[-1]["sandbox_space_id"] == "alice/sandbox-12345678"
304
+ assert persisted[-1]["sandbox_hardware"] == "cpu-basic"
305
+ assert persisted[-1]["sandbox_owner"] == "alice"
306
+ assert persisted[-1]["sandbox_status"] == "active"
307
 
308
 
309
  def test_sandbox_creation_is_serialized_per_owner(monkeypatch):
 
468
 
469
  def test_teardown_cancels_preload_and_deletes_owned_sandbox(monkeypatch):
470
  deleted: list[str] = []
471
+ persisted: list[dict] = []
472
 
473
  async def fake_record_sandbox_destroy(*args, **kwargs):
474
  pass
 
482
  await asyncio.sleep(0)
483
 
484
  session = SimpleNamespace(
485
+ session_id="s1",
486
  sandbox=SimpleNamespace(
487
  space_id="alice/sandbox-12345678",
488
  _owns_space=True,
 
491
  sandbox_hardware="cpu-basic",
492
  sandbox_preload_task=asyncio.create_task(preload()),
493
  sandbox_preload_cancel_event=cancel_event,
494
+ persistence_store=SimpleNamespace(
495
+ update_session_fields=lambda session_id, **fields: _record_metadata(
496
+ session_id, fields
497
+ )
498
+ ),
499
  )
500
 
501
  await sandbox_tool.teardown_session_sandbox(session)
502
  return session, cancel_event
503
 
504
+ async def _record_metadata(session_id, fields):
505
+ persisted.append({"session_id": session_id, **fields})
506
+
507
  session, cancel_event = asyncio.run(run())
508
 
509
  assert cancel_event.is_set()
510
  assert deleted == ["alice/sandbox-12345678"]
511
  assert session.sandbox is None
512
  assert session.sandbox_hardware is None
513
+ assert persisted[-1]["session_id"] == "s1"
514
+ assert persisted[-1]["sandbox_space_id"] is None
515
+ assert persisted[-1]["sandbox_status"] == "destroyed"
516
 
517
 
518
  def test_cancel_sandbox_preload_cancels_task_after_timeout(monkeypatch):
tests/unit/test_session_manager_persistence.py CHANGED
@@ -4,6 +4,7 @@ from __future__ import annotations
4
 
5
  import asyncio
6
  import sys
 
7
  from datetime import datetime, UTC
8
  from pathlib import Path
9
  from types import SimpleNamespace
@@ -30,6 +31,10 @@ class FakeRuntimeSession:
30
  self.auto_approval_enabled = False
31
  self.auto_approval_cost_cap_usd = None
32
  self.auto_approval_estimated_spend_usd = 0.0
 
 
 
 
33
 
34
  def auto_approval_policy_summary(self):
35
  cap = self.auto_approval_cost_cap_usd
@@ -65,6 +70,7 @@ class RestoreStore(NoopSessionStore):
65
  self.messages = messages or []
66
  self.delay = delay
67
  self.load_calls = 0
 
68
 
69
  async def load_session(self, session_id: str, **_: Any) -> dict[str, Any] | None:
70
  self.load_calls += 1
@@ -75,6 +81,18 @@ class RestoreStore(NoopSessionStore):
75
  metadata.setdefault("_id", session_id)
76
  return {"metadata": metadata, "messages": self.messages}
77
 
 
 
 
 
 
 
 
 
 
 
 
 
78
 
79
  def _manager_with_store(store: NoopSessionStore) -> SessionManager:
80
  manager = object.__new__(SessionManager)
@@ -82,6 +100,7 @@ def _manager_with_store(store: NoopSessionStore) -> SessionManager:
82
  manager.sessions = {}
83
  manager._lock = asyncio.Lock()
84
  manager.persistence_store = store
 
85
  return manager
86
 
87
 
@@ -151,6 +170,87 @@ async def _cancel_runtime_tasks(manager: SessionManager) -> None:
151
  await asyncio.gather(*tasks, return_exceptions=True)
152
 
153
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
154
  @pytest.mark.asyncio
155
  async def test_existing_session_rejects_cross_user_token_overwrite():
156
  manager = _manager_with_store(NoopSessionStore())
@@ -253,6 +353,107 @@ async def test_lazy_restore_schedules_cpu_sandbox_preload():
253
  await _cancel_runtime_tasks(manager)
254
 
255
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
256
  @pytest.mark.asyncio
257
  async def test_lazy_restore_preserves_pending_approval_tool_calls():
258
  store = RestoreStore(
 
4
 
5
  import asyncio
6
  import sys
7
+ import threading
8
  from datetime import datetime, UTC
9
  from pathlib import Path
10
  from types import SimpleNamespace
 
31
  self.auto_approval_enabled = False
32
  self.auto_approval_cost_cap_usd = None
33
  self.auto_approval_estimated_spend_usd = 0.0
34
+ self.sandbox = None
35
+ self.sandbox_hardware = None
36
+ self.sandbox_preload_task = None
37
+ self.sandbox_preload_cancel_event = None
38
 
39
  def auto_approval_policy_summary(self):
40
  cap = self.auto_approval_cost_cap_usd
 
70
  self.messages = messages or []
71
  self.delay = delay
72
  self.load_calls = 0
73
+ self.updated_fields: list[tuple[str, dict[str, Any]]] = []
74
 
75
  async def load_session(self, session_id: str, **_: Any) -> dict[str, Any] | None:
76
  self.load_calls += 1
 
81
  metadata.setdefault("_id", session_id)
82
  return {"metadata": metadata, "messages": self.messages}
83
 
84
+ async def update_session_fields(self, session_id: str, **fields: Any) -> None:
85
+ self.updated_fields.append((session_id, fields))
86
+ self.metadata.update(fields)
87
+
88
+
89
+ class CloseableResource:
90
+ def __init__(self) -> None:
91
+ self.closed = False
92
+
93
+ async def close(self) -> None:
94
+ self.closed = True
95
+
96
 
97
  def _manager_with_store(store: NoopSessionStore) -> SessionManager:
98
  manager = object.__new__(SessionManager)
 
100
  manager.sessions = {}
101
  manager._lock = asyncio.Lock()
102
  manager.persistence_store = store
103
+ manager.messaging_gateway = CloseableResource()
104
  return manager
105
 
106
 
 
170
  await asyncio.gather(*tasks, return_exceptions=True)
171
 
172
 
173
+ @pytest.mark.asyncio
174
+ async def test_close_cancels_preload_and_deletes_owned_sandbox(monkeypatch):
175
+ deleted: list[str] = []
176
+
177
+ async def fake_record_sandbox_destroy(*args, **kwargs):
178
+ pass
179
+
180
+ monkeypatch.setattr(
181
+ "agent.core.telemetry.record_sandbox_destroy",
182
+ fake_record_sandbox_destroy,
183
+ )
184
+
185
+ store = NoopSessionStore()
186
+ manager = _manager_with_store(store)
187
+ gateway = CloseableResource()
188
+ persistence = CloseableResource()
189
+ manager.messaging_gateway = gateway # type: ignore[assignment]
190
+ manager.persistence_store = persistence # type: ignore[assignment]
191
+
192
+ cancel_event = asyncio.Event()
193
+ preload_cancel_event = threading.Event()
194
+
195
+ async def preload():
196
+ while not preload_cancel_event.is_set():
197
+ await asyncio.sleep(0)
198
+ cancel_event.set()
199
+
200
+ session = FakeRuntimeSession(hf_token="token")
201
+ session.session_id = "s1"
202
+ session.persistence_store = NoopSessionStore()
203
+ session.sandbox = SimpleNamespace(
204
+ space_id="owner/sandbox-12345678",
205
+ _owns_space=True,
206
+ delete=lambda: deleted.append("owner/sandbox-12345678"),
207
+ )
208
+ session.sandbox_hardware = "cpu-basic"
209
+ session.sandbox_preload_cancel_event = preload_cancel_event
210
+ session.sandbox_preload_task = asyncio.create_task(preload())
211
+ manager.sessions["s1"] = AgentSession(
212
+ session_id="s1",
213
+ session=session, # type: ignore[arg-type]
214
+ tool_router=object(), # type: ignore[arg-type]
215
+ submission_queue=asyncio.Queue(),
216
+ user_id="owner",
217
+ hf_token="token",
218
+ )
219
+
220
+ await manager.close()
221
+
222
+ assert preload_cancel_event.is_set()
223
+ assert cancel_event.is_set()
224
+ assert deleted == ["owner/sandbox-12345678"]
225
+ assert gateway.closed is True
226
+ assert persistence.closed is True
227
+
228
+
229
+ @pytest.mark.asyncio
230
+ async def test_close_closes_resources_when_sandbox_cleanup_fails():
231
+ manager = _manager_with_store(NoopSessionStore())
232
+ gateway = CloseableResource()
233
+ persistence = CloseableResource()
234
+ manager.messaging_gateway = gateway # type: ignore[assignment]
235
+ manager.persistence_store = persistence # type: ignore[assignment]
236
+ manager.sessions["s1"] = _runtime_agent_session("s1")
237
+ manager.sessions["s2"] = _runtime_agent_session("s2")
238
+ cleaned: list[str] = []
239
+
240
+ async def fake_cleanup(session):
241
+ cleaned.append(session.hf_token)
242
+ if session.hf_token == "owner-token":
243
+ raise RuntimeError("boom")
244
+
245
+ manager._cleanup_sandbox = fake_cleanup # type: ignore[method-assign]
246
+
247
+ await manager.close()
248
+
249
+ assert cleaned == ["owner-token", "owner-token"]
250
+ assert gateway.closed is True
251
+ assert persistence.closed is True
252
+
253
+
254
  @pytest.mark.asyncio
255
  async def test_existing_session_rejects_cross_user_token_overwrite():
256
  manager = _manager_with_store(NoopSessionStore())
 
353
  await _cancel_runtime_tasks(manager)
354
 
355
 
356
+ @pytest.mark.asyncio
357
+ async def test_lazy_restore_deletes_persisted_sandbox_before_preload(monkeypatch):
358
+ deleted: list[tuple[str, str, str]] = []
359
+
360
+ class FakeApi:
361
+ def __init__(self, token=None):
362
+ self.token = token
363
+
364
+ def delete_repo(self, repo_id, repo_type):
365
+ deleted.append((self.token, repo_id, repo_type))
366
+
367
+ monkeypatch.setattr("huggingface_hub.HfApi", FakeApi)
368
+
369
+ store = RestoreStore(
370
+ metadata={
371
+ "session_id": "persisted-session",
372
+ "user_id": "owner",
373
+ "model": "test-model",
374
+ "created_at": datetime.now(UTC),
375
+ "sandbox_space_id": "owner/sandbox-12345678",
376
+ "sandbox_hardware": "cpu-basic",
377
+ "sandbox_owner": "owner",
378
+ "sandbox_created_at": datetime.now(UTC),
379
+ "sandbox_status": "active",
380
+ }
381
+ )
382
+ manager = _manager_with_store(store)
383
+ stop = _install_fake_runtime(manager)
384
+ scheduled: list[str] = []
385
+
386
+ def fake_start_cpu_sandbox_preload(agent_session: AgentSession) -> None:
387
+ scheduled.append(agent_session.session_id)
388
+
389
+ manager._start_cpu_sandbox_preload = fake_start_cpu_sandbox_preload # type: ignore[method-assign]
390
+
391
+ try:
392
+ restored = await manager.ensure_session_loaded(
393
+ "persisted-session",
394
+ user_id="owner",
395
+ hf_token="user-token",
396
+ )
397
+
398
+ assert restored is not None
399
+ assert deleted == [("user-token", "owner/sandbox-12345678", "space")]
400
+ assert scheduled == ["persisted-session"]
401
+ assert store.metadata["sandbox_space_id"] is None
402
+ assert store.metadata["sandbox_status"] == "destroyed"
403
+ finally:
404
+ stop.set()
405
+ await _cancel_runtime_tasks(manager)
406
+
407
+
408
+ @pytest.mark.asyncio
409
+ async def test_lazy_restore_can_skip_cpu_sandbox_preload_after_cleanup(monkeypatch):
410
+ deleted: list[str] = []
411
+
412
+ class FakeApi:
413
+ def __init__(self, token=None):
414
+ self.token = token
415
+
416
+ def delete_repo(self, repo_id, repo_type):
417
+ deleted.append(repo_id)
418
+
419
+ monkeypatch.setattr("huggingface_hub.HfApi", FakeApi)
420
+
421
+ store = RestoreStore(
422
+ metadata={
423
+ "session_id": "persisted-session",
424
+ "user_id": "owner",
425
+ "model": "test-model",
426
+ "created_at": datetime.now(UTC),
427
+ "sandbox_space_id": "owner/sandbox-87654321",
428
+ "sandbox_status": "active",
429
+ }
430
+ )
431
+ manager = _manager_with_store(store)
432
+ stop = _install_fake_runtime(manager)
433
+ scheduled: list[str] = []
434
+
435
+ def fake_start_cpu_sandbox_preload(agent_session: AgentSession) -> None:
436
+ scheduled.append(agent_session.session_id)
437
+
438
+ manager._start_cpu_sandbox_preload = fake_start_cpu_sandbox_preload # type: ignore[method-assign]
439
+
440
+ try:
441
+ restored = await manager.ensure_session_loaded(
442
+ "persisted-session",
443
+ user_id="owner",
444
+ hf_token="user-token",
445
+ preload_sandbox=False,
446
+ )
447
+
448
+ assert restored is not None
449
+ assert deleted == ["owner/sandbox-87654321"]
450
+ assert scheduled == []
451
+ assert store.metadata["sandbox_space_id"] is None
452
+ finally:
453
+ stop.set()
454
+ await _cancel_runtime_tasks(manager)
455
+
456
+
457
  @pytest.mark.asyncio
458
  async def test_lazy_restore_preserves_pending_approval_tool_calls():
459
  store = RestoreStore(