import os import dotenv import openai import json from typing import List, Dict, Any from tools.fetch import fetch_webpage, search_web from tools.yttranscript import get_youtube_transcript, get_youtube_title_description from tools.stt import get_text_transcript_from_audio_file from tools.image import analyze_image from common.mylogger import mylog import myprompts dotenv.load_dotenv() # Set up OpenAI client openai.api_key = os.environ["OPENAI_API_KEY"] class OpenAIAgent: def __init__(self, model_id: str, name: str, description: str, tools: List = None, max_steps: int = 7): self.model_id = model_id self.name = name self.description = description self.tools = tools or [] self.max_steps = max_steps self.conversation_history = [] # Debug log tool names for t in self.tools: print("Loaded tool:", getattr(t, "name", getattr(t, "__name__", "UNKNOWN"))) def _get_tool_schema(self) -> List[Dict[str, Any]]: functions = [] for tool in self.tools: # Handle smolagents @tool objects if hasattr(tool, "name") and hasattr(tool, "run"): name = tool.name description = tool.__doc__ or "" # Create proper schema based on tool name if name == "search_web": params = { "type": "object", "properties": { "query": {"type": "string", "description": "Search query"} }, "required": ["query"] } elif name == "fetch_webpage": params = { "type": "object", "properties": { "url": {"type": "string", "description": "URL to fetch"} }, "required": ["url"] } elif name == "get_youtube_transcript": params = { "type": "object", "properties": { "url": {"type": "string", "description": "YouTube URL"} }, "required": ["url"] } elif name == "get_youtube_title_description": params = { "type": "object", "properties": { "url": {"type": "string", "description": "YouTube URL"} }, "required": ["url"] } elif name == "get_text_transcript_from_audio_file": params = { "type": "object", "properties": { "file_path": {"type": "string", "description": "Path to audio file"} }, "required": ["file_path"] } elif name == "analyze_image": params = { "type": "object", "properties": { "image_path": {"type": "string", "description": "Path to image file"} }, "required": ["image_path"] } else: # Default schema for unknown tools params = { "type": "object", "properties": { "input": {"type": "string", "description": "Input for the tool"} }, "required": ["input"] } functions.append({ "type": "function", "function": { "name": name, "description": description, "parameters": params } }) # Handle normal Python functions elif hasattr(tool, "__name__"): name = tool.__name__ description = tool.__doc__ or "" # Create proper schema based on function name if name == "search_web": params = { "type": "object", "properties": { "query": {"type": "string", "description": "Search query"} }, "required": ["query"] } elif name == "fetch_webpage": params = { "type": "object", "properties": { "url": {"type": "string", "description": "URL to fetch"} }, "required": ["url"] } elif name == "get_youtube_transcript": params = { "type": "object", "properties": { "url": {"type": "string", "description": "YouTube URL"} }, "required": ["url"] } elif name == "get_youtube_title_description": params = { "type": "object", "properties": { "url": {"type": "string", "description": "YouTube URL"} }, "required": ["url"] } elif name == "get_text_transcript_from_audio_file": params = { "type": "object", "properties": { "file_path": {"type": "string", "description": "Path to audio file"} }, "required": ["file_path"] } elif name == "analyze_image": params = { "type": "object", "properties": { "image_path": {"type": "string", "description": "Path to image file"} }, "required": ["image_path"] } else: # Default schema for unknown functions params = { "type": "object", "properties": { "input": {"type": "string", "description": "Input for the function"} }, "required": ["input"] } functions.append({ "type": "function", "function": { "name": name, "description": description, "parameters": params } }) return functions def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]): for tool in self.tools: # smolagents tool if hasattr(tool, "name") and tool.name == tool_name: try: return tool.run(**arguments) except Exception as e: return f"Error executing {tool_name}: {e}" # plain Python function if hasattr(tool, "__name__") and tool.__name__ == tool_name: try: return tool(**arguments) except Exception as e: return f"Error executing {tool_name}: {e}" return f"Tool {tool_name} not found" def run(self, query: str) -> str: """Run the agent with the given query""" self.conversation_history = [ {"role": "system", "content": f"You are {self.name}. {self.description}"}, {"role": "user", "content": query} ] steps = 0 while steps < self.max_steps: try: # Make API call to OpenAI response = openai.chat.completions.create( model=self.model_id, messages=self.conversation_history, tools=self._get_tool_schema() if self.tools else None, tool_choice="auto" if self.tools else None ) message = response.choices[0].message # Add assistant's response to conversation history self.conversation_history.append({ "role": "assistant", "content": message.content, "tool_calls": message.tool_calls }) # Check if the assistant wants to call tools if message.tool_calls: for tool_call in message.tool_calls: function_name = tool_call.function.name function_args = json.loads(tool_call.function.arguments) # Execute the tool tool_result = self._execute_tool(function_name, function_args) # Add tool result to conversation history self.conversation_history.append({ "role": "tool", "tool_call_id": tool_call.id, "content": str(tool_result) }) else: # No more tools to call, return the response return message.content or "No response generated" steps += 1 except Exception as e: return f"Error in agent execution: {str(e)}" return "Maximum steps reached without completion" class ManagerAgent(OpenAIAgent): def __init__(self, model_id: str, managed_agents: List[OpenAIAgent], max_steps: int = 15): super().__init__( model_id=model_id, name="manager_agent", description="A manager agent that coordinates the work of other agents to answer questions.", max_steps=max_steps ) self.managed_agents = managed_agents def _delegate_to_agent(self, agent_name: str, task: str) -> str: """Delegate a task to a specific agent""" for agent in self.managed_agents: if agent.name == agent_name: return agent.run(task) return f"Agent {agent_name} not found" def run(self, query: str) -> str: """Run the manager agent with delegation capabilities""" # Add information about available agents to the system prompt agent_info = "\n".join([f"- {agent.name}: {agent.description}" for agent in self.managed_agents]) system_prompt = f"""You are {self.name}. {self.description} Available agents you can delegate to: {agent_info} When you need to delegate a task, clearly state which agent should handle it and what specific task they should perform. You should coordinate the work and synthesize the results from different agents to provide a comprehensive answer. """ self.conversation_history = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": query} ] steps = 0 while steps < self.max_steps: try: response = openai.chat.completions.create( model=self.model_id, messages=self.conversation_history, temperature=0.1, # Controls randomness (0.0 to 2.0) top_p=0.88, # Nucleus sampling max_tokens=4000 # Maximum response length ) message = response.choices[0].message.content # Check if the manager wants to delegate to an agent if "DELEGATE:" in message: # Parse delegation request lines = message.split('\n') for line in lines: if line.startswith("DELEGATE:"): parts = line.replace("DELEGATE:", "").strip().split("|", 1) if len(parts) == 2: agent_name = parts[0].strip() task = parts[1].strip() # Delegate to the specified agent result = self._delegate_to_agent(agent_name, task) # Add the delegation result to conversation self.conversation_history.append({ "role": "assistant", "content": message }) self.conversation_history.append({ "role": "user", "content": f"Result from {agent_name}: {result}" }) break else: # Final answer return message steps += 1 except Exception as e: return f"Error in manager execution: {str(e)}" return "Maximum steps reached without completion" def check_final_answer(final_answer, agent_memory=None) -> bool: """ Check if the final answer is correct. basic check on the length of the answer. """ mylog("check_final_answer", final_answer) # if return answer is more than 200 characters, we will assume it is not correct if len(str(final_answer)) > 200: return False else: return True # Create agents web_agent = OpenAIAgent( model_id="gpt-4o-mini", name="web_agent", description="Use search engine to find webpages related to a subject and get the page content", tools=[search_web, fetch_webpage], max_steps=7 ) audiovideo_agent = OpenAIAgent( model_id="gpt-4o-mini", name="audiovideo_agent", description="Extracts information from image, video or audio files from the web", tools=[get_youtube_transcript, get_youtube_title_description, get_text_transcript_from_audio_file, analyze_image], max_steps=7 ) manager_agent = ManagerAgent( model_id="gpt-4o-mini", managed_agents=[web_agent, audiovideo_agent], max_steps=15 ) class MultiAgent: def __init__(self): print("MultiAgent initialized.") def __call__(self, question: str) -> str: mylog(self.__class__.__name__, question) try: prefix = """You are the top agent of a multi-agent system that can answer questions by coordinating the work of other agents. You will receive a question and you will decide which agent to use to answer it. You can use the web_agent to search the web for information and for fetching the content of a web page, or the audiovideo_agent to extract information from video or audio files. You can also use your own knowledge to answer the question. You need to respect the output format that is given to you. Finding the correct answer to the question need reasoning and planning, read the question carefully, think step by step and do not skip any steps. To delegate tasks to agents, use the format: DELEGATE: agent_name | task_description For example: DELEGATE: web_agent | Search for information about the Malko competition 2023 enrollment """ question = prefix + "\nTHE QUESTION:\n" + question + '\n' + myprompts.output_format fixed_answer = manager_agent.run(question) return fixed_answer except Exception as e: error = f"An error occurred while processing the question: {e}" print(error) return error if __name__ == "__main__": # Example usage question = """ What was the actual enrollment of the Malko competition in 2023? """ agent = MultiAgent() answer = agent(question) print(f"Answer: {answer}")