Spaces:
Sleeping
Sleeping
File size: 8,980 Bytes
d7b2379 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 | """
Multi-step task planner (Phase 2).
Asks the existing ``llm_router`` for a structured JSON plan that decomposes
the user's request into 1..N executable steps. The planner is *additive*: if
it fails, callers can fall back to the Phase-1 single-shot path.
Returned plan shape::
{
"summary": "What we're going to do, in one sentence.",
"needs_browser": false,
"steps": [
{
"title": "Create proof.txt",
"description": "Write current UNIX timestamp to /home/user/proof.txt",
"kind": "python" | "shell" | "browser" | "reason"
},
...
]
}
The router and key rotation are used unchanged, so all Phase-1 provider
failover / cooldown behaviour applies here too.
"""
from __future__ import annotations
import json
import logging
import re
from typing import Any, Dict, List, Optional
from . import llm_router
logger = logging.getLogger(__name__)
PLANNER_SYSTEM = """You are a task planner for an autonomous agent that runs
real code in a Linux sandbox (E2B). Given the user's request, decompose it
into a small ordered list of CONCRETE executable steps.
Return STRICT JSON only. No prose, no markdown fences. Schema:
{
"summary": "<one sentence describing the overall goal>",
"needs_browser": true | false,
"steps": [
{
"title": "<short imperative title, <80 chars>",
"description": "<what this step does and what success looks like>",
"kind": "python" | "shell" | "browser" | "reason"
}
]
}
Rules:
- 1 to 6 steps. Prefer fewer. Combine trivially related actions into one step.
- A "python" step runs Python code in the sandbox.
- A "shell" step runs bash inside the sandbox.
- A "browser" step uses Playwright (only when navigating real web pages is
required). Set "needs_browser": true if ANY step uses kind="browser".
- A "reason" step is pure thinking (no execution); use sparingly, only when
the user explicitly asks for analysis/explanation between executions.
- Each step must be runnable on its own with information available so far.
- Do NOT include the code itself — only describe what should happen.
- If the request is a simple greeting or question (no execution needed), emit
a single step of kind "reason".
"""
# Regex to peel ```json fences off the model output if it ignores instructions
_FENCE_RE = re.compile(r"^```(?:json)?\s*|\s*```$", re.IGNORECASE | re.MULTILINE)
def _safe_json_load(text: str) -> Optional[Dict[str, Any]]:
if not text:
return None
cleaned = _FENCE_RE.sub("", text.strip()).strip()
# Some models prepend explanations; extract the first {...} block.
first = cleaned.find("{")
last = cleaned.rfind("}")
if first != -1 and last != -1 and last > first:
cleaned = cleaned[first:last + 1]
try:
return json.loads(cleaned)
except Exception:
return None
_VALID_KINDS = {"python", "shell", "browser", "reason"}
def _sanitize_plan(raw: Dict[str, Any], user_message: str) -> Dict[str, Any]:
steps_in = raw.get("steps") or []
if not isinstance(steps_in, list) or not steps_in:
steps_in = [{"title": user_message[:80] or "Handle request",
"description": user_message,
"kind": "python"}]
cleaned: List[Dict[str, str]] = []
for s in steps_in[:6]:
if not isinstance(s, dict):
continue
kind = str(s.get("kind", "python")).lower().strip()
if kind not in _VALID_KINDS:
kind = "python"
title = str(s.get("title", "")).strip()[:120] or f"Step {len(cleaned)+1}"
description = str(s.get("description", "")).strip()[:1000]
cleaned.append({"title": title, "description": description, "kind": kind})
if not cleaned:
cleaned.append({"title": user_message[:80] or "Handle request",
"description": user_message, "kind": "python"})
needs_browser = bool(raw.get("needs_browser")) or any(c["kind"] == "browser" for c in cleaned)
summary = str(raw.get("summary", "")).strip()[:400] or user_message[:200]
return {"summary": summary, "needs_browser": needs_browser, "steps": cleaned}
def _heuristic_plan(user_message: str) -> Dict[str, Any]:
"""Used when the planner LLM fails. Keeps behaviour Phase-1-equivalent."""
lower = user_message.lower()
kind = "python"
if any(tok in lower for tok in (" website", "browse ", "open url", "http://", "https://",
"scrape", "click ", "playwright")):
kind = "browser"
elif lower.lstrip().startswith(("ls ", "cd ", "cat ", "mkdir", "rm ", "cp ", "mv ",
"echo ", "grep ", "chmod ")):
kind = "shell"
return {
"summary": user_message[:200],
"needs_browser": kind == "browser",
"steps": [{
"title": user_message[:80] or "Handle request",
"description": user_message,
"kind": kind,
}],
}
async def make_plan(user_message: str,
history: Optional[List[Dict[str, str]]] = None) -> Dict[str, Any]:
"""Produce a sanitised execution plan.
Falls back to a single-step heuristic plan if the LLM is unavailable, so
the agent never gets stuck when providers are down.
"""
history = history or []
messages: List[Dict[str, str]] = [{"role": "system", "content": PLANNER_SYSTEM}]
# Keep the last few turns for context (token-friendly)
for h in history[-6:]:
role = h.get("role", "user")
if role not in ("user", "assistant", "system"):
role = "user"
messages.append({"role": role, "content": str(h.get("content", ""))[:2000]})
messages.append({"role": "user", "content": user_message[:4000]})
try:
resp = await llm_router.complete(messages, temperature=0.1, max_tokens=900)
except Exception as e:
logger.warning("planner LLM failed: %s — using heuristic", e)
plan = _heuristic_plan(user_message)
plan["provider"] = "heuristic"
return plan
raw = _safe_json_load(resp.get("content", ""))
if not raw:
logger.info("planner could not parse JSON — using heuristic")
plan = _heuristic_plan(user_message)
plan["provider"] = resp.get("provider", "heuristic")
return plan
plan = _sanitize_plan(raw, user_message)
plan["provider"] = resp.get("provider")
plan["model"] = resp.get("model")
return plan
# ---------------------------------------------------------------------------
# Code generation per step (still goes through the existing llm_router)
# ---------------------------------------------------------------------------
STEP_CODER_SYSTEM = """You generate ONE fenced code block to satisfy ONE step
of a larger plan. The code will be executed in a real Linux sandbox (E2B).
You may use previously stored variables/files from prior steps in the same
sandbox session — assume cwd is /home/user.
Strict rules:
- Output ONLY a single fenced code block, no prose.
- Use ```python``` for kind=python, ```bash``` for kind=shell.
- Print a clear success/failure marker on the last line.
- Total code under 200 lines.
"""
async def code_for_step(plan_summary: str,
step: Dict[str, str],
prior_results: List[Dict[str, Any]],
feedback: Optional[str] = None) -> Dict[str, Any]:
"""Ask the router for the code that implements ``step``.
``feedback`` (when set) is the traceback / error from a previous failing
attempt — the planner uses it to fix the code. This powers the
self-repair retry loop in retry.py without it needing to know about
llm_router internals.
"""
history_blob = ""
for prior in prior_results[-4:]:
history_blob += (
f"\n--- prior step #{prior.get('idx')} ({prior.get('kind')}): "
f"{prior.get('title')}\n"
f" state: {prior.get('state')}\n"
f" stdout_tail: {(prior.get('stdout') or '')[-400:]}\n"
f" stderr_tail: {(prior.get('stderr') or '')[-400:]}\n"
)
user_prompt = (
f"PLAN SUMMARY: {plan_summary}\n\n"
f"PRIOR STEP RESULTS:{history_blob or ' (none)'}\n\n"
f"CURRENT STEP:\n"
f" title: {step.get('title')}\n"
f" kind: {step.get('kind')}\n"
f" description: {step.get('description')}\n"
)
if feedback:
user_prompt += (
"\nPREVIOUS ATTEMPT FAILED. Use this traceback / error to FIX the "
"code and try again:\n"
f"```\n{feedback[-2000:]}\n```\n"
"Return the corrected code only."
)
messages = [
{"role": "system", "content": STEP_CODER_SYSTEM},
{"role": "user", "content": user_prompt},
]
resp = await llm_router.complete(messages, temperature=0.2, max_tokens=1500)
return resp
|