AiDebuggerClean / brain_server /api /routes_plan.py
MrA7A3's picture
Initial modernized KAPO runtime upload
564b5ea verified
"""Routes for planning."""
import time
import uuid
import logging
from typing import Dict, Any, List
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from api.deps import get_executor_headers, get_executor_url, get_logger, load_config
from agents.planner_agent import PlannerAgent
from agents.reasoning_agent import ReasoningAgent
from agents.memory_agent import MemoryAgent
from api.deps import is_remote_brain_only
import requests
router = APIRouter()
logger = get_logger("kapo.brain.plan")
class PlanRequest(BaseModel):
request_id: str
user_input: str
context: Dict[str, Any] = {}
auth_token: str | None = None
timestamp: float | None = None
class PlanResponse(BaseModel):
plan_id: str
plan: List[Dict[str, Any]]
metadata: Dict[str, Any]
@router.post("/plan", response_model=PlanResponse)
async def plan(req: PlanRequest):
"""????? ??? ????? ???????."""
try:
logger.info("Plan requested", extra={"request_id": req.request_id, "component": "plan"})
planner = PlannerAgent()
reasoning = ReasoningAgent()
memory = MemoryAgent()
plan_steps = planner.run(req.user_input, req.context)
rationale = reasoning.run(req.user_input, plan_steps)
plan_id = str(uuid.uuid4())
if not is_remote_brain_only():
memory.write_short_term(req.request_id, {"plan": plan_steps, "rationale": rationale})
else:
try:
cfg = load_config()
hub = get_executor_url(cfg)
if hub:
requests.post(
f"{hub}/memory/store",
json={"request_id": req.request_id, "payload": {"plan": plan_steps, "rationale": rationale}},
headers=get_executor_headers(cfg),
timeout=(3, 4),
)
except Exception:
logger.warning("Local hub store failed")
return PlanResponse(
plan_id=plan_id,
plan=plan_steps,
metadata={"rationale": rationale, "timestamp": time.time()},
)
except Exception as exc:
logger.exception("Plan failed")
raise HTTPException(status_code=500, detail=str(exc))