Spaces:
Runtime error
Runtime error
File size: 6,200 Bytes
49ab10c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
import os
import re
import requests
import openai
from typing import List
from dotenv import load_dotenv
from langchain_core.tools import tool
from langchain_community.document_loaders import WebBaseLoader, WikipediaLoader, ImageCaptionLoader, ArxivLoader
from langchain_community.tools import DuckDuckGoSearchResults
from langchain_text_splitters import CharacterTextSplitter
load_dotenv()
@tool
def multiply(a: int, b: int) -> int:
"""
Multiply two integers and return the result
Args:
a: The first integer to multiply
b: The second integer to multiply
Returns:
int: The result of the multiplication
"""
return a * b
@tool
def add(a: int, b: int) -> int:
"""
Add two integers and return the result
Args:
a: The first integer to add
b: The second integer to add
Returns:
int: The result of the addition
"""
return a + b
@tool
def subtract(a: int, b: int) -> int:
"""
Subtract two integers and return the result
Args:
a: The first integer to subtract
b: The second integer to subtract
Returns:
int: The result of the subtraction
"""
return a - b
@tool
def divide(a: int, b: int) -> int:
"""
Divide the first integer by the second integer and return the result
Args:
a: The first integer to divide
b: The second integer to divide
Returns:
int: The result of the division
"""
return a / b
FILE_URL = "https://agents-course-unit4-scoring.hf.space/files/"
@tool
def read_file(task_id: str) -> str:
"""
Download a file based on the task_id and then read the content of the file
Args:
task_id: The id of the task to download the file from
Returns:
str: The content of the file
"""
file_url = f"{FILE_URL}{task_id}"
response = requests.get(file_url, timeout=10, allow_redirects=True)
with open('temp', 'wb') as fp:
fp.write(response.content)
with open('temp') as file:
return file.read()
@tool
def analyze_image(task_id: str) -> str:
"""
Analyze an image based on the task_id and return a description of the content of the image
Args:
task_id: The id of the task to analyze the image from
Returns:
str: The description of the content of the image
"""
file_url = f"{FILE_URL}{task_id}"
image = ImageCaptionLoader(images=[file_url])
return image.load()[0].page_content
@tool
def analyze_audio(task_id: str) -> str:
"""
Analyze an mp3 file based on the task_id and return a description of the content of the audio file
Args:
task_id: The id of the task to analyze the audio file from
Returns:
str: The description of the content of the audio file
"""
file_url = f"{FILE_URL}{task_id}"
response = requests.get(file_url, timeout=10, allow_redirects=True)
temp_file = 'temp.mp3'
with open(temp_file, 'wb') as fp:
fp.write(response.content)
with open(temp_file, "rb") as audio_file:
transcript = openai.audio.transcriptions.create(
file=audio_file,
model="whisper-1"
)
return transcript.text
@tool
def analyze_youtube_video(youtube_url: str, question: str) -> str:
"""
Analyze a youtube video based on the youtube_url and the question and return the answer to the question
Args:
youtube_url: The url of the youtube video to analyze
question: The question to answer based on the youtube video
Returns:
str: The answer to the question
"""
@tool
def web_search(query: str) -> str:
"""
Search the web for the given query and return the results
Args:
query: The query to search the web for
Returns:
str: The text content of the web search results
"""
search_engine = DuckDuckGoSearchResults(output_type="list", num_results=3)
results = search_engine.invoke({"query": query})
page_urls = [url["link"] for url in results]
loader = WebBaseLoader(web_paths=page_urls)
docs = loader.load()
combined_text = "\n\n".join(doc.page_content[:15000] for doc in docs)
# Clean up excessive newlines, spaces and strip leading/trailing whitespace
cleaned_text = re.sub(r'\n{3,}', '\n\n', combined_text).strip()
cleaned_text = re.sub(r'[ \t]{6,}', ' ', cleaned_text)
# Strip leading/trailing whitespace
cleaned_text = cleaned_text.strip()
return cleaned_text
@tool
def wikipedia_search(query: str) -> str:
"""
Search Wikipedia articles with the given query and return the pages
Args:
query: The query to search Wikipedia for
Returns:
str: The text content of the Wikipedia articles related to the query
"""
print("Searching Wikipedia for the query: ", query)
search_docs = WikipediaLoader(query=query, load_max_docs=3).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in search_docs
])
return formatted_search_docs
@tool
def arxiv_search(query: str) -> str:
"""
Search arxiv for the given query and return the results
Args:
query: The query to search arxiv for
Returns:
str: The text content of the arxiv search results
"""
search_docs = ArxivLoader(query=query, load_max_docs=3).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>'
for doc in search_docs
])
return formatted_search_docs
@tool
def text_splitter(text: str) -> List[str]:
"""
Split a large text into smaller chunks using Langchain's CharacterTextSplitter
Args:
text: The large text to split into smaller chunks
Returns:
List[str]: a list container the smaller chunks of the text
"""
splitter = CharacterTextSplitter(chunk_size=300, chunk_overlap=10)
return splitter.split_text(text) |