File size: 4,464 Bytes
4f96544 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | """Routes for remote execution via the local executive agent."""
import time
from typing import Any
import requests
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from agents.auto_heal_agent import AutoHealAgent
from agents.memory_agent import MemoryAgent
from agents.supervisor_agent import SupervisorAgent
from agents.tool_selector_agent import ToolSelectorAgent
from api.deps import get_executor_headers, get_executor_url, get_logger, is_remote_brain_only, load_config
router = APIRouter()
logger = get_logger("kapo.brain.execute")
class ExecuteRequest(BaseModel):
request_id: str
plan: dict[str, Any] | list[dict[str, Any]]
auth_token: str | None = None
timestamp: float | None = None
executor_url: str | None = None
def _normalize_executor_url(url: str | None) -> str:
if not url:
return ""
value = url.strip()
if not value:
return ""
if "://" not in value:
value = f"http://{value}"
return value.rstrip("/")
def _extract_steps(plan: dict[str, Any] | list[dict[str, Any]]) -> list[dict[str, Any]]:
if isinstance(plan, list):
return [_normalize_step(step, index) for index, step in enumerate(plan, start=1)]
if isinstance(plan, dict):
steps = plan.get("steps")
if isinstance(steps, list):
return [_normalize_step(step, index) for index, step in enumerate(steps, start=1)]
return []
def _normalize_step(step: dict[str, Any], index: int) -> dict[str, Any]:
if not isinstance(step, dict):
return {"id": f"step-{index}", "action": "execute", "input": str(step), "tool_hint": "python"}
normalized = dict(step)
if not str(normalized.get("id", "")).strip():
normalized["id"] = f"step-{index}"
if not str(normalized.get("input", "")).strip():
for key in ("description", "title", "summary", "task", "prompt"):
value = str(normalized.get(key, "")).strip()
if value:
normalized["input"] = value
break
if not str(normalized.get("action", "")).strip():
normalized["action"] = "execute"
if not str(normalized.get("tool_hint", "")).strip():
normalized["tool_hint"] = "python"
return normalized
@router.post("/execute")
async def execute(req: ExecuteRequest):
cfg = load_config()
tool_selector = ToolSelectorAgent()
supervisor = SupervisorAgent()
auto_heal = AutoHealAgent()
memory = MemoryAgent()
base_exec_url = _normalize_executor_url(req.executor_url) or get_executor_url(cfg)
exec_url = f"{base_exec_url}/execute"
exec_headers = get_executor_headers(cfg)
timeout = int(cfg.get("REQUEST_TIMEOUT_SEC", 20) or 20)
retries = int(cfg.get("REQUEST_RETRIES", 2) or 2)
try:
results: list[dict[str, Any]] = []
for step in _extract_steps(req.plan):
tool = tool_selector.select_tool(step)
payload = {
"request_id": req.request_id,
"step_id": step.get("id"),
"command": tool.get("command"),
"files": tool.get("files", {}),
"env": tool.get("env", {}),
"timestamp": time.time(),
}
attempt = 0
last_err = None
while attempt <= retries:
try:
response = requests.post(exec_url, json=payload, headers=exec_headers, timeout=timeout)
if response.status_code == 200:
item = response.json()
item["selected_tool"] = tool.get("tool")
results.append(item)
last_err = None
break
last_err = response.text
except Exception as exc:
last_err = str(exc)
attempt += 1
if last_err:
results.append({"error": last_err, "step": step, "auto_heal": auto_heal.suggest(last_err, step)})
report = supervisor.review(results)
snapshot = {"execution": results, "report": report}
if not is_remote_brain_only():
memory.write_short_term(req.request_id, snapshot)
return {"status": "ok", "results": results, "report": report, "timestamp": time.time()}
except Exception as exc:
logger.exception("Execution failed")
raise HTTPException(status_code=500, detail=str(exc))
|