autonomous-coding-system / agents /orchestrator.py
God Agent OS CI
πŸš€ Deploy God Agent OS v11 - 2026-05-17 07:49
02117ee
"""
God Agent Orchestrator β€” Central Brain (Manus-style)
Routes user intent β†’ correct agent β†’ merges results
"""
import asyncio
import json
import time
import uuid
from typing import Any, Dict, List, Optional
import structlog
log = structlog.get_logger()
SYSTEM_PROMPT = """You are GOD AGENT β€” an elite autonomous AI operating system combining:
- Manus-style orchestration and planning
- Devin-style autonomous coding and debugging
- Genspark-style repo engineering
You coordinate multiple specialized agents:
- ChatAgent: Conversation and clarification
- PlannerAgent: Break goals into executable task graphs
- CodingAgent: Generate, edit, refactor, fix code
- DebugAgent: Detect and auto-fix errors (self-healing)
- MemoryAgent: Persistent long-term memory
- ConnectorAgent: GitHub/HF/Vercel/n8n integrations
- DeployAgent: Automated deployments
- WorkflowAgent: n8n workflow generation
- SandboxAgent: VS Code sandbox execution
You respond in Burmese or English based on user preference.
Always think step-by-step. Be autonomous, decisive, and thorough.
"""
class GodAgentOrchestrator:
"""
Central orchestrator β€” routes tasks to specialized agents,
merges results, manages agent collaboration.
"""
def __init__(self, ws_manager=None, ai_router=None):
self.ws = ws_manager
self.ai_router = ai_router
self._agents: Dict[str, Any] = {}
self._active_tasks: Dict[str, Dict] = {}
def register_agent(self, name: str, agent):
self._agents[name] = agent
log.info("Agent registered", agent=name)
def get_agent(self, name: str):
return self._agents.get(name)
# ─── Intent Classification ────────────────────────────────────────────────
async def classify_intent(self, user_message: str) -> Dict:
"""Classify user intent to route to correct agent(s)."""
classify_prompt = f"""Classify this user request and identify which agents are needed.
User message: "{user_message}"
Respond ONLY with JSON:
{{
"primary_agent": "chat|planner|coding|debug|connector|deploy|workflow|sandbox|memory",
"secondary_agents": [],
"intent": "brief description",
"requires_planning": true/false,
"is_code_task": true/false,
"is_deployment": true/false,
"is_workflow": true/false,
"language": "en|my",
"complexity": "simple|moderate|complex"
}}"""
if self.ai_router:
messages = [
{"role": "system", "content": "You are an intent classifier. Return only valid JSON."},
{"role": "user", "content": classify_prompt},
]
raw = await self.ai_router.complete(messages, temperature=0.1, max_tokens=300)
try:
start = raw.find("{")
end = raw.rfind("}") + 1
if start >= 0 and end > start:
return json.loads(raw[start:end])
except Exception:
pass
# Fallback heuristic classification
msg_lower = user_message.lower()
is_code = any(k in msg_lower for k in ["code", "build", "create", "write", "fix", "debug", "api", "function", "class", "script", "app"])
is_deploy = any(k in msg_lower for k in ["deploy", "vercel", "github", "push", "publish", "release"])
is_workflow = any(k in msg_lower for k in ["workflow", "n8n", "automate", "trigger", "pipeline", "schedule"])
is_memory = any(k in msg_lower for k in ["remember", "recall", "memory", "history", "previous"])
primary = "coding" if is_code else ("deploy" if is_deploy else ("workflow" if is_workflow else ("memory" if is_memory else "chat")))
return {
"primary_agent": primary,
"secondary_agents": [],
"intent": user_message[:80],
"requires_planning": is_code or is_deploy,
"is_code_task": is_code,
"is_deployment": is_deploy,
"is_workflow": is_workflow,
"language": "my" if any(c > "\u1000" for c in user_message) else "en",
"complexity": "complex" if len(user_message) > 200 else "moderate",
}
# ─── Main Orchestration ───────────────────────────────────────────────────
async def orchestrate(
self,
user_message: str,
session_id: str = "",
task_id: str = "",
context: Dict = {},
) -> str:
"""Main orchestration β€” classify β†’ route β†’ execute β†’ merge."""
exec_id = task_id or f"orch_{uuid.uuid4().hex[:8]}"
await self._emit(session_id, task_id, "orchestrator_start", {
"message": user_message[:100],
"session_id": session_id,
})
# 1. Classify intent
intent = await self.classify_intent(user_message)
log.info("Intent classified", **intent)
await self._emit(session_id, task_id, "intent_classified", {
"primary_agent": intent["primary_agent"],
"complexity": intent["complexity"],
"language": intent["language"],
})
# 2. Build execution plan for complex tasks
if intent.get("requires_planning") and intent.get("complexity") == "complex":
planner = self._agents.get("planner")
if planner:
await self._emit(session_id, task_id, "agent_called", {"agent": "PlannerAgent"})
plan = await planner.run(user_message, context=context, session_id=session_id, task_id=exec_id)
context["plan"] = plan
# 3. Route to primary agent
primary_name = intent["primary_agent"]
primary_agent = self._agents.get(primary_name) or self._agents.get("chat")
if not primary_agent:
return f"Agent '{primary_name}' not available."
await self._emit(session_id, task_id, "agent_called", {
"agent": primary_name,
"intent": intent["intent"],
})
# 4. Execute primary agent
result = await primary_agent.run(
user_message,
context={**context, "intent": intent},
session_id=session_id,
task_id=exec_id,
)
# 5. Run secondary agents in parallel if needed
secondary_results = []
if intent.get("secondary_agents"):
tasks = []
for agent_name in intent["secondary_agents"]:
agent = self._agents.get(agent_name)
if agent:
tasks.append(agent.run(user_message, context=context, session_id=session_id, task_id=exec_id))
if tasks:
secondary_results = await asyncio.gather(*tasks, return_exceptions=True)
# 6. Save to memory
memory_agent = self._agents.get("memory")
if memory_agent:
asyncio.create_task(memory_agent.save_interaction(
user_message=user_message,
assistant_response=result,
session_id=session_id,
intent=intent,
))
await self._emit(session_id, task_id, "orchestrator_complete", {
"primary_agent": primary_name,
"result_length": len(result),
})
return result
# ─── Self-Healing Loop ────────────────────────────────────────────────────
async def self_heal(
self,
error: str,
original_task: str,
task_id: str = "",
session_id: str = "",
max_retries: int = 3,
) -> str:
"""Self-healing retry loop β€” automatically fix errors."""
debug_agent = self._agents.get("debug")
if not debug_agent:
return f"Cannot self-heal: DebugAgent not available. Error: {error}"
for attempt in range(1, max_retries + 1):
await self._emit(session_id, task_id, "self_heal_attempt", {
"attempt": attempt,
"max": max_retries,
"error": error[:200],
})
fix = await debug_agent.run(
f"Fix this error: {error}\n\nOriginal task: {original_task}",
context={"attempt": attempt},
session_id=session_id,
task_id=task_id,
)
if fix and "❌" not in fix[:10]:
await self._emit(session_id, task_id, "self_heal_success", {
"attempt": attempt,
"fix": fix[:200],
})
return fix
return f"❌ Self-healing failed after {max_retries} attempts. Last error: {error}"
# ─── Emit Helper ─────────────────────────────────────────────────────────
async def _emit(self, session_id: str, task_id: str, event: str, data: Dict):
if not self.ws:
return
if task_id:
await self.ws.emit(task_id, event, data, session_id=session_id)
if session_id:
await self.ws.emit_chat(session_id, event, data)
# ─── Agent Status ────────────────────────────────────────────────────────
def get_status(self) -> Dict:
return {
"agents": list(self._agents.keys()),
"active_tasks": len(self._active_tasks),
"total_agents": len(self._agents),
}