| """ |
| God Agent Orchestrator β Central Brain (Manus-style) |
| Routes user intent β correct agent β merges results |
| """ |
|
|
| import asyncio |
| import json |
| import time |
| import uuid |
| from typing import Any, Dict, List, Optional |
|
|
| import structlog |
|
|
| log = structlog.get_logger() |
|
|
| SYSTEM_PROMPT = """You are GOD AGENT β an elite autonomous AI operating system combining: |
| - Manus-style orchestration and planning |
| - Devin-style autonomous coding and debugging |
| - Genspark-style repo engineering |
| |
| You coordinate multiple specialized agents: |
| - ChatAgent: Conversation and clarification |
| - PlannerAgent: Break goals into executable task graphs |
| - CodingAgent: Generate, edit, refactor, fix code |
| - DebugAgent: Detect and auto-fix errors (self-healing) |
| - MemoryAgent: Persistent long-term memory |
| - ConnectorAgent: GitHub/HF/Vercel/n8n integrations |
| - DeployAgent: Automated deployments |
| - WorkflowAgent: n8n workflow generation |
| - SandboxAgent: VS Code sandbox execution |
| |
| You respond in Burmese or English based on user preference. |
| Always think step-by-step. Be autonomous, decisive, and thorough. |
| """ |
|
|
|
|
| class GodAgentOrchestrator: |
| """ |
| Central orchestrator β routes tasks to specialized agents, |
| merges results, manages agent collaboration. |
| """ |
|
|
| def __init__(self, ws_manager=None, ai_router=None): |
| self.ws = ws_manager |
| self.ai_router = ai_router |
| self._agents: Dict[str, Any] = {} |
| self._active_tasks: Dict[str, Dict] = {} |
|
|
| def register_agent(self, name: str, agent): |
| self._agents[name] = agent |
| log.info("Agent registered", agent=name) |
|
|
| def get_agent(self, name: str): |
| return self._agents.get(name) |
|
|
| |
|
|
| async def classify_intent(self, user_message: str) -> Dict: |
| """Classify user intent to route to correct agent(s).""" |
| classify_prompt = f"""Classify this user request and identify which agents are needed. |
| |
| User message: "{user_message}" |
| |
| Respond ONLY with JSON: |
| {{ |
| "primary_agent": "chat|planner|coding|debug|connector|deploy|workflow|sandbox|memory", |
| "secondary_agents": [], |
| "intent": "brief description", |
| "requires_planning": true/false, |
| "is_code_task": true/false, |
| "is_deployment": true/false, |
| "is_workflow": true/false, |
| "language": "en|my", |
| "complexity": "simple|moderate|complex" |
| }}""" |
|
|
| if self.ai_router: |
| messages = [ |
| {"role": "system", "content": "You are an intent classifier. Return only valid JSON."}, |
| {"role": "user", "content": classify_prompt}, |
| ] |
| raw = await self.ai_router.complete(messages, temperature=0.1, max_tokens=300) |
| try: |
| start = raw.find("{") |
| end = raw.rfind("}") + 1 |
| if start >= 0 and end > start: |
| return json.loads(raw[start:end]) |
| except Exception: |
| pass |
|
|
| |
| msg_lower = user_message.lower() |
| is_code = any(k in msg_lower for k in ["code", "build", "create", "write", "fix", "debug", "api", "function", "class", "script", "app"]) |
| is_deploy = any(k in msg_lower for k in ["deploy", "vercel", "github", "push", "publish", "release"]) |
| is_workflow = any(k in msg_lower for k in ["workflow", "n8n", "automate", "trigger", "pipeline", "schedule"]) |
| is_memory = any(k in msg_lower for k in ["remember", "recall", "memory", "history", "previous"]) |
|
|
| primary = "coding" if is_code else ("deploy" if is_deploy else ("workflow" if is_workflow else ("memory" if is_memory else "chat"))) |
|
|
| return { |
| "primary_agent": primary, |
| "secondary_agents": [], |
| "intent": user_message[:80], |
| "requires_planning": is_code or is_deploy, |
| "is_code_task": is_code, |
| "is_deployment": is_deploy, |
| "is_workflow": is_workflow, |
| "language": "my" if any(c > "\u1000" for c in user_message) else "en", |
| "complexity": "complex" if len(user_message) > 200 else "moderate", |
| } |
|
|
| |
|
|
| async def orchestrate( |
| self, |
| user_message: str, |
| session_id: str = "", |
| task_id: str = "", |
| context: Dict = {}, |
| ) -> str: |
| """Main orchestration β classify β route β execute β merge.""" |
| exec_id = task_id or f"orch_{uuid.uuid4().hex[:8]}" |
|
|
| await self._emit(session_id, task_id, "orchestrator_start", { |
| "message": user_message[:100], |
| "session_id": session_id, |
| }) |
|
|
| |
| intent = await self.classify_intent(user_message) |
| log.info("Intent classified", **intent) |
|
|
| await self._emit(session_id, task_id, "intent_classified", { |
| "primary_agent": intent["primary_agent"], |
| "complexity": intent["complexity"], |
| "language": intent["language"], |
| }) |
|
|
| |
| if intent.get("requires_planning") and intent.get("complexity") == "complex": |
| planner = self._agents.get("planner") |
| if planner: |
| await self._emit(session_id, task_id, "agent_called", {"agent": "PlannerAgent"}) |
| plan = await planner.run(user_message, context=context, session_id=session_id, task_id=exec_id) |
| context["plan"] = plan |
|
|
| |
| primary_name = intent["primary_agent"] |
| primary_agent = self._agents.get(primary_name) or self._agents.get("chat") |
|
|
| if not primary_agent: |
| return f"Agent '{primary_name}' not available." |
|
|
| await self._emit(session_id, task_id, "agent_called", { |
| "agent": primary_name, |
| "intent": intent["intent"], |
| }) |
|
|
| |
| result = await primary_agent.run( |
| user_message, |
| context={**context, "intent": intent}, |
| session_id=session_id, |
| task_id=exec_id, |
| ) |
|
|
| |
| secondary_results = [] |
| if intent.get("secondary_agents"): |
| tasks = [] |
| for agent_name in intent["secondary_agents"]: |
| agent = self._agents.get(agent_name) |
| if agent: |
| tasks.append(agent.run(user_message, context=context, session_id=session_id, task_id=exec_id)) |
| if tasks: |
| secondary_results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
| |
| memory_agent = self._agents.get("memory") |
| if memory_agent: |
| asyncio.create_task(memory_agent.save_interaction( |
| user_message=user_message, |
| assistant_response=result, |
| session_id=session_id, |
| intent=intent, |
| )) |
|
|
| await self._emit(session_id, task_id, "orchestrator_complete", { |
| "primary_agent": primary_name, |
| "result_length": len(result), |
| }) |
|
|
| return result |
|
|
| |
|
|
| async def self_heal( |
| self, |
| error: str, |
| original_task: str, |
| task_id: str = "", |
| session_id: str = "", |
| max_retries: int = 3, |
| ) -> str: |
| """Self-healing retry loop β automatically fix errors.""" |
| debug_agent = self._agents.get("debug") |
| if not debug_agent: |
| return f"Cannot self-heal: DebugAgent not available. Error: {error}" |
|
|
| for attempt in range(1, max_retries + 1): |
| await self._emit(session_id, task_id, "self_heal_attempt", { |
| "attempt": attempt, |
| "max": max_retries, |
| "error": error[:200], |
| }) |
| fix = await debug_agent.run( |
| f"Fix this error: {error}\n\nOriginal task: {original_task}", |
| context={"attempt": attempt}, |
| session_id=session_id, |
| task_id=task_id, |
| ) |
| if fix and "β" not in fix[:10]: |
| await self._emit(session_id, task_id, "self_heal_success", { |
| "attempt": attempt, |
| "fix": fix[:200], |
| }) |
| return fix |
|
|
| return f"β Self-healing failed after {max_retries} attempts. Last error: {error}" |
|
|
| |
|
|
| async def _emit(self, session_id: str, task_id: str, event: str, data: Dict): |
| if not self.ws: |
| return |
| if task_id: |
| await self.ws.emit(task_id, event, data, session_id=session_id) |
| if session_id: |
| await self.ws.emit_chat(session_id, event, data) |
|
|
| |
|
|
| def get_status(self) -> Dict: |
| return { |
| "agents": list(self._agents.keys()), |
| "active_tasks": len(self._active_tasks), |
| "total_agents": len(self._agents), |
| } |
|
|