Final_Assignment_Template / src /agents /execution_agent.py
Humanlearning's picture
multi agent architecture
fe36046
raw
history blame
6.67 kB
"""Execution Agent - Handles code execution and computational tasks"""
from typing import Dict, Any, List
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage
from langchain_core.tools import tool
from langchain_groq import ChatGroq
from code_agent import run_agent # Import our existing code execution engine
from src.tracing import get_langfuse_callback_handler
@tool
def run_python(input: str) -> str:
"""Execute Python code in a restricted sandbox (code-interpreter).
Pass **any** coding or file-manipulation task here and the agent will
compute the answer by running Python. The entire standard library is NOT
available; heavy networking is disabled. Suitable for: math, data-frames,
small file parsing, algorithmic questions.
"""
return run_agent(input)
def load_execution_prompt() -> str:
"""Load the execution prompt from file"""
try:
with open("./prompts/execution_prompt.txt", "r", encoding="utf-8") as f:
return f.read().strip()
except FileNotFoundError:
return """You are a specialized execution agent. Use the run_python tool to execute code and solve computational problems."""
def get_execution_tools() -> List:
"""Get list of tools available to the execution agent"""
return [run_python]
def execute_tool_calls(tool_calls: list, tools: list) -> list:
"""Execute tool calls and return results"""
tool_messages = []
# Create a mapping of tool names to tool functions
tool_map = {tool.name: tool for tool in tools}
for tool_call in tool_calls:
tool_name = tool_call['name']
tool_args = tool_call['args']
tool_call_id = tool_call['id']
if tool_name in tool_map:
try:
print(f"Execution Agent: Executing {tool_name} with args: {str(tool_args)[:200]}...")
result = tool_map[tool_name].invoke(tool_args)
tool_messages.append(
ToolMessage(
content=str(result),
tool_call_id=tool_call_id
)
)
except Exception as e:
print(f"Error executing {tool_name}: {e}")
tool_messages.append(
ToolMessage(
content=f"Error executing {tool_name}: {e}",
tool_call_id=tool_call_id
)
)
else:
tool_messages.append(
ToolMessage(
content=f"Unknown tool: {tool_name}",
tool_call_id=tool_call_id
)
)
return tool_messages
def needs_code_execution(query: str) -> bool:
"""Heuristic to determine if a query requires code execution"""
code_indicators = [
"calculate", "compute", "algorithm", "fibonacci", "math", "data",
"programming", "code", "function", "sort", "csv", "json", "pandas",
"plot", "graph", "analyze", "process", "file", "manipulation"
]
query_lower = query.lower()
return any(indicator in query_lower for indicator in code_indicators)
def execution_agent(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Execution agent that handles computational and code execution tasks
"""
print("Execution Agent: Processing computational request")
try:
# Get execution prompt
execution_prompt = load_execution_prompt()
# Initialize LLM with tools
llm = ChatGroq(model="qwen-qwq-32b", temperature=0.1) # Lower temp for consistent code
tools = get_execution_tools()
llm_with_tools = llm.bind_tools(tools)
# Get callback handler for tracing
callback_handler = get_langfuse_callback_handler()
callbacks = [callback_handler] if callback_handler else []
# Build messages
messages = state.get("messages", [])
# Add execution system prompt
execution_messages = [SystemMessage(content=execution_prompt)]
# Get user query for analysis
user_query = None
for msg in reversed(messages):
if msg.type == "human":
user_query = msg.content
break
# If this clearly needs code execution, provide guidance
if user_query and needs_code_execution(user_query):
guidance_msg = HumanMessage(
content=f"""Task requiring code execution: {user_query}
Please analyze this computational task and use the run_python tool to solve it step by step.
Break down complex problems into smaller steps and provide clear explanations."""
)
execution_messages.append(guidance_msg)
# Add original messages (excluding system messages to avoid duplicates)
for msg in messages:
if msg.type != "system":
execution_messages.append(msg)
# Get initial response from LLM
response = llm_with_tools.invoke(execution_messages, config={"callbacks": callbacks})
# Check if the LLM wants to use tools
if response.tool_calls:
print(f"Execution Agent: LLM requested {len(response.tool_calls)} tool calls")
# Execute the tool calls
tool_messages = execute_tool_calls(response.tool_calls, tools)
# Add the response and tool messages to conversation
execution_messages.extend([response] + tool_messages)
# Get final response after tool execution
final_response = llm.invoke(execution_messages, config={"callbacks": callbacks})
return {
**state,
"messages": execution_messages + [final_response],
"agent_response": final_response,
"current_step": "verification"
}
else:
# Direct response without tools
return {
**state,
"messages": execution_messages + [response],
"agent_response": response,
"current_step": "verification"
}
except Exception as e:
print(f"Execution Agent Error: {e}")
error_response = AIMessage(content=f"I encountered an error while processing your computational request: {e}")
return {
**state,
"messages": state.get("messages", []) + [error_response],
"agent_response": error_response,
"current_step": "verification"
}