Final_Assignment_Toast_New / multiagents.py
bwilkie's picture
Update multiagents.py
505a1c6 verified
import os
import dotenv
import openai
import json
from typing import List, Dict, Any
from tools.fetch import fetch_webpage, search_web
from tools.yttranscript import get_youtube_transcript, get_youtube_title_description
from tools.stt import get_text_transcript_from_audio_file
from tools.image import analyze_image
from common.mylogger import mylog
import myprompts
dotenv.load_dotenv()
# Set up OpenAI client
openai.api_key = os.environ["OPENAI_API_KEY"]
class OpenAIAgent:
def __init__(self, model_id: str, name: str, description: str, tools: List = None, max_steps: int = 7):
self.model_id = model_id
self.name = name
self.description = description
self.tools = tools or []
self.max_steps = max_steps
self.conversation_history = []
# Debug log tool names
for t in self.tools:
print("Loaded tool:", getattr(t, "name", getattr(t, "__name__", "UNKNOWN")))
def _get_tool_schema(self) -> List[Dict[str, Any]]:
functions = []
for tool in self.tools:
# Handle smolagents @tool objects
if hasattr(tool, "name") and hasattr(tool, "run"):
name = tool.name
description = tool.__doc__ or ""
# Create proper schema based on tool name
if name == "search_web":
params = {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
}
elif name == "fetch_webpage":
params = {
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to fetch"}
},
"required": ["url"]
}
elif name == "get_youtube_transcript":
params = {
"type": "object",
"properties": {
"url": {"type": "string", "description": "YouTube URL"}
},
"required": ["url"]
}
elif name == "get_youtube_title_description":
params = {
"type": "object",
"properties": {
"url": {"type": "string", "description": "YouTube URL"}
},
"required": ["url"]
}
elif name == "get_text_transcript_from_audio_file":
params = {
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "Path to audio file"}
},
"required": ["file_path"]
}
elif name == "analyze_image":
params = {
"type": "object",
"properties": {
"image_path": {"type": "string", "description": "Path to image file"}
},
"required": ["image_path"]
}
else:
# Default schema for unknown tools
params = {
"type": "object",
"properties": {
"input": {"type": "string", "description": "Input for the tool"}
},
"required": ["input"]
}
functions.append({
"type": "function",
"function": {
"name": name,
"description": description,
"parameters": params
}
})
# Handle normal Python functions
elif hasattr(tool, "__name__"):
name = tool.__name__
description = tool.__doc__ or ""
# Create proper schema based on function name
if name == "search_web":
params = {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
}
elif name == "fetch_webpage":
params = {
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to fetch"}
},
"required": ["url"]
}
elif name == "get_youtube_transcript":
params = {
"type": "object",
"properties": {
"url": {"type": "string", "description": "YouTube URL"}
},
"required": ["url"]
}
elif name == "get_youtube_title_description":
params = {
"type": "object",
"properties": {
"url": {"type": "string", "description": "YouTube URL"}
},
"required": ["url"]
}
elif name == "get_text_transcript_from_audio_file":
params = {
"type": "object",
"properties": {
"file_path": {"type": "string", "description": "Path to audio file"}
},
"required": ["file_path"]
}
elif name == "analyze_image":
params = {
"type": "object",
"properties": {
"image_path": {"type": "string", "description": "Path to image file"}
},
"required": ["image_path"]
}
else:
# Default schema for unknown functions
params = {
"type": "object",
"properties": {
"input": {"type": "string", "description": "Input for the function"}
},
"required": ["input"]
}
functions.append({
"type": "function",
"function": {
"name": name,
"description": description,
"parameters": params
}
})
return functions
def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]):
for tool in self.tools:
# smolagents tool
if hasattr(tool, "name") and tool.name == tool_name:
try:
return tool.run(**arguments)
except Exception as e:
return f"Error executing {tool_name}: {e}"
# plain Python function
if hasattr(tool, "__name__") and tool.__name__ == tool_name:
try:
return tool(**arguments)
except Exception as e:
return f"Error executing {tool_name}: {e}"
return f"Tool {tool_name} not found"
def run(self, query: str) -> str:
"""Run the agent with the given query"""
self.conversation_history = [
{"role": "system", "content": f"You are {self.name}. {self.description}"},
{"role": "user", "content": query}
]
steps = 0
while steps < self.max_steps:
try:
# Make API call to OpenAI
response = openai.chat.completions.create(
model=self.model_id,
messages=self.conversation_history,
tools=self._get_tool_schema() if self.tools else None,
tool_choice="auto" if self.tools else None
)
message = response.choices[0].message
# Add assistant's response to conversation history
self.conversation_history.append({
"role": "assistant",
"content": message.content,
"tool_calls": message.tool_calls
})
# Check if the assistant wants to call tools
if message.tool_calls:
for tool_call in message.tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
# Execute the tool
tool_result = self._execute_tool(function_name, function_args)
# Add tool result to conversation history
self.conversation_history.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(tool_result)
})
else:
# No more tools to call, return the response
return message.content or "No response generated"
steps += 1
except Exception as e:
return f"Error in agent execution: {str(e)}"
return "Maximum steps reached without completion"
class ManagerAgent(OpenAIAgent):
def __init__(self, model_id: str, managed_agents: List[OpenAIAgent], max_steps: int = 15):
super().__init__(
model_id=model_id,
name="manager_agent",
description="A manager agent that coordinates the work of other agents to answer questions.",
max_steps=max_steps
)
self.managed_agents = managed_agents
def _delegate_to_agent(self, agent_name: str, task: str) -> str:
"""Delegate a task to a specific agent"""
for agent in self.managed_agents:
if agent.name == agent_name:
return agent.run(task)
return f"Agent {agent_name} not found"
def run(self, query: str) -> str:
"""Run the manager agent with delegation capabilities"""
# Add information about available agents to the system prompt
agent_info = "\n".join([f"- {agent.name}: {agent.description}" for agent in self.managed_agents])
system_prompt = f"""You are {self.name}. {self.description}
Available agents you can delegate to:
{agent_info}
When you need to delegate a task, clearly state which agent should handle it and what specific task they should perform.
You should coordinate the work and synthesize the results from different agents to provide a comprehensive answer.
"""
self.conversation_history = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": query}
]
steps = 0
while steps < self.max_steps:
try:
response = openai.chat.completions.create(
model=self.model_id,
messages=self.conversation_history,
temperature=0.1, # Controls randomness (0.0 to 2.0)
top_p=0.88, # Nucleus sampling
max_tokens=4000 # Maximum response length
)
message = response.choices[0].message.content
# Check if the manager wants to delegate to an agent
if "DELEGATE:" in message:
# Parse delegation request
lines = message.split('\n')
for line in lines:
if line.startswith("DELEGATE:"):
parts = line.replace("DELEGATE:", "").strip().split("|", 1)
if len(parts) == 2:
agent_name = parts[0].strip()
task = parts[1].strip()
# Delegate to the specified agent
result = self._delegate_to_agent(agent_name, task)
# Add the delegation result to conversation
self.conversation_history.append({
"role": "assistant",
"content": message
})
self.conversation_history.append({
"role": "user",
"content": f"Result from {agent_name}: {result}"
})
break
else:
# Final answer
return message
steps += 1
except Exception as e:
return f"Error in manager execution: {str(e)}"
return "Maximum steps reached without completion"
def check_final_answer(final_answer, agent_memory=None) -> bool:
"""
Check if the final answer is correct.
basic check on the length of the answer.
"""
mylog("check_final_answer", final_answer)
# if return answer is more than 200 characters, we will assume it is not correct
if len(str(final_answer)) > 200:
return False
else:
return True
# Create agents
web_agent = OpenAIAgent(
model_id="gpt-4o-mini",
name="web_agent",
description="Use search engine to find webpages related to a subject and get the page content",
tools=[search_web, fetch_webpage],
max_steps=7
)
audiovideo_agent = OpenAIAgent(
model_id="gpt-4o-mini",
name="audiovideo_agent",
description="Extracts information from image, video or audio files from the web",
tools=[get_youtube_transcript, get_youtube_title_description, get_text_transcript_from_audio_file, analyze_image],
max_steps=7
)
manager_agent = ManagerAgent(
model_id="gpt-4o-mini",
managed_agents=[web_agent, audiovideo_agent],
max_steps=15
)
class MultiAgent:
def __init__(self):
print("MultiAgent initialized.")
def __call__(self, question: str) -> str:
mylog(self.__class__.__name__, question)
try:
prefix = """You are the top agent of a multi-agent system that can answer questions by coordinating the work of other agents.
You will receive a question and you will decide which agent to use to answer it.
You can use the web_agent to search the web for information and for fetching the content of a web page, or the audiovideo_agent to extract information from video or audio files.
You can also use your own knowledge to answer the question.
You need to respect the output format that is given to you.
Finding the correct answer to the question need reasoning and planning, read the question carefully, think step by step and do not skip any steps.
To delegate tasks to agents, use the format: DELEGATE: agent_name | task_description
For example: DELEGATE: web_agent | Search for information about the Malko competition 2023 enrollment
"""
question = prefix + "\nTHE QUESTION:\n" + question + '\n' + myprompts.output_format
fixed_answer = manager_agent.run(question)
return fixed_answer
except Exception as e:
error = f"An error occurred while processing the question: {e}"
print(error)
return error
if __name__ == "__main__":
# Example usage
question = """
What was the actual enrollment of the Malko competition in 2023?
"""
agent = MultiAgent()
answer = agent(question)
print(f"Answer: {answer}")