File size: 6,671 Bytes
fe36046
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""Execution Agent - Handles code execution and computational tasks"""
from typing import Dict, Any, List
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage
from langchain_core.tools import tool
from langchain_groq import ChatGroq
from code_agent import run_agent  # Import our existing code execution engine
from src.tracing import get_langfuse_callback_handler


@tool
def run_python(input: str) -> str:
    """Execute Python code in a restricted sandbox (code-interpreter).

    Pass **any** coding or file-manipulation task here and the agent will
    compute the answer by running Python. The entire standard library is NOT
    available; heavy networking is disabled. Suitable for: math, data-frames,
    small file parsing, algorithmic questions.
    """
    return run_agent(input)


def load_execution_prompt() -> str:
    """Load the execution prompt from file"""
    try:
        with open("./prompts/execution_prompt.txt", "r", encoding="utf-8") as f:
            return f.read().strip()
    except FileNotFoundError:
        return """You are a specialized execution agent. Use the run_python tool to execute code and solve computational problems."""


def get_execution_tools() -> List:
    """Get list of tools available to the execution agent"""
    return [run_python]


def execute_tool_calls(tool_calls: list, tools: list) -> list:
    """Execute tool calls and return results"""
    tool_messages = []
    
    # Create a mapping of tool names to tool functions
    tool_map = {tool.name: tool for tool in tools}
    
    for tool_call in tool_calls:
        tool_name = tool_call['name']
        tool_args = tool_call['args']
        tool_call_id = tool_call['id']
        
        if tool_name in tool_map:
            try:
                print(f"Execution Agent: Executing {tool_name} with args: {str(tool_args)[:200]}...")
                result = tool_map[tool_name].invoke(tool_args)
                tool_messages.append(
                    ToolMessage(
                        content=str(result),
                        tool_call_id=tool_call_id
                    )
                )
            except Exception as e:
                print(f"Error executing {tool_name}: {e}")
                tool_messages.append(
                    ToolMessage(
                        content=f"Error executing {tool_name}: {e}",
                        tool_call_id=tool_call_id
                    )
                )
        else:
            tool_messages.append(
                ToolMessage(
                    content=f"Unknown tool: {tool_name}",
                    tool_call_id=tool_call_id
                )
            )
    
    return tool_messages


def needs_code_execution(query: str) -> bool:
    """Heuristic to determine if a query requires code execution"""
    code_indicators = [
        "calculate", "compute", "algorithm", "fibonacci", "math", "data",
        "programming", "code", "function", "sort", "csv", "json", "pandas",
        "plot", "graph", "analyze", "process", "file", "manipulation"
    ]
    query_lower = query.lower()
    return any(indicator in query_lower for indicator in code_indicators)


def execution_agent(state: Dict[str, Any]) -> Dict[str, Any]:
    """
    Execution agent that handles computational and code execution tasks
    """
    print("Execution Agent: Processing computational request")
    
    try:
        # Get execution prompt
        execution_prompt = load_execution_prompt()
        
        # Initialize LLM with tools
        llm = ChatGroq(model="qwen-qwq-32b", temperature=0.1)  # Lower temp for consistent code
        tools = get_execution_tools()
        llm_with_tools = llm.bind_tools(tools)
        
        # Get callback handler for tracing
        callback_handler = get_langfuse_callback_handler()
        callbacks = [callback_handler] if callback_handler else []
        
        # Build messages
        messages = state.get("messages", [])
        
        # Add execution system prompt
        execution_messages = [SystemMessage(content=execution_prompt)]
        
        # Get user query for analysis
        user_query = None
        for msg in reversed(messages):
            if msg.type == "human":
                user_query = msg.content
                break
        
        # If this clearly needs code execution, provide guidance
        if user_query and needs_code_execution(user_query):
            guidance_msg = HumanMessage(
                content=f"""Task requiring code execution: {user_query}

Please analyze this computational task and use the run_python tool to solve it step by step.
Break down complex problems into smaller steps and provide clear explanations."""
            )
            execution_messages.append(guidance_msg)
        
        # Add original messages (excluding system messages to avoid duplicates)
        for msg in messages:
            if msg.type != "system":
                execution_messages.append(msg)
        
        # Get initial response from LLM
        response = llm_with_tools.invoke(execution_messages, config={"callbacks": callbacks})
        
        # Check if the LLM wants to use tools
        if response.tool_calls:
            print(f"Execution Agent: LLM requested {len(response.tool_calls)} tool calls")
            
            # Execute the tool calls
            tool_messages = execute_tool_calls(response.tool_calls, tools)
            
            # Add the response and tool messages to conversation
            execution_messages.extend([response] + tool_messages)
            
            # Get final response after tool execution
            final_response = llm.invoke(execution_messages, config={"callbacks": callbacks})
            
            return {
                **state,
                "messages": execution_messages + [final_response],
                "agent_response": final_response,
                "current_step": "verification"
            }
        else:
            # Direct response without tools
            return {
                **state,
                "messages": execution_messages + [response],
                "agent_response": response,
                "current_step": "verification"
            }
        
    except Exception as e:
        print(f"Execution Agent Error: {e}")
        error_response = AIMessage(content=f"I encountered an error while processing your computational request: {e}")
        return {
            **state,
            "messages": state.get("messages", []) + [error_response],
            "agent_response": error_response,
            "current_step": "verification"
        }