""" Agent loop: orchestrates LLM <-> E2B sandbox for execution tasks. Phase 1 design — kept deliberately simple and robust: 1. Ask LLM to produce a SINGLE python code block (and optional shell block) to satisfy the user's request, given recent context. 2. Extract the code block(s). 3. Run them in a fresh E2B sandbox, streaming stdout/stderr to the caller. 4. Show the LLM the real output and ask for a final natural-language reply. 5. Stream that reply. 6. Close the sandbox. Anything more elaborate (multi-step planner, tool-calling, retry-on-error) is intentionally OUT of Phase 1. """ from __future__ import annotations import asyncio import json import logging import re import time from dataclasses import dataclass from typing import Any, AsyncIterator, Dict, List, Optional from . import llm_router from .executor import E2BExecutor, ExecEvent logger = logging.getLogger(__name__) # ---------------------------------------------------------------------------- # Prompts # ---------------------------------------------------------------------------- CODER_SYSTEM = """You are a code executor agent running inside a real Linux sandbox (E2B). The user will ask you to do something that requires running real code. Reply with ONE single fenced code block — Python preferred — that, when executed, accomplishes the task. Strict rules: - Output ONLY the code block. No prose before or after. - Prefer Python. Use ```python fences. - If the task is shell-only (mkdir, ls, install a package), you may use one ```bash block instead. - Print clear progress messages so the user can see what happened. - Always print a final confirmation line. - Keep total output under ~200 lines. """ REPLY_SYSTEM = """You are a helpful assistant. The user asked for a task that required running real code. Below is the user's request, the code that ran, and the REAL execution output. Write a short, friendly natural-language reply summarising what was done and quoting any important values from the real output. Do NOT fabricate. Do NOT re-run anything. Keep it concise (3-6 sentences).""" CHAT_SYSTEM = """You are a concise, helpful assistant. Reply in the same language as the user when natural. Keep answers focused.""" # ---------------------------------------------------------------------------- # Code extraction # ---------------------------------------------------------------------------- _FENCE_RE = re.compile( r"```([a-zA-Z0-9_+\-]*)\s*\n(.*?)```", re.DOTALL ) @dataclass class CodeBlock: language: str code: str def extract_code_blocks(text: str) -> List[CodeBlock]: blocks: List[CodeBlock] = [] for m in _FENCE_RE.finditer(text or ""): lang = (m.group(1) or "").lower().strip() code = m.group(2).rstrip() blocks.append(CodeBlock(language=lang or "python", code=code)) return blocks def pick_runnable(blocks: List[CodeBlock]) -> Optional[CodeBlock]: # Prefer python; else bash/sh; else first for b in blocks: if b.language in ("python", "py"): return b for b in blocks: if b.language in ("bash", "sh", "shell"): return b return blocks[0] if blocks else None # ---------------------------------------------------------------------------- # Streaming agent # ---------------------------------------------------------------------------- async def stream_chat_only( messages: List[Dict[str, str]], ) -> AsyncIterator[Dict]: """Plain chat: no sandbox.""" full_messages = [{"role": "system", "content": CHAT_SYSTEM}, *messages] yield {"type": "phase", "phase": "chat"} async for chunk in llm_router.stream_complete(full_messages, temperature=0.4, max_tokens=1024): if chunk["type"] == "delta": yield {"type": "assistant_delta", "content": chunk["content"]} elif chunk["type"] == "done": yield {"type": "assistant_done", "provider": chunk.get("provider"), "model": chunk.get("model")} elif chunk["type"] == "error": yield {"type": "error", "error": chunk["error"]} async def stream_execute( messages: List[Dict[str, str]], *, sandbox_timeout: int = 300, ) -> AsyncIterator[Dict]: """Execution task: spin up E2B, run code, reply with real results.""" # --- step 1: ask LLM for code ------------------------------------------- yield {"type": "phase", "phase": "planning"} code_messages = [{"role": "system", "content": CODER_SYSTEM}, *messages] try: coder_resp = await llm_router.complete(code_messages, temperature=0.2, max_tokens=1500) except Exception as e: yield {"type": "error", "error": f"LLM failed: {e}"} return raw = coder_resp["content"] yield {"type": "plan", "content": raw, "provider": coder_resp.get("provider")} blocks = extract_code_blocks(raw) chosen = pick_runnable(blocks) if chosen is None: # No code block → degrade to chat reply yield {"type": "assistant_delta", "content": raw} yield {"type": "assistant_done"} return yield {"type": "code", "language": chosen.language, "code": chosen.code} # --- step 2: launch sandbox & run --------------------------------------- yield {"type": "phase", "phase": "sandbox_starting"} executor: Optional[E2BExecutor] = None stdout_buf: List[str] = [] stderr_buf: List[str] = [] error_text: Optional[str] = None result_text: str = "" exit_code: Optional[int] = None try: executor = E2BExecutor(timeout=sandbox_timeout) await executor.start() yield {"type": "sandbox_started", "sandbox_id": executor.sandbox_id} runner = ( executor.run_python(chosen.code) if chosen.language in ("python", "py") else executor.run_shell(chosen.code) ) yield {"type": "phase", "phase": "executing"} async for ev in runner: if ev.type == "stdout": stdout_buf.append(ev.data) yield {"type": "stdout", "content": ev.data} elif ev.type == "stderr": stderr_buf.append(ev.data) yield {"type": "stderr", "content": ev.data} elif ev.type == "error": error_text = ev.data yield {"type": "exec_error", "content": ev.data, "meta": ev.meta} elif ev.type == "result": result_text = ev.data exit_code = ev.meta.get("exit_code") if ev.meta else None yield {"type": "exec_result", "content": ev.data, "meta": ev.meta} except Exception as e: logger.exception("sandbox error") yield {"type": "error", "error": f"Sandbox error: {e}"} if executor: await executor.close() return finally: if executor: await executor.close() yield {"type": "sandbox_closed"} # --- step 3: ask LLM for final reply with real outputs ------------------ yield {"type": "phase", "phase": "summarising"} user_request = next((m["content"] for m in reversed(messages) if m.get("role") == "user"), "") summary_user = ( f"USER REQUEST:\n{user_request}\n\n" f"CODE EXECUTED ({chosen.language}):\n```\n{chosen.code}\n```\n\n" f"STDOUT:\n{''.join(stdout_buf) or '(empty)'}\n\n" f"STDERR:\n{''.join(stderr_buf) or '(empty)'}\n\n" f"RESULT:\n{result_text or '(none)'}\n\n" f"ERROR:\n{error_text or '(none)'}\n\n" f"EXIT_CODE: {exit_code}" ) reply_messages = [ {"role": "system", "content": REPLY_SYSTEM}, {"role": "user", "content": summary_user}, ] async for chunk in llm_router.stream_complete(reply_messages, temperature=0.4, max_tokens=600): if chunk["type"] == "delta": yield {"type": "assistant_delta", "content": chunk["content"]} elif chunk["type"] == "done": yield {"type": "assistant_done", "provider": chunk.get("provider"), "model": chunk.get("model")} elif chunk["type"] == "error": yield {"type": "error", "error": chunk["error"]} # ============================================================================ # Phase 2: Multi-step planner-driven agent loop # ---------------------------------------------------------------------------- # This block is ADDITIVE. The Phase-1 ``stream_chat_only`` and # ``stream_execute`` functions above remain the canonical entry points for # the existing /api/chat/stream and /api/execute endpoints — they MUST NOT # change behaviour. The new ``stream_agent_plan`` is wired to a brand-new # /api/agent/stream endpoint in app.py. # ============================================================================ from . import browser as _browser from . import planner as _planner from . import retry as _retry from . import tasks as _tasks from .tasks import TaskState SUMMARY_SYSTEM = """You are summarising a completed autonomous run. You will see the user's request, the plan, and the real outputs of each executed step. Write a concise (3-7 sentence) reply that: - States what was accomplished (or, if it failed, exactly where and why) - Quotes any concrete values from the real outputs (paths, numbers, URLs) - Does NOT invent results. Do NOT re-run anything. - Speaks in the user's language.""" def _truncate(text: str, n: int = 1200) -> str: if not text: return "" if len(text) <= n: return text return text[: n // 2] + "\n…[truncated]…\n" + text[-n // 2 :] async def _emit(task_id: Optional[str], event: Dict[str, Any]): """Persist event to SQLite (best-effort) and pass it through.""" if task_id: try: kind = event.get("type", "event") payload = {k: v for k, v in event.items() if k != "type"} await _tasks.append_event(task_id, kind, payload, step_idx=event.get("step_idx")) except Exception as e: logger.debug("persist event failed (ignored): %s", e) return event async def _run_python_or_shell_step( executor: E2BExecutor, step: Dict[str, Any], code: str, step_idx: int, task_id: Optional[str], ) -> Dict[str, Any]: """Stream a single python/shell step. Returns a result dict, but events must be sent by the *caller* via the queue produced by this generator. Implementation: we wrap a queue-style async generator and let the caller iterate; this helper just collects buffers for the retry decision. """ # The actual streaming is implemented inline in stream_agent_plan; this # placeholder exists only to clarify intent. raise NotImplementedError("inline in stream_agent_plan") async def stream_agent_plan( messages: List[Dict[str, str]], *, sandbox_timeout: int = 600, max_retries_per_step: int = 2, enable_browser: bool = True, task_id: Optional[str] = None, ) -> AsyncIterator[Dict[str, Any]]: """Multi-step autonomous agent run. Lifecycle: queued → planning → executing (with optional retrying) → completed | failed. SSE events emitted (all carry a ``ts`` field): task_started { task_id } state_change { state } plan { summary, steps[], needs_browser, provider } step_started { step_idx, title, kind } code { step_idx, language, code } stdout / stderr { step_idx, content } exec_error { step_idx, content, meta } exec_result { step_idx, content, meta } browser_* (forwarded from browser.run_actions) step_retry { step_idx, attempt, reason } step_completed { step_idx, ok } assistant_delta { content } ← final summary streaming assistant_done { provider, model } task_completed { final_state } error { error } """ user_message = next( (m["content"] for m in reversed(messages) if m.get("role") == "user"), "" ) def stamp(d: Dict[str, Any]) -> Dict[str, Any]: d.setdefault("ts", time.time()) return d # ---------- task row ---------- if task_id is None: try: t = await _tasks.create_task(user_message, metadata={"source": "agent_stream"}) task_id = t.id except Exception as e: logger.warning("task DB unavailable: %s", e) task_id = None yield stamp({"type": "task_started", "task_id": task_id}) # ---------- planning ---------- if task_id: await _tasks.update_state(task_id, TaskState.PLANNING) yield stamp({"type": "state_change", "state": TaskState.PLANNING}) try: plan = await _planner.make_plan(user_message, history=messages[:-1]) except Exception as e: logger.exception("planner failure") if task_id: await _tasks.update_state(task_id, TaskState.FAILED, error=str(e)) yield stamp({"type": "error", "error": f"planner: {e}"}) yield stamp({"type": "task_completed", "final_state": TaskState.FAILED}) return if task_id: try: await _tasks.set_steps(task_id, plan["steps"]) except Exception: pass yield stamp({"type": "plan", **plan}) # If the plan is a single "reason" step → respond as plain chat. if len(plan["steps"]) == 1 and plan["steps"][0]["kind"] == "reason": if task_id: await _tasks.update_state(task_id, TaskState.THINKING) yield stamp({"type": "state_change", "state": TaskState.THINKING}) full_messages = [{"role": "system", "content": CHAT_SYSTEM}, *messages] async for chunk in llm_router.stream_complete( full_messages, temperature=0.4, max_tokens=1024 ): if chunk["type"] == "delta": yield stamp({"type": "assistant_delta", "content": chunk["content"]}) elif chunk["type"] == "done": yield stamp({"type": "assistant_done", "provider": chunk.get("provider"), "model": chunk.get("model")}) elif chunk["type"] == "error": yield stamp({"type": "error", "error": chunk["error"]}) if task_id: await _tasks.update_state(task_id, TaskState.COMPLETED) yield stamp({"type": "task_completed", "final_state": TaskState.COMPLETED}) return # ---------- sandbox start ---------- if task_id: await _tasks.update_state(task_id, TaskState.EXECUTING) yield stamp({"type": "state_change", "state": TaskState.EXECUTING}) executor: Optional[E2BExecutor] = None try: executor = E2BExecutor(timeout=sandbox_timeout) await executor.start() except Exception as e: logger.exception("sandbox start failed") if task_id: await _tasks.update_state(task_id, TaskState.FAILED, error=str(e)) yield stamp({"type": "error", "error": f"sandbox: {e}"}) yield stamp({"type": "task_completed", "final_state": TaskState.FAILED}) return yield stamp({"type": "sandbox_started", "sandbox_id": executor.sandbox_id}) if task_id: await _tasks.update_state(task_id, TaskState.EXECUTING, sandbox_id=executor.sandbox_id) # ---------- bootstrap browser once if needed ---------- browser_ready = False if enable_browser and plan.get("needs_browser"): yield stamp({"type": "state_change", "state": "browser_bootstrapping"}) try: async for ev in _browser.ensure_bootstrap(executor): if ev.type == "stdout": yield stamp({"type": "browser_log", "content": ev.data}) elif ev.type == "stderr": yield stamp({"type": "browser_log", "content": ev.data, "stream": "stderr"}) browser_ready = True except Exception as e: logger.warning("browser bootstrap failed: %s", e) yield stamp({"type": "browser_log", "content": f"bootstrap failed: {e}", "stream": "stderr"}) # ---------- execute steps ---------- prior_results: List[Dict[str, Any]] = [] overall_ok = True for step_idx, step in enumerate(plan["steps"]): kind = step["kind"] if task_id: await _tasks.update_step(task_id, step_idx, state=TaskState.EXECUTING) yield stamp({"type": "step_started", "step_idx": step_idx, "title": step["title"], "kind": kind}) last_error_blob: Optional[str] = None step_ok = False attempt = 0 while attempt < max_retries_per_step + 1: attempt += 1 if task_id: await _tasks.update_step(task_id, step_idx, attempts_delta=1) # ---- ask for code (or browser actions) ---- stdout_buf: List[str] = [] stderr_buf: List[str] = [] error_text: Optional[str] = None result_text: str = "" exit_code: Optional[int] = None try: if kind == "browser": if not browser_ready: try: async for ev in _browser.ensure_bootstrap(executor): if ev.type in ("stdout", "stderr"): yield stamp({"type": "browser_log", "content": ev.data, "stream": "stderr" if ev.type == "stderr" else "stdout"}) browser_ready = True except Exception as e: raise RuntimeError(f"browser bootstrap failed: {e}") # Ask LLM for a Playwright action list actions_msg = await _planner.code_for_step( plan["summary"], step, prior_results, feedback=last_error_blob, ) actions_raw = actions_msg["content"] # Extract JSON: prefer fenced block, else first [...] block. blocks = extract_code_blocks(actions_raw) json_text = "" for b in blocks: if b.language in ("json", ""): json_text = b.code; break if not json_text: m = re.search(r"\[\s*\{.*\}\s*\]", actions_raw, re.DOTALL) if m: json_text = m.group(0) if not json_text: raise RuntimeError("planner returned no actions JSON") actions = json.loads(json_text) yield stamp({"type": "code", "step_idx": step_idx, "language": "browser", "code": json.dumps(actions, indent=2)}) async for bev in _browser.run_actions(executor, actions): bev["step_idx"] = step_idx if bev["type"] == "browser_step": stdout_buf.append(json.dumps(bev, ensure_ascii=False)) if bev["type"] == "browser_error": error_text = bev.get("error") stderr_buf.append(error_text or "") if bev["type"] == "browser_done": if bev.get("error") and not error_text: error_text = bev["error"] yield stamp(bev) else: # python / shell coder_resp = await _planner.code_for_step( plan["summary"], step, prior_results, feedback=last_error_blob, ) raw = coder_resp["content"] blocks = extract_code_blocks(raw) chosen = pick_runnable(blocks) if chosen is None: # Degenerate: no code → treat content as the result. stdout_buf.append(raw) yield stamp({"type": "stdout", "step_idx": step_idx, "content": raw[:2000]}) else: # Override language with the planner's intent if mismatched lang = chosen.language if kind == "shell" and lang in ("python", "py"): lang = chosen.language # trust the code's actual fence yield stamp({"type": "code", "step_idx": step_idx, "language": lang, "code": chosen.code}) runner = ( executor.run_python(chosen.code) if lang in ("python", "py") else executor.run_shell(chosen.code) ) async for ev in runner: if ev.type == "stdout": stdout_buf.append(ev.data) yield stamp({"type": "stdout", "step_idx": step_idx, "content": ev.data}) elif ev.type == "stderr": stderr_buf.append(ev.data) yield stamp({"type": "stderr", "step_idx": step_idx, "content": ev.data}) elif ev.type == "error": error_text = ev.data yield stamp({"type": "exec_error", "step_idx": step_idx, "content": ev.data, "meta": ev.meta}) elif ev.type == "result": result_text = ev.data exit_code = (ev.meta or {}).get("exit_code") yield stamp({"type": "exec_result", "step_idx": step_idx, "content": ev.data, "meta": ev.meta}) except Exception as e: error_text = str(e) yield stamp({"type": "exec_error", "step_idx": step_idx, "content": error_text}) # ---- decide retry ---- decision = _retry.decide( attempt=attempt, max_attempts=max_retries_per_step + 1, stdout="".join(stdout_buf), stderr="".join(stderr_buf), error=error_text, exit_code=exit_code, ) if not _retry.is_failure("".join(stderr_buf), error_text, exit_code, "".join(stdout_buf)): step_ok = True if task_id: await _tasks.update_step(task_id, step_idx, state=TaskState.COMPLETED, result=_truncate("".join(stdout_buf), 2000)) yield stamp({"type": "step_completed", "step_idx": step_idx, "ok": True}) break if decision.should_retry: last_error_blob = decision.feedback if task_id: await _tasks.update_state(task_id, TaskState.RETRYING) await _tasks.update_step(task_id, step_idx, state=TaskState.RETRYING, error=_truncate(decision.feedback, 1500)) yield stamp({"type": "step_retry", "step_idx": step_idx, "attempt": attempt, "reason": decision.reason, "next_in_s": decision.delay_seconds}) if decision.delay_seconds > 0: await asyncio.sleep(decision.delay_seconds) if task_id: await _tasks.update_state(task_id, TaskState.EXECUTING) continue # No retry → mark failed if task_id: await _tasks.update_step(task_id, step_idx, state=TaskState.FAILED, error=_truncate(error_text or decision.reason, 1500)) yield stamp({"type": "step_completed", "step_idx": step_idx, "ok": False, "reason": decision.reason}) overall_ok = False break prior_results.append({ "idx": step_idx, "title": step["title"], "kind": kind, "state": "completed" if step_ok else "failed", "stdout": "".join(stdout_buf), "stderr": "".join(stderr_buf), "error": error_text, }) if not step_ok: break # don't continue plan if a step failed terminally # ---------- sandbox cleanup ---------- try: if executor: await executor.close() yield stamp({"type": "sandbox_closed"}) except Exception as e: logger.warning("sandbox close error: %s", e) # ---------- final summary ---------- yield stamp({"type": "state_change", "state": "summarising"}) summary_blob_lines = [ f"USER REQUEST:\n{user_message}\n", f"PLAN: {plan['summary']}\n", ] for r in prior_results: summary_blob_lines.append( f"\n[step {r['idx']}] {r['title']} ({r['kind']}, {r['state']})\n" f" stdout: {_truncate(r['stdout'], 600)}\n" f" stderr: {_truncate(r['stderr'], 400)}\n" f" error: {_truncate(r.get('error') or '', 400)}\n" ) final_chunks: List[str] = [] try: async for chunk in llm_router.stream_complete( [ {"role": "system", "content": SUMMARY_SYSTEM}, {"role": "user", "content": "".join(summary_blob_lines)}, ], temperature=0.4, max_tokens=700, ): if chunk["type"] == "delta": final_chunks.append(chunk["content"]) yield stamp({"type": "assistant_delta", "content": chunk["content"]}) elif chunk["type"] == "done": yield stamp({"type": "assistant_done", "provider": chunk.get("provider"), "model": chunk.get("model")}) elif chunk["type"] == "error": yield stamp({"type": "error", "error": chunk["error"]}) except Exception as e: logger.warning("summary stream failed: %s", e) final_state = TaskState.COMPLETED if overall_ok else TaskState.FAILED if task_id: await _tasks.update_state( task_id, final_state, final_reply="".join(final_chunks)[:8000], error=None if overall_ok else "one or more steps failed", ) yield stamp({"type": "task_completed", "final_state": final_state, "task_id": task_id})