| """ |
| PlannerAgent — Breaks goals into executable task graphs (Manus-style) |
| """ |
| import json |
| from typing import Dict, List |
| import structlog |
| from .base_agent import BaseAgent |
|
|
| log = structlog.get_logger() |
|
|
| PLANNER_SYSTEM = """You are an elite software architect and project planner. |
| Break down user goals into detailed, executable task graphs. |
| Think like Devin/Manus — autonomous, thorough, production-minded. |
| |
| Always respond with valid JSON task plans. |
| """ |
|
|
| PLANNER_PROMPT = """Break this goal into a detailed execution plan. |
| |
| Goal: {goal} |
| Context: {context} |
| |
| Respond ONLY with valid JSON: |
| {{ |
| "title": "Plan title", |
| "steps": [ |
| {{ |
| "id": "step_1", |
| "name": "Step name", |
| "description": "Detailed description", |
| "agent": "coding|debug|connector|deploy|workflow|sandbox|memory", |
| "tool": "code|shell|file|github|memory|search|test|none", |
| "depends_on": [], |
| "estimated_seconds": 15, |
| "can_parallel": false |
| }} |
| ], |
| "estimated_duration": 120, |
| "tools_needed": ["code", "shell"], |
| "agents_needed": ["coding", "debug"], |
| "complexity": "simple|moderate|complex" |
| }}""" |
|
|
|
|
| class PlannerAgent(BaseAgent): |
| def __init__(self, ws_manager=None, ai_router=None): |
| super().__init__("PlannerAgent", ws_manager, ai_router) |
|
|
| async def run(self, task: str, context: Dict = {}, **kwargs) -> str: |
| session_id = kwargs.get("session_id", "") |
| task_id = kwargs.get("task_id", "") |
|
|
| await self.emit(task_id, "agent_start", {"agent": "PlannerAgent", "goal": task[:80]}, session_id) |
|
|
| ctx_str = json.dumps(context.get("memory", []))[:300] if context.get("memory") else "No prior context" |
| prompt = PLANNER_PROMPT.format(goal=task, context=ctx_str) |
|
|
| messages = [ |
| {"role": "system", "content": PLANNER_SYSTEM}, |
| {"role": "user", "content": prompt}, |
| ] |
|
|
| raw = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2, max_tokens=2000) |
|
|
| try: |
| start = raw.find("{") |
| end = raw.rfind("}") + 1 |
| if start >= 0 and end > start: |
| plan = json.loads(raw[start:end]) |
| else: |
| plan = json.loads(raw) |
|
|
| await self.emit(task_id, "plan_ready", { |
| "title": plan.get("title", "Execution Plan"), |
| "steps": len(plan.get("steps", [])), |
| "estimated_duration": plan.get("estimated_duration", 60), |
| "agents_needed": plan.get("agents_needed", []), |
| }, session_id) |
|
|
| return json.dumps(plan) |
|
|
| except Exception as e: |
| log.warning("Plan parse failed", error=str(e)) |
| fallback = self._fallback_plan(task) |
| await self.emit(task_id, "plan_ready", {"title": "Fallback Plan", "steps": len(fallback["steps"])}, session_id) |
| return json.dumps(fallback) |
|
|
| def _fallback_plan(self, goal: str) -> Dict: |
| return { |
| "title": f"Plan: {goal[:50]}", |
| "steps": [ |
| {"id": "step_1", "name": "Analyze Requirements", "description": f"Analyze: {goal[:60]}", "agent": "coding", "tool": "none", "depends_on": [], "estimated_seconds": 10, "can_parallel": False}, |
| {"id": "step_2", "name": "Design Solution", "description": "Design the architecture", "agent": "coding", "tool": "code", "depends_on": ["step_1"], "estimated_seconds": 20, "can_parallel": False}, |
| {"id": "step_3", "name": "Implement", "description": "Write implementation code", "agent": "coding", "tool": "code", "depends_on": ["step_2"], "estimated_seconds": 30, "can_parallel": False}, |
| {"id": "step_4", "name": "Test & Debug", "description": "Test and fix errors", "agent": "debug", "tool": "test", "depends_on": ["step_3"], "estimated_seconds": 20, "can_parallel": False}, |
| {"id": "step_5", "name": "Document", "description": "Write documentation", "agent": "coding", "tool": "file", "depends_on": ["step_4"], "estimated_seconds": 10, "can_parallel": True}, |
| ], |
| "estimated_duration": 90, |
| "tools_needed": ["code", "test"], |
| "agents_needed": ["coding", "debug"], |
| "complexity": "moderate", |
| } |
|
|
| async def build_task_graph(self, plan_json: str) -> List[Dict]: |
| """Build execution order respecting dependencies.""" |
| try: |
| plan = json.loads(plan_json) |
| steps = plan.get("steps", []) |
| |
| ordered = [] |
| visited = set() |
| |
| def visit(step_id): |
| if step_id in visited: |
| return |
| visited.add(step_id) |
| step = next((s for s in steps if s["id"] == step_id), None) |
| if step: |
| for dep in step.get("depends_on", []): |
| visit(dep) |
| ordered.append(step) |
| |
| for step in steps: |
| visit(step["id"]) |
| return ordered |
| except Exception: |
| return [] |
|
|