| from typing import List, Optional | |
| from pmcp.agents.agent_base import AgentBlueprint | |
| from langchain_core.tools import BaseTool | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.messages import HumanMessage | |
| from pmcp.models.state import PlanningState | |
| from loguru import logger | |
| class ExecutorAgent: | |
| def __init__(self, llm: ChatOpenAI, tools: Optional[List[BaseTool]] = None): | |
| self.agent = AgentBlueprint( | |
| agent_name="EXECUTOR_AGENT", | |
| description="The agent that executes all the steps for the planner", | |
| llm=llm, | |
| ) | |
| def call_executor_agent(self, state: PlanningState): | |
| plan_step_index = state.plan_step | |
| current_step = None | |
| messages = [] | |
| if len(state.plan.steps) > plan_step_index: | |
| current_step = state.plan.steps[plan_step_index] | |
| messages = [ | |
| HumanMessage( | |
| content=f"The {current_step.agent} agent should perform the following action:\n{current_step.description}" | |
| ) | |
| ] | |
| logger.info(f"The Executor is executing step: {current_step}") | |
| return { | |
| "plan_step": plan_step_index + 1, | |
| "messages": messages, | |
| "current_step": current_step, | |
| } | |
| async def acall_executor_agent(self, state: PlanningState): | |
| plan_step_index = state.plan_step | |
| current_step = None | |
| messages = [] | |
| if len(state.plan.steps) > plan_step_index: | |
| current_step = state.plan.steps[plan_step_index] | |
| messages = [ | |
| HumanMessage( | |
| content=f"The {current_step.agent} agent should perform the following action:\n{current_step.description}" | |
| ) | |
| ] | |
| logger.info(f"The Executor is executing step: {current_step}") | |
| return { | |
| "plan_step": plan_step_index + 1, | |
| "messages": messages, | |
| "current_step": current_step, | |
| } | |