Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import requests | |
| import openai | |
| from typing import List | |
| from dotenv import load_dotenv | |
| from langchain_core.tools import tool | |
| from langchain_community.document_loaders import WebBaseLoader, WikipediaLoader, ImageCaptionLoader, ArxivLoader | |
| from langchain_community.tools import DuckDuckGoSearchResults | |
| from langchain_text_splitters import CharacterTextSplitter | |
| load_dotenv() | |
| def multiply(a: int, b: int) -> int: | |
| """ | |
| Multiply two integers and return the result | |
| Args: | |
| a: The first integer to multiply | |
| b: The second integer to multiply | |
| Returns: | |
| int: The result of the multiplication | |
| """ | |
| return a * b | |
| def add(a: int, b: int) -> int: | |
| """ | |
| Add two integers and return the result | |
| Args: | |
| a: The first integer to add | |
| b: The second integer to add | |
| Returns: | |
| int: The result of the addition | |
| """ | |
| return a + b | |
| def subtract(a: int, b: int) -> int: | |
| """ | |
| Subtract two integers and return the result | |
| Args: | |
| a: The first integer to subtract | |
| b: The second integer to subtract | |
| Returns: | |
| int: The result of the subtraction | |
| """ | |
| return a - b | |
| def divide(a: int, b: int) -> int: | |
| """ | |
| Divide the first integer by the second integer and return the result | |
| Args: | |
| a: The first integer to divide | |
| b: The second integer to divide | |
| Returns: | |
| int: The result of the division | |
| """ | |
| return a / b | |
| FILE_URL = "https://agents-course-unit4-scoring.hf.space/files/" | |
| def read_file(task_id: str) -> str: | |
| """ | |
| Download a file based on the task_id and then read the content of the file | |
| Args: | |
| task_id: The id of the task to download the file from | |
| Returns: | |
| str: The content of the file | |
| """ | |
| file_url = f"{FILE_URL}{task_id}" | |
| response = requests.get(file_url, timeout=10, allow_redirects=True) | |
| with open('temp', 'wb') as fp: | |
| fp.write(response.content) | |
| with open('temp') as file: | |
| return file.read() | |
| def analyze_image(task_id: str) -> str: | |
| """ | |
| Analyze an image based on the task_id and return a description of the content of the image | |
| Args: | |
| task_id: The id of the task to analyze the image from | |
| Returns: | |
| str: The description of the content of the image | |
| """ | |
| file_url = f"{FILE_URL}{task_id}" | |
| image = ImageCaptionLoader(images=[file_url]) | |
| return image.load()[0].page_content | |
| def analyze_audio(task_id: str) -> str: | |
| """ | |
| Analyze an mp3 file based on the task_id and return a description of the content of the audio file | |
| Args: | |
| task_id: The id of the task to analyze the audio file from | |
| Returns: | |
| str: The description of the content of the audio file | |
| """ | |
| file_url = f"{FILE_URL}{task_id}" | |
| response = requests.get(file_url, timeout=10, allow_redirects=True) | |
| temp_file = 'temp.mp3' | |
| with open(temp_file, 'wb') as fp: | |
| fp.write(response.content) | |
| with open(temp_file, "rb") as audio_file: | |
| transcript = openai.audio.transcriptions.create( | |
| file=audio_file, | |
| model="whisper-1" | |
| ) | |
| return transcript.text | |
| def analyze_youtube_video(youtube_url: str, question: str) -> str: | |
| """ | |
| Analyze a youtube video based on the youtube_url and the question and return the answer to the question | |
| Args: | |
| youtube_url: The url of the youtube video to analyze | |
| question: The question to answer based on the youtube video | |
| Returns: | |
| str: The answer to the question | |
| """ | |
| def web_search(query: str) -> str: | |
| """ | |
| Search the web for the given query and return the results | |
| Args: | |
| query: The query to search the web for | |
| Returns: | |
| str: The text content of the web search results | |
| """ | |
| search_engine = DuckDuckGoSearchResults(output_type="list", num_results=3) | |
| results = search_engine.invoke({"query": query}) | |
| page_urls = [url["link"] for url in results] | |
| loader = WebBaseLoader(web_paths=page_urls) | |
| docs = loader.load() | |
| combined_text = "\n\n".join(doc.page_content[:15000] for doc in docs) | |
| # Clean up excessive newlines, spaces and strip leading/trailing whitespace | |
| cleaned_text = re.sub(r'\n{3,}', '\n\n', combined_text).strip() | |
| cleaned_text = re.sub(r'[ \t]{6,}', ' ', cleaned_text) | |
| # Strip leading/trailing whitespace | |
| cleaned_text = cleaned_text.strip() | |
| return cleaned_text | |
| def wikipedia_search(query: str) -> str: | |
| """ | |
| Search Wikipedia articles with the given query and return the pages | |
| Args: | |
| query: The query to search Wikipedia for | |
| Returns: | |
| str: The text content of the Wikipedia articles related to the query | |
| """ | |
| print("Searching Wikipedia for the query: ", query) | |
| search_docs = WikipediaLoader(query=query, load_max_docs=3).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return formatted_search_docs | |
| def arxiv_search(query: str) -> str: | |
| """ | |
| Search arxiv for the given query and return the results | |
| Args: | |
| query: The query to search arxiv for | |
| Returns: | |
| str: The text content of the arxiv search results | |
| """ | |
| search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return formatted_search_docs | |
| def text_splitter(text: str) -> List[str]: | |
| """ | |
| Split a large text into smaller chunks using Langchain's CharacterTextSplitter | |
| Args: | |
| text: The large text to split into smaller chunks | |
| Returns: | |
| List[str]: a list container the smaller chunks of the text | |
| """ | |
| splitter = CharacterTextSplitter(chunk_size=300, chunk_overlap=10) | |
| return splitter.split_text(text) |