File size: 4,464 Bytes
4f96544
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""Routes for remote execution via the local executive agent."""
import time
from typing import Any

import requests
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel

from agents.auto_heal_agent import AutoHealAgent
from agents.memory_agent import MemoryAgent
from agents.supervisor_agent import SupervisorAgent
from agents.tool_selector_agent import ToolSelectorAgent
from api.deps import get_executor_headers, get_executor_url, get_logger, is_remote_brain_only, load_config

router = APIRouter()
logger = get_logger("kapo.brain.execute")


class ExecuteRequest(BaseModel):
    request_id: str
    plan: dict[str, Any] | list[dict[str, Any]]
    auth_token: str | None = None
    timestamp: float | None = None
    executor_url: str | None = None


def _normalize_executor_url(url: str | None) -> str:
    if not url:
        return ""
    value = url.strip()
    if not value:
        return ""
    if "://" not in value:
        value = f"http://{value}"
    return value.rstrip("/")


def _extract_steps(plan: dict[str, Any] | list[dict[str, Any]]) -> list[dict[str, Any]]:
    if isinstance(plan, list):
        return [_normalize_step(step, index) for index, step in enumerate(plan, start=1)]
    if isinstance(plan, dict):
        steps = plan.get("steps")
        if isinstance(steps, list):
            return [_normalize_step(step, index) for index, step in enumerate(steps, start=1)]
    return []


def _normalize_step(step: dict[str, Any], index: int) -> dict[str, Any]:
    if not isinstance(step, dict):
        return {"id": f"step-{index}", "action": "execute", "input": str(step), "tool_hint": "python"}
    normalized = dict(step)
    if not str(normalized.get("id", "")).strip():
        normalized["id"] = f"step-{index}"
    if not str(normalized.get("input", "")).strip():
        for key in ("description", "title", "summary", "task", "prompt"):
            value = str(normalized.get(key, "")).strip()
            if value:
                normalized["input"] = value
                break
    if not str(normalized.get("action", "")).strip():
        normalized["action"] = "execute"
    if not str(normalized.get("tool_hint", "")).strip():
        normalized["tool_hint"] = "python"
    return normalized


@router.post("/execute")
async def execute(req: ExecuteRequest):
    cfg = load_config()
    tool_selector = ToolSelectorAgent()
    supervisor = SupervisorAgent()
    auto_heal = AutoHealAgent()
    memory = MemoryAgent()

    base_exec_url = _normalize_executor_url(req.executor_url) or get_executor_url(cfg)
    exec_url = f"{base_exec_url}/execute"
    exec_headers = get_executor_headers(cfg)
    timeout = int(cfg.get("REQUEST_TIMEOUT_SEC", 20) or 20)
    retries = int(cfg.get("REQUEST_RETRIES", 2) or 2)

    try:
        results: list[dict[str, Any]] = []
        for step in _extract_steps(req.plan):
            tool = tool_selector.select_tool(step)
            payload = {
                "request_id": req.request_id,
                "step_id": step.get("id"),
                "command": tool.get("command"),
                "files": tool.get("files", {}),
                "env": tool.get("env", {}),
                "timestamp": time.time(),
            }
            attempt = 0
            last_err = None
            while attempt <= retries:
                try:
                    response = requests.post(exec_url, json=payload, headers=exec_headers, timeout=timeout)
                    if response.status_code == 200:
                        item = response.json()
                        item["selected_tool"] = tool.get("tool")
                        results.append(item)
                        last_err = None
                        break
                    last_err = response.text
                except Exception as exc:
                    last_err = str(exc)
                attempt += 1

            if last_err:
                results.append({"error": last_err, "step": step, "auto_heal": auto_heal.suggest(last_err, step)})

        report = supervisor.review(results)
        snapshot = {"execution": results, "report": report}
        if not is_remote_brain_only():
            memory.write_short_term(req.request_id, snapshot)

        return {"status": "ok", "results": results, "report": report, "timestamp": time.time()}
    except Exception as exc:
        logger.exception("Execution failed")
        raise HTTPException(status_code=500, detail=str(exc))