openhands-backend / agent.py
Phase2 Deploy
feat(phase-2): multi-step agent, self-repair, persistent tasks, browser
d7b2379
"""
Agent loop: orchestrates LLM <-> E2B sandbox for execution tasks.
Phase 1 design — kept deliberately simple and robust:
1. Ask LLM to produce a SINGLE python code block (and optional shell block)
to satisfy the user's request, given recent context.
2. Extract the code block(s).
3. Run them in a fresh E2B sandbox, streaming stdout/stderr to the caller.
4. Show the LLM the real output and ask for a final natural-language reply.
5. Stream that reply.
6. Close the sandbox.
Anything more elaborate (multi-step planner, tool-calling, retry-on-error) is
intentionally OUT of Phase 1.
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
import time
from dataclasses import dataclass
from typing import Any, AsyncIterator, Dict, List, Optional
from . import llm_router
from .executor import E2BExecutor, ExecEvent
logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------
# Prompts
# ----------------------------------------------------------------------------
CODER_SYSTEM = """You are a code executor agent running inside a real Linux
sandbox (E2B). The user will ask you to do something that requires running
real code. Reply with ONE single fenced code block — Python preferred — that,
when executed, accomplishes the task.
Strict rules:
- Output ONLY the code block. No prose before or after.
- Prefer Python. Use ```python fences.
- If the task is shell-only (mkdir, ls, install a package), you may use one
```bash block instead.
- Print clear progress messages so the user can see what happened.
- Always print a final confirmation line.
- Keep total output under ~200 lines.
"""
REPLY_SYSTEM = """You are a helpful assistant. The user asked for a task that
required running real code. Below is the user's request, the code that ran,
and the REAL execution output. Write a short, friendly natural-language reply
summarising what was done and quoting any important values from the real
output. Do NOT fabricate. Do NOT re-run anything. Keep it concise (3-6
sentences)."""
CHAT_SYSTEM = """You are a concise, helpful assistant. Reply in the same
language as the user when natural. Keep answers focused."""
# ----------------------------------------------------------------------------
# Code extraction
# ----------------------------------------------------------------------------
_FENCE_RE = re.compile(
r"```([a-zA-Z0-9_+\-]*)\s*\n(.*?)```", re.DOTALL
)
@dataclass
class CodeBlock:
language: str
code: str
def extract_code_blocks(text: str) -> List[CodeBlock]:
blocks: List[CodeBlock] = []
for m in _FENCE_RE.finditer(text or ""):
lang = (m.group(1) or "").lower().strip()
code = m.group(2).rstrip()
blocks.append(CodeBlock(language=lang or "python", code=code))
return blocks
def pick_runnable(blocks: List[CodeBlock]) -> Optional[CodeBlock]:
# Prefer python; else bash/sh; else first
for b in blocks:
if b.language in ("python", "py"):
return b
for b in blocks:
if b.language in ("bash", "sh", "shell"):
return b
return blocks[0] if blocks else None
# ----------------------------------------------------------------------------
# Streaming agent
# ----------------------------------------------------------------------------
async def stream_chat_only(
messages: List[Dict[str, str]],
) -> AsyncIterator[Dict]:
"""Plain chat: no sandbox."""
full_messages = [{"role": "system", "content": CHAT_SYSTEM}, *messages]
yield {"type": "phase", "phase": "chat"}
async for chunk in llm_router.stream_complete(full_messages, temperature=0.4, max_tokens=1024):
if chunk["type"] == "delta":
yield {"type": "assistant_delta", "content": chunk["content"]}
elif chunk["type"] == "done":
yield {"type": "assistant_done", "provider": chunk.get("provider"), "model": chunk.get("model")}
elif chunk["type"] == "error":
yield {"type": "error", "error": chunk["error"]}
async def stream_execute(
messages: List[Dict[str, str]],
*,
sandbox_timeout: int = 300,
) -> AsyncIterator[Dict]:
"""Execution task: spin up E2B, run code, reply with real results."""
# --- step 1: ask LLM for code -------------------------------------------
yield {"type": "phase", "phase": "planning"}
code_messages = [{"role": "system", "content": CODER_SYSTEM}, *messages]
try:
coder_resp = await llm_router.complete(code_messages, temperature=0.2, max_tokens=1500)
except Exception as e:
yield {"type": "error", "error": f"LLM failed: {e}"}
return
raw = coder_resp["content"]
yield {"type": "plan", "content": raw, "provider": coder_resp.get("provider")}
blocks = extract_code_blocks(raw)
chosen = pick_runnable(blocks)
if chosen is None:
# No code block → degrade to chat reply
yield {"type": "assistant_delta", "content": raw}
yield {"type": "assistant_done"}
return
yield {"type": "code", "language": chosen.language, "code": chosen.code}
# --- step 2: launch sandbox & run ---------------------------------------
yield {"type": "phase", "phase": "sandbox_starting"}
executor: Optional[E2BExecutor] = None
stdout_buf: List[str] = []
stderr_buf: List[str] = []
error_text: Optional[str] = None
result_text: str = ""
exit_code: Optional[int] = None
try:
executor = E2BExecutor(timeout=sandbox_timeout)
await executor.start()
yield {"type": "sandbox_started", "sandbox_id": executor.sandbox_id}
runner = (
executor.run_python(chosen.code)
if chosen.language in ("python", "py")
else executor.run_shell(chosen.code)
)
yield {"type": "phase", "phase": "executing"}
async for ev in runner:
if ev.type == "stdout":
stdout_buf.append(ev.data)
yield {"type": "stdout", "content": ev.data}
elif ev.type == "stderr":
stderr_buf.append(ev.data)
yield {"type": "stderr", "content": ev.data}
elif ev.type == "error":
error_text = ev.data
yield {"type": "exec_error", "content": ev.data, "meta": ev.meta}
elif ev.type == "result":
result_text = ev.data
exit_code = ev.meta.get("exit_code") if ev.meta else None
yield {"type": "exec_result", "content": ev.data, "meta": ev.meta}
except Exception as e:
logger.exception("sandbox error")
yield {"type": "error", "error": f"Sandbox error: {e}"}
if executor:
await executor.close()
return
finally:
if executor:
await executor.close()
yield {"type": "sandbox_closed"}
# --- step 3: ask LLM for final reply with real outputs ------------------
yield {"type": "phase", "phase": "summarising"}
user_request = next((m["content"] for m in reversed(messages) if m.get("role") == "user"), "")
summary_user = (
f"USER REQUEST:\n{user_request}\n\n"
f"CODE EXECUTED ({chosen.language}):\n```\n{chosen.code}\n```\n\n"
f"STDOUT:\n{''.join(stdout_buf) or '(empty)'}\n\n"
f"STDERR:\n{''.join(stderr_buf) or '(empty)'}\n\n"
f"RESULT:\n{result_text or '(none)'}\n\n"
f"ERROR:\n{error_text or '(none)'}\n\n"
f"EXIT_CODE: {exit_code}"
)
reply_messages = [
{"role": "system", "content": REPLY_SYSTEM},
{"role": "user", "content": summary_user},
]
async for chunk in llm_router.stream_complete(reply_messages, temperature=0.4, max_tokens=600):
if chunk["type"] == "delta":
yield {"type": "assistant_delta", "content": chunk["content"]}
elif chunk["type"] == "done":
yield {"type": "assistant_done", "provider": chunk.get("provider"), "model": chunk.get("model")}
elif chunk["type"] == "error":
yield {"type": "error", "error": chunk["error"]}
# ============================================================================
# Phase 2: Multi-step planner-driven agent loop
# ----------------------------------------------------------------------------
# This block is ADDITIVE. The Phase-1 ``stream_chat_only`` and
# ``stream_execute`` functions above remain the canonical entry points for
# the existing /api/chat/stream and /api/execute endpoints — they MUST NOT
# change behaviour. The new ``stream_agent_plan`` is wired to a brand-new
# /api/agent/stream endpoint in app.py.
# ============================================================================
from . import browser as _browser
from . import planner as _planner
from . import retry as _retry
from . import tasks as _tasks
from .tasks import TaskState
SUMMARY_SYSTEM = """You are summarising a completed autonomous run. You will
see the user's request, the plan, and the real outputs of each executed
step. Write a concise (3-7 sentence) reply that:
- States what was accomplished (or, if it failed, exactly where and why)
- Quotes any concrete values from the real outputs (paths, numbers, URLs)
- Does NOT invent results. Do NOT re-run anything.
- Speaks in the user's language."""
def _truncate(text: str, n: int = 1200) -> str:
if not text:
return ""
if len(text) <= n:
return text
return text[: n // 2] + "\n…[truncated]…\n" + text[-n // 2 :]
async def _emit(task_id: Optional[str], event: Dict[str, Any]):
"""Persist event to SQLite (best-effort) and pass it through."""
if task_id:
try:
kind = event.get("type", "event")
payload = {k: v for k, v in event.items() if k != "type"}
await _tasks.append_event(task_id, kind, payload,
step_idx=event.get("step_idx"))
except Exception as e:
logger.debug("persist event failed (ignored): %s", e)
return event
async def _run_python_or_shell_step(
executor: E2BExecutor,
step: Dict[str, Any],
code: str,
step_idx: int,
task_id: Optional[str],
) -> Dict[str, Any]:
"""Stream a single python/shell step. Returns a result dict, but events
must be sent by the *caller* via the queue produced by this generator.
Implementation: we wrap a queue-style async generator and let the caller
iterate; this helper just collects buffers for the retry decision.
"""
# The actual streaming is implemented inline in stream_agent_plan; this
# placeholder exists only to clarify intent.
raise NotImplementedError("inline in stream_agent_plan")
async def stream_agent_plan(
messages: List[Dict[str, str]],
*,
sandbox_timeout: int = 600,
max_retries_per_step: int = 2,
enable_browser: bool = True,
task_id: Optional[str] = None,
) -> AsyncIterator[Dict[str, Any]]:
"""Multi-step autonomous agent run.
Lifecycle: queued → planning → executing (with optional retrying) →
completed | failed. SSE events emitted (all carry a ``ts`` field):
task_started { task_id }
state_change { state }
plan { summary, steps[], needs_browser, provider }
step_started { step_idx, title, kind }
code { step_idx, language, code }
stdout / stderr { step_idx, content }
exec_error { step_idx, content, meta }
exec_result { step_idx, content, meta }
browser_* (forwarded from browser.run_actions)
step_retry { step_idx, attempt, reason }
step_completed { step_idx, ok }
assistant_delta { content } ← final summary streaming
assistant_done { provider, model }
task_completed { final_state }
error { error }
"""
user_message = next(
(m["content"] for m in reversed(messages) if m.get("role") == "user"), ""
)
def stamp(d: Dict[str, Any]) -> Dict[str, Any]:
d.setdefault("ts", time.time())
return d
# ---------- task row ----------
if task_id is None:
try:
t = await _tasks.create_task(user_message,
metadata={"source": "agent_stream"})
task_id = t.id
except Exception as e:
logger.warning("task DB unavailable: %s", e)
task_id = None
yield stamp({"type": "task_started", "task_id": task_id})
# ---------- planning ----------
if task_id:
await _tasks.update_state(task_id, TaskState.PLANNING)
yield stamp({"type": "state_change", "state": TaskState.PLANNING})
try:
plan = await _planner.make_plan(user_message, history=messages[:-1])
except Exception as e:
logger.exception("planner failure")
if task_id:
await _tasks.update_state(task_id, TaskState.FAILED, error=str(e))
yield stamp({"type": "error", "error": f"planner: {e}"})
yield stamp({"type": "task_completed", "final_state": TaskState.FAILED})
return
if task_id:
try:
await _tasks.set_steps(task_id, plan["steps"])
except Exception:
pass
yield stamp({"type": "plan", **plan})
# If the plan is a single "reason" step → respond as plain chat.
if len(plan["steps"]) == 1 and plan["steps"][0]["kind"] == "reason":
if task_id:
await _tasks.update_state(task_id, TaskState.THINKING)
yield stamp({"type": "state_change", "state": TaskState.THINKING})
full_messages = [{"role": "system", "content": CHAT_SYSTEM}, *messages]
async for chunk in llm_router.stream_complete(
full_messages, temperature=0.4, max_tokens=1024
):
if chunk["type"] == "delta":
yield stamp({"type": "assistant_delta", "content": chunk["content"]})
elif chunk["type"] == "done":
yield stamp({"type": "assistant_done",
"provider": chunk.get("provider"),
"model": chunk.get("model")})
elif chunk["type"] == "error":
yield stamp({"type": "error", "error": chunk["error"]})
if task_id:
await _tasks.update_state(task_id, TaskState.COMPLETED)
yield stamp({"type": "task_completed", "final_state": TaskState.COMPLETED})
return
# ---------- sandbox start ----------
if task_id:
await _tasks.update_state(task_id, TaskState.EXECUTING)
yield stamp({"type": "state_change", "state": TaskState.EXECUTING})
executor: Optional[E2BExecutor] = None
try:
executor = E2BExecutor(timeout=sandbox_timeout)
await executor.start()
except Exception as e:
logger.exception("sandbox start failed")
if task_id:
await _tasks.update_state(task_id, TaskState.FAILED, error=str(e))
yield stamp({"type": "error", "error": f"sandbox: {e}"})
yield stamp({"type": "task_completed", "final_state": TaskState.FAILED})
return
yield stamp({"type": "sandbox_started", "sandbox_id": executor.sandbox_id})
if task_id:
await _tasks.update_state(task_id, TaskState.EXECUTING,
sandbox_id=executor.sandbox_id)
# ---------- bootstrap browser once if needed ----------
browser_ready = False
if enable_browser and plan.get("needs_browser"):
yield stamp({"type": "state_change", "state": "browser_bootstrapping"})
try:
async for ev in _browser.ensure_bootstrap(executor):
if ev.type == "stdout":
yield stamp({"type": "browser_log", "content": ev.data})
elif ev.type == "stderr":
yield stamp({"type": "browser_log", "content": ev.data,
"stream": "stderr"})
browser_ready = True
except Exception as e:
logger.warning("browser bootstrap failed: %s", e)
yield stamp({"type": "browser_log",
"content": f"bootstrap failed: {e}", "stream": "stderr"})
# ---------- execute steps ----------
prior_results: List[Dict[str, Any]] = []
overall_ok = True
for step_idx, step in enumerate(plan["steps"]):
kind = step["kind"]
if task_id:
await _tasks.update_step(task_id, step_idx, state=TaskState.EXECUTING)
yield stamp({"type": "step_started", "step_idx": step_idx,
"title": step["title"], "kind": kind})
last_error_blob: Optional[str] = None
step_ok = False
attempt = 0
while attempt < max_retries_per_step + 1:
attempt += 1
if task_id:
await _tasks.update_step(task_id, step_idx, attempts_delta=1)
# ---- ask for code (or browser actions) ----
stdout_buf: List[str] = []
stderr_buf: List[str] = []
error_text: Optional[str] = None
result_text: str = ""
exit_code: Optional[int] = None
try:
if kind == "browser":
if not browser_ready:
try:
async for ev in _browser.ensure_bootstrap(executor):
if ev.type in ("stdout", "stderr"):
yield stamp({"type": "browser_log",
"content": ev.data,
"stream": "stderr" if ev.type == "stderr" else "stdout"})
browser_ready = True
except Exception as e:
raise RuntimeError(f"browser bootstrap failed: {e}")
# Ask LLM for a Playwright action list
actions_msg = await _planner.code_for_step(
plan["summary"], step, prior_results,
feedback=last_error_blob,
)
actions_raw = actions_msg["content"]
# Extract JSON: prefer fenced block, else first [...] block.
blocks = extract_code_blocks(actions_raw)
json_text = ""
for b in blocks:
if b.language in ("json", ""):
json_text = b.code; break
if not json_text:
m = re.search(r"\[\s*\{.*\}\s*\]", actions_raw, re.DOTALL)
if m:
json_text = m.group(0)
if not json_text:
raise RuntimeError("planner returned no actions JSON")
actions = json.loads(json_text)
yield stamp({"type": "code", "step_idx": step_idx,
"language": "browser", "code": json.dumps(actions, indent=2)})
async for bev in _browser.run_actions(executor, actions):
bev["step_idx"] = step_idx
if bev["type"] == "browser_step":
stdout_buf.append(json.dumps(bev, ensure_ascii=False))
if bev["type"] == "browser_error":
error_text = bev.get("error")
stderr_buf.append(error_text or "")
if bev["type"] == "browser_done":
if bev.get("error") and not error_text:
error_text = bev["error"]
yield stamp(bev)
else:
# python / shell
coder_resp = await _planner.code_for_step(
plan["summary"], step, prior_results,
feedback=last_error_blob,
)
raw = coder_resp["content"]
blocks = extract_code_blocks(raw)
chosen = pick_runnable(blocks)
if chosen is None:
# Degenerate: no code → treat content as the result.
stdout_buf.append(raw)
yield stamp({"type": "stdout", "step_idx": step_idx,
"content": raw[:2000]})
else:
# Override language with the planner's intent if mismatched
lang = chosen.language
if kind == "shell" and lang in ("python", "py"):
lang = chosen.language # trust the code's actual fence
yield stamp({"type": "code", "step_idx": step_idx,
"language": lang, "code": chosen.code})
runner = (
executor.run_python(chosen.code)
if lang in ("python", "py")
else executor.run_shell(chosen.code)
)
async for ev in runner:
if ev.type == "stdout":
stdout_buf.append(ev.data)
yield stamp({"type": "stdout",
"step_idx": step_idx,
"content": ev.data})
elif ev.type == "stderr":
stderr_buf.append(ev.data)
yield stamp({"type": "stderr",
"step_idx": step_idx,
"content": ev.data})
elif ev.type == "error":
error_text = ev.data
yield stamp({"type": "exec_error",
"step_idx": step_idx,
"content": ev.data,
"meta": ev.meta})
elif ev.type == "result":
result_text = ev.data
exit_code = (ev.meta or {}).get("exit_code")
yield stamp({"type": "exec_result",
"step_idx": step_idx,
"content": ev.data,
"meta": ev.meta})
except Exception as e:
error_text = str(e)
yield stamp({"type": "exec_error", "step_idx": step_idx,
"content": error_text})
# ---- decide retry ----
decision = _retry.decide(
attempt=attempt,
max_attempts=max_retries_per_step + 1,
stdout="".join(stdout_buf),
stderr="".join(stderr_buf),
error=error_text,
exit_code=exit_code,
)
if not _retry.is_failure("".join(stderr_buf), error_text, exit_code,
"".join(stdout_buf)):
step_ok = True
if task_id:
await _tasks.update_step(task_id, step_idx,
state=TaskState.COMPLETED,
result=_truncate("".join(stdout_buf), 2000))
yield stamp({"type": "step_completed",
"step_idx": step_idx, "ok": True})
break
if decision.should_retry:
last_error_blob = decision.feedback
if task_id:
await _tasks.update_state(task_id, TaskState.RETRYING)
await _tasks.update_step(task_id, step_idx,
state=TaskState.RETRYING,
error=_truncate(decision.feedback, 1500))
yield stamp({"type": "step_retry",
"step_idx": step_idx,
"attempt": attempt,
"reason": decision.reason,
"next_in_s": decision.delay_seconds})
if decision.delay_seconds > 0:
await asyncio.sleep(decision.delay_seconds)
if task_id:
await _tasks.update_state(task_id, TaskState.EXECUTING)
continue
# No retry → mark failed
if task_id:
await _tasks.update_step(task_id, step_idx,
state=TaskState.FAILED,
error=_truncate(error_text or decision.reason, 1500))
yield stamp({"type": "step_completed",
"step_idx": step_idx, "ok": False,
"reason": decision.reason})
overall_ok = False
break
prior_results.append({
"idx": step_idx,
"title": step["title"],
"kind": kind,
"state": "completed" if step_ok else "failed",
"stdout": "".join(stdout_buf),
"stderr": "".join(stderr_buf),
"error": error_text,
})
if not step_ok:
break # don't continue plan if a step failed terminally
# ---------- sandbox cleanup ----------
try:
if executor:
await executor.close()
yield stamp({"type": "sandbox_closed"})
except Exception as e:
logger.warning("sandbox close error: %s", e)
# ---------- final summary ----------
yield stamp({"type": "state_change", "state": "summarising"})
summary_blob_lines = [
f"USER REQUEST:\n{user_message}\n",
f"PLAN: {plan['summary']}\n",
]
for r in prior_results:
summary_blob_lines.append(
f"\n[step {r['idx']}] {r['title']} ({r['kind']}, {r['state']})\n"
f" stdout: {_truncate(r['stdout'], 600)}\n"
f" stderr: {_truncate(r['stderr'], 400)}\n"
f" error: {_truncate(r.get('error') or '', 400)}\n"
)
final_chunks: List[str] = []
try:
async for chunk in llm_router.stream_complete(
[
{"role": "system", "content": SUMMARY_SYSTEM},
{"role": "user", "content": "".join(summary_blob_lines)},
],
temperature=0.4, max_tokens=700,
):
if chunk["type"] == "delta":
final_chunks.append(chunk["content"])
yield stamp({"type": "assistant_delta", "content": chunk["content"]})
elif chunk["type"] == "done":
yield stamp({"type": "assistant_done",
"provider": chunk.get("provider"),
"model": chunk.get("model")})
elif chunk["type"] == "error":
yield stamp({"type": "error", "error": chunk["error"]})
except Exception as e:
logger.warning("summary stream failed: %s", e)
final_state = TaskState.COMPLETED if overall_ok else TaskState.FAILED
if task_id:
await _tasks.update_state(
task_id, final_state,
final_reply="".join(final_chunks)[:8000],
error=None if overall_ok else "one or more steps failed",
)
yield stamp({"type": "task_completed", "final_state": final_state,
"task_id": task_id})