File size: 8,980 Bytes
d7b2379
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
"""
Multi-step task planner (Phase 2).

Asks the existing ``llm_router`` for a structured JSON plan that decomposes
the user's request into 1..N executable steps.  The planner is *additive*: if
it fails, callers can fall back to the Phase-1 single-shot path.

Returned plan shape::

    {
        "summary": "What we're going to do, in one sentence.",
        "needs_browser": false,
        "steps": [
            {
                "title": "Create proof.txt",
                "description": "Write current UNIX timestamp to /home/user/proof.txt",
                "kind": "python" | "shell" | "browser" | "reason"
            },
            ...
        ]
    }

The router and key rotation are used unchanged, so all Phase-1 provider
failover / cooldown behaviour applies here too.
"""

from __future__ import annotations

import json
import logging
import re
from typing import Any, Dict, List, Optional

from . import llm_router

logger = logging.getLogger(__name__)


PLANNER_SYSTEM = """You are a task planner for an autonomous agent that runs
real code in a Linux sandbox (E2B). Given the user's request, decompose it
into a small ordered list of CONCRETE executable steps.

Return STRICT JSON only. No prose, no markdown fences. Schema:

{
  "summary": "<one sentence describing the overall goal>",
  "needs_browser": true | false,
  "steps": [
    {
      "title": "<short imperative title, <80 chars>",
      "description": "<what this step does and what success looks like>",
      "kind": "python" | "shell" | "browser" | "reason"
    }
  ]
}

Rules:
- 1 to 6 steps. Prefer fewer. Combine trivially related actions into one step.
- A "python" step runs Python code in the sandbox.
- A "shell" step runs bash inside the sandbox.
- A "browser" step uses Playwright (only when navigating real web pages is
  required). Set "needs_browser": true if ANY step uses kind="browser".
- A "reason" step is pure thinking (no execution); use sparingly, only when
  the user explicitly asks for analysis/explanation between executions.
- Each step must be runnable on its own with information available so far.
- Do NOT include the code itself — only describe what should happen.
- If the request is a simple greeting or question (no execution needed), emit
  a single step of kind "reason".
"""


# Regex to peel ```json fences off the model output if it ignores instructions
_FENCE_RE = re.compile(r"^```(?:json)?\s*|\s*```$", re.IGNORECASE | re.MULTILINE)


def _safe_json_load(text: str) -> Optional[Dict[str, Any]]:
    if not text:
        return None
    cleaned = _FENCE_RE.sub("", text.strip()).strip()
    # Some models prepend explanations; extract the first {...} block.
    first = cleaned.find("{")
    last = cleaned.rfind("}")
    if first != -1 and last != -1 and last > first:
        cleaned = cleaned[first:last + 1]
    try:
        return json.loads(cleaned)
    except Exception:
        return None


_VALID_KINDS = {"python", "shell", "browser", "reason"}


def _sanitize_plan(raw: Dict[str, Any], user_message: str) -> Dict[str, Any]:
    steps_in = raw.get("steps") or []
    if not isinstance(steps_in, list) or not steps_in:
        steps_in = [{"title": user_message[:80] or "Handle request",
                     "description": user_message,
                     "kind": "python"}]
    cleaned: List[Dict[str, str]] = []
    for s in steps_in[:6]:
        if not isinstance(s, dict):
            continue
        kind = str(s.get("kind", "python")).lower().strip()
        if kind not in _VALID_KINDS:
            kind = "python"
        title = str(s.get("title", "")).strip()[:120] or f"Step {len(cleaned)+1}"
        description = str(s.get("description", "")).strip()[:1000]
        cleaned.append({"title": title, "description": description, "kind": kind})
    if not cleaned:
        cleaned.append({"title": user_message[:80] or "Handle request",
                        "description": user_message, "kind": "python"})
    needs_browser = bool(raw.get("needs_browser")) or any(c["kind"] == "browser" for c in cleaned)
    summary = str(raw.get("summary", "")).strip()[:400] or user_message[:200]
    return {"summary": summary, "needs_browser": needs_browser, "steps": cleaned}


def _heuristic_plan(user_message: str) -> Dict[str, Any]:
    """Used when the planner LLM fails. Keeps behaviour Phase-1-equivalent."""
    lower = user_message.lower()
    kind = "python"
    if any(tok in lower for tok in (" website", "browse ", "open url", "http://", "https://",
                                    "scrape", "click ", "playwright")):
        kind = "browser"
    elif lower.lstrip().startswith(("ls ", "cd ", "cat ", "mkdir", "rm ", "cp ", "mv ",
                                    "echo ", "grep ", "chmod ")):
        kind = "shell"
    return {
        "summary": user_message[:200],
        "needs_browser": kind == "browser",
        "steps": [{
            "title": user_message[:80] or "Handle request",
            "description": user_message,
            "kind": kind,
        }],
    }


async def make_plan(user_message: str,
                    history: Optional[List[Dict[str, str]]] = None) -> Dict[str, Any]:
    """Produce a sanitised execution plan.

    Falls back to a single-step heuristic plan if the LLM is unavailable, so
    the agent never gets stuck when providers are down.
    """
    history = history or []
    messages: List[Dict[str, str]] = [{"role": "system", "content": PLANNER_SYSTEM}]
    # Keep the last few turns for context (token-friendly)
    for h in history[-6:]:
        role = h.get("role", "user")
        if role not in ("user", "assistant", "system"):
            role = "user"
        messages.append({"role": role, "content": str(h.get("content", ""))[:2000]})
    messages.append({"role": "user", "content": user_message[:4000]})

    try:
        resp = await llm_router.complete(messages, temperature=0.1, max_tokens=900)
    except Exception as e:
        logger.warning("planner LLM failed: %s — using heuristic", e)
        plan = _heuristic_plan(user_message)
        plan["provider"] = "heuristic"
        return plan

    raw = _safe_json_load(resp.get("content", ""))
    if not raw:
        logger.info("planner could not parse JSON — using heuristic")
        plan = _heuristic_plan(user_message)
        plan["provider"] = resp.get("provider", "heuristic")
        return plan

    plan = _sanitize_plan(raw, user_message)
    plan["provider"] = resp.get("provider")
    plan["model"] = resp.get("model")
    return plan


# ---------------------------------------------------------------------------
# Code generation per step (still goes through the existing llm_router)
# ---------------------------------------------------------------------------

STEP_CODER_SYSTEM = """You generate ONE fenced code block to satisfy ONE step
of a larger plan. The code will be executed in a real Linux sandbox (E2B).

You may use previously stored variables/files from prior steps in the same
sandbox session — assume cwd is /home/user.

Strict rules:
- Output ONLY a single fenced code block, no prose.
- Use ```python``` for kind=python, ```bash``` for kind=shell.
- Print a clear success/failure marker on the last line.
- Total code under 200 lines.
"""


async def code_for_step(plan_summary: str,
                        step: Dict[str, str],
                        prior_results: List[Dict[str, Any]],
                        feedback: Optional[str] = None) -> Dict[str, Any]:
    """Ask the router for the code that implements ``step``.

    ``feedback`` (when set) is the traceback / error from a previous failing
    attempt — the planner uses it to fix the code.  This powers the
    self-repair retry loop in retry.py without it needing to know about
    llm_router internals.
    """
    history_blob = ""
    for prior in prior_results[-4:]:
        history_blob += (
            f"\n--- prior step #{prior.get('idx')} ({prior.get('kind')}): "
            f"{prior.get('title')}\n"
            f"   state: {prior.get('state')}\n"
            f"   stdout_tail: {(prior.get('stdout') or '')[-400:]}\n"
            f"   stderr_tail: {(prior.get('stderr') or '')[-400:]}\n"
        )
    user_prompt = (
        f"PLAN SUMMARY: {plan_summary}\n\n"
        f"PRIOR STEP RESULTS:{history_blob or ' (none)'}\n\n"
        f"CURRENT STEP:\n"
        f"  title: {step.get('title')}\n"
        f"  kind:  {step.get('kind')}\n"
        f"  description: {step.get('description')}\n"
    )
    if feedback:
        user_prompt += (
            "\nPREVIOUS ATTEMPT FAILED. Use this traceback / error to FIX the "
            "code and try again:\n"
            f"```\n{feedback[-2000:]}\n```\n"
            "Return the corrected code only."
        )
    messages = [
        {"role": "system", "content": STEP_CODER_SYSTEM},
        {"role": "user", "content": user_prompt},
    ]
    resp = await llm_router.complete(messages, temperature=0.2, max_tokens=1500)
    return resp