Spaces:
Sleeping
Sleeping
| """ | |
| Multi-step task planner (Phase 2). | |
| Asks the existing ``llm_router`` for a structured JSON plan that decomposes | |
| the user's request into 1..N executable steps. The planner is *additive*: if | |
| it fails, callers can fall back to the Phase-1 single-shot path. | |
| Returned plan shape:: | |
| { | |
| "summary": "What we're going to do, in one sentence.", | |
| "needs_browser": false, | |
| "steps": [ | |
| { | |
| "title": "Create proof.txt", | |
| "description": "Write current UNIX timestamp to /home/user/proof.txt", | |
| "kind": "python" | "shell" | "browser" | "reason" | |
| }, | |
| ... | |
| ] | |
| } | |
| The router and key rotation are used unchanged, so all Phase-1 provider | |
| failover / cooldown behaviour applies here too. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import re | |
| from typing import Any, Dict, List, Optional | |
| from . import llm_router | |
| logger = logging.getLogger(__name__) | |
| PLANNER_SYSTEM = """You are a task planner for an autonomous agent that runs | |
| real code in a Linux sandbox (E2B). Given the user's request, decompose it | |
| into a small ordered list of CONCRETE executable steps. | |
| Return STRICT JSON only. No prose, no markdown fences. Schema: | |
| { | |
| "summary": "<one sentence describing the overall goal>", | |
| "needs_browser": true | false, | |
| "steps": [ | |
| { | |
| "title": "<short imperative title, <80 chars>", | |
| "description": "<what this step does and what success looks like>", | |
| "kind": "python" | "shell" | "browser" | "reason" | |
| } | |
| ] | |
| } | |
| Rules: | |
| - 1 to 6 steps. Prefer fewer. Combine trivially related actions into one step. | |
| - A "python" step runs Python code in the sandbox. | |
| - A "shell" step runs bash inside the sandbox. | |
| - A "browser" step uses Playwright (only when navigating real web pages is | |
| required). Set "needs_browser": true if ANY step uses kind="browser". | |
| - A "reason" step is pure thinking (no execution); use sparingly, only when | |
| the user explicitly asks for analysis/explanation between executions. | |
| - Each step must be runnable on its own with information available so far. | |
| - Do NOT include the code itself — only describe what should happen. | |
| - If the request is a simple greeting or question (no execution needed), emit | |
| a single step of kind "reason". | |
| """ | |
| # Regex to peel ```json fences off the model output if it ignores instructions | |
| _FENCE_RE = re.compile(r"^```(?:json)?\s*|\s*```$", re.IGNORECASE | re.MULTILINE) | |
| def _safe_json_load(text: str) -> Optional[Dict[str, Any]]: | |
| if not text: | |
| return None | |
| cleaned = _FENCE_RE.sub("", text.strip()).strip() | |
| # Some models prepend explanations; extract the first {...} block. | |
| first = cleaned.find("{") | |
| last = cleaned.rfind("}") | |
| if first != -1 and last != -1 and last > first: | |
| cleaned = cleaned[first:last + 1] | |
| try: | |
| return json.loads(cleaned) | |
| except Exception: | |
| return None | |
| _VALID_KINDS = {"python", "shell", "browser", "reason"} | |
| def _sanitize_plan(raw: Dict[str, Any], user_message: str) -> Dict[str, Any]: | |
| steps_in = raw.get("steps") or [] | |
| if not isinstance(steps_in, list) or not steps_in: | |
| steps_in = [{"title": user_message[:80] or "Handle request", | |
| "description": user_message, | |
| "kind": "python"}] | |
| cleaned: List[Dict[str, str]] = [] | |
| for s in steps_in[:6]: | |
| if not isinstance(s, dict): | |
| continue | |
| kind = str(s.get("kind", "python")).lower().strip() | |
| if kind not in _VALID_KINDS: | |
| kind = "python" | |
| title = str(s.get("title", "")).strip()[:120] or f"Step {len(cleaned)+1}" | |
| description = str(s.get("description", "")).strip()[:1000] | |
| cleaned.append({"title": title, "description": description, "kind": kind}) | |
| if not cleaned: | |
| cleaned.append({"title": user_message[:80] or "Handle request", | |
| "description": user_message, "kind": "python"}) | |
| needs_browser = bool(raw.get("needs_browser")) or any(c["kind"] == "browser" for c in cleaned) | |
| summary = str(raw.get("summary", "")).strip()[:400] or user_message[:200] | |
| return {"summary": summary, "needs_browser": needs_browser, "steps": cleaned} | |
| def _heuristic_plan(user_message: str) -> Dict[str, Any]: | |
| """Used when the planner LLM fails. Keeps behaviour Phase-1-equivalent.""" | |
| lower = user_message.lower() | |
| kind = "python" | |
| if any(tok in lower for tok in (" website", "browse ", "open url", "http://", "https://", | |
| "scrape", "click ", "playwright")): | |
| kind = "browser" | |
| elif lower.lstrip().startswith(("ls ", "cd ", "cat ", "mkdir", "rm ", "cp ", "mv ", | |
| "echo ", "grep ", "chmod ")): | |
| kind = "shell" | |
| return { | |
| "summary": user_message[:200], | |
| "needs_browser": kind == "browser", | |
| "steps": [{ | |
| "title": user_message[:80] or "Handle request", | |
| "description": user_message, | |
| "kind": kind, | |
| }], | |
| } | |
| async def make_plan(user_message: str, | |
| history: Optional[List[Dict[str, str]]] = None) -> Dict[str, Any]: | |
| """Produce a sanitised execution plan. | |
| Falls back to a single-step heuristic plan if the LLM is unavailable, so | |
| the agent never gets stuck when providers are down. | |
| """ | |
| history = history or [] | |
| messages: List[Dict[str, str]] = [{"role": "system", "content": PLANNER_SYSTEM}] | |
| # Keep the last few turns for context (token-friendly) | |
| for h in history[-6:]: | |
| role = h.get("role", "user") | |
| if role not in ("user", "assistant", "system"): | |
| role = "user" | |
| messages.append({"role": role, "content": str(h.get("content", ""))[:2000]}) | |
| messages.append({"role": "user", "content": user_message[:4000]}) | |
| try: | |
| resp = await llm_router.complete(messages, temperature=0.1, max_tokens=900) | |
| except Exception as e: | |
| logger.warning("planner LLM failed: %s — using heuristic", e) | |
| plan = _heuristic_plan(user_message) | |
| plan["provider"] = "heuristic" | |
| return plan | |
| raw = _safe_json_load(resp.get("content", "")) | |
| if not raw: | |
| logger.info("planner could not parse JSON — using heuristic") | |
| plan = _heuristic_plan(user_message) | |
| plan["provider"] = resp.get("provider", "heuristic") | |
| return plan | |
| plan = _sanitize_plan(raw, user_message) | |
| plan["provider"] = resp.get("provider") | |
| plan["model"] = resp.get("model") | |
| return plan | |
| # --------------------------------------------------------------------------- | |
| # Code generation per step (still goes through the existing llm_router) | |
| # --------------------------------------------------------------------------- | |
| STEP_CODER_SYSTEM = """You generate ONE fenced code block to satisfy ONE step | |
| of a larger plan. The code will be executed in a real Linux sandbox (E2B). | |
| You may use previously stored variables/files from prior steps in the same | |
| sandbox session — assume cwd is /home/user. | |
| Strict rules: | |
| - Output ONLY a single fenced code block, no prose. | |
| - Use ```python``` for kind=python, ```bash``` for kind=shell. | |
| - Print a clear success/failure marker on the last line. | |
| - Total code under 200 lines. | |
| """ | |
| async def code_for_step(plan_summary: str, | |
| step: Dict[str, str], | |
| prior_results: List[Dict[str, Any]], | |
| feedback: Optional[str] = None) -> Dict[str, Any]: | |
| """Ask the router for the code that implements ``step``. | |
| ``feedback`` (when set) is the traceback / error from a previous failing | |
| attempt — the planner uses it to fix the code. This powers the | |
| self-repair retry loop in retry.py without it needing to know about | |
| llm_router internals. | |
| """ | |
| history_blob = "" | |
| for prior in prior_results[-4:]: | |
| history_blob += ( | |
| f"\n--- prior step #{prior.get('idx')} ({prior.get('kind')}): " | |
| f"{prior.get('title')}\n" | |
| f" state: {prior.get('state')}\n" | |
| f" stdout_tail: {(prior.get('stdout') or '')[-400:]}\n" | |
| f" stderr_tail: {(prior.get('stderr') or '')[-400:]}\n" | |
| ) | |
| user_prompt = ( | |
| f"PLAN SUMMARY: {plan_summary}\n\n" | |
| f"PRIOR STEP RESULTS:{history_blob or ' (none)'}\n\n" | |
| f"CURRENT STEP:\n" | |
| f" title: {step.get('title')}\n" | |
| f" kind: {step.get('kind')}\n" | |
| f" description: {step.get('description')}\n" | |
| ) | |
| if feedback: | |
| user_prompt += ( | |
| "\nPREVIOUS ATTEMPT FAILED. Use this traceback / error to FIX the " | |
| "code and try again:\n" | |
| f"```\n{feedback[-2000:]}\n```\n" | |
| "Return the corrected code only." | |
| ) | |
| messages = [ | |
| {"role": "system", "content": STEP_CODER_SYSTEM}, | |
| {"role": "user", "content": user_prompt}, | |
| ] | |
| resp = await llm_router.complete(messages, temperature=0.2, max_tokens=1500) | |
| return resp | |