AiDebuggerClean / brain_server /agents /supervisor_agent.py
MrA7A3's picture
Initial modernized KAPO runtime upload
564b5ea verified
"""Supervisor Agent: ????? ??????? ????? ??????."""
import logging
from typing import Any, Dict, List
from api.deps import get_logger
from memory.episodic_db import EpisodicDB
logger = get_logger("kapo.agent.supervisor")
class SupervisorAgent:
def review(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
try:
success = all(r.get("exit_code", 0) == 0 for r in results if isinstance(r, dict))
return {"success": success, "results_count": len(results)}
except Exception as exc:
logger.exception("Supervisor failed")
return {"success": False, "error": str(exc)}
class AgentOS:
"""Self-Improving loop reading episodic DB and proposing prompt updates."""
def run(self) -> Dict[str, Any]:
try:
db = EpisodicDB()
recent = db.list_recent(limit=20)
success_rate = 0
if recent:
success_rate = sum(1 for r in recent if r.get("success")) / len(recent)
proposal = {
"summary": f"Recent success rate: {success_rate:.2f}",
"prompt_update": "?????? ????? ????? Planner ??? ?????? ??????.",
"sandbox_test": "Run simulated plan execution in sandbox before promotion.",
}
return proposal
except Exception as exc:
logger.exception("AgentOS failed")
return {"error": str(exc)}