Spaces:
Sleeping
Sleeping
| from langchain_community.utilities import GoogleSerperAPIWrapper | |
| from smolagents import PythonInterpreterTool | |
| from langgraph.graph import MessagesState | |
| from langchain_openai import ChatOpenAI | |
| from langgraph.graph import START, StateGraph | |
| from langgraph.prebuilt import tools_condition, ToolNode | |
| from langchain_core.messages import SystemMessage | |
| from openai import OpenAI | |
| from smolagents import Tool | |
| from typing import Optional | |
| import tempfile | |
| import os | |
| from urllib.parse import urlparse | |
| from base64 import b64encode | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import re | |
| import wikipediaapi | |
| # Configs | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| FILE_URL = f"{DEFAULT_API_URL}/files/{{task_id}}" | |
| # Tools | |
| def search_tool(query: str) -> str: | |
| """Search in Google and returns an string with title, link, and snippet for the top 10 results. | |
| Args: | |
| query: str | |
| Returns: | |
| Title, link, and snippet for the top 10 results | |
| """ | |
| searcher = GoogleSerperAPIWrapper(k=10) | |
| retries = 3 | |
| result = "" | |
| while retries > 0: | |
| try: | |
| search_results = searcher.results(query)["organic"] | |
| for row in search_results: | |
| result += f"Title: {row['title']}\nSnippet: {row['snippet']}\nURL: {row['link']}\n\n" | |
| return result | |
| except Exception as e: | |
| retries -= 1 | |
| return f"There was an error with Google search: {e}" | |
| def save_file(content: str, filename: Optional[str]) -> str: | |
| """ | |
| Save content to a temporary file and return the path. | |
| Useful for processing files from the GAIA API. | |
| Args: | |
| content: The content to save to the file | |
| filename: Optional filename, will generate a random name if not provided | |
| Returns: | |
| Path to the saved file | |
| """ | |
| temp_dir = tempfile.gettempdir() | |
| if filename is None: | |
| temp_file = tempfile.NamedTemporaryFile(delete=False) | |
| filepath = temp_file.name | |
| else: | |
| filepath = os.path.join(temp_dir, filename) | |
| # Write content to the file | |
| with open(filepath, "w") as f: | |
| f.write(content) | |
| return f"File saved to {filepath}. You can read this file to process its contents." | |
| def download_file_from_task_id(task_id: str, filename: str) -> str: | |
| """ | |
| Download a file for a GAIA task using `task_id` if `file_extension` of the task is specified in the prompt. | |
| Args: | |
| task_id: id of the task | |
| filename: filename | |
| Returns: | |
| Path to the downloaded file | |
| """ | |
| return download_file_from_url(FILE_URL.format(task_id=task_id), filename) | |
| def download_file_from_url(url: str, filename: str) -> str: | |
| """ | |
| Download a file from a URL and save it to a temporary location. | |
| Args: | |
| url: The URL to download from | |
| filename: filename | |
| Returns: | |
| Path to the downloaded file | |
| """ | |
| try: | |
| # Parse URL to get filename if not provided | |
| if not filename: | |
| path = urlparse(url).path | |
| filename = os.path.basename(path) | |
| if not filename: | |
| # Generate a random name if we couldn't extract one | |
| import uuid | |
| filename = f"downloaded_{uuid.uuid4().hex[:8]}" | |
| # Create temporary file | |
| temp_dir = tempfile.gettempdir() | |
| filepath = os.path.join(temp_dir, filename) | |
| # Download the file | |
| response = requests.get(url, stream=True) | |
| response.raise_for_status() | |
| # Save the file | |
| with open(filepath, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return f"File downloaded to {filepath}. You can now process this file." | |
| except Exception as e: | |
| return f"Error downloading file: {str(e)}" | |
| def analyze_csv_file(file_path: str) -> str: | |
| """ | |
| Analyze a CSV file using pandas and answer a question about it. | |
| Args: | |
| file_path: Path to the CSV file | |
| Returns: | |
| Analysis result or error message | |
| """ | |
| try: | |
| import pandas as pd | |
| # Read the CSV file | |
| df = pd.read_csv(file_path) | |
| # Run various analyses based on the query | |
| result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
| result += f"Columns: {', '.join(df.columns)}\n\n" | |
| # Add summary statistics | |
| result += "Summary statistics:\n" | |
| result += str(df.describe()) | |
| result += "\n\n" + df.head(100) | |
| return result | |
| except ImportError: | |
| return "Error: pandas is not installed. Please install it with 'pip install pandas'." | |
| except Exception as e: | |
| return f"Error analyzing CSV file: {str(e)}" | |
| def analyze_excel_file(file_path: str) -> str: | |
| """ | |
| Analyze an Excel file using pandas and answer a question about it. | |
| Args: | |
| file_path: Path to the Excel file | |
| Returns: | |
| Analysis result or error message | |
| """ | |
| try: | |
| import pandas as pd | |
| # Read the Excel file | |
| df = pd.read_excel(file_path) | |
| print(df) | |
| # Run various analyses based on the query | |
| result = f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
| result += f"Columns: {', '.join(df.columns)}\n\n" | |
| # Add summary statistics | |
| result += "Summary statistics:\n" | |
| result += str(df.describe()) | |
| result += "\n\n" + str(df.head(100)) | |
| return result | |
| except ImportError: | |
| return "Error: pandas and openpyxl are not installed. Please install them with 'pip install pandas openpyxl'." | |
| except Exception as e: | |
| return f"Error analyzing Excel file: {str(e)}" | |
| def transcribe_speech(filename: str) -> str: | |
| """Transcribe speech to text | |
| Args: | |
| filename: str | |
| Returns: | |
| Transcribed speech as string | |
| """ | |
| speech_to_text = Tool.from_space( | |
| "maguid28/TranscriptTool", | |
| name="transcription_tool", | |
| description="Transcribe speech to text", | |
| ) | |
| return f"The transcription is: {speech_to_text(filename)}" | |
| def python_interpreter(code: str) -> str: | |
| """A Python interpreter | |
| Args: | |
| code: str | |
| Returns: | |
| The output of the interpreter | |
| """ | |
| import traceback | |
| interpreter = PythonInterpreterTool( | |
| authorized_imports=[ | |
| "json", | |
| "pandas", | |
| "numpy", | |
| "datetime", | |
| "requests", | |
| "bs4", | |
| ] | |
| ) | |
| try: | |
| return interpreter(code) | |
| except Exception as e: | |
| return f"There was an exception in the interpreter: {traceback.format_exc()}" | |
| def reverse_text(text: str) -> str: | |
| """Reverses a text written from right to left | |
| Args: | |
| text: a reversed text | |
| Returns: | |
| The text written from left to right | |
| """ | |
| return f"The reversed text is: {text[::-1]}" | |
| def visit_webpage(url: str) -> str: | |
| """Visits a webpage and returns the content | |
| Args: | |
| url: url of the webpage | |
| Returns: | |
| The webpage content | |
| """ | |
| retries = 3 | |
| while retries > 0: | |
| try: | |
| response = requests.get( | |
| url, | |
| headers={ | |
| "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36" | |
| }, | |
| ) | |
| html = response.content | |
| soup = BeautifulSoup(html, "html.parser") | |
| for tag in soup.find_all( | |
| ["header", "footer", "nav", "section", "aside"] | |
| ): | |
| tag.decompose() | |
| for tag in soup.find_all(["script", "style"]): | |
| tag.decompose() | |
| meaningful_texts = [] | |
| for tag in soup.find_all(["p", "span", "div"]): | |
| text = tag.get_text(separator=" ", strip=True) | |
| if text: | |
| meaningful_texts.append(text) | |
| # Join all texts nicely | |
| final_text = " ".join(meaningful_texts) | |
| # Clean multiple spaces | |
| final_text = re.sub(r"\s+", " ", final_text) | |
| return " ".join(final_text.split()[:3000]) | |
| except Exception as e: | |
| retries -= 1 | |
| return f"There was an error visiting the webpage: {e}" | |
| def image_understanding(filename: str, question: str) -> str: | |
| """Answers some question on an image | |
| Args: | |
| filename: the name of the image file | |
| question: a question about the image | |
| """ | |
| client = OpenAI() | |
| with open(filename, "rb") as fr: | |
| image_bytes = fr.read() | |
| b64_image = b64encode(image_bytes).decode("utf-8") | |
| response = client.responses.create( | |
| model="gpt-4o", | |
| input=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "input_text", "text": question}, | |
| { | |
| "type": "input_image", | |
| "image_url": f"data:image/png;base64,{b64_image}", | |
| }, | |
| ], | |
| } | |
| ], | |
| ) | |
| return response.output[0].content[0].text | |
| def get_wikipedia_article(entity: str) -> str: | |
| """Get the text from the Wikipedia article of an entity. | |
| Args: | |
| entity: the name of the entity. Only for entities existing in Wikipedia, e.g. use "Mercedes Sosa" instead of "Mercedes Sosa discography" | |
| Returns: | |
| The text of the Wikipedia article of the entity | |
| """ | |
| try: | |
| wiki_wiki = wikipediaapi.Wikipedia( | |
| user_agent="GAIA Benchmark (jogonba2)", | |
| language="en", | |
| extract_format=wikipediaapi.ExtractFormat.WIKI, | |
| ) | |
| p_wiki = wiki_wiki.page(entity) | |
| text = p_wiki.text | |
| if not text: | |
| return f"The article is empty for {entity}. Please, be sure that the entity appears in Wikipedia." | |
| return " ".join(text.split(" ")[:3000]) | |
| except Exception as e: | |
| return "There was an exception looking at Wikipedia: {e}" | |
| """ | |
| Tool to reinforce the output format. | |
| """ | |
| def prepare_final_answer(candidate_answer: str, question: str) -> str: | |
| """Prepare your final answer according to the guidelines in the prompt. | |
| This tool must be called always before giving the final anwer. | |
| Args: | |
| candidate_answer: a candidate answer | |
| question: the user question to know how to prepare the final answer | |
| Returns: | |
| Your final answer | |
| """ | |
| client = OpenAI() | |
| system_prompt = """Your final answer should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. | |
| Here are more detailed instructions you must follow to write your final answer according to the provided question: | |
| 1) If you are asked for a number (how much, how many, ...), you must write a number!. Don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. | |
| 2) If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. | |
| 3) If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. | |
| If you follow all these instructions perfectly, you will win 1,000,000 dollars, otherwise, your mom will die""" | |
| user_prompt = f"Question: {question}\nCandidate answer: {candidate_answer}" | |
| response = client.responses.create( | |
| model="gpt-4o", | |
| input=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "input_text", "text": user_prompt}, | |
| ], | |
| } | |
| ], | |
| ) | |
| return response.output[0].content[0].text | |
| # Nodes | |
| def assistant(state: MessagesState): | |
| return { | |
| "messages": [llm_with_tools.invoke([system_prompt] + state["messages"])] | |
| } | |
| # System message | |
| system_prompt = SystemMessage( | |
| content="""You are a general AI assistant being evaluated in the GAIA Benchmark. | |
| I will ask you a question and you must reach your final answer by using a set of tools I provide to you. Please, when you are needed to pass file names to the tools, pass absolute paths. | |
| Your final answer should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. | |
| Here are more detailed instructions you must follow to write your final answer: | |
| 1) If you are asked for a number, you must write a number!. Don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. | |
| 2) If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. | |
| 3) If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. | |
| If you follow all these instructions perfectly, you will win 1,000,000 dollars, otherwise, your mom will die. | |
| Let's start! | |
| """ | |
| ) | |
| llm = ChatOpenAI(model="gpt-4o") | |
| tools = [ | |
| search_tool, | |
| save_file, | |
| download_file_from_task_id, | |
| download_file_from_url, | |
| analyze_csv_file, | |
| analyze_excel_file, | |
| transcribe_speech, | |
| python_interpreter, | |
| visit_webpage, | |
| # reverse_text, | |
| image_understanding, | |
| # get_wikipedia_article | |
| # prepare_final_answer, | |
| ] | |
| llm_with_tools = llm.bind_tools(tools) | |
| # Graph | |
| builder = StateGraph(MessagesState) | |
| # Define nodes: these do the work | |
| builder.add_node("assistant", assistant) | |
| builder.add_node("tools", ToolNode(tools)) | |
| # Define edges: these determine the control flow | |
| builder.add_edge(START, "assistant") | |
| builder.add_conditional_edges( | |
| "assistant", | |
| tools_condition, | |
| ) | |
| builder.add_edge("tools", "assistant") | |
| react_graph = builder.compile() | |
| def print_stream(stream): | |
| for s in stream: | |
| message = s["messages"][-1] | |
| if isinstance(message, tuple): | |
| print(message) | |
| else: | |
| message.pretty_print() | |
| class ReactAgent: | |
| def __init__(self, verbose: bool = False): | |
| self.graph = react_graph | |
| self.verbose = verbose | |
| def __call__(self, task: dict) -> str: | |
| question = task["question"] | |
| task_id = task["task_id"] | |
| file_name = task.get("file_name") | |
| file_ext = None | |
| user_prompt = question | |
| if file_name: | |
| file_ext = os.path.splitext(file_name)[-1].removeprefix(".") | |
| user_prompt += f"\nTask ID: {task_id}\nFile extension: {file_ext}" | |
| user_input = {"messages": [("user", user_prompt)]} | |
| if self.verbose: | |
| print_stream(self.graph.stream(user_input, stream_mode="values")) | |
| else: | |
| answer = self.graph.invoke(user_input)["messages"][-1].content | |
| return self._clean_answer(answer) | |
| def _clean_answer(self, answer: any) -> str: | |
| """ | |
| Taken from `susmitsil`: | |
| https://huggingface.co/spaces/susmitsil/FinalAgenticAssessment/blob/main/main_agent.py | |
| Clean up the answer to remove common prefixes and formatting | |
| that models often add but that can cause exact match failures. | |
| Args: | |
| answer: The raw answer from the model | |
| Returns: | |
| The cleaned answer as a string | |
| """ | |
| # Convert non-string types to strings | |
| if not isinstance(answer, str): | |
| # Handle numeric types (float, int) | |
| if isinstance(answer, float): | |
| # Format floating point numbers properly | |
| # Check if it's an integer value in float form (e.g., 12.0) | |
| if answer.is_integer(): | |
| formatted_answer = str(int(answer)) | |
| else: | |
| # For currency values that might need formatting | |
| if abs(answer) >= 1000: | |
| formatted_answer = f"${answer:,.2f}" | |
| else: | |
| formatted_answer = str(answer) | |
| return formatted_answer | |
| elif isinstance(answer, int): | |
| return str(answer) | |
| else: | |
| # For any other type | |
| return str(answer) | |
| # Now we know answer is a string, so we can safely use string methods | |
| # Normalize whitespace | |
| answer = answer.strip() | |
| # Remove common prefixes and formatting that models add | |
| prefixes_to_remove = [ | |
| "The answer is ", | |
| "Answer: ", | |
| "Final answer: ", | |
| "The result is ", | |
| "To answer this question: ", | |
| "Based on the information provided, ", | |
| "According to the information: ", | |
| ] | |
| for prefix in prefixes_to_remove: | |
| if answer.startswith(prefix): | |
| answer = answer[len(prefix) :].strip() | |
| # Remove quotes if they wrap the entire answer | |
| if (answer.startswith('"') and answer.endswith('"')) or ( | |
| answer.startswith("'") and answer.endswith("'") | |
| ): | |
| answer = answer[1:-1].strip() | |
| return answer | |
| if __name__ == "__main__": | |
| task = { | |
| "task_id": "8e867cd7-cff9-4e6c-867a-ff5ddc2550be", | |
| "question": "How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of english wikipedia.", | |
| "Level": "1", | |
| "file_name": "", | |
| } | |
| agent = ReactAgent(verbose=False) | |
| print(agent(task)) | |