File size: 6,671 Bytes
fe36046 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
"""Execution Agent - Handles code execution and computational tasks"""
from typing import Dict, Any, List
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage
from langchain_core.tools import tool
from langchain_groq import ChatGroq
from code_agent import run_agent # Import our existing code execution engine
from src.tracing import get_langfuse_callback_handler
@tool
def run_python(input: str) -> str:
"""Execute Python code in a restricted sandbox (code-interpreter).
Pass **any** coding or file-manipulation task here and the agent will
compute the answer by running Python. The entire standard library is NOT
available; heavy networking is disabled. Suitable for: math, data-frames,
small file parsing, algorithmic questions.
"""
return run_agent(input)
def load_execution_prompt() -> str:
"""Load the execution prompt from file"""
try:
with open("./prompts/execution_prompt.txt", "r", encoding="utf-8") as f:
return f.read().strip()
except FileNotFoundError:
return """You are a specialized execution agent. Use the run_python tool to execute code and solve computational problems."""
def get_execution_tools() -> List:
"""Get list of tools available to the execution agent"""
return [run_python]
def execute_tool_calls(tool_calls: list, tools: list) -> list:
"""Execute tool calls and return results"""
tool_messages = []
# Create a mapping of tool names to tool functions
tool_map = {tool.name: tool for tool in tools}
for tool_call in tool_calls:
tool_name = tool_call['name']
tool_args = tool_call['args']
tool_call_id = tool_call['id']
if tool_name in tool_map:
try:
print(f"Execution Agent: Executing {tool_name} with args: {str(tool_args)[:200]}...")
result = tool_map[tool_name].invoke(tool_args)
tool_messages.append(
ToolMessage(
content=str(result),
tool_call_id=tool_call_id
)
)
except Exception as e:
print(f"Error executing {tool_name}: {e}")
tool_messages.append(
ToolMessage(
content=f"Error executing {tool_name}: {e}",
tool_call_id=tool_call_id
)
)
else:
tool_messages.append(
ToolMessage(
content=f"Unknown tool: {tool_name}",
tool_call_id=tool_call_id
)
)
return tool_messages
def needs_code_execution(query: str) -> bool:
"""Heuristic to determine if a query requires code execution"""
code_indicators = [
"calculate", "compute", "algorithm", "fibonacci", "math", "data",
"programming", "code", "function", "sort", "csv", "json", "pandas",
"plot", "graph", "analyze", "process", "file", "manipulation"
]
query_lower = query.lower()
return any(indicator in query_lower for indicator in code_indicators)
def execution_agent(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Execution agent that handles computational and code execution tasks
"""
print("Execution Agent: Processing computational request")
try:
# Get execution prompt
execution_prompt = load_execution_prompt()
# Initialize LLM with tools
llm = ChatGroq(model="qwen-qwq-32b", temperature=0.1) # Lower temp for consistent code
tools = get_execution_tools()
llm_with_tools = llm.bind_tools(tools)
# Get callback handler for tracing
callback_handler = get_langfuse_callback_handler()
callbacks = [callback_handler] if callback_handler else []
# Build messages
messages = state.get("messages", [])
# Add execution system prompt
execution_messages = [SystemMessage(content=execution_prompt)]
# Get user query for analysis
user_query = None
for msg in reversed(messages):
if msg.type == "human":
user_query = msg.content
break
# If this clearly needs code execution, provide guidance
if user_query and needs_code_execution(user_query):
guidance_msg = HumanMessage(
content=f"""Task requiring code execution: {user_query}
Please analyze this computational task and use the run_python tool to solve it step by step.
Break down complex problems into smaller steps and provide clear explanations."""
)
execution_messages.append(guidance_msg)
# Add original messages (excluding system messages to avoid duplicates)
for msg in messages:
if msg.type != "system":
execution_messages.append(msg)
# Get initial response from LLM
response = llm_with_tools.invoke(execution_messages, config={"callbacks": callbacks})
# Check if the LLM wants to use tools
if response.tool_calls:
print(f"Execution Agent: LLM requested {len(response.tool_calls)} tool calls")
# Execute the tool calls
tool_messages = execute_tool_calls(response.tool_calls, tools)
# Add the response and tool messages to conversation
execution_messages.extend([response] + tool_messages)
# Get final response after tool execution
final_response = llm.invoke(execution_messages, config={"callbacks": callbacks})
return {
**state,
"messages": execution_messages + [final_response],
"agent_response": final_response,
"current_step": "verification"
}
else:
# Direct response without tools
return {
**state,
"messages": execution_messages + [response],
"agent_response": response,
"current_step": "verification"
}
except Exception as e:
print(f"Execution Agent Error: {e}")
error_response = AIMessage(content=f"I encountered an error while processing your computational request: {e}")
return {
**state,
"messages": state.get("messages", []) + [error_response],
"agent_response": error_response,
"current_step": "verification"
} |