bwilkie commited on
Commit
6f1cfd1
·
verified ·
1 Parent(s): af05ff5

Create multiagents.py

Browse files
Files changed (1) hide show
  1. multiagents.py +303 -0
multiagents.py ADDED
@@ -0,0 +1,303 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # a multi agent proposal to solve HF agent course final assignment
2
+ import os
3
+ import dotenv
4
+ import openai
5
+ import json
6
+ from typing import List, Dict, Any
7
+ from tools.fetch import fetch_webpage, search_web
8
+ from tools.yttranscript import get_youtube_transcript, get_youtube_title_description
9
+ from tools.stt import get_text_transcript_from_audio_file
10
+ from tools.image import analyze_image
11
+ from common.mylogger import mylog
12
+ import myprompts
13
+
14
+ dotenv.load_dotenv()
15
+
16
+ # Set up OpenAI client
17
+ openai.api_key = os.environ["OPENAI_API_KEY"]
18
+
19
+ class OpenAIAgent:
20
+ def __init__(self, model_id: str, name: str, description: str, tools: List = None, max_steps: int = 7):
21
+ self.model_id = model_id
22
+ self.name = name
23
+ self.description = description
24
+ self.tools = tools or []
25
+ self.max_steps = max_steps
26
+ self.conversation_history = []
27
+
28
+ def _get_tool_schema(self):
29
+ """Convert tools to OpenAI function calling format"""
30
+ tool_schemas = []
31
+
32
+ for tool in self.tools:
33
+ if hasattr(tool, '__name__'):
34
+ tool_name = tool.__name__
35
+ tool_doc = tool.__doc__ or "No description available"
36
+
37
+ # Basic schema - you may need to customize this based on your specific tools
38
+ schema = {
39
+ "type": "function",
40
+ "function": {
41
+ "name": tool_name,
42
+ "description": tool_doc,
43
+ "parameters": {
44
+ "type": "object",
45
+ "properties": {},
46
+ "required": []
47
+ }
48
+ }
49
+ }
50
+
51
+ # Add specific parameters based on tool name
52
+ if tool_name == "search_web":
53
+ schema["function"]["parameters"]["properties"] = {
54
+ "query": {"type": "string", "description": "Search query"}
55
+ }
56
+ schema["function"]["parameters"]["required"] = ["query"]
57
+ elif tool_name == "fetch_webpage":
58
+ schema["function"]["parameters"]["properties"] = {
59
+ "url": {"type": "string", "description": "URL to fetch"}
60
+ }
61
+ schema["function"]["parameters"]["required"] = ["url"]
62
+ elif tool_name == "get_youtube_transcript":
63
+ schema["function"]["parameters"]["properties"] = {
64
+ "url": {"type": "string", "description": "YouTube URL"}
65
+ }
66
+ schema["function"]["parameters"]["required"] = ["url"]
67
+ elif tool_name == "get_youtube_title_description":
68
+ schema["function"]["parameters"]["properties"] = {
69
+ "url": {"type": "string", "description": "YouTube URL"}
70
+ }
71
+ schema["function"]["parameters"]["required"] = ["url"]
72
+ elif tool_name == "get_text_transcript_from_audio_file":
73
+ schema["function"]["parameters"]["properties"] = {
74
+ "file_path": {"type": "string", "description": "Path to audio file"}
75
+ }
76
+ schema["function"]["parameters"]["required"] = ["file_path"]
77
+ elif tool_name == "analyze_image":
78
+ schema["function"]["parameters"]["properties"] = {
79
+ "image_path": {"type": "string", "description": "Path to image file"}
80
+ }
81
+ schema["function"]["parameters"]["required"] = ["image_path"]
82
+
83
+ tool_schemas.append(schema)
84
+
85
+ return tool_schemas
86
+
87
+ def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]):
88
+ """Execute a tool with given arguments"""
89
+ for tool in self.tools:
90
+ if hasattr(tool, '__name__') and tool.__name__ == tool_name:
91
+ try:
92
+ return tool(**arguments)
93
+ except Exception as e:
94
+ return f"Error executing {tool_name}: {str(e)}"
95
+ return f"Tool {tool_name} not found"
96
+
97
+ def run(self, query: str) -> str:
98
+ """Run the agent with the given query"""
99
+ self.conversation_history = [
100
+ {"role": "system", "content": f"You are {self.name}. {self.description}"},
101
+ {"role": "user", "content": query}
102
+ ]
103
+
104
+ steps = 0
105
+ while steps < self.max_steps:
106
+ try:
107
+ # Make API call to OpenAI
108
+ response = openai.chat.completions.create(
109
+ model=self.model_id,
110
+ messages=self.conversation_history,
111
+ tools=self._get_tool_schema() if self.tools else None,
112
+ tool_choice="auto" if self.tools else None
113
+ )
114
+
115
+ message = response.choices[0].message
116
+
117
+ # Add assistant's response to conversation history
118
+ self.conversation_history.append({
119
+ "role": "assistant",
120
+ "content": message.content,
121
+ "tool_calls": message.tool_calls
122
+ })
123
+
124
+ # Check if the assistant wants to call tools
125
+ if message.tool_calls:
126
+ for tool_call in message.tool_calls:
127
+ function_name = tool_call.function.name
128
+ function_args = json.loads(tool_call.function.arguments)
129
+
130
+ # Execute the tool
131
+ tool_result = self._execute_tool(function_name, function_args)
132
+
133
+ # Add tool result to conversation history
134
+ self.conversation_history.append({
135
+ "role": "tool",
136
+ "tool_call_id": tool_call.id,
137
+ "content": str(tool_result)
138
+ })
139
+ else:
140
+ # No more tools to call, return the response
141
+ return message.content or "No response generated"
142
+
143
+ steps += 1
144
+
145
+ except Exception as e:
146
+ return f"Error in agent execution: {str(e)}"
147
+
148
+ return "Maximum steps reached without completion"
149
+
150
+ class ManagerAgent(OpenAIAgent):
151
+ def __init__(self, model_id: str, managed_agents: List[OpenAIAgent], max_steps: int = 15):
152
+ super().__init__(
153
+ model_id=model_id,
154
+ name="manager_agent",
155
+ description="A manager agent that coordinates the work of other agents to answer questions.",
156
+ max_steps=max_steps
157
+ )
158
+ self.managed_agents = managed_agents
159
+
160
+ def _delegate_to_agent(self, agent_name: str, task: str) -> str:
161
+ """Delegate a task to a specific agent"""
162
+ for agent in self.managed_agents:
163
+ if agent.name == agent_name:
164
+ return agent.run(task)
165
+ return f"Agent {agent_name} not found"
166
+
167
+ def run(self, query: str) -> str:
168
+ """Run the manager agent with delegation capabilities"""
169
+ # Add information about available agents to the system prompt
170
+ agent_info = "\n".join([f"- {agent.name}: {agent.description}" for agent in self.managed_agents])
171
+
172
+ system_prompt = f"""You are {self.name}. {self.description}
173
+
174
+ Available agents you can delegate to:
175
+ {agent_info}
176
+
177
+ When you need to delegate a task, clearly state which agent should handle it and what specific task they should perform.
178
+ You should coordinate the work and synthesize the results from different agents to provide a comprehensive answer.
179
+ """
180
+
181
+ self.conversation_history = [
182
+ {"role": "system", "content": system_prompt},
183
+ {"role": "user", "content": query}
184
+ ]
185
+
186
+ steps = 0
187
+ while steps < self.max_steps:
188
+ try:
189
+ response = openai.chat.completions.create(
190
+ model=self.model_id,
191
+ messages=self.conversation_history
192
+ )
193
+
194
+ message = response.choices[0].message.content
195
+
196
+ # Check if the manager wants to delegate to an agent
197
+ if "DELEGATE:" in message:
198
+ # Parse delegation request
199
+ lines = message.split('\n')
200
+ for line in lines:
201
+ if line.startswith("DELEGATE:"):
202
+ parts = line.replace("DELEGATE:", "").strip().split("|", 1)
203
+ if len(parts) == 2:
204
+ agent_name = parts[0].strip()
205
+ task = parts[1].strip()
206
+
207
+ # Delegate to the specified agent
208
+ result = self._delegate_to_agent(agent_name, task)
209
+
210
+ # Add the delegation result to conversation
211
+ self.conversation_history.append({
212
+ "role": "assistant",
213
+ "content": message
214
+ })
215
+ self.conversation_history.append({
216
+ "role": "user",
217
+ "content": f"Result from {agent_name}: {result}"
218
+ })
219
+ break
220
+ else:
221
+ # Final answer
222
+ return message
223
+
224
+ steps += 1
225
+
226
+ except Exception as e:
227
+ return f"Error in manager execution: {str(e)}"
228
+
229
+ return "Maximum steps reached without completion"
230
+
231
+ def check_final_answer(final_answer, agent_memory=None) -> bool:
232
+ """
233
+ Check if the final answer is correct.
234
+ basic check on the length of the answer.
235
+ """
236
+ mylog("check_final_answer", final_answer)
237
+ # if return answer is more than 200 characters, we will assume it is not correct
238
+ if len(str(final_answer)) > 200:
239
+ return False
240
+ else:
241
+ return True
242
+
243
+ # Create agents
244
+ web_agent = OpenAIAgent(
245
+ model_id="gpt-4o-mini",
246
+ name="web_agent",
247
+ description="Use search engine to find webpages related to a subject and get the page content",
248
+ tools=[search_web, fetch_webpage],
249
+ max_steps=7
250
+ )
251
+
252
+ audiovideo_agent = OpenAIAgent(
253
+ model_id="gpt-4o-mini",
254
+ name="audiovideo_agent",
255
+ description="Extracts information from image, video or audio files from the web",
256
+ tools=[get_youtube_transcript, get_youtube_title_description, get_text_transcript_from_audio_file, analyze_image],
257
+ max_steps=7
258
+ )
259
+
260
+ manager_agent = ManagerAgent(
261
+ model_id="gpt-4o-mini",
262
+ managed_agents=[web_agent, audiovideo_agent],
263
+ max_steps=15
264
+ )
265
+
266
+ class MultiAgent:
267
+ def __init__(self):
268
+ print("MultiAgent initialized.")
269
+
270
+ def __call__(self, question: str) -> str:
271
+ mylog(self.__class__.__name__, question)
272
+
273
+ try:
274
+ prefix = """You are the top agent of a multi-agent system that can answer questions by coordinating the work of other agents.
275
+ You will receive a question and you will decide which agent to use to answer it.
276
+ You can use the web_agent to search the web for information and for fetching the content of a web page, or the audiovideo_agent to extract information from video or audio files.
277
+ You can also use your own knowledge to answer the question.
278
+ You need to respect the output format that is given to you.
279
+ Finding the correct answer to the question need reasoning and planning, read the question carefully, think step by step and do not skip any steps.
280
+
281
+ To delegate tasks to agents, use the format: DELEGATE: agent_name | task_description
282
+ For example: DELEGATE: web_agent | Search for information about the Malko competition 2023 enrollment
283
+ """
284
+
285
+ question = prefix + "\nTHE QUESTION:\n" + question + '\n' + myprompts.output_format
286
+
287
+ fixed_answer = manager_agent.run(question)
288
+
289
+ return fixed_answer
290
+ except Exception as e:
291
+ error = f"An error occurred while processing the question: {e}"
292
+ print(error)
293
+ return error
294
+
295
+ if __name__ == "__main__":
296
+ # Example usage
297
+
298
+ question = """
299
+ What was the actual enrollment of the Malko competition in 2023?
300
+ """
301
+ agent = MultiAgent()
302
+ answer = agent(question)
303
+ print(f"Answer: {answer}")