import os import pandas as pd import requests from smolagents import OpenAIServerModel, CodeAgent, InferenceClientModel, DuckDuckGoSearchTool, VisitWebpageTool from smolagents.tools import tool import markdownify MANAGER_MODEL = "deepseek-ai/DeepSeek-R1" AGENT_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct" FINAL_ANSWER_MODEL = "deepseek-ai/DeepSeek-R1" # OpenAIServerModel WEB_SEARCH_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct" IMAGE_ANALYSIS_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct" AUDIO_ANALYSIS_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct" VIDEO_ANALYSIS_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct" YOUTUBE_ANALYSIS_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct" DOCUMENT_ANALYSIS_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct" ARITHMETIC_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct" CODE_GENERATION_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct" CODE_EXECUTION_MODEL = "Qwen/Qwen2.5-Coder-32B-Instruct" def orchestrate(message, file_path): # Tools simple_web_search_tool = DuckDuckGoSearchTool() visit_web_page_tool = VisitWebpageTool() @tool def web_search_tool(query: str) -> str: """ Given a question, search the web and return a summary answer. Args: query (str): The search query to look up. Returns: str: A relevant summary or result from DuckDuckGo. """ try: url = "https://api.duckduckgo.com/" params = {"q": query, "format": "json", "no_html": 1} response = requests.get(url, params=params) data = response.json() if abstract := data.get("AbstractText"): return abstract elif related := data.get("RelatedTopics"): return related[0]["Text"] if related else "No result found." else: return "No relevant information found via DuckDuckGo." except Exception as e: raise RuntimeError(f"DuckDuckGo search failed: {str(e)}") # Promts def get_manager_prompt(message, file_path=None): prompt = f"""Your job is to answer the following question. Answer the following question. If needed, delegate to one of your coworkers:\n - Web Search Agent: Use when the question requires current information. Web Search Agent requires a question only.\n Format the prompt like: "You are an expert web search assistant. Your task is to search the web and provide accurate answers to the following question: [INSERT QUESTION]" ... In case you cannot answer the question and there is not a good coworker, delegate to the Code Generation Agent.\n. Question: {message} """ return prompt def run_manager_workflow(message, file_path=None): final_prompt = get_manager_prompt(message, file_path) initial_answer = manager_agent.run(message) final_answer = get_final_answer(final_answer_agent, message, str(initial_answer)) print(f"=> Initial question: {message}") print(f"=> Final prompt: {final_prompt}") print(f"=> Initial answer: {initial_answer}") print(f"=> Final answer: {final_answer}") return final_answer def get_final_answer(agent, question: str, initial_answer: str) -> str: prompt = f""" You are an expert question answering assistant. Given a question and an initial answer, your task is to provide the final answer. Your final answer must be a number and/or string OR as few words as possible OR a comma-separated list of numbers and/or strings. If you are asked for a number, don't use comma to write your number neither use units such as USD, $, percent, or % unless specified otherwise. If you are asked for a string, don't use articles, neither abbreviations (for example cities), and write the digits in plain text unless specified otherwise. If you are asked for a comma-separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. If the final answer is a number, use a number not a word. If the final answer is a string, start with an uppercase character. If the final answer is a comma-separated list of numbers, use a space character after each comma. If the final answer is a comma-separated list of strings, use a space character after each comma and start with a lowercase character. Do not add any content to the final answer that is not in the initial answer. **Question:** """ + question + """ **Initial answer:** """ + initial_answer + """ **Example 1:** What is the biggest city in California? Los Angeles **Example 2:** How many 'r's are in strawberry? 3 **Example 3:** What is the opposite of black? White **Example 4:** What are the first 5 numbers in the Fibonacci sequence? 0, 1, 1, 2, 3 **Example 5:** What is the opposite of bad, worse, worst? good, better, best **Final answer:** """ return agent.run(prompt) # Agents web_search_agent = CodeAgent( name="web_search_agent", description="As an expert web search assistant, you search the web to answer the question. Your task is to search the web and provide accurate answers to the question: {message}", model=InferenceClientModel(WEB_SEARCH_MODEL), max_steps=2, tools=[web_search_tool], ) simple_web_search_agent = CodeAgent( name="simple_web_search_agent", description="As an expert web search assistant, you search the web to answer the question. Your task is to search the web and provide accurate answers to the question: {message}", # system_message="As an expert web search assistant, you search the web to answer the question. Your task is to search the web and provide accurate answers to the question: {message}", model=InferenceClientModel(WEB_SEARCH_MODEL), max_steps=2, tools=[simple_web_search_tool, visit_web_page_tool], ) manager_prompt = get_manager_prompt(message) manager_agent = CodeAgent( name="manager_agent", model=InferenceClientModel(MANAGER_MODEL, provider="together", max_tokens=8096), description=manager_prompt, tools=[], planning_interval=4, verbosity_level=2, managed_agents=[simple_web_search_agent], max_steps=10, additional_authorized_imports=[ "requests", "zipfile", "os", "pandas", "numpy", "sympy", "json", "bs4", "pubchempy", "xml", "yahoo_finance", "Bio", "sklearn", "scipy", "pydub", "io", "PIL", "chess", "PyPDF2", "pptx", "torch", "datetime", "csv", "fractions", ], ) final_answer_agent = CodeAgent( name="final_answer_agent", description="Given a question and an initial answer, return the final refined answer following strict formatting rules.", model=InferenceClientModel(FINAL_ANSWER_MODEL), max_steps=1, tools=[], ) final_answer = run_manager_workflow(message) # final_answer = manager_agent.run(message) return final_answer