File size: 5,087 Bytes
8666b1b
4863507
 
 
 
 
 
 
 
8666b1b
4863507
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8666b1b
4863507
8666b1b
 
 
 
 
4863507
 
 
 
 
 
 
 
 
 
 
 
8666b1b
4863507
 
 
 
 
8666b1b
4863507
 
 
 
 
 
 
 
 
 
 
8666b1b
 
 
 
 
 
 
 
 
 
 
 
4863507
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from sophos_models import Sophos
from sophos_tools import *

class Agent:
    def __init__(self, name: str, instructions: str, model: str, tools: list):
        self.name = name
        self.instructions = instructions
        self.model = model
        self.tools = {tool.__name__: tool for tool in tools if hasattr(tool, "_is_tool")}
        self.sophos = Sophos(model_path=model)

    def process_tool_calls(self, response_content: str):
        """Process tool calls from response content and return results"""
        if "```tool" not in response_content:
            return None
            
        print("🔧 TOOL DETECTED - Processing...")
        try:
            tool_block = response_content.split("```tool")[1].split("```")[0].strip()
            print(f"🔧 Tool block found: {tool_block}")
            
            tool_results = []
            tool_lines = [line.strip() for line in tool_block.split('\n') if line.strip()]
            
            for tool_line in tool_lines:
                if "(" in tool_line and ")" in tool_line:
                    tool_name = tool_line.split("(")[0].strip()
                    arg_part = tool_line.split("(", 1)[1].rsplit(")", 1)[0].strip()
                    arg = arg_part.strip('"').strip("'")
                    
                    if tool_name in self.tools:
                        print(f"🔧 CALLING TOOL: {tool_name}({arg})")
                        if arg:
                            tool_result = self.tools[tool_name](arg)
                        else:
                            tool_result = self.tools[tool_name]()
                        print(f"🔧 TOOL RESULT: {tool_result}")
                        tool_results.append(f"{tool_name}: {tool_result}")
                    else:
                        print(f"🔧 ERROR: Tool {tool_name} not found")
                        tool_results.append(f"Error: Tool {tool_name} not found")
            
            return tool_results if tool_results else None
                        
        except Exception as e:
            print(f"Error parsing tool: {e}")
            return None
        
    def get_tools_names_and_descriptions(self):
        return {name: func.__doc__ for name, func in self.tools.items()}

    def run(self, prompt: str):
        system_prompt = f"""You are {self.name}. {self.instructions}

Available tools: {self.get_tools_names_and_descriptions()}

CRITICAL: To use a tool, respond with exactly this format:
```tool
get_weather(Tokyo)
```

You can use multiple tools:
```tool
get_weather(Tokyo)
get_time()
```

YOU MUST USE A BLOCK FOR TOOLS, or the system will not understand you are calling a tool.
CRITICAL:
```tool
<tool_name>(<arg>)
```
REMEMBER ALL TOOLS MUST BE INSIDE THE SAME BLOCK

IMPORTANT WORKFLOW:
1. If you need live information, use the appropriate tool(s) first
2. Once you have the information you need, provide your final response WITHOUT any tool calls
3. Do NOT call the same tool repeatedly unless the user asks for updated information
4. After calling a tool once and getting results, provide your answer to the user

user prompt: {prompt}
"""
        print(f"--- System Prompt ---\n{system_prompt}\n---------------------")

        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": prompt}
        ]

        max_iterations = 99
        iteration = 0
        last_tool_calls = []
        
        while iteration < max_iterations:
            iteration += 1
            print(f"\n--- Iteration {iteration} ---")

            response = self.sophos.ask(str(messages))

            current_response = response 
            print(f"AI Response: {current_response}")
            
            tool_results = self.process_tool_calls(current_response)
            
            if tool_results is None:
                print("🏁 No tool calls detected - Agent finished!")
                return current_response
        
            all_results = "\n".join(tool_results)
            
            # Check if we're repeating the same tool calls
            current_tool_calls = [result.split(":")[0] for result in tool_results]
            if current_tool_calls == last_tool_calls and iteration > 1:
                print("⚠️ Detected repeated tool calls - forcing final response")
                messages.append({"role": "assistant", "content": current_response})
                messages.append({"role": "user", "content": f"Tool results:\n{all_results}\n\nYou have already called these tools. Now provide your final answer to the user without calling any more tools."})
            else:
                messages.append({"role": "assistant", "content": current_response})
                messages.append({"role": "user", "content": f"Tool results:\n{all_results}\n\nNow provide your final response to the user. Do NOT call any more tools unless absolutely necessary."})
            
            last_tool_calls = current_tool_calls
        
        print(f"⚠️ Reached maximum iterations ({max_iterations})")
        return current_response