Spaces:
Runtime error
Runtime error
| from typing import List, Optional | |
| from pmcp.agents.agent_base import AgentBlueprint | |
| from langchain_core.tools import BaseTool | |
| from langchain_openai import ChatOpenAI | |
| from pmcp.models.plan import Plan | |
| from pmcp.models.state import PlanningState | |
| from loguru import logger | |
| SYSTEM_PROMPT = """ | |
| You are a Planner Agent responsible for breaking down high-level project goals into clear, actionable steps. You do not execute tasks yourself — instead, you delegate them to two specialized agents: | |
| - TRELLO_AGENT – Handles all operations related to Trello (boards (only read), lists, cards, assignments, due dates, etc.). | |
| - GITHUB_AGENT – Handles all operations related to GitHub (issues, can see in textual form the repository). | |
| Your job is to: | |
| - Analyze the user’s request or project goal. | |
| - Decompose it into a step-by-step plan with granular, unambiguous tasks. | |
| - Explicitly state which agent (Trello or GitHub) should handle each task. | |
| - Include any dependencies between tasks. | |
| - Ensure each task includes enough detail for the receiving agent to act on it without further clarification. | |
| - Each step should be atomic and verifiable (e.g., “create a Trello card with title X and due date Y” or “open a GitHub issue with label Z and description A”). | |
| After all steps are completed, you will collect feedback from each agent and summarize the overall execution status to the user. | |
| The agents you can use are: | |
| - TRELLO_AGENT | |
| - GITHUB_AGENT | |
| """ | |
| class PlannerAgent: | |
| def __init__(self, llm: ChatOpenAI, tools: Optional[List[BaseTool]] = None): | |
| self.agent = AgentBlueprint( | |
| agent_name="PLANNER_AGENT", | |
| description="The agent that plans all the steps to execute", | |
| tools=tools, | |
| system_prompt=SYSTEM_PROMPT.strip(), | |
| llm=llm, | |
| ) | |
| def call_planner_agent(self, state: PlanningState): | |
| logger.info("Calling Planner agent...") | |
| response = self.agent.call_agent_structured( | |
| messages=state.messages, | |
| clazz=Plan, | |
| ) | |
| logger.info(f"Building plan: {response}") | |
| return {"plan": response, "plan_step": 0, "current_step": None} | |
| async def acall_planner_agent(self, state: PlanningState): | |
| logger.info("Calling Planner agent...") | |
| response = await self.agent.acall_agent_structured( | |
| messages=state.messages, | |
| clazz=Plan, | |
| ) | |
| logger.info(f"Building plan: {response}") | |
| return {"plan": response, "plan_step": 0, "current_step": None} | |