Spaces:
Sleeping
Sleeping
File size: 16,715 Bytes
6f1cfd1 7e9f8c3 6f1cfd1 f459969 aae56b9 6f1cfd1 f459969 505a1c6 f459969 505a1c6 f459969 6f1cfd1 f459969 6f1cfd1 f459969 6f1cfd1 6e46681 6f1cfd1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 | import os
import dotenv
import openai
import json
from typing import List, Dict, Any
from tools.fetch import fetch_webpage, search_web
from tools.yttranscript import get_youtube_transcript, get_youtube_title_description
from tools.stt import get_text_transcript_from_audio_file
from tools.image import analyze_image
from common.mylogger import mylog
import myprompts
dotenv.load_dotenv()
# Set up OpenAI client
openai.api_key = os.environ["OPENAI_API_KEY"]
class OpenAIAgent:
def __init__(self, model_id: str, name: str, description: str, tools: List = None, max_steps: int = 7):
self.model_id = model_id
self.name = name
self.description = description
self.tools = tools or []
self.max_steps = max_steps
self.conversation_history = []
# Debug log tool names
for t in self.tools:
print("Loaded tool:", getattr(t, "name", getattr(t, "__name__", "UNKNOWN")))
def _get_tool_schema(self) -> List[Dict[str, Any]]:
functions = []
for tool in self.tools:
# Handle smolagents @tool objects
if hasattr(tool, "name") and hasattr(tool, "run"):
name = tool.name
description = tool.__doc__ or ""
# Create proper schema based on tool name
if name == "search_web":
params = {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
}
elif name == "fetch_webpage":
params = {
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to fetch"}
},
"required": ["url"]
}
elif name == "get_youtube_transcript":
params = {
"type": "object",
"properties": {
"url": {"type": "string", "description": "YouTube URL"}
},
"required": ["url"]
}
elif name == "get_youtube_title_description":
params = {
"type": "object",
"properties": {
"url": {"type": "string", "description": "YouTube URL"}
},
"required": ["url"]
}
elif name == "get_text_transcript_from_audio_file":
params = {
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "Path to audio file"}
},
"required": ["file_path"]
}
elif name == "analyze_image":
params = {
"type": "object",
"properties": {
"image_path": {"type": "string", "description": "Path to image file"}
},
"required": ["image_path"]
}
else:
# Default schema for unknown tools
params = {
"type": "object",
"properties": {
"input": {"type": "string", "description": "Input for the tool"}
},
"required": ["input"]
}
functions.append({
"type": "function",
"function": {
"name": name,
"description": description,
"parameters": params
}
})
# Handle normal Python functions
elif hasattr(tool, "__name__"):
name = tool.__name__
description = tool.__doc__ or ""
# Create proper schema based on function name
if name == "search_web":
params = {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
}
elif name == "fetch_webpage":
params = {
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to fetch"}
},
"required": ["url"]
}
elif name == "get_youtube_transcript":
params = {
"type": "object",
"properties": {
"url": {"type": "string", "description": "YouTube URL"}
},
"required": ["url"]
}
elif name == "get_youtube_title_description":
params = {
"type": "object",
"properties": {
"url": {"type": "string", "description": "YouTube URL"}
},
"required": ["url"]
}
elif name == "get_text_transcript_from_audio_file":
params = {
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "Path to audio file"}
},
"required": ["file_path"]
}
elif name == "analyze_image":
params = {
"type": "object",
"properties": {
"image_path": {"type": "string", "description": "Path to image file"}
},
"required": ["image_path"]
}
else:
# Default schema for unknown functions
params = {
"type": "object",
"properties": {
"input": {"type": "string", "description": "Input for the function"}
},
"required": ["input"]
}
functions.append({
"type": "function",
"function": {
"name": name,
"description": description,
"parameters": params
}
})
return functions
def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]):
for tool in self.tools:
# smolagents tool
if hasattr(tool, "name") and tool.name == tool_name:
try:
return tool.run(**arguments)
except Exception as e:
return f"Error executing {tool_name}: {e}"
# plain Python function
if hasattr(tool, "__name__") and tool.__name__ == tool_name:
try:
return tool(**arguments)
except Exception as e:
return f"Error executing {tool_name}: {e}"
return f"Tool {tool_name} not found"
def run(self, query: str) -> str:
"""Run the agent with the given query"""
self.conversation_history = [
{"role": "system", "content": f"You are {self.name}. {self.description}"},
{"role": "user", "content": query}
]
steps = 0
while steps < self.max_steps:
try:
# Make API call to OpenAI
response = openai.chat.completions.create(
model=self.model_id,
messages=self.conversation_history,
tools=self._get_tool_schema() if self.tools else None,
tool_choice="auto" if self.tools else None
)
message = response.choices[0].message
# Add assistant's response to conversation history
self.conversation_history.append({
"role": "assistant",
"content": message.content,
"tool_calls": message.tool_calls
})
# Check if the assistant wants to call tools
if message.tool_calls:
for tool_call in message.tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
# Execute the tool
tool_result = self._execute_tool(function_name, function_args)
# Add tool result to conversation history
self.conversation_history.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(tool_result)
})
else:
# No more tools to call, return the response
return message.content or "No response generated"
steps += 1
except Exception as e:
return f"Error in agent execution: {str(e)}"
return "Maximum steps reached without completion"
class ManagerAgent(OpenAIAgent):
def __init__(self, model_id: str, managed_agents: List[OpenAIAgent], max_steps: int = 15):
super().__init__(
model_id=model_id,
name="manager_agent",
description="A manager agent that coordinates the work of other agents to answer questions.",
max_steps=max_steps
)
self.managed_agents = managed_agents
def _delegate_to_agent(self, agent_name: str, task: str) -> str:
"""Delegate a task to a specific agent"""
for agent in self.managed_agents:
if agent.name == agent_name:
return agent.run(task)
return f"Agent {agent_name} not found"
def run(self, query: str) -> str:
"""Run the manager agent with delegation capabilities"""
# Add information about available agents to the system prompt
agent_info = "\n".join([f"- {agent.name}: {agent.description}" for agent in self.managed_agents])
system_prompt = f"""You are {self.name}. {self.description}
Available agents you can delegate to:
{agent_info}
When you need to delegate a task, clearly state which agent should handle it and what specific task they should perform.
You should coordinate the work and synthesize the results from different agents to provide a comprehensive answer.
"""
self.conversation_history = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": query}
]
steps = 0
while steps < self.max_steps:
try:
response = openai.chat.completions.create(
model=self.model_id,
messages=self.conversation_history,
temperature=0.1, # Controls randomness (0.0 to 2.0)
top_p=0.88, # Nucleus sampling
max_tokens=4000 # Maximum response length
)
message = response.choices[0].message.content
# Check if the manager wants to delegate to an agent
if "DELEGATE:" in message:
# Parse delegation request
lines = message.split('\n')
for line in lines:
if line.startswith("DELEGATE:"):
parts = line.replace("DELEGATE:", "").strip().split("|", 1)
if len(parts) == 2:
agent_name = parts[0].strip()
task = parts[1].strip()
# Delegate to the specified agent
result = self._delegate_to_agent(agent_name, task)
# Add the delegation result to conversation
self.conversation_history.append({
"role": "assistant",
"content": message
})
self.conversation_history.append({
"role": "user",
"content": f"Result from {agent_name}: {result}"
})
break
else:
# Final answer
return message
steps += 1
except Exception as e:
return f"Error in manager execution: {str(e)}"
return "Maximum steps reached without completion"
def check_final_answer(final_answer, agent_memory=None) -> bool:
"""
Check if the final answer is correct.
basic check on the length of the answer.
"""
mylog("check_final_answer", final_answer)
# if return answer is more than 200 characters, we will assume it is not correct
if len(str(final_answer)) > 200:
return False
else:
return True
# Create agents
web_agent = OpenAIAgent(
model_id="gpt-4o-mini",
name="web_agent",
description="Use search engine to find webpages related to a subject and get the page content",
tools=[search_web, fetch_webpage],
max_steps=7
)
audiovideo_agent = OpenAIAgent(
model_id="gpt-4o-mini",
name="audiovideo_agent",
description="Extracts information from image, video or audio files from the web",
tools=[get_youtube_transcript, get_youtube_title_description, get_text_transcript_from_audio_file, analyze_image],
max_steps=7
)
manager_agent = ManagerAgent(
model_id="gpt-4o-mini",
managed_agents=[web_agent, audiovideo_agent],
max_steps=15
)
class MultiAgent:
def __init__(self):
print("MultiAgent initialized.")
def __call__(self, question: str) -> str:
mylog(self.__class__.__name__, question)
try:
prefix = """You are the top agent of a multi-agent system that can answer questions by coordinating the work of other agents.
You will receive a question and you will decide which agent to use to answer it.
You can use the web_agent to search the web for information and for fetching the content of a web page, or the audiovideo_agent to extract information from video or audio files.
You can also use your own knowledge to answer the question.
You need to respect the output format that is given to you.
Finding the correct answer to the question need reasoning and planning, read the question carefully, think step by step and do not skip any steps.
To delegate tasks to agents, use the format: DELEGATE: agent_name | task_description
For example: DELEGATE: web_agent | Search for information about the Malko competition 2023 enrollment
"""
question = prefix + "\nTHE QUESTION:\n" + question + '\n' + myprompts.output_format
fixed_answer = manager_agent.run(question)
return fixed_answer
except Exception as e:
error = f"An error occurred while processing the question: {e}"
print(error)
return error
if __name__ == "__main__":
# Example usage
question = """
What was the actual enrollment of the Malko competition in 2023?
"""
agent = MultiAgent()
answer = agent(question)
print(f"Answer: {answer}") |