Spaces:
Sleeping
Sleeping
| """ | |
| Agent loop: orchestrates LLM <-> E2B sandbox for execution tasks. | |
| Phase 1 design — kept deliberately simple and robust: | |
| 1. Ask LLM to produce a SINGLE python code block (and optional shell block) | |
| to satisfy the user's request, given recent context. | |
| 2. Extract the code block(s). | |
| 3. Run them in a fresh E2B sandbox, streaming stdout/stderr to the caller. | |
| 4. Show the LLM the real output and ask for a final natural-language reply. | |
| 5. Stream that reply. | |
| 6. Close the sandbox. | |
| Anything more elaborate (multi-step planner, tool-calling, retry-on-error) is | |
| intentionally OUT of Phase 1. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import logging | |
| import re | |
| import time | |
| from dataclasses import dataclass | |
| from typing import Any, AsyncIterator, Dict, List, Optional | |
| from . import llm_router | |
| from .executor import E2BExecutor, ExecEvent | |
| logger = logging.getLogger(__name__) | |
| # ---------------------------------------------------------------------------- | |
| # Prompts | |
| # ---------------------------------------------------------------------------- | |
| CODER_SYSTEM = """You are a code executor agent running inside a real Linux | |
| sandbox (E2B). The user will ask you to do something that requires running | |
| real code. Reply with ONE single fenced code block — Python preferred — that, | |
| when executed, accomplishes the task. | |
| Strict rules: | |
| - Output ONLY the code block. No prose before or after. | |
| - Prefer Python. Use ```python fences. | |
| - If the task is shell-only (mkdir, ls, install a package), you may use one | |
| ```bash block instead. | |
| - Print clear progress messages so the user can see what happened. | |
| - Always print a final confirmation line. | |
| - Keep total output under ~200 lines. | |
| """ | |
| REPLY_SYSTEM = """You are a helpful assistant. The user asked for a task that | |
| required running real code. Below is the user's request, the code that ran, | |
| and the REAL execution output. Write a short, friendly natural-language reply | |
| summarising what was done and quoting any important values from the real | |
| output. Do NOT fabricate. Do NOT re-run anything. Keep it concise (3-6 | |
| sentences).""" | |
| CHAT_SYSTEM = """You are a concise, helpful assistant. Reply in the same | |
| language as the user when natural. Keep answers focused.""" | |
| # ---------------------------------------------------------------------------- | |
| # Code extraction | |
| # ---------------------------------------------------------------------------- | |
| _FENCE_RE = re.compile( | |
| r"```([a-zA-Z0-9_+\-]*)\s*\n(.*?)```", re.DOTALL | |
| ) | |
| class CodeBlock: | |
| language: str | |
| code: str | |
| def extract_code_blocks(text: str) -> List[CodeBlock]: | |
| blocks: List[CodeBlock] = [] | |
| for m in _FENCE_RE.finditer(text or ""): | |
| lang = (m.group(1) or "").lower().strip() | |
| code = m.group(2).rstrip() | |
| blocks.append(CodeBlock(language=lang or "python", code=code)) | |
| return blocks | |
| def pick_runnable(blocks: List[CodeBlock]) -> Optional[CodeBlock]: | |
| # Prefer python; else bash/sh; else first | |
| for b in blocks: | |
| if b.language in ("python", "py"): | |
| return b | |
| for b in blocks: | |
| if b.language in ("bash", "sh", "shell"): | |
| return b | |
| return blocks[0] if blocks else None | |
| # ---------------------------------------------------------------------------- | |
| # Streaming agent | |
| # ---------------------------------------------------------------------------- | |
| async def stream_chat_only( | |
| messages: List[Dict[str, str]], | |
| ) -> AsyncIterator[Dict]: | |
| """Plain chat: no sandbox.""" | |
| full_messages = [{"role": "system", "content": CHAT_SYSTEM}, *messages] | |
| yield {"type": "phase", "phase": "chat"} | |
| async for chunk in llm_router.stream_complete(full_messages, temperature=0.4, max_tokens=1024): | |
| if chunk["type"] == "delta": | |
| yield {"type": "assistant_delta", "content": chunk["content"]} | |
| elif chunk["type"] == "done": | |
| yield {"type": "assistant_done", "provider": chunk.get("provider"), "model": chunk.get("model")} | |
| elif chunk["type"] == "error": | |
| yield {"type": "error", "error": chunk["error"]} | |
| async def stream_execute( | |
| messages: List[Dict[str, str]], | |
| *, | |
| sandbox_timeout: int = 300, | |
| ) -> AsyncIterator[Dict]: | |
| """Execution task: spin up E2B, run code, reply with real results.""" | |
| # --- step 1: ask LLM for code ------------------------------------------- | |
| yield {"type": "phase", "phase": "planning"} | |
| code_messages = [{"role": "system", "content": CODER_SYSTEM}, *messages] | |
| try: | |
| coder_resp = await llm_router.complete(code_messages, temperature=0.2, max_tokens=1500) | |
| except Exception as e: | |
| yield {"type": "error", "error": f"LLM failed: {e}"} | |
| return | |
| raw = coder_resp["content"] | |
| yield {"type": "plan", "content": raw, "provider": coder_resp.get("provider")} | |
| blocks = extract_code_blocks(raw) | |
| chosen = pick_runnable(blocks) | |
| if chosen is None: | |
| # No code block → degrade to chat reply | |
| yield {"type": "assistant_delta", "content": raw} | |
| yield {"type": "assistant_done"} | |
| return | |
| yield {"type": "code", "language": chosen.language, "code": chosen.code} | |
| # --- step 2: launch sandbox & run --------------------------------------- | |
| yield {"type": "phase", "phase": "sandbox_starting"} | |
| executor: Optional[E2BExecutor] = None | |
| stdout_buf: List[str] = [] | |
| stderr_buf: List[str] = [] | |
| error_text: Optional[str] = None | |
| result_text: str = "" | |
| exit_code: Optional[int] = None | |
| try: | |
| executor = E2BExecutor(timeout=sandbox_timeout) | |
| await executor.start() | |
| yield {"type": "sandbox_started", "sandbox_id": executor.sandbox_id} | |
| runner = ( | |
| executor.run_python(chosen.code) | |
| if chosen.language in ("python", "py") | |
| else executor.run_shell(chosen.code) | |
| ) | |
| yield {"type": "phase", "phase": "executing"} | |
| async for ev in runner: | |
| if ev.type == "stdout": | |
| stdout_buf.append(ev.data) | |
| yield {"type": "stdout", "content": ev.data} | |
| elif ev.type == "stderr": | |
| stderr_buf.append(ev.data) | |
| yield {"type": "stderr", "content": ev.data} | |
| elif ev.type == "error": | |
| error_text = ev.data | |
| yield {"type": "exec_error", "content": ev.data, "meta": ev.meta} | |
| elif ev.type == "result": | |
| result_text = ev.data | |
| exit_code = ev.meta.get("exit_code") if ev.meta else None | |
| yield {"type": "exec_result", "content": ev.data, "meta": ev.meta} | |
| except Exception as e: | |
| logger.exception("sandbox error") | |
| yield {"type": "error", "error": f"Sandbox error: {e}"} | |
| if executor: | |
| await executor.close() | |
| return | |
| finally: | |
| if executor: | |
| await executor.close() | |
| yield {"type": "sandbox_closed"} | |
| # --- step 3: ask LLM for final reply with real outputs ------------------ | |
| yield {"type": "phase", "phase": "summarising"} | |
| user_request = next((m["content"] for m in reversed(messages) if m.get("role") == "user"), "") | |
| summary_user = ( | |
| f"USER REQUEST:\n{user_request}\n\n" | |
| f"CODE EXECUTED ({chosen.language}):\n```\n{chosen.code}\n```\n\n" | |
| f"STDOUT:\n{''.join(stdout_buf) or '(empty)'}\n\n" | |
| f"STDERR:\n{''.join(stderr_buf) or '(empty)'}\n\n" | |
| f"RESULT:\n{result_text or '(none)'}\n\n" | |
| f"ERROR:\n{error_text or '(none)'}\n\n" | |
| f"EXIT_CODE: {exit_code}" | |
| ) | |
| reply_messages = [ | |
| {"role": "system", "content": REPLY_SYSTEM}, | |
| {"role": "user", "content": summary_user}, | |
| ] | |
| async for chunk in llm_router.stream_complete(reply_messages, temperature=0.4, max_tokens=600): | |
| if chunk["type"] == "delta": | |
| yield {"type": "assistant_delta", "content": chunk["content"]} | |
| elif chunk["type"] == "done": | |
| yield {"type": "assistant_done", "provider": chunk.get("provider"), "model": chunk.get("model")} | |
| elif chunk["type"] == "error": | |
| yield {"type": "error", "error": chunk["error"]} | |
| # ============================================================================ | |
| # Phase 2: Multi-step planner-driven agent loop | |
| # ---------------------------------------------------------------------------- | |
| # This block is ADDITIVE. The Phase-1 ``stream_chat_only`` and | |
| # ``stream_execute`` functions above remain the canonical entry points for | |
| # the existing /api/chat/stream and /api/execute endpoints — they MUST NOT | |
| # change behaviour. The new ``stream_agent_plan`` is wired to a brand-new | |
| # /api/agent/stream endpoint in app.py. | |
| # ============================================================================ | |
| from . import browser as _browser | |
| from . import planner as _planner | |
| from . import retry as _retry | |
| from . import tasks as _tasks | |
| from .tasks import TaskState | |
| SUMMARY_SYSTEM = """You are summarising a completed autonomous run. You will | |
| see the user's request, the plan, and the real outputs of each executed | |
| step. Write a concise (3-7 sentence) reply that: | |
| - States what was accomplished (or, if it failed, exactly where and why) | |
| - Quotes any concrete values from the real outputs (paths, numbers, URLs) | |
| - Does NOT invent results. Do NOT re-run anything. | |
| - Speaks in the user's language.""" | |
| def _truncate(text: str, n: int = 1200) -> str: | |
| if not text: | |
| return "" | |
| if len(text) <= n: | |
| return text | |
| return text[: n // 2] + "\n…[truncated]…\n" + text[-n // 2 :] | |
| async def _emit(task_id: Optional[str], event: Dict[str, Any]): | |
| """Persist event to SQLite (best-effort) and pass it through.""" | |
| if task_id: | |
| try: | |
| kind = event.get("type", "event") | |
| payload = {k: v for k, v in event.items() if k != "type"} | |
| await _tasks.append_event(task_id, kind, payload, | |
| step_idx=event.get("step_idx")) | |
| except Exception as e: | |
| logger.debug("persist event failed (ignored): %s", e) | |
| return event | |
| async def _run_python_or_shell_step( | |
| executor: E2BExecutor, | |
| step: Dict[str, Any], | |
| code: str, | |
| step_idx: int, | |
| task_id: Optional[str], | |
| ) -> Dict[str, Any]: | |
| """Stream a single python/shell step. Returns a result dict, but events | |
| must be sent by the *caller* via the queue produced by this generator. | |
| Implementation: we wrap a queue-style async generator and let the caller | |
| iterate; this helper just collects buffers for the retry decision. | |
| """ | |
| # The actual streaming is implemented inline in stream_agent_plan; this | |
| # placeholder exists only to clarify intent. | |
| raise NotImplementedError("inline in stream_agent_plan") | |
| async def stream_agent_plan( | |
| messages: List[Dict[str, str]], | |
| *, | |
| sandbox_timeout: int = 600, | |
| max_retries_per_step: int = 2, | |
| enable_browser: bool = True, | |
| task_id: Optional[str] = None, | |
| ) -> AsyncIterator[Dict[str, Any]]: | |
| """Multi-step autonomous agent run. | |
| Lifecycle: queued → planning → executing (with optional retrying) → | |
| completed | failed. SSE events emitted (all carry a ``ts`` field): | |
| task_started { task_id } | |
| state_change { state } | |
| plan { summary, steps[], needs_browser, provider } | |
| step_started { step_idx, title, kind } | |
| code { step_idx, language, code } | |
| stdout / stderr { step_idx, content } | |
| exec_error { step_idx, content, meta } | |
| exec_result { step_idx, content, meta } | |
| browser_* (forwarded from browser.run_actions) | |
| step_retry { step_idx, attempt, reason } | |
| step_completed { step_idx, ok } | |
| assistant_delta { content } ← final summary streaming | |
| assistant_done { provider, model } | |
| task_completed { final_state } | |
| error { error } | |
| """ | |
| user_message = next( | |
| (m["content"] for m in reversed(messages) if m.get("role") == "user"), "" | |
| ) | |
| def stamp(d: Dict[str, Any]) -> Dict[str, Any]: | |
| d.setdefault("ts", time.time()) | |
| return d | |
| # ---------- task row ---------- | |
| if task_id is None: | |
| try: | |
| t = await _tasks.create_task(user_message, | |
| metadata={"source": "agent_stream"}) | |
| task_id = t.id | |
| except Exception as e: | |
| logger.warning("task DB unavailable: %s", e) | |
| task_id = None | |
| yield stamp({"type": "task_started", "task_id": task_id}) | |
| # ---------- planning ---------- | |
| if task_id: | |
| await _tasks.update_state(task_id, TaskState.PLANNING) | |
| yield stamp({"type": "state_change", "state": TaskState.PLANNING}) | |
| try: | |
| plan = await _planner.make_plan(user_message, history=messages[:-1]) | |
| except Exception as e: | |
| logger.exception("planner failure") | |
| if task_id: | |
| await _tasks.update_state(task_id, TaskState.FAILED, error=str(e)) | |
| yield stamp({"type": "error", "error": f"planner: {e}"}) | |
| yield stamp({"type": "task_completed", "final_state": TaskState.FAILED}) | |
| return | |
| if task_id: | |
| try: | |
| await _tasks.set_steps(task_id, plan["steps"]) | |
| except Exception: | |
| pass | |
| yield stamp({"type": "plan", **plan}) | |
| # If the plan is a single "reason" step → respond as plain chat. | |
| if len(plan["steps"]) == 1 and plan["steps"][0]["kind"] == "reason": | |
| if task_id: | |
| await _tasks.update_state(task_id, TaskState.THINKING) | |
| yield stamp({"type": "state_change", "state": TaskState.THINKING}) | |
| full_messages = [{"role": "system", "content": CHAT_SYSTEM}, *messages] | |
| async for chunk in llm_router.stream_complete( | |
| full_messages, temperature=0.4, max_tokens=1024 | |
| ): | |
| if chunk["type"] == "delta": | |
| yield stamp({"type": "assistant_delta", "content": chunk["content"]}) | |
| elif chunk["type"] == "done": | |
| yield stamp({"type": "assistant_done", | |
| "provider": chunk.get("provider"), | |
| "model": chunk.get("model")}) | |
| elif chunk["type"] == "error": | |
| yield stamp({"type": "error", "error": chunk["error"]}) | |
| if task_id: | |
| await _tasks.update_state(task_id, TaskState.COMPLETED) | |
| yield stamp({"type": "task_completed", "final_state": TaskState.COMPLETED}) | |
| return | |
| # ---------- sandbox start ---------- | |
| if task_id: | |
| await _tasks.update_state(task_id, TaskState.EXECUTING) | |
| yield stamp({"type": "state_change", "state": TaskState.EXECUTING}) | |
| executor: Optional[E2BExecutor] = None | |
| try: | |
| executor = E2BExecutor(timeout=sandbox_timeout) | |
| await executor.start() | |
| except Exception as e: | |
| logger.exception("sandbox start failed") | |
| if task_id: | |
| await _tasks.update_state(task_id, TaskState.FAILED, error=str(e)) | |
| yield stamp({"type": "error", "error": f"sandbox: {e}"}) | |
| yield stamp({"type": "task_completed", "final_state": TaskState.FAILED}) | |
| return | |
| yield stamp({"type": "sandbox_started", "sandbox_id": executor.sandbox_id}) | |
| if task_id: | |
| await _tasks.update_state(task_id, TaskState.EXECUTING, | |
| sandbox_id=executor.sandbox_id) | |
| # ---------- bootstrap browser once if needed ---------- | |
| browser_ready = False | |
| if enable_browser and plan.get("needs_browser"): | |
| yield stamp({"type": "state_change", "state": "browser_bootstrapping"}) | |
| try: | |
| async for ev in _browser.ensure_bootstrap(executor): | |
| if ev.type == "stdout": | |
| yield stamp({"type": "browser_log", "content": ev.data}) | |
| elif ev.type == "stderr": | |
| yield stamp({"type": "browser_log", "content": ev.data, | |
| "stream": "stderr"}) | |
| browser_ready = True | |
| except Exception as e: | |
| logger.warning("browser bootstrap failed: %s", e) | |
| yield stamp({"type": "browser_log", | |
| "content": f"bootstrap failed: {e}", "stream": "stderr"}) | |
| # ---------- execute steps ---------- | |
| prior_results: List[Dict[str, Any]] = [] | |
| overall_ok = True | |
| for step_idx, step in enumerate(plan["steps"]): | |
| kind = step["kind"] | |
| if task_id: | |
| await _tasks.update_step(task_id, step_idx, state=TaskState.EXECUTING) | |
| yield stamp({"type": "step_started", "step_idx": step_idx, | |
| "title": step["title"], "kind": kind}) | |
| last_error_blob: Optional[str] = None | |
| step_ok = False | |
| attempt = 0 | |
| while attempt < max_retries_per_step + 1: | |
| attempt += 1 | |
| if task_id: | |
| await _tasks.update_step(task_id, step_idx, attempts_delta=1) | |
| # ---- ask for code (or browser actions) ---- | |
| stdout_buf: List[str] = [] | |
| stderr_buf: List[str] = [] | |
| error_text: Optional[str] = None | |
| result_text: str = "" | |
| exit_code: Optional[int] = None | |
| try: | |
| if kind == "browser": | |
| if not browser_ready: | |
| try: | |
| async for ev in _browser.ensure_bootstrap(executor): | |
| if ev.type in ("stdout", "stderr"): | |
| yield stamp({"type": "browser_log", | |
| "content": ev.data, | |
| "stream": "stderr" if ev.type == "stderr" else "stdout"}) | |
| browser_ready = True | |
| except Exception as e: | |
| raise RuntimeError(f"browser bootstrap failed: {e}") | |
| # Ask LLM for a Playwright action list | |
| actions_msg = await _planner.code_for_step( | |
| plan["summary"], step, prior_results, | |
| feedback=last_error_blob, | |
| ) | |
| actions_raw = actions_msg["content"] | |
| # Extract JSON: prefer fenced block, else first [...] block. | |
| blocks = extract_code_blocks(actions_raw) | |
| json_text = "" | |
| for b in blocks: | |
| if b.language in ("json", ""): | |
| json_text = b.code; break | |
| if not json_text: | |
| m = re.search(r"\[\s*\{.*\}\s*\]", actions_raw, re.DOTALL) | |
| if m: | |
| json_text = m.group(0) | |
| if not json_text: | |
| raise RuntimeError("planner returned no actions JSON") | |
| actions = json.loads(json_text) | |
| yield stamp({"type": "code", "step_idx": step_idx, | |
| "language": "browser", "code": json.dumps(actions, indent=2)}) | |
| async for bev in _browser.run_actions(executor, actions): | |
| bev["step_idx"] = step_idx | |
| if bev["type"] == "browser_step": | |
| stdout_buf.append(json.dumps(bev, ensure_ascii=False)) | |
| if bev["type"] == "browser_error": | |
| error_text = bev.get("error") | |
| stderr_buf.append(error_text or "") | |
| if bev["type"] == "browser_done": | |
| if bev.get("error") and not error_text: | |
| error_text = bev["error"] | |
| yield stamp(bev) | |
| else: | |
| # python / shell | |
| coder_resp = await _planner.code_for_step( | |
| plan["summary"], step, prior_results, | |
| feedback=last_error_blob, | |
| ) | |
| raw = coder_resp["content"] | |
| blocks = extract_code_blocks(raw) | |
| chosen = pick_runnable(blocks) | |
| if chosen is None: | |
| # Degenerate: no code → treat content as the result. | |
| stdout_buf.append(raw) | |
| yield stamp({"type": "stdout", "step_idx": step_idx, | |
| "content": raw[:2000]}) | |
| else: | |
| # Override language with the planner's intent if mismatched | |
| lang = chosen.language | |
| if kind == "shell" and lang in ("python", "py"): | |
| lang = chosen.language # trust the code's actual fence | |
| yield stamp({"type": "code", "step_idx": step_idx, | |
| "language": lang, "code": chosen.code}) | |
| runner = ( | |
| executor.run_python(chosen.code) | |
| if lang in ("python", "py") | |
| else executor.run_shell(chosen.code) | |
| ) | |
| async for ev in runner: | |
| if ev.type == "stdout": | |
| stdout_buf.append(ev.data) | |
| yield stamp({"type": "stdout", | |
| "step_idx": step_idx, | |
| "content": ev.data}) | |
| elif ev.type == "stderr": | |
| stderr_buf.append(ev.data) | |
| yield stamp({"type": "stderr", | |
| "step_idx": step_idx, | |
| "content": ev.data}) | |
| elif ev.type == "error": | |
| error_text = ev.data | |
| yield stamp({"type": "exec_error", | |
| "step_idx": step_idx, | |
| "content": ev.data, | |
| "meta": ev.meta}) | |
| elif ev.type == "result": | |
| result_text = ev.data | |
| exit_code = (ev.meta or {}).get("exit_code") | |
| yield stamp({"type": "exec_result", | |
| "step_idx": step_idx, | |
| "content": ev.data, | |
| "meta": ev.meta}) | |
| except Exception as e: | |
| error_text = str(e) | |
| yield stamp({"type": "exec_error", "step_idx": step_idx, | |
| "content": error_text}) | |
| # ---- decide retry ---- | |
| decision = _retry.decide( | |
| attempt=attempt, | |
| max_attempts=max_retries_per_step + 1, | |
| stdout="".join(stdout_buf), | |
| stderr="".join(stderr_buf), | |
| error=error_text, | |
| exit_code=exit_code, | |
| ) | |
| if not _retry.is_failure("".join(stderr_buf), error_text, exit_code, | |
| "".join(stdout_buf)): | |
| step_ok = True | |
| if task_id: | |
| await _tasks.update_step(task_id, step_idx, | |
| state=TaskState.COMPLETED, | |
| result=_truncate("".join(stdout_buf), 2000)) | |
| yield stamp({"type": "step_completed", | |
| "step_idx": step_idx, "ok": True}) | |
| break | |
| if decision.should_retry: | |
| last_error_blob = decision.feedback | |
| if task_id: | |
| await _tasks.update_state(task_id, TaskState.RETRYING) | |
| await _tasks.update_step(task_id, step_idx, | |
| state=TaskState.RETRYING, | |
| error=_truncate(decision.feedback, 1500)) | |
| yield stamp({"type": "step_retry", | |
| "step_idx": step_idx, | |
| "attempt": attempt, | |
| "reason": decision.reason, | |
| "next_in_s": decision.delay_seconds}) | |
| if decision.delay_seconds > 0: | |
| await asyncio.sleep(decision.delay_seconds) | |
| if task_id: | |
| await _tasks.update_state(task_id, TaskState.EXECUTING) | |
| continue | |
| # No retry → mark failed | |
| if task_id: | |
| await _tasks.update_step(task_id, step_idx, | |
| state=TaskState.FAILED, | |
| error=_truncate(error_text or decision.reason, 1500)) | |
| yield stamp({"type": "step_completed", | |
| "step_idx": step_idx, "ok": False, | |
| "reason": decision.reason}) | |
| overall_ok = False | |
| break | |
| prior_results.append({ | |
| "idx": step_idx, | |
| "title": step["title"], | |
| "kind": kind, | |
| "state": "completed" if step_ok else "failed", | |
| "stdout": "".join(stdout_buf), | |
| "stderr": "".join(stderr_buf), | |
| "error": error_text, | |
| }) | |
| if not step_ok: | |
| break # don't continue plan if a step failed terminally | |
| # ---------- sandbox cleanup ---------- | |
| try: | |
| if executor: | |
| await executor.close() | |
| yield stamp({"type": "sandbox_closed"}) | |
| except Exception as e: | |
| logger.warning("sandbox close error: %s", e) | |
| # ---------- final summary ---------- | |
| yield stamp({"type": "state_change", "state": "summarising"}) | |
| summary_blob_lines = [ | |
| f"USER REQUEST:\n{user_message}\n", | |
| f"PLAN: {plan['summary']}\n", | |
| ] | |
| for r in prior_results: | |
| summary_blob_lines.append( | |
| f"\n[step {r['idx']}] {r['title']} ({r['kind']}, {r['state']})\n" | |
| f" stdout: {_truncate(r['stdout'], 600)}\n" | |
| f" stderr: {_truncate(r['stderr'], 400)}\n" | |
| f" error: {_truncate(r.get('error') or '', 400)}\n" | |
| ) | |
| final_chunks: List[str] = [] | |
| try: | |
| async for chunk in llm_router.stream_complete( | |
| [ | |
| {"role": "system", "content": SUMMARY_SYSTEM}, | |
| {"role": "user", "content": "".join(summary_blob_lines)}, | |
| ], | |
| temperature=0.4, max_tokens=700, | |
| ): | |
| if chunk["type"] == "delta": | |
| final_chunks.append(chunk["content"]) | |
| yield stamp({"type": "assistant_delta", "content": chunk["content"]}) | |
| elif chunk["type"] == "done": | |
| yield stamp({"type": "assistant_done", | |
| "provider": chunk.get("provider"), | |
| "model": chunk.get("model")}) | |
| elif chunk["type"] == "error": | |
| yield stamp({"type": "error", "error": chunk["error"]}) | |
| except Exception as e: | |
| logger.warning("summary stream failed: %s", e) | |
| final_state = TaskState.COMPLETED if overall_ok else TaskState.FAILED | |
| if task_id: | |
| await _tasks.update_state( | |
| task_id, final_state, | |
| final_reply="".join(final_chunks)[:8000], | |
| error=None if overall_ok else "one or more steps failed", | |
| ) | |
| yield stamp({"type": "task_completed", "final_state": final_state, | |
| "task_id": task_id}) | |