File size: 16,715 Bytes
6f1cfd1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7e9f8c3
 
 
 
6f1cfd1
f459969
aae56b9
6f1cfd1
f459969
 
 
 
505a1c6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f459969
 
 
 
505a1c6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f459969
6f1cfd1
 
 
f459969
 
 
 
 
 
 
 
6f1cfd1
 
 
f459969
6f1cfd1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6e46681
 
 
 
6f1cfd1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
import os
import dotenv
import openai
import json
from typing import List, Dict, Any
from tools.fetch import fetch_webpage, search_web
from tools.yttranscript import get_youtube_transcript, get_youtube_title_description
from tools.stt import get_text_transcript_from_audio_file
from tools.image import analyze_image
from common.mylogger import mylog
import myprompts

dotenv.load_dotenv()

# Set up OpenAI client
openai.api_key = os.environ["OPENAI_API_KEY"]

class OpenAIAgent:
    def __init__(self, model_id: str, name: str, description: str, tools: List = None, max_steps: int = 7):
        self.model_id = model_id
        self.name = name
        self.description = description
        self.tools = tools or []
        self.max_steps = max_steps
        self.conversation_history = []

        # Debug log tool names
        for t in self.tools:
            print("Loaded tool:", getattr(t, "name", getattr(t, "__name__", "UNKNOWN")))
    
    def _get_tool_schema(self) -> List[Dict[str, Any]]:
        functions = []
        for tool in self.tools:
            # Handle smolagents @tool objects
            if hasattr(tool, "name") and hasattr(tool, "run"):
                name = tool.name
                description = tool.__doc__ or ""
                
                # Create proper schema based on tool name
                if name == "search_web":
                    params = {
                        "type": "object",
                        "properties": {
                            "query": {"type": "string", "description": "Search query"}
                        },
                        "required": ["query"]
                    }
                elif name == "fetch_webpage":
                    params = {
                        "type": "object",
                        "properties": {
                            "url": {"type": "string", "description": "URL to fetch"}
                        },
                        "required": ["url"]
                    }
                elif name == "get_youtube_transcript":
                    params = {
                        "type": "object",
                        "properties": {
                            "url": {"type": "string", "description": "YouTube URL"}
                        },
                        "required": ["url"]
                    }
                elif name == "get_youtube_title_description":
                    params = {
                        "type": "object",
                        "properties": {
                            "url": {"type": "string", "description": "YouTube URL"}
                        },
                        "required": ["url"]
                    }
                elif name == "get_text_transcript_from_audio_file":
                    params = {
                        "type": "object",
                        "properties": {
                            "file_path": {"type": "string", "description": "Path to audio file"}
                        },
                        "required": ["file_path"]
                    }
                elif name == "analyze_image":
                    params = {
                        "type": "object",
                        "properties": {
                            "image_path": {"type": "string", "description": "Path to image file"}
                        },
                        "required": ["image_path"]
                    }
                else:
                    # Default schema for unknown tools
                    params = {
                        "type": "object",
                        "properties": {
                            "input": {"type": "string", "description": "Input for the tool"}
                        },
                        "required": ["input"]
                    }
                
                functions.append({
                    "type": "function",
                    "function": {
                        "name": name,
                        "description": description,
                        "parameters": params
                    }
                })
            
            # Handle normal Python functions
            elif hasattr(tool, "__name__"):
                name = tool.__name__
                description = tool.__doc__ or ""
                
                # Create proper schema based on function name
                if name == "search_web":
                    params = {
                        "type": "object",
                        "properties": {
                            "query": {"type": "string", "description": "Search query"}
                        },
                        "required": ["query"]
                    }
                elif name == "fetch_webpage":
                    params = {
                        "type": "object",
                        "properties": {
                            "url": {"type": "string", "description": "URL to fetch"}
                        },
                        "required": ["url"]
                    }
                elif name == "get_youtube_transcript":
                    params = {
                        "type": "object",
                        "properties": {
                            "url": {"type": "string", "description": "YouTube URL"}
                        },
                        "required": ["url"]
                    }
                elif name == "get_youtube_title_description":
                    params = {
                        "type": "object",
                        "properties": {
                            "url": {"type": "string", "description": "YouTube URL"}
                        },
                        "required": ["url"]
                    }
                elif name == "get_text_transcript_from_audio_file":
                    params = {
                        "type": "object",
                        "properties": {
                            "file_path": {"type": "string", "description": "Path to audio file"}
                        },
                        "required": ["file_path"]
                    }
                elif name == "analyze_image":
                    params = {
                        "type": "object",
                        "properties": {
                            "image_path": {"type": "string", "description": "Path to image file"}
                        },
                        "required": ["image_path"]
                    }
                else:
                    # Default schema for unknown functions
                    params = {
                        "type": "object",
                        "properties": {
                            "input": {"type": "string", "description": "Input for the function"}
                        },
                        "required": ["input"]
                    }
                
                functions.append({
                    "type": "function",
                    "function": {
                        "name": name,
                        "description": description,
                        "parameters": params
                    }
                })
        
        return functions
    
    def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]):
        for tool in self.tools:
            # smolagents tool
            if hasattr(tool, "name") and tool.name == tool_name:
                try:
                    return tool.run(**arguments)
                except Exception as e:
                    return f"Error executing {tool_name}: {e}"
            # plain Python function
            if hasattr(tool, "__name__") and tool.__name__ == tool_name:
                try:
                    return tool(**arguments)
                except Exception as e:
                    return f"Error executing {tool_name}: {e}"
        return f"Tool {tool_name} not found"
    
    def run(self, query: str) -> str:
        """Run the agent with the given query"""
        self.conversation_history = [
            {"role": "system", "content": f"You are {self.name}. {self.description}"},
            {"role": "user", "content": query}
        ]
        
        steps = 0
        while steps < self.max_steps:
            try:
                # Make API call to OpenAI
                response = openai.chat.completions.create(
                    model=self.model_id,
                    messages=self.conversation_history,
                    tools=self._get_tool_schema() if self.tools else None,
                    tool_choice="auto" if self.tools else None
                )
                
                message = response.choices[0].message
                
                # Add assistant's response to conversation history
                self.conversation_history.append({
                    "role": "assistant",
                    "content": message.content,
                    "tool_calls": message.tool_calls
                })
                
                # Check if the assistant wants to call tools
                if message.tool_calls:
                    for tool_call in message.tool_calls:
                        function_name = tool_call.function.name
                        function_args = json.loads(tool_call.function.arguments)
                        
                        # Execute the tool
                        tool_result = self._execute_tool(function_name, function_args)
                        
                        # Add tool result to conversation history
                        self.conversation_history.append({
                            "role": "tool",
                            "tool_call_id": tool_call.id,
                            "content": str(tool_result)
                        })
                else:
                    # No more tools to call, return the response
                    return message.content or "No response generated"
                
                steps += 1
                
            except Exception as e:
                return f"Error in agent execution: {str(e)}"
        
        return "Maximum steps reached without completion"

class ManagerAgent(OpenAIAgent):
    def __init__(self, model_id: str, managed_agents: List[OpenAIAgent], max_steps: int = 15):
        super().__init__(
            model_id=model_id,
            name="manager_agent",
            description="A manager agent that coordinates the work of other agents to answer questions.",
            max_steps=max_steps
        )
        self.managed_agents = managed_agents
    
    def _delegate_to_agent(self, agent_name: str, task: str) -> str:
        """Delegate a task to a specific agent"""
        for agent in self.managed_agents:
            if agent.name == agent_name:
                return agent.run(task)
        return f"Agent {agent_name} not found"
    
    def run(self, query: str) -> str:
        """Run the manager agent with delegation capabilities"""
        # Add information about available agents to the system prompt
        agent_info = "\n".join([f"- {agent.name}: {agent.description}" for agent in self.managed_agents])
        
        system_prompt = f"""You are {self.name}. {self.description}
Available agents you can delegate to:
{agent_info}
When you need to delegate a task, clearly state which agent should handle it and what specific task they should perform.
You should coordinate the work and synthesize the results from different agents to provide a comprehensive answer.
"""
        
        self.conversation_history = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": query}
        ]
        
        steps = 0
        while steps < self.max_steps:
            try:
                response = openai.chat.completions.create(
                    model=self.model_id,
                    messages=self.conversation_history,
                    temperature=0.1,  # Controls randomness (0.0 to 2.0)
                    top_p=0.88,       # Nucleus sampling
                    max_tokens=4000   # Maximum response length
                )
                
                message = response.choices[0].message.content
                
                # Check if the manager wants to delegate to an agent
                if "DELEGATE:" in message:
                    # Parse delegation request
                    lines = message.split('\n')
                    for line in lines:
                        if line.startswith("DELEGATE:"):
                            parts = line.replace("DELEGATE:", "").strip().split("|", 1)
                            if len(parts) == 2:
                                agent_name = parts[0].strip()
                                task = parts[1].strip()
                                
                                # Delegate to the specified agent
                                result = self._delegate_to_agent(agent_name, task)
                                
                                # Add the delegation result to conversation
                                self.conversation_history.append({
                                    "role": "assistant",
                                    "content": message
                                })
                                self.conversation_history.append({
                                    "role": "user",
                                    "content": f"Result from {agent_name}: {result}"
                                })
                                break
                else:
                    # Final answer
                    return message
                
                steps += 1
                
            except Exception as e:
                return f"Error in manager execution: {str(e)}"
        
        return "Maximum steps reached without completion"

def check_final_answer(final_answer, agent_memory=None) -> bool:
    """
    Check if the final answer is correct.
    basic check on the length of the answer.
    """
    mylog("check_final_answer", final_answer)
    # if return answer is more than 200 characters, we will assume it is not correct    
    if len(str(final_answer)) > 200:
        return False
    else:
        return True

# Create agents
web_agent = OpenAIAgent(
    model_id="gpt-4o-mini",
    name="web_agent",
    description="Use search engine to find webpages related to a subject and get the page content",
    tools=[search_web, fetch_webpage],
    max_steps=7
)

audiovideo_agent = OpenAIAgent(
    model_id="gpt-4o-mini",
    name="audiovideo_agent",
    description="Extracts information from image, video or audio files from the web",
    tools=[get_youtube_transcript, get_youtube_title_description, get_text_transcript_from_audio_file, analyze_image],
    max_steps=7
)

manager_agent = ManagerAgent(
    model_id="gpt-4o-mini",
    managed_agents=[web_agent, audiovideo_agent],
    max_steps=15
)

class MultiAgent:
    def __init__(self):
        print("MultiAgent initialized.")

    def __call__(self, question: str) -> str:
        mylog(self.__class__.__name__, question)        

        try:
            prefix = """You are the top agent of a multi-agent system that can answer questions by coordinating the work of other agents.
            You will receive a question and you will decide which agent to use to answer it.
            You can use the web_agent to search the web for information and for fetching the content of a web page, or the audiovideo_agent to extract information from video or audio files.
            You can also use your own knowledge to answer the question.
            You need to respect the output format that is given to you.
            Finding the correct answer to the question need reasoning and planning, read the question carefully, think step by step and do not skip any steps.
            
            To delegate tasks to agents, use the format: DELEGATE: agent_name | task_description
            For example: DELEGATE: web_agent | Search for information about the Malko competition 2023 enrollment
            """

            question = prefix + "\nTHE QUESTION:\n" + question + '\n' + myprompts.output_format

            fixed_answer = manager_agent.run(question)
            
            return fixed_answer
        except Exception as e:
            error = f"An error occurred while processing the question: {e}"
            print(error)
            return error

if __name__ == "__main__":
    # Example usage
   
    question = """
What was the actual enrollment of the Malko competition in 2023?
"""
    agent = MultiAgent()
    answer = agent(question)
    print(f"Answer: {answer}")