Spaces:
Sleeping
Sleeping
| import os | |
| import dotenv | |
| import openai | |
| import json | |
| from typing import List, Dict, Any | |
| from tools.fetch import fetch_webpage, search_web | |
| from tools.yttranscript import get_youtube_transcript, get_youtube_title_description | |
| from tools.stt import get_text_transcript_from_audio_file | |
| from tools.image import analyze_image | |
| from common.mylogger import mylog | |
| import myprompts | |
| dotenv.load_dotenv() | |
| # Set up OpenAI client | |
| openai.api_key = os.environ["OPENAI_API_KEY"] | |
| class OpenAIAgent: | |
| def __init__(self, model_id: str, name: str, description: str, tools: List = None, max_steps: int = 7): | |
| self.model_id = model_id | |
| self.name = name | |
| self.description = description | |
| self.tools = tools or [] | |
| self.max_steps = max_steps | |
| self.conversation_history = [] | |
| # Debug log tool names | |
| for t in self.tools: | |
| print("Loaded tool:", getattr(t, "name", getattr(t, "__name__", "UNKNOWN"))) | |
| def _get_tool_schema(self) -> List[Dict[str, Any]]: | |
| functions = [] | |
| for tool in self.tools: | |
| # Handle smolagents @tool objects | |
| if hasattr(tool, "name") and hasattr(tool, "run"): | |
| name = tool.name | |
| description = tool.__doc__ or "" | |
| # Create proper schema based on tool name | |
| if name == "search_web": | |
| params = { | |
| "type": "object", | |
| "properties": { | |
| "query": {"type": "string", "description": "Search query"} | |
| }, | |
| "required": ["query"] | |
| } | |
| elif name == "fetch_webpage": | |
| params = { | |
| "type": "object", | |
| "properties": { | |
| "url": {"type": "string", "description": "URL to fetch"} | |
| }, | |
| "required": ["url"] | |
| } | |
| elif name == "get_youtube_transcript": | |
| params = { | |
| "type": "object", | |
| "properties": { | |
| "url": {"type": "string", "description": "YouTube URL"} | |
| }, | |
| "required": ["url"] | |
| } | |
| elif name == "get_youtube_title_description": | |
| params = { | |
| "type": "object", | |
| "properties": { | |
| "url": {"type": "string", "description": "YouTube URL"} | |
| }, | |
| "required": ["url"] | |
| } | |
| elif name == "get_text_transcript_from_audio_file": | |
| params = { | |
| "type": "object", | |
| "properties": { | |
| "file_path": {"type": "string", "description": "Path to audio file"} | |
| }, | |
| "required": ["file_path"] | |
| } | |
| elif name == "analyze_image": | |
| params = { | |
| "type": "object", | |
| "properties": { | |
| "image_path": {"type": "string", "description": "Path to image file"} | |
| }, | |
| "required": ["image_path"] | |
| } | |
| else: | |
| # Default schema for unknown tools | |
| params = { | |
| "type": "object", | |
| "properties": { | |
| "input": {"type": "string", "description": "Input for the tool"} | |
| }, | |
| "required": ["input"] | |
| } | |
| functions.append({ | |
| "type": "function", | |
| "function": { | |
| "name": name, | |
| "description": description, | |
| "parameters": params | |
| } | |
| }) | |
| # Handle normal Python functions | |
| elif hasattr(tool, "__name__"): | |
| name = tool.__name__ | |
| description = tool.__doc__ or "" | |
| # Create proper schema based on function name | |
| if name == "search_web": | |
| params = { | |
| "type": "object", | |
| "properties": { | |
| "query": {"type": "string", "description": "Search query"} | |
| }, | |
| "required": ["query"] | |
| } | |
| elif name == "fetch_webpage": | |
| params = { | |
| "type": "object", | |
| "properties": { | |
| "url": {"type": "string", "description": "URL to fetch"} | |
| }, | |
| "required": ["url"] | |
| } | |
| elif name == "get_youtube_transcript": | |
| params = { | |
| "type": "object", | |
| "properties": { | |
| "url": {"type": "string", "description": "YouTube URL"} | |
| }, | |
| "required": ["url"] | |
| } | |
| elif name == "get_youtube_title_description": | |
| params = { | |
| "type": "object", | |
| "properties": { | |
| "url": {"type": "string", "description": "YouTube URL"} | |
| }, | |
| "required": ["url"] | |
| } | |
| elif name == "get_text_transcript_from_audio_file": | |
| params = { | |
| "type": "object", | |
| "properties": { | |
| "file_path": {"type": "string", "description": "Path to audio file"} | |
| }, | |
| "required": ["file_path"] | |
| } | |
| elif name == "analyze_image": | |
| params = { | |
| "type": "object", | |
| "properties": { | |
| "image_path": {"type": "string", "description": "Path to image file"} | |
| }, | |
| "required": ["image_path"] | |
| } | |
| else: | |
| # Default schema for unknown functions | |
| params = { | |
| "type": "object", | |
| "properties": { | |
| "input": {"type": "string", "description": "Input for the function"} | |
| }, | |
| "required": ["input"] | |
| } | |
| functions.append({ | |
| "type": "function", | |
| "function": { | |
| "name": name, | |
| "description": description, | |
| "parameters": params | |
| } | |
| }) | |
| return functions | |
| def _execute_tool(self, tool_name: str, arguments: Dict[str, Any]): | |
| for tool in self.tools: | |
| # smolagents tool | |
| if hasattr(tool, "name") and tool.name == tool_name: | |
| try: | |
| return tool.run(**arguments) | |
| except Exception as e: | |
| return f"Error executing {tool_name}: {e}" | |
| # plain Python function | |
| if hasattr(tool, "__name__") and tool.__name__ == tool_name: | |
| try: | |
| return tool(**arguments) | |
| except Exception as e: | |
| return f"Error executing {tool_name}: {e}" | |
| return f"Tool {tool_name} not found" | |
| def run(self, query: str) -> str: | |
| """Run the agent with the given query""" | |
| self.conversation_history = [ | |
| {"role": "system", "content": f"You are {self.name}. {self.description}"}, | |
| {"role": "user", "content": query} | |
| ] | |
| steps = 0 | |
| while steps < self.max_steps: | |
| try: | |
| # Make API call to OpenAI | |
| response = openai.chat.completions.create( | |
| model=self.model_id, | |
| messages=self.conversation_history, | |
| tools=self._get_tool_schema() if self.tools else None, | |
| tool_choice="auto" if self.tools else None | |
| ) | |
| message = response.choices[0].message | |
| # Add assistant's response to conversation history | |
| self.conversation_history.append({ | |
| "role": "assistant", | |
| "content": message.content, | |
| "tool_calls": message.tool_calls | |
| }) | |
| # Check if the assistant wants to call tools | |
| if message.tool_calls: | |
| for tool_call in message.tool_calls: | |
| function_name = tool_call.function.name | |
| function_args = json.loads(tool_call.function.arguments) | |
| # Execute the tool | |
| tool_result = self._execute_tool(function_name, function_args) | |
| # Add tool result to conversation history | |
| self.conversation_history.append({ | |
| "role": "tool", | |
| "tool_call_id": tool_call.id, | |
| "content": str(tool_result) | |
| }) | |
| else: | |
| # No more tools to call, return the response | |
| return message.content or "No response generated" | |
| steps += 1 | |
| except Exception as e: | |
| return f"Error in agent execution: {str(e)}" | |
| return "Maximum steps reached without completion" | |
| class ManagerAgent(OpenAIAgent): | |
| def __init__(self, model_id: str, managed_agents: List[OpenAIAgent], max_steps: int = 15): | |
| super().__init__( | |
| model_id=model_id, | |
| name="manager_agent", | |
| description="A manager agent that coordinates the work of other agents to answer questions.", | |
| max_steps=max_steps | |
| ) | |
| self.managed_agents = managed_agents | |
| def _delegate_to_agent(self, agent_name: str, task: str) -> str: | |
| """Delegate a task to a specific agent""" | |
| for agent in self.managed_agents: | |
| if agent.name == agent_name: | |
| return agent.run(task) | |
| return f"Agent {agent_name} not found" | |
| def run(self, query: str) -> str: | |
| """Run the manager agent with delegation capabilities""" | |
| # Add information about available agents to the system prompt | |
| agent_info = "\n".join([f"- {agent.name}: {agent.description}" for agent in self.managed_agents]) | |
| system_prompt = f"""You are {self.name}. {self.description} | |
| Available agents you can delegate to: | |
| {agent_info} | |
| When you need to delegate a task, clearly state which agent should handle it and what specific task they should perform. | |
| You should coordinate the work and synthesize the results from different agents to provide a comprehensive answer. | |
| """ | |
| self.conversation_history = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": query} | |
| ] | |
| steps = 0 | |
| while steps < self.max_steps: | |
| try: | |
| response = openai.chat.completions.create( | |
| model=self.model_id, | |
| messages=self.conversation_history, | |
| temperature=0.1, # Controls randomness (0.0 to 2.0) | |
| top_p=0.88, # Nucleus sampling | |
| max_tokens=4000 # Maximum response length | |
| ) | |
| message = response.choices[0].message.content | |
| # Check if the manager wants to delegate to an agent | |
| if "DELEGATE:" in message: | |
| # Parse delegation request | |
| lines = message.split('\n') | |
| for line in lines: | |
| if line.startswith("DELEGATE:"): | |
| parts = line.replace("DELEGATE:", "").strip().split("|", 1) | |
| if len(parts) == 2: | |
| agent_name = parts[0].strip() | |
| task = parts[1].strip() | |
| # Delegate to the specified agent | |
| result = self._delegate_to_agent(agent_name, task) | |
| # Add the delegation result to conversation | |
| self.conversation_history.append({ | |
| "role": "assistant", | |
| "content": message | |
| }) | |
| self.conversation_history.append({ | |
| "role": "user", | |
| "content": f"Result from {agent_name}: {result}" | |
| }) | |
| break | |
| else: | |
| # Final answer | |
| return message | |
| steps += 1 | |
| except Exception as e: | |
| return f"Error in manager execution: {str(e)}" | |
| return "Maximum steps reached without completion" | |
| def check_final_answer(final_answer, agent_memory=None) -> bool: | |
| """ | |
| Check if the final answer is correct. | |
| basic check on the length of the answer. | |
| """ | |
| mylog("check_final_answer", final_answer) | |
| # if return answer is more than 200 characters, we will assume it is not correct | |
| if len(str(final_answer)) > 200: | |
| return False | |
| else: | |
| return True | |
| # Create agents | |
| web_agent = OpenAIAgent( | |
| model_id="gpt-4o-mini", | |
| name="web_agent", | |
| description="Use search engine to find webpages related to a subject and get the page content", | |
| tools=[search_web, fetch_webpage], | |
| max_steps=7 | |
| ) | |
| audiovideo_agent = OpenAIAgent( | |
| model_id="gpt-4o-mini", | |
| name="audiovideo_agent", | |
| description="Extracts information from image, video or audio files from the web", | |
| tools=[get_youtube_transcript, get_youtube_title_description, get_text_transcript_from_audio_file, analyze_image], | |
| max_steps=7 | |
| ) | |
| manager_agent = ManagerAgent( | |
| model_id="gpt-4o-mini", | |
| managed_agents=[web_agent, audiovideo_agent], | |
| max_steps=15 | |
| ) | |
| class MultiAgent: | |
| def __init__(self): | |
| print("MultiAgent initialized.") | |
| def __call__(self, question: str) -> str: | |
| mylog(self.__class__.__name__, question) | |
| try: | |
| prefix = """You are the top agent of a multi-agent system that can answer questions by coordinating the work of other agents. | |
| You will receive a question and you will decide which agent to use to answer it. | |
| You can use the web_agent to search the web for information and for fetching the content of a web page, or the audiovideo_agent to extract information from video or audio files. | |
| You can also use your own knowledge to answer the question. | |
| You need to respect the output format that is given to you. | |
| Finding the correct answer to the question need reasoning and planning, read the question carefully, think step by step and do not skip any steps. | |
| To delegate tasks to agents, use the format: DELEGATE: agent_name | task_description | |
| For example: DELEGATE: web_agent | Search for information about the Malko competition 2023 enrollment | |
| """ | |
| question = prefix + "\nTHE QUESTION:\n" + question + '\n' + myprompts.output_format | |
| fixed_answer = manager_agent.run(question) | |
| return fixed_answer | |
| except Exception as e: | |
| error = f"An error occurred while processing the question: {e}" | |
| print(error) | |
| return error | |
| if __name__ == "__main__": | |
| # Example usage | |
| question = """ | |
| What was the actual enrollment of the Malko competition in 2023? | |
| """ | |
| agent = MultiAgent() | |
| answer = agent(question) | |
| print(f"Answer: {answer}") |