"""Supervisor Agent: ????? ??????? ????? ??????.""" import logging from typing import Any, Dict, List from api.deps import get_logger from memory.episodic_db import EpisodicDB logger = get_logger("kapo.agent.supervisor") class SupervisorAgent: def review(self, results: List[Dict[str, Any]]) -> Dict[str, Any]: try: success = all(r.get("exit_code", 0) == 0 for r in results if isinstance(r, dict)) return {"success": success, "results_count": len(results)} except Exception as exc: logger.exception("Supervisor failed") return {"success": False, "error": str(exc)} class AgentOS: """Self-Improving loop reading episodic DB and proposing prompt updates.""" def run(self) -> Dict[str, Any]: try: db = EpisodicDB() recent = db.list_recent(limit=20) success_rate = 0 if recent: success_rate = sum(1 for r in recent if r.get("success")) / len(recent) proposal = { "summary": f"Recent success rate: {success_rate:.2f}", "prompt_update": "?????? ????? ????? Planner ??? ?????? ??????.", "sandbox_test": "Run simulated plan execution in sandbox before promotion.", } return proposal except Exception as exc: logger.exception("AgentOS failed") return {"error": str(exc)}