Spaces:
Sleeping
Sleeping
| """ | |
| God Agent Orchestrator β Central Brain (Manus-style) | |
| Routes user intent β correct agent β merges results | |
| """ | |
| import asyncio | |
| import json | |
| import time | |
| import uuid | |
| from typing import Any, Dict, List, Optional | |
| import structlog | |
| log = structlog.get_logger() | |
| SYSTEM_PROMPT = """You are GOD AGENT β an elite autonomous AI operating system combining: | |
| - Manus-style orchestration and planning | |
| - Devin-style autonomous coding and debugging | |
| - Genspark-style repo engineering | |
| You coordinate multiple specialized agents: | |
| - ChatAgent: Conversation and clarification | |
| - PlannerAgent: Break goals into executable task graphs | |
| - CodingAgent: Generate, edit, refactor, fix code | |
| - DebugAgent: Detect and auto-fix errors (self-healing) | |
| - MemoryAgent: Persistent long-term memory | |
| - ConnectorAgent: GitHub/HF/Vercel/n8n integrations | |
| - DeployAgent: Automated deployments | |
| - WorkflowAgent: n8n workflow generation | |
| - SandboxAgent: VS Code sandbox execution | |
| You respond in Burmese or English based on user preference. | |
| Always think step-by-step. Be autonomous, decisive, and thorough. | |
| """ | |
| class GodAgentOrchestrator: | |
| """ | |
| Central orchestrator β routes tasks to specialized agents, | |
| merges results, manages agent collaboration. | |
| """ | |
| def __init__(self, ws_manager=None, ai_router=None): | |
| self.ws = ws_manager | |
| self.ai_router = ai_router | |
| self._agents: Dict[str, Any] = {} | |
| self._active_tasks: Dict[str, Dict] = {} | |
| def register_agent(self, name: str, agent): | |
| self._agents[name] = agent | |
| log.info("Agent registered", agent=name) | |
| def get_agent(self, name: str): | |
| return self._agents.get(name) | |
| # βββ Intent Classification ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def classify_intent(self, user_message: str) -> Dict: | |
| """Classify user intent to route to correct agent(s).""" | |
| classify_prompt = f"""Classify this user request and identify which agents are needed. | |
| User message: "{user_message}" | |
| Respond ONLY with JSON: | |
| {{ | |
| "primary_agent": "chat|planner|coding|debug|connector|deploy|workflow|sandbox|memory", | |
| "secondary_agents": [], | |
| "intent": "brief description", | |
| "requires_planning": true/false, | |
| "is_code_task": true/false, | |
| "is_deployment": true/false, | |
| "is_workflow": true/false, | |
| "language": "en|my", | |
| "complexity": "simple|moderate|complex" | |
| }}""" | |
| if self.ai_router: | |
| messages = [ | |
| {"role": "system", "content": "You are an intent classifier. Return only valid JSON."}, | |
| {"role": "user", "content": classify_prompt}, | |
| ] | |
| raw = await self.ai_router.complete(messages, temperature=0.1, max_tokens=300) | |
| try: | |
| start = raw.find("{") | |
| end = raw.rfind("}") + 1 | |
| if start >= 0 and end > start: | |
| return json.loads(raw[start:end]) | |
| except Exception: | |
| pass | |
| # Fallback heuristic classification | |
| msg_lower = user_message.lower() | |
| is_code = any(k in msg_lower for k in ["code", "build", "create", "write", "fix", "debug", "api", "function", "class", "script", "app"]) | |
| is_deploy = any(k in msg_lower for k in ["deploy", "vercel", "github", "push", "publish", "release"]) | |
| is_workflow = any(k in msg_lower for k in ["workflow", "n8n", "automate", "trigger", "pipeline", "schedule"]) | |
| is_memory = any(k in msg_lower for k in ["remember", "recall", "memory", "history", "previous"]) | |
| primary = "coding" if is_code else ("deploy" if is_deploy else ("workflow" if is_workflow else ("memory" if is_memory else "chat"))) | |
| return { | |
| "primary_agent": primary, | |
| "secondary_agents": [], | |
| "intent": user_message[:80], | |
| "requires_planning": is_code or is_deploy, | |
| "is_code_task": is_code, | |
| "is_deployment": is_deploy, | |
| "is_workflow": is_workflow, | |
| "language": "my" if any(c > "\u1000" for c in user_message) else "en", | |
| "complexity": "complex" if len(user_message) > 200 else "moderate", | |
| } | |
| # βββ Main Orchestration βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def orchestrate( | |
| self, | |
| user_message: str, | |
| session_id: str = "", | |
| task_id: str = "", | |
| context: Dict = {}, | |
| ) -> str: | |
| """Main orchestration β classify β route β execute β merge.""" | |
| exec_id = task_id or f"orch_{uuid.uuid4().hex[:8]}" | |
| await self._emit(session_id, task_id, "orchestrator_start", { | |
| "message": user_message[:100], | |
| "session_id": session_id, | |
| }) | |
| # 1. Classify intent | |
| intent = await self.classify_intent(user_message) | |
| log.info("Intent classified", **intent) | |
| await self._emit(session_id, task_id, "intent_classified", { | |
| "primary_agent": intent["primary_agent"], | |
| "complexity": intent["complexity"], | |
| "language": intent["language"], | |
| }) | |
| # 2. Build execution plan for complex tasks | |
| if intent.get("requires_planning") and intent.get("complexity") == "complex": | |
| planner = self._agents.get("planner") | |
| if planner: | |
| await self._emit(session_id, task_id, "agent_called", {"agent": "PlannerAgent"}) | |
| plan = await planner.run(user_message, context=context, session_id=session_id, task_id=exec_id) | |
| context["plan"] = plan | |
| # 3. Route to primary agent | |
| primary_name = intent["primary_agent"] | |
| primary_agent = self._agents.get(primary_name) or self._agents.get("chat") | |
| if not primary_agent: | |
| return f"Agent '{primary_name}' not available." | |
| await self._emit(session_id, task_id, "agent_called", { | |
| "agent": primary_name, | |
| "intent": intent["intent"], | |
| }) | |
| # 4. Execute primary agent | |
| result = await primary_agent.run( | |
| user_message, | |
| context={**context, "intent": intent}, | |
| session_id=session_id, | |
| task_id=exec_id, | |
| ) | |
| # 5. Run secondary agents in parallel if needed | |
| secondary_results = [] | |
| if intent.get("secondary_agents"): | |
| tasks = [] | |
| for agent_name in intent["secondary_agents"]: | |
| agent = self._agents.get(agent_name) | |
| if agent: | |
| tasks.append(agent.run(user_message, context=context, session_id=session_id, task_id=exec_id)) | |
| if tasks: | |
| secondary_results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # 6. Save to memory | |
| memory_agent = self._agents.get("memory") | |
| if memory_agent: | |
| asyncio.create_task(memory_agent.save_interaction( | |
| user_message=user_message, | |
| assistant_response=result, | |
| session_id=session_id, | |
| intent=intent, | |
| )) | |
| await self._emit(session_id, task_id, "orchestrator_complete", { | |
| "primary_agent": primary_name, | |
| "result_length": len(result), | |
| }) | |
| return result | |
| # βββ Self-Healing Loop ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def self_heal( | |
| self, | |
| error: str, | |
| original_task: str, | |
| task_id: str = "", | |
| session_id: str = "", | |
| max_retries: int = 3, | |
| ) -> str: | |
| """Self-healing retry loop β automatically fix errors.""" | |
| debug_agent = self._agents.get("debug") | |
| if not debug_agent: | |
| return f"Cannot self-heal: DebugAgent not available. Error: {error}" | |
| for attempt in range(1, max_retries + 1): | |
| await self._emit(session_id, task_id, "self_heal_attempt", { | |
| "attempt": attempt, | |
| "max": max_retries, | |
| "error": error[:200], | |
| }) | |
| fix = await debug_agent.run( | |
| f"Fix this error: {error}\n\nOriginal task: {original_task}", | |
| context={"attempt": attempt}, | |
| session_id=session_id, | |
| task_id=task_id, | |
| ) | |
| if fix and "β" not in fix[:10]: | |
| await self._emit(session_id, task_id, "self_heal_success", { | |
| "attempt": attempt, | |
| "fix": fix[:200], | |
| }) | |
| return fix | |
| return f"β Self-healing failed after {max_retries} attempts. Last error: {error}" | |
| # βββ Emit Helper βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def _emit(self, session_id: str, task_id: str, event: str, data: Dict): | |
| if not self.ws: | |
| return | |
| if task_id: | |
| await self.ws.emit(task_id, event, data, session_id=session_id) | |
| if session_id: | |
| await self.ws.emit_chat(session_id, event, data) | |
| # βββ Agent Status ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_status(self) -> Dict: | |
| return { | |
| "agents": list(self._agents.keys()), | |
| "active_tasks": len(self._active_tasks), | |
| "total_agents": len(self._agents), | |
| } | |