Spaces:
Sleeping
Sleeping
| """Routes for remote execution via the local executive agent.""" | |
| import time | |
| from typing import Any | |
| import requests | |
| from fastapi import APIRouter, HTTPException | |
| from pydantic import BaseModel | |
| from agents.auto_heal_agent import AutoHealAgent | |
| from agents.memory_agent import MemoryAgent | |
| from agents.supervisor_agent import SupervisorAgent | |
| from agents.tool_selector_agent import ToolSelectorAgent | |
| from api.deps import get_executor_headers, get_executor_url, get_logger, is_remote_brain_only, load_config | |
| router = APIRouter() | |
| logger = get_logger("kapo.brain.execute") | |
| class ExecuteRequest(BaseModel): | |
| request_id: str | |
| plan: dict[str, Any] | list[dict[str, Any]] | |
| auth_token: str | None = None | |
| timestamp: float | None = None | |
| executor_url: str | None = None | |
| def _normalize_executor_url(url: str | None) -> str: | |
| if not url: | |
| return "" | |
| value = url.strip() | |
| if not value: | |
| return "" | |
| if "://" not in value: | |
| value = f"http://{value}" | |
| return value.rstrip("/") | |
| def _extract_steps(plan: dict[str, Any] | list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| if isinstance(plan, list): | |
| return [_normalize_step(step, index) for index, step in enumerate(plan, start=1)] | |
| if isinstance(plan, dict): | |
| steps = plan.get("steps") | |
| if isinstance(steps, list): | |
| return [_normalize_step(step, index) for index, step in enumerate(steps, start=1)] | |
| return [] | |
| def _normalize_step(step: dict[str, Any], index: int) -> dict[str, Any]: | |
| if not isinstance(step, dict): | |
| return {"id": f"step-{index}", "action": "execute", "input": str(step), "tool_hint": "python"} | |
| normalized = dict(step) | |
| if not str(normalized.get("id", "")).strip(): | |
| normalized["id"] = f"step-{index}" | |
| if not str(normalized.get("input", "")).strip(): | |
| for key in ("description", "title", "summary", "task", "prompt"): | |
| value = str(normalized.get(key, "")).strip() | |
| if value: | |
| normalized["input"] = value | |
| break | |
| if not str(normalized.get("action", "")).strip(): | |
| normalized["action"] = "execute" | |
| if not str(normalized.get("tool_hint", "")).strip(): | |
| normalized["tool_hint"] = "python" | |
| return normalized | |
| async def execute(req: ExecuteRequest): | |
| cfg = load_config() | |
| tool_selector = ToolSelectorAgent() | |
| supervisor = SupervisorAgent() | |
| auto_heal = AutoHealAgent() | |
| memory = MemoryAgent() | |
| base_exec_url = _normalize_executor_url(req.executor_url) or get_executor_url(cfg) | |
| exec_url = f"{base_exec_url}/execute" | |
| exec_headers = get_executor_headers(cfg) | |
| timeout = int(cfg.get("REQUEST_TIMEOUT_SEC", 20) or 20) | |
| retries = int(cfg.get("REQUEST_RETRIES", 2) or 2) | |
| try: | |
| results: list[dict[str, Any]] = [] | |
| for step in _extract_steps(req.plan): | |
| tool = tool_selector.select_tool(step) | |
| payload = { | |
| "request_id": req.request_id, | |
| "step_id": step.get("id"), | |
| "command": tool.get("command"), | |
| "files": tool.get("files", {}), | |
| "env": tool.get("env", {}), | |
| "timestamp": time.time(), | |
| } | |
| attempt = 0 | |
| last_err = None | |
| while attempt <= retries: | |
| try: | |
| response = requests.post(exec_url, json=payload, headers=exec_headers, timeout=timeout) | |
| if response.status_code == 200: | |
| item = response.json() | |
| item["selected_tool"] = tool.get("tool") | |
| results.append(item) | |
| last_err = None | |
| break | |
| last_err = response.text | |
| except Exception as exc: | |
| last_err = str(exc) | |
| attempt += 1 | |
| if last_err: | |
| results.append({"error": last_err, "step": step, "auto_heal": auto_heal.suggest(last_err, step)}) | |
| report = supervisor.review(results) | |
| snapshot = {"execution": results, "report": report} | |
| if not is_remote_brain_only(): | |
| memory.write_short_term(req.request_id, snapshot) | |
| return {"status": "ok", "results": results, "report": report, "timestamp": time.time()} | |
| except Exception as exc: | |
| logger.exception("Execution failed") | |
| raise HTTPException(status_code=500, detail=str(exc)) | |