| | """Execution Agent - Handles code execution and computational tasks""" |
| | from typing import Dict, Any, List |
| | from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage |
| | from langchain_core.tools import tool |
| | from langchain_groq import ChatGroq |
| | from code_agent import run_agent |
| | from src.tracing import get_langfuse_callback_handler |
| |
|
| |
|
| | @tool |
| | def run_python(input: str) -> str: |
| | """Execute Python code in a restricted sandbox (code-interpreter). |
| | |
| | Pass **any** coding or file-manipulation task here and the agent will |
| | compute the answer by running Python. The entire standard library is NOT |
| | available; heavy networking is disabled. Suitable for: math, data-frames, |
| | small file parsing, algorithmic questions. |
| | """ |
| | return run_agent(input) |
| |
|
| |
|
| | def load_execution_prompt() -> str: |
| | """Load the execution prompt from file""" |
| | try: |
| | with open("./prompts/execution_prompt.txt", "r", encoding="utf-8") as f: |
| | return f.read().strip() |
| | except FileNotFoundError: |
| | return """You are a specialized execution agent. Use the run_python tool to execute code and solve computational problems.""" |
| |
|
| |
|
| | def get_execution_tools() -> List: |
| | """Get list of tools available to the execution agent""" |
| | return [run_python] |
| |
|
| |
|
| | def execute_tool_calls(tool_calls: list, tools: list) -> list: |
| | """Execute tool calls and return results""" |
| | tool_messages = [] |
| | |
| | |
| | tool_map = {tool.name: tool for tool in tools} |
| | |
| | for tool_call in tool_calls: |
| | tool_name = tool_call['name'] |
| | tool_args = tool_call['args'] |
| | tool_call_id = tool_call['id'] |
| | |
| | if tool_name in tool_map: |
| | try: |
| | print(f"Execution Agent: Executing {tool_name} with args: {str(tool_args)[:200]}...") |
| | result = tool_map[tool_name].invoke(tool_args) |
| | tool_messages.append( |
| | ToolMessage( |
| | content=str(result), |
| | tool_call_id=tool_call_id |
| | ) |
| | ) |
| | except Exception as e: |
| | print(f"Error executing {tool_name}: {e}") |
| | tool_messages.append( |
| | ToolMessage( |
| | content=f"Error executing {tool_name}: {e}", |
| | tool_call_id=tool_call_id |
| | ) |
| | ) |
| | else: |
| | tool_messages.append( |
| | ToolMessage( |
| | content=f"Unknown tool: {tool_name}", |
| | tool_call_id=tool_call_id |
| | ) |
| | ) |
| | |
| | return tool_messages |
| |
|
| |
|
| | def needs_code_execution(query: str) -> bool: |
| | """Heuristic to determine if a query requires code execution""" |
| | code_indicators = [ |
| | "calculate", "compute", "algorithm", "fibonacci", "math", "data", |
| | "programming", "code", "function", "sort", "csv", "json", "pandas", |
| | "plot", "graph", "analyze", "process", "file", "manipulation" |
| | ] |
| | query_lower = query.lower() |
| | return any(indicator in query_lower for indicator in code_indicators) |
| |
|
| |
|
| | def execution_agent(state: Dict[str, Any]) -> Dict[str, Any]: |
| | """ |
| | Execution agent that handles computational and code execution tasks |
| | """ |
| | print("Execution Agent: Processing computational request") |
| | |
| | try: |
| | |
| | execution_prompt = load_execution_prompt() |
| | |
| | |
| | llm = ChatGroq(model="qwen-qwq-32b", temperature=0.1) |
| | tools = get_execution_tools() |
| | llm_with_tools = llm.bind_tools(tools) |
| | |
| | |
| | callback_handler = get_langfuse_callback_handler() |
| | callbacks = [callback_handler] if callback_handler else [] |
| | |
| | |
| | messages = state.get("messages", []) |
| | |
| | |
| | execution_messages = [SystemMessage(content=execution_prompt)] |
| | |
| | |
| | user_query = None |
| | for msg in reversed(messages): |
| | if msg.type == "human": |
| | user_query = msg.content |
| | break |
| | |
| | |
| | if user_query and needs_code_execution(user_query): |
| | guidance_msg = HumanMessage( |
| | content=f"""Task requiring code execution: {user_query} |
| | |
| | Please analyze this computational task and use the run_python tool to solve it step by step. |
| | Break down complex problems into smaller steps and provide clear explanations.""" |
| | ) |
| | execution_messages.append(guidance_msg) |
| | |
| | |
| | for msg in messages: |
| | if msg.type != "system": |
| | execution_messages.append(msg) |
| | |
| | |
| | response = llm_with_tools.invoke(execution_messages, config={"callbacks": callbacks}) |
| | |
| | |
| | if response.tool_calls: |
| | print(f"Execution Agent: LLM requested {len(response.tool_calls)} tool calls") |
| | |
| | |
| | tool_messages = execute_tool_calls(response.tool_calls, tools) |
| | |
| | |
| | execution_messages.extend([response] + tool_messages) |
| | |
| | |
| | final_response = llm.invoke(execution_messages, config={"callbacks": callbacks}) |
| | |
| | return { |
| | **state, |
| | "messages": execution_messages + [final_response], |
| | "agent_response": final_response, |
| | "current_step": "verification" |
| | } |
| | else: |
| | |
| | return { |
| | **state, |
| | "messages": execution_messages + [response], |
| | "agent_response": response, |
| | "current_step": "verification" |
| | } |
| | |
| | except Exception as e: |
| | print(f"Execution Agent Error: {e}") |
| | error_response = AIMessage(content=f"I encountered an error while processing your computational request: {e}") |
| | return { |
| | **state, |
| | "messages": state.get("messages", []) + [error_response], |
| | "agent_response": error_response, |
| | "current_step": "verification" |
| | } |