File size: 5,114 Bytes
02117ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""
PlannerAgent — Breaks goals into executable task graphs (Manus-style)
"""
import json
from typing import Dict, List
import structlog
from .base_agent import BaseAgent

log = structlog.get_logger()

PLANNER_SYSTEM = """You are an elite software architect and project planner.
Break down user goals into detailed, executable task graphs.
Think like Devin/Manus — autonomous, thorough, production-minded.

Always respond with valid JSON task plans.
"""

PLANNER_PROMPT = """Break this goal into a detailed execution plan.

Goal: {goal}
Context: {context}

Respond ONLY with valid JSON:
{{
  "title": "Plan title",
  "steps": [
    {{
      "id": "step_1",
      "name": "Step name",
      "description": "Detailed description",
      "agent": "coding|debug|connector|deploy|workflow|sandbox|memory",
      "tool": "code|shell|file|github|memory|search|test|none",
      "depends_on": [],
      "estimated_seconds": 15,
      "can_parallel": false
    }}
  ],
  "estimated_duration": 120,
  "tools_needed": ["code", "shell"],
  "agents_needed": ["coding", "debug"],
  "complexity": "simple|moderate|complex"
}}"""


class PlannerAgent(BaseAgent):
    def __init__(self, ws_manager=None, ai_router=None):
        super().__init__("PlannerAgent", ws_manager, ai_router)

    async def run(self, task: str, context: Dict = {}, **kwargs) -> str:
        session_id = kwargs.get("session_id", "")
        task_id = kwargs.get("task_id", "")

        await self.emit(task_id, "agent_start", {"agent": "PlannerAgent", "goal": task[:80]}, session_id)

        ctx_str = json.dumps(context.get("memory", []))[:300] if context.get("memory") else "No prior context"
        prompt = PLANNER_PROMPT.format(goal=task, context=ctx_str)

        messages = [
            {"role": "system", "content": PLANNER_SYSTEM},
            {"role": "user", "content": prompt},
        ]

        raw = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.2, max_tokens=2000)

        try:
            start = raw.find("{")
            end = raw.rfind("}") + 1
            if start >= 0 and end > start:
                plan = json.loads(raw[start:end])
            else:
                plan = json.loads(raw)

            await self.emit(task_id, "plan_ready", {
                "title": plan.get("title", "Execution Plan"),
                "steps": len(plan.get("steps", [])),
                "estimated_duration": plan.get("estimated_duration", 60),
                "agents_needed": plan.get("agents_needed", []),
            }, session_id)

            return json.dumps(plan)

        except Exception as e:
            log.warning("Plan parse failed", error=str(e))
            fallback = self._fallback_plan(task)
            await self.emit(task_id, "plan_ready", {"title": "Fallback Plan", "steps": len(fallback["steps"])}, session_id)
            return json.dumps(fallback)

    def _fallback_plan(self, goal: str) -> Dict:
        return {
            "title": f"Plan: {goal[:50]}",
            "steps": [
                {"id": "step_1", "name": "Analyze Requirements", "description": f"Analyze: {goal[:60]}", "agent": "coding", "tool": "none", "depends_on": [], "estimated_seconds": 10, "can_parallel": False},
                {"id": "step_2", "name": "Design Solution", "description": "Design the architecture", "agent": "coding", "tool": "code", "depends_on": ["step_1"], "estimated_seconds": 20, "can_parallel": False},
                {"id": "step_3", "name": "Implement", "description": "Write implementation code", "agent": "coding", "tool": "code", "depends_on": ["step_2"], "estimated_seconds": 30, "can_parallel": False},
                {"id": "step_4", "name": "Test & Debug", "description": "Test and fix errors", "agent": "debug", "tool": "test", "depends_on": ["step_3"], "estimated_seconds": 20, "can_parallel": False},
                {"id": "step_5", "name": "Document", "description": "Write documentation", "agent": "coding", "tool": "file", "depends_on": ["step_4"], "estimated_seconds": 10, "can_parallel": True},
            ],
            "estimated_duration": 90,
            "tools_needed": ["code", "test"],
            "agents_needed": ["coding", "debug"],
            "complexity": "moderate",
        }

    async def build_task_graph(self, plan_json: str) -> List[Dict]:
        """Build execution order respecting dependencies."""
        try:
            plan = json.loads(plan_json)
            steps = plan.get("steps", [])
            # Topological sort by dependencies
            ordered = []
            visited = set()
            
            def visit(step_id):
                if step_id in visited:
                    return
                visited.add(step_id)
                step = next((s for s in steps if s["id"] == step_id), None)
                if step:
                    for dep in step.get("depends_on", []):
                        visit(dep)
                    ordered.append(step)
            
            for step in steps:
                visit(step["id"])
            return ordered
        except Exception:
            return []