AiDebuggerClean / brain_server /api /routes_execute.py
MrA7A3's picture
Initial modernized KAPO runtime upload
564b5ea verified
"""Routes for remote execution via the local executive agent."""
import time
from typing import Any
import requests
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from agents.auto_heal_agent import AutoHealAgent
from agents.memory_agent import MemoryAgent
from agents.supervisor_agent import SupervisorAgent
from agents.tool_selector_agent import ToolSelectorAgent
from api.deps import get_executor_headers, get_executor_url, get_logger, is_remote_brain_only, load_config
router = APIRouter()
logger = get_logger("kapo.brain.execute")
class ExecuteRequest(BaseModel):
request_id: str
plan: dict[str, Any] | list[dict[str, Any]]
auth_token: str | None = None
timestamp: float | None = None
executor_url: str | None = None
def _normalize_executor_url(url: str | None) -> str:
if not url:
return ""
value = url.strip()
if not value:
return ""
if "://" not in value:
value = f"http://{value}"
return value.rstrip("/")
def _extract_steps(plan: dict[str, Any] | list[dict[str, Any]]) -> list[dict[str, Any]]:
if isinstance(plan, list):
return [_normalize_step(step, index) for index, step in enumerate(plan, start=1)]
if isinstance(plan, dict):
steps = plan.get("steps")
if isinstance(steps, list):
return [_normalize_step(step, index) for index, step in enumerate(steps, start=1)]
return []
def _normalize_step(step: dict[str, Any], index: int) -> dict[str, Any]:
if not isinstance(step, dict):
return {"id": f"step-{index}", "action": "execute", "input": str(step), "tool_hint": "python"}
normalized = dict(step)
if not str(normalized.get("id", "")).strip():
normalized["id"] = f"step-{index}"
if not str(normalized.get("input", "")).strip():
for key in ("description", "title", "summary", "task", "prompt"):
value = str(normalized.get(key, "")).strip()
if value:
normalized["input"] = value
break
if not str(normalized.get("action", "")).strip():
normalized["action"] = "execute"
if not str(normalized.get("tool_hint", "")).strip():
normalized["tool_hint"] = "python"
return normalized
@router.post("/execute")
async def execute(req: ExecuteRequest):
cfg = load_config()
tool_selector = ToolSelectorAgent()
supervisor = SupervisorAgent()
auto_heal = AutoHealAgent()
memory = MemoryAgent()
base_exec_url = _normalize_executor_url(req.executor_url) or get_executor_url(cfg)
exec_url = f"{base_exec_url}/execute"
exec_headers = get_executor_headers(cfg)
timeout = int(cfg.get("REQUEST_TIMEOUT_SEC", 20) or 20)
retries = int(cfg.get("REQUEST_RETRIES", 2) or 2)
try:
results: list[dict[str, Any]] = []
for step in _extract_steps(req.plan):
tool = tool_selector.select_tool(step)
payload = {
"request_id": req.request_id,
"step_id": step.get("id"),
"command": tool.get("command"),
"files": tool.get("files", {}),
"env": tool.get("env", {}),
"timestamp": time.time(),
}
attempt = 0
last_err = None
while attempt <= retries:
try:
response = requests.post(exec_url, json=payload, headers=exec_headers, timeout=timeout)
if response.status_code == 200:
item = response.json()
item["selected_tool"] = tool.get("tool")
results.append(item)
last_err = None
break
last_err = response.text
except Exception as exc:
last_err = str(exc)
attempt += 1
if last_err:
results.append({"error": last_err, "step": step, "auto_heal": auto_heal.suggest(last_err, step)})
report = supervisor.review(results)
snapshot = {"execution": results, "report": report}
if not is_remote_brain_only():
memory.write_short_term(req.request_id, snapshot)
return {"status": "ok", "results": results, "report": report, "timestamp": time.time()}
except Exception as exc:
logger.exception("Execution failed")
raise HTTPException(status_code=500, detail=str(exc))