"""Execution Agent - Handles code execution and computational tasks""" from typing import Dict, Any, List from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage from langchain_core.tools import tool from langchain_groq import ChatGroq from code_agent import run_agent # Import our existing code execution engine from src.tracing import get_langfuse_callback_handler @tool def run_python(input: str) -> str: """Execute Python code in a restricted sandbox (code-interpreter). Pass **any** coding or file-manipulation task here and the agent will compute the answer by running Python. The entire standard library is NOT available; heavy networking is disabled. Suitable for: math, data-frames, small file parsing, algorithmic questions. """ return run_agent(input) def load_execution_prompt() -> str: """Load the execution prompt from file""" try: with open("./prompts/execution_prompt.txt", "r", encoding="utf-8") as f: return f.read().strip() except FileNotFoundError: return """You are a specialized execution agent. Use the run_python tool to execute code and solve computational problems.""" def get_execution_tools() -> List: """Get list of tools available to the execution agent""" return [run_python] def execute_tool_calls(tool_calls: list, tools: list) -> list: """Execute tool calls and return results""" tool_messages = [] # Create a mapping of tool names to tool functions tool_map = {tool.name: tool for tool in tools} for tool_call in tool_calls: tool_name = tool_call['name'] tool_args = tool_call['args'] tool_call_id = tool_call['id'] if tool_name in tool_map: try: print(f"Execution Agent: Executing {tool_name} with args: {str(tool_args)[:200]}...") result = tool_map[tool_name].invoke(tool_args) tool_messages.append( ToolMessage( content=str(result), tool_call_id=tool_call_id ) ) except Exception as e: print(f"Error executing {tool_name}: {e}") tool_messages.append( ToolMessage( content=f"Error executing {tool_name}: {e}", tool_call_id=tool_call_id ) ) else: tool_messages.append( ToolMessage( content=f"Unknown tool: {tool_name}", tool_call_id=tool_call_id ) ) return tool_messages def needs_code_execution(query: str) -> bool: """Heuristic to determine if a query requires code execution""" code_indicators = [ "calculate", "compute", "algorithm", "fibonacci", "math", "data", "programming", "code", "function", "sort", "csv", "json", "pandas", "plot", "graph", "analyze", "process", "file", "manipulation" ] query_lower = query.lower() return any(indicator in query_lower for indicator in code_indicators) def execution_agent(state: Dict[str, Any]) -> Dict[str, Any]: """ Execution agent that handles computational and code execution tasks """ print("Execution Agent: Processing computational request") try: # Get execution prompt execution_prompt = load_execution_prompt() # Initialize LLM with tools llm = ChatGroq(model="qwen-qwq-32b", temperature=0.1) # Lower temp for consistent code tools = get_execution_tools() llm_with_tools = llm.bind_tools(tools) # Get callback handler for tracing callback_handler = get_langfuse_callback_handler() callbacks = [callback_handler] if callback_handler else [] # Build messages messages = state.get("messages", []) # Add execution system prompt execution_messages = [SystemMessage(content=execution_prompt)] # Get user query for analysis user_query = None for msg in reversed(messages): if msg.type == "human": user_query = msg.content break # If this clearly needs code execution, provide guidance if user_query and needs_code_execution(user_query): guidance_msg = HumanMessage( content=f"""Task requiring code execution: {user_query} Please analyze this computational task and use the run_python tool to solve it step by step. Break down complex problems into smaller steps and provide clear explanations.""" ) execution_messages.append(guidance_msg) # Add original messages (excluding system messages to avoid duplicates) for msg in messages: if msg.type != "system": execution_messages.append(msg) # Get initial response from LLM response = llm_with_tools.invoke(execution_messages, config={"callbacks": callbacks}) # Check if the LLM wants to use tools if response.tool_calls: print(f"Execution Agent: LLM requested {len(response.tool_calls)} tool calls") # Execute the tool calls tool_messages = execute_tool_calls(response.tool_calls, tools) # Add the response and tool messages to conversation execution_messages.extend([response] + tool_messages) # Get final response after tool execution final_response = llm.invoke(execution_messages, config={"callbacks": callbacks}) return { **state, "messages": execution_messages + [final_response], "agent_response": final_response, "current_step": "verification" } else: # Direct response without tools return { **state, "messages": execution_messages + [response], "agent_response": response, "current_step": "verification" } except Exception as e: print(f"Execution Agent Error: {e}") error_response = AIMessage(content=f"I encountered an error while processing your computational request: {e}") return { **state, "messages": state.get("messages", []) + [error_response], "agent_response": error_response, "current_step": "verification" }