Spaces:
Running
Running
Commit Β·
a017896
1
Parent(s): b94b18b
fix: handle interrupt during streaming tool calls without corrupting context
Browse filesWhen the user clicks interrupt while the LLM is streaming a tool call,
the accumulated deltas can have empty IDs and truncated arguments.
Previously these got added to the context and caused litellm API errors
on the next call.
- Add cancellation check inside the streaming loop so we stop early
- Drop tool_calls with empty IDs when building from stream deltas
- recover_malformed_tool_calls() now strips empty-ID tool_calls from
assistant messages as a safety net
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- agent/context_manager/manager.py +24 -7
- agent/core/agent_loop.py +13 -1
agent/context_manager/manager.py
CHANGED
|
@@ -167,11 +167,11 @@ class ContextManager:
|
|
| 167 |
def recover_malformed_tool_calls(self) -> set[str]:
|
| 168 |
"""Sanitize malformed tool_call arguments and inject error results.
|
| 169 |
|
| 170 |
-
|
| 171 |
-
|
| 172 |
-
|
| 173 |
-
|
| 174 |
-
|
| 175 |
|
| 176 |
This method is idempotent β safe to call from both the agent loop
|
| 177 |
(before tool execution) and from :meth:`get_messages` (safety net).
|
|
@@ -183,7 +183,6 @@ class ContextManager:
|
|
| 183 |
|
| 184 |
malformed_ids: set[str] = set()
|
| 185 |
|
| 186 |
-
# 1. Find and sanitize malformed arguments
|
| 187 |
for msg in self.items:
|
| 188 |
if getattr(msg, "role", None) != "assistant":
|
| 189 |
continue
|
|
@@ -191,6 +190,24 @@ class ContextManager:
|
|
| 191 |
if not tool_calls:
|
| 192 |
continue
|
| 193 |
self._normalize_tool_calls(msg)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 194 |
for tc in msg.tool_calls:
|
| 195 |
try:
|
| 196 |
json.loads(tc.function.arguments)
|
|
@@ -205,7 +222,7 @@ class ContextManager:
|
|
| 205 |
if not malformed_ids:
|
| 206 |
return malformed_ids
|
| 207 |
|
| 208 |
-
#
|
| 209 |
answered_ids = {
|
| 210 |
getattr(m, "tool_call_id", None)
|
| 211 |
for m in self.items
|
|
|
|
| 167 |
def recover_malformed_tool_calls(self) -> set[str]:
|
| 168 |
"""Sanitize malformed tool_call arguments and inject error results.
|
| 169 |
|
| 170 |
+
Handles two classes of corruption:
|
| 171 |
+
- **Empty/missing IDs**: Stripped from the assistant message entirely
|
| 172 |
+
(common when streaming is interrupted mid-tool-call).
|
| 173 |
+
- **Malformed JSON arguments**: Replaced with ``"{}"`` and an error
|
| 174 |
+
tool-result is injected asking the agent to retry.
|
| 175 |
|
| 176 |
This method is idempotent β safe to call from both the agent loop
|
| 177 |
(before tool execution) and from :meth:`get_messages` (safety net).
|
|
|
|
| 183 |
|
| 184 |
malformed_ids: set[str] = set()
|
| 185 |
|
|
|
|
| 186 |
for msg in self.items:
|
| 187 |
if getattr(msg, "role", None) != "assistant":
|
| 188 |
continue
|
|
|
|
| 190 |
if not tool_calls:
|
| 191 |
continue
|
| 192 |
self._normalize_tool_calls(msg)
|
| 193 |
+
|
| 194 |
+
# 1. Strip tool_calls with empty/missing IDs (cannot be repaired)
|
| 195 |
+
valid_tcs = []
|
| 196 |
+
for tc in msg.tool_calls:
|
| 197 |
+
if not getattr(tc, "id", None):
|
| 198 |
+
logger.warning(
|
| 199 |
+
"Stripping tool_call with empty ID (name=%s) β likely interrupted stream",
|
| 200 |
+
getattr(tc.function, "name", "?"),
|
| 201 |
+
)
|
| 202 |
+
continue
|
| 203 |
+
valid_tcs.append(tc)
|
| 204 |
+
if len(valid_tcs) != len(msg.tool_calls):
|
| 205 |
+
msg.tool_calls = valid_tcs or None
|
| 206 |
+
|
| 207 |
+
if not msg.tool_calls:
|
| 208 |
+
continue
|
| 209 |
+
|
| 210 |
+
# 2. Fix malformed JSON arguments
|
| 211 |
for tc in msg.tool_calls:
|
| 212 |
try:
|
| 213 |
json.loads(tc.function.arguments)
|
|
|
|
| 222 |
if not malformed_ids:
|
| 223 |
return malformed_ids
|
| 224 |
|
| 225 |
+
# 3. Inject error results for malformed calls that don't have one yet
|
| 226 |
answered_ids = {
|
| 227 |
getattr(m, "tool_call_id", None)
|
| 228 |
for m in self.items
|
agent/core/agent_loop.py
CHANGED
|
@@ -261,6 +261,11 @@ class Handlers:
|
|
| 261 |
token_count = 0
|
| 262 |
|
| 263 |
async for chunk in response:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 264 |
choice = chunk.choices[0] if chunk.choices else None
|
| 265 |
if not choice:
|
| 266 |
# Last chunk may carry only usage info
|
|
@@ -309,10 +314,17 @@ class Handlers:
|
|
| 309 |
# ββ Stream finished β reconstruct full message βββββββ
|
| 310 |
content = full_content or None
|
| 311 |
|
| 312 |
-
# Build tool_calls list from accumulated deltas
|
|
|
|
| 313 |
tool_calls: list[ToolCall] = []
|
| 314 |
for idx in sorted(tool_calls_acc.keys()):
|
| 315 |
tc_data = tool_calls_acc[idx]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 316 |
tool_calls.append(
|
| 317 |
ToolCall(
|
| 318 |
id=tc_data["id"],
|
|
|
|
| 261 |
token_count = 0
|
| 262 |
|
| 263 |
async for chunk in response:
|
| 264 |
+
# ββ Check cancellation during streaming ββ
|
| 265 |
+
if session.is_cancelled:
|
| 266 |
+
tool_calls_acc.clear()
|
| 267 |
+
break
|
| 268 |
+
|
| 269 |
choice = chunk.choices[0] if chunk.choices else None
|
| 270 |
if not choice:
|
| 271 |
# Last chunk may carry only usage info
|
|
|
|
| 314 |
# ββ Stream finished β reconstruct full message βββββββ
|
| 315 |
content = full_content or None
|
| 316 |
|
| 317 |
+
# Build tool_calls list from accumulated deltas,
|
| 318 |
+
# dropping any with empty IDs (from interrupted streams)
|
| 319 |
tool_calls: list[ToolCall] = []
|
| 320 |
for idx in sorted(tool_calls_acc.keys()):
|
| 321 |
tc_data = tool_calls_acc[idx]
|
| 322 |
+
if not tc_data["id"]:
|
| 323 |
+
logger.warning(
|
| 324 |
+
"Dropping tool_call with empty ID (name=%s) β likely interrupted stream",
|
| 325 |
+
tc_data["function"]["name"],
|
| 326 |
+
)
|
| 327 |
+
continue
|
| 328 |
tool_calls.append(
|
| 329 |
ToolCall(
|
| 330 |
id=tc_data["id"],
|