"""Routes for remote execution via the local executive agent.""" import time from typing import Any import requests from fastapi import APIRouter, HTTPException from pydantic import BaseModel from agents.auto_heal_agent import AutoHealAgent from agents.memory_agent import MemoryAgent from agents.supervisor_agent import SupervisorAgent from agents.tool_selector_agent import ToolSelectorAgent from api.deps import get_executor_headers, get_executor_url, get_logger, is_remote_brain_only, load_config router = APIRouter() logger = get_logger("kapo.brain.execute") class ExecuteRequest(BaseModel): request_id: str plan: dict[str, Any] | list[dict[str, Any]] auth_token: str | None = None timestamp: float | None = None executor_url: str | None = None def _normalize_executor_url(url: str | None) -> str: if not url: return "" value = url.strip() if not value: return "" if "://" not in value: value = f"http://{value}" return value.rstrip("/") def _extract_steps(plan: dict[str, Any] | list[dict[str, Any]]) -> list[dict[str, Any]]: if isinstance(plan, list): return [_normalize_step(step, index) for index, step in enumerate(plan, start=1)] if isinstance(plan, dict): steps = plan.get("steps") if isinstance(steps, list): return [_normalize_step(step, index) for index, step in enumerate(steps, start=1)] return [] def _normalize_step(step: dict[str, Any], index: int) -> dict[str, Any]: if not isinstance(step, dict): return {"id": f"step-{index}", "action": "execute", "input": str(step), "tool_hint": "python"} normalized = dict(step) if not str(normalized.get("id", "")).strip(): normalized["id"] = f"step-{index}" if not str(normalized.get("input", "")).strip(): for key in ("description", "title", "summary", "task", "prompt"): value = str(normalized.get(key, "")).strip() if value: normalized["input"] = value break if not str(normalized.get("action", "")).strip(): normalized["action"] = "execute" if not str(normalized.get("tool_hint", "")).strip(): normalized["tool_hint"] = "python" return normalized @router.post("/execute") async def execute(req: ExecuteRequest): cfg = load_config() tool_selector = ToolSelectorAgent() supervisor = SupervisorAgent() auto_heal = AutoHealAgent() memory = MemoryAgent() base_exec_url = _normalize_executor_url(req.executor_url) or get_executor_url(cfg) exec_url = f"{base_exec_url}/execute" exec_headers = get_executor_headers(cfg) timeout = int(cfg.get("REQUEST_TIMEOUT_SEC", 20) or 20) retries = int(cfg.get("REQUEST_RETRIES", 2) or 2) try: results: list[dict[str, Any]] = [] for step in _extract_steps(req.plan): tool = tool_selector.select_tool(step) payload = { "request_id": req.request_id, "step_id": step.get("id"), "command": tool.get("command"), "files": tool.get("files", {}), "env": tool.get("env", {}), "timestamp": time.time(), } attempt = 0 last_err = None while attempt <= retries: try: response = requests.post(exec_url, json=payload, headers=exec_headers, timeout=timeout) if response.status_code == 200: item = response.json() item["selected_tool"] = tool.get("tool") results.append(item) last_err = None break last_err = response.text except Exception as exc: last_err = str(exc) attempt += 1 if last_err: results.append({"error": last_err, "step": step, "auto_heal": auto_heal.suggest(last_err, step)}) report = supervisor.review(results) snapshot = {"execution": results, "report": report} if not is_remote_brain_only(): memory.write_short_term(req.request_id, snapshot) return {"status": "ok", "results": results, "report": report, "timestamp": time.time()} except Exception as exc: logger.exception("Execution failed") raise HTTPException(status_code=500, detail=str(exc))