Spaces:
Sleeping
Sleeping
| """Supervisor Agent: ????? ??????? ????? ??????.""" | |
| import logging | |
| from typing import Any, Dict, List | |
| from api.deps import get_logger | |
| from memory.episodic_db import EpisodicDB | |
| logger = get_logger("kapo.agent.supervisor") | |
| class SupervisorAgent: | |
| def review(self, results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| try: | |
| success = all(r.get("exit_code", 0) == 0 for r in results if isinstance(r, dict)) | |
| return {"success": success, "results_count": len(results)} | |
| except Exception as exc: | |
| logger.exception("Supervisor failed") | |
| return {"success": False, "error": str(exc)} | |
| class AgentOS: | |
| """Self-Improving loop reading episodic DB and proposing prompt updates.""" | |
| def run(self) -> Dict[str, Any]: | |
| try: | |
| db = EpisodicDB() | |
| recent = db.list_recent(limit=20) | |
| success_rate = 0 | |
| if recent: | |
| success_rate = sum(1 for r in recent if r.get("success")) / len(recent) | |
| proposal = { | |
| "summary": f"Recent success rate: {success_rate:.2f}", | |
| "prompt_update": "?????? ????? ????? Planner ??? ?????? ??????.", | |
| "sandbox_test": "Run simulated plan execution in sandbox before promotion.", | |
| } | |
| return proposal | |
| except Exception as exc: | |
| logger.exception("AgentOS failed") | |
| return {"error": str(exc)} | |