rqueraud's picture
Before refactoring tools
4d5f444
raw
history blame
9.83 kB
"""
Tools for the FlexibleAgent
All tool functions that the agent can use
"""
import os
import re
import requests
import tempfile
import mimetypes
from pathlib import Path
from langchain_core.tools import tool
from langchain_community.retrievers import WikipediaRetriever
from langchain_community.document_loaders import (
UnstructuredFileLoader,
TextLoader,
CSVLoader,
PDFPlumberLoader,
UnstructuredImageLoader,
UnstructuredMarkdownLoader,
UnstructuredWordDocumentLoader,
UnstructuredPowerPointLoader,
UnstructuredExcelLoader
)
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.tools import Tool
from langchain_google_community import GoogleSearchAPIWrapper
from langchain_community.tools import DuckDuckGoSearchResults
from langchain_community.document_loaders import WebBaseLoader
@tool
def wikipedia_search(query: str) -> str:
"""Search Wikipedia for information. Use this for factual information and encyclopedic content.
Args:
query: The search query."""
try:
retriever = WikipediaRetriever(load_max_docs=10)
docs = retriever.invoke(query)
if not docs:
return f"No Wikipedia articles found for '{query}'"
output = f"Wikipedia search results for '{query}':\n\n"
# Format the search results as HTML
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in docs
]
)
return output + formatted_search_docs
except Exception as e:
return f"Wikipedia search failed: {str(e)}"
@tool
def youtube_search(query: str) -> str:
"""Search YouTube for videos and get video information. Use this when you need YouTube-specific content."""
try:
from youtubesearchpython import VideosSearch
search = VideosSearch(query, limit=3)
results = search.result()
output = f"YouTube search results for '{query}':\n"
for video in results['result']:
output += f"- {video['title']} by {video['channel']['name']}\n"
output += f" Duration: {video['duration']}, Views: {video['viewCount']['text']}\n"
output += f" URL: {video['link']}\n\n"
return output
except Exception as e:
return f"YouTube search failed: {str(e)}"
@tool
def web_search(query: str) -> str:
"""Search the web for a query and return the first results.
Args:
query: The search query."""
result = "Results from web search:\n\n"
search = DuckDuckGoSearchResults(output_format="list")
search_results = search.invoke(query)
urls = [search_result['link'] for search_result in search_results[:3]]
loader = WebBaseLoader(web_paths=urls)
for doc in loader.lazy_load():
result += f"{doc.metadata}\n\n"
result += f"{doc.page_content}\n\n"
result += f"--------------------------------\n\n"
return result
@tool
def decode_text(text: str) -> str:
"""Decode or reverse text that might be encoded backwards or in other ways."""
try:
# Try reversing words
words = text.split()
reversed_words = [word[::-1] for word in words]
reversed_text = " ".join(reversed_words)
# Try reversing the entire string
fully_reversed = text[::-1]
return f"Original: {text}\nWord-by-word reversed: {reversed_text}\nFully reversed: {fully_reversed}"
except Exception as e:
return f"Text decoding failed: {str(e)}"
@tool
def download_and_process_file(task_id: str) -> str:
"""Download and process a file from the GAIA API using the task_id.
Use this tool when detect_file_requirement indicates a file is needed."""
api_url = "https://agents-course-unit4-scoring.hf.space"
try:
# Download file from API
file_url = f"{api_url}/files/{task_id}"
print(f"Downloading file from: {file_url}")
response = requests.get(file_url, timeout=30)
response.raise_for_status()
# Get filename from Content-Disposition header or use task_id
filename = task_id
if 'Content-Disposition' in response.headers:
cd = response.headers['Content-Disposition']
filename_match = re.search(r'filename="?([^"]+)"?', cd)
if filename_match:
filename = filename_match.group(1)
# Create temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as tmp_file:
tmp_file.write(response.content)
temp_path = tmp_file.name
# Process the file based on type
file_content = _process_downloaded_file(temp_path, filename)
# Clean up
os.unlink(temp_path)
return f"FILE PROCESSED: {filename}\n\nContent:\n{file_content}"
except requests.exceptions.RequestException as e:
return f"File download failed: {str(e)}"
except Exception as e:
return f"File processing failed: {str(e)}"
def _process_downloaded_file(file_path: str, filename: str) -> str:
"""Process a downloaded file based on its type and return content."""
try:
# Determine file type
mime_type, _ = mimetypes.guess_type(filename)
file_extension = Path(filename).suffix.lower()
# Handle audio files
if mime_type and mime_type.startswith('audio') or file_extension in ['.mp3', '.wav', '.m4a', '.ogg']:
return _process_audio_file(file_path)
# Handle image files
elif mime_type and mime_type.startswith('image') or file_extension in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']:
return _process_image_file(file_path)
# Handle documents
elif file_extension in ['.pdf']:
loader = PDFPlumberLoader(file_path)
docs = loader.load()
return "\n".join([doc.page_content for doc in docs])
elif file_extension in ['.docx', '.doc']:
loader = UnstructuredWordDocumentLoader(file_path)
docs = loader.load()
return "\n".join([doc.page_content for doc in docs])
elif file_extension in ['.pptx', '.ppt']:
loader = UnstructuredPowerPointLoader(file_path)
docs = loader.load()
return "\n".join([doc.page_content for doc in docs])
elif file_extension in ['.xlsx', '.xls']:
loader = UnstructuredExcelLoader(file_path)
docs = loader.load()
return "\n".join([doc.page_content for doc in docs])
elif file_extension in ['.csv']:
loader = CSVLoader(file_path)
docs = loader.load()
return "\n".join([doc.page_content for doc in docs])
elif file_extension in ['.md', '.markdown']:
loader = UnstructuredMarkdownLoader(file_path)
docs = loader.load()
return "\n".join([doc.page_content for doc in docs])
elif file_extension in ['.txt'] or mime_type and mime_type.startswith('text'):
loader = TextLoader(file_path)
docs = loader.load()
return "\n".join([doc.page_content for doc in docs])
# Fallback: try unstructured loader
else:
loader = UnstructuredFileLoader(file_path)
docs = loader.load()
return "\n".join([doc.page_content for doc in docs])
except Exception as e:
return f"Error processing file {filename}: {str(e)}"
def _process_audio_file(file_path: str) -> str:
"""Process audio files using speech recognition."""
try:
import speech_recognition as sr
from pydub import AudioSegment
# Convert to WAV if needed
audio = AudioSegment.from_file(file_path)
wav_path = file_path + ".wav"
audio.export(wav_path, format="wav")
# Use speech recognition
recognizer = sr.Recognizer()
with sr.AudioFile(wav_path) as source:
audio_data = recognizer.record(source)
text = recognizer.recognize_google(audio_data)
# Clean up temporary WAV file
if os.path.exists(wav_path):
os.unlink(wav_path)
return f"Audio transcription:\n{text}"
except ImportError:
return "Audio processing requires additional dependencies (speech_recognition, pydub)"
except Exception as e:
# Fallback: try with whisper if available
try:
import whisper
model = whisper.load_model("base")
result = model.transcribe(file_path)
return f"Audio transcription (Whisper):\n{result['text']}"
except ImportError:
return f"Audio processing failed: {str(e)}. Consider installing speech_recognition, pydub, or openai-whisper."
except Exception as e2:
return f"Audio processing failed: {str(e2)}"
def _process_image_file(file_path: str) -> str:
"""Process image files."""
try:
# Use unstructured image loader
loader = UnstructuredImageLoader(file_path)
docs = loader.load()
content = "\n".join([doc.page_content for doc in docs])
if content.strip():
return f"Image content extracted:\n{content}"
else:
return f"Image file detected but no text content could be extracted. Consider using OCR or image analysis tools."
except Exception as e:
return f"Image processing failed: {str(e)}"