""" Tools for the FlexibleAgent All tool functions that the agent can use """ import os import re import requests import tempfile import mimetypes from pathlib import Path from langchain_core.tools import tool from langchain_community.retrievers import WikipediaRetriever from langchain_community.document_loaders import ( UnstructuredFileLoader, TextLoader, CSVLoader, PDFPlumberLoader, UnstructuredImageLoader, UnstructuredMarkdownLoader, UnstructuredWordDocumentLoader, UnstructuredPowerPointLoader, UnstructuredExcelLoader ) from langchain_community.tools.tavily_search import TavilySearchResults from langchain_core.tools import Tool from langchain_google_community import GoogleSearchAPIWrapper from langchain_community.tools import DuckDuckGoSearchResults from langchain_community.document_loaders import WebBaseLoader @tool def wikipedia_search(query: str) -> str: """Search Wikipedia for information. Use this for factual information and encyclopedic content. Args: query: The search query.""" try: retriever = WikipediaRetriever(load_max_docs=10) docs = retriever.invoke(query) if not docs: return f"No Wikipedia articles found for '{query}'" output = f"Wikipedia search results for '{query}':\n\n" # Format the search results as HTML formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in docs ] ) return output + formatted_search_docs except Exception as e: return f"Wikipedia search failed: {str(e)}" @tool def youtube_search(query: str) -> str: """Search YouTube for videos and get video information. Use this when you need YouTube-specific content.""" try: from youtubesearchpython import VideosSearch search = VideosSearch(query, limit=3) results = search.result() output = f"YouTube search results for '{query}':\n" for video in results['result']: output += f"- {video['title']} by {video['channel']['name']}\n" output += f" Duration: {video['duration']}, Views: {video['viewCount']['text']}\n" output += f" URL: {video['link']}\n\n" return output except Exception as e: return f"YouTube search failed: {str(e)}" @tool def web_search(query: str) -> str: """Search the web for a query and return the first results. Args: query: The search query.""" result = "Results from web search:\n\n" search = DuckDuckGoSearchResults(output_format="list") search_results = search.invoke(query) urls = [search_result['link'] for search_result in search_results[:3]] loader = WebBaseLoader(web_paths=urls) for doc in loader.lazy_load(): result += f"{doc.metadata}\n\n" result += f"{doc.page_content}\n\n" result += f"--------------------------------\n\n" return result @tool def decode_text(text: str) -> str: """Decode or reverse text that might be encoded backwards or in other ways.""" try: # Try reversing words words = text.split() reversed_words = [word[::-1] for word in words] reversed_text = " ".join(reversed_words) # Try reversing the entire string fully_reversed = text[::-1] return f"Original: {text}\nWord-by-word reversed: {reversed_text}\nFully reversed: {fully_reversed}" except Exception as e: return f"Text decoding failed: {str(e)}" @tool def download_and_process_file(task_id: str) -> str: """Download and process a file from the GAIA API using the task_id. Use this tool when detect_file_requirement indicates a file is needed.""" api_url = "https://agents-course-unit4-scoring.hf.space" try: # Download file from API file_url = f"{api_url}/files/{task_id}" print(f"Downloading file from: {file_url}") response = requests.get(file_url, timeout=30) response.raise_for_status() # Get filename from Content-Disposition header or use task_id filename = task_id if 'Content-Disposition' in response.headers: cd = response.headers['Content-Disposition'] filename_match = re.search(r'filename="?([^"]+)"?', cd) if filename_match: filename = filename_match.group(1) # Create temporary file with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as tmp_file: tmp_file.write(response.content) temp_path = tmp_file.name # Process the file based on type file_content = _process_downloaded_file(temp_path, filename) # Clean up os.unlink(temp_path) return f"FILE PROCESSED: {filename}\n\nContent:\n{file_content}" except requests.exceptions.RequestException as e: return f"File download failed: {str(e)}" except Exception as e: return f"File processing failed: {str(e)}" def _process_downloaded_file(file_path: str, filename: str) -> str: """Process a downloaded file based on its type and return content.""" try: # Determine file type mime_type, _ = mimetypes.guess_type(filename) file_extension = Path(filename).suffix.lower() # Handle audio files if mime_type and mime_type.startswith('audio') or file_extension in ['.mp3', '.wav', '.m4a', '.ogg']: return _process_audio_file(file_path) # Handle image files elif mime_type and mime_type.startswith('image') or file_extension in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']: return _process_image_file(file_path) # Handle documents elif file_extension in ['.pdf']: loader = PDFPlumberLoader(file_path) docs = loader.load() return "\n".join([doc.page_content for doc in docs]) elif file_extension in ['.docx', '.doc']: loader = UnstructuredWordDocumentLoader(file_path) docs = loader.load() return "\n".join([doc.page_content for doc in docs]) elif file_extension in ['.pptx', '.ppt']: loader = UnstructuredPowerPointLoader(file_path) docs = loader.load() return "\n".join([doc.page_content for doc in docs]) elif file_extension in ['.xlsx', '.xls']: loader = UnstructuredExcelLoader(file_path) docs = loader.load() return "\n".join([doc.page_content for doc in docs]) elif file_extension in ['.csv']: loader = CSVLoader(file_path) docs = loader.load() return "\n".join([doc.page_content for doc in docs]) elif file_extension in ['.md', '.markdown']: loader = UnstructuredMarkdownLoader(file_path) docs = loader.load() return "\n".join([doc.page_content for doc in docs]) elif file_extension in ['.txt'] or mime_type and mime_type.startswith('text'): loader = TextLoader(file_path) docs = loader.load() return "\n".join([doc.page_content for doc in docs]) # Fallback: try unstructured loader else: loader = UnstructuredFileLoader(file_path) docs = loader.load() return "\n".join([doc.page_content for doc in docs]) except Exception as e: return f"Error processing file {filename}: {str(e)}" def _process_audio_file(file_path: str) -> str: """Process audio files using speech recognition.""" try: import speech_recognition as sr from pydub import AudioSegment # Convert to WAV if needed audio = AudioSegment.from_file(file_path) wav_path = file_path + ".wav" audio.export(wav_path, format="wav") # Use speech recognition recognizer = sr.Recognizer() with sr.AudioFile(wav_path) as source: audio_data = recognizer.record(source) text = recognizer.recognize_google(audio_data) # Clean up temporary WAV file if os.path.exists(wav_path): os.unlink(wav_path) return f"Audio transcription:\n{text}" except ImportError: return "Audio processing requires additional dependencies (speech_recognition, pydub)" except Exception as e: # Fallback: try with whisper if available try: import whisper model = whisper.load_model("base") result = model.transcribe(file_path) return f"Audio transcription (Whisper):\n{result['text']}" except ImportError: return f"Audio processing failed: {str(e)}. Consider installing speech_recognition, pydub, or openai-whisper." except Exception as e2: return f"Audio processing failed: {str(e2)}" def _process_image_file(file_path: str) -> str: """Process image files.""" try: # Use unstructured image loader loader = UnstructuredImageLoader(file_path) docs = loader.load() content = "\n".join([doc.page_content for doc in docs]) if content.strip(): return f"Image content extracted:\n{content}" else: return f"Image file detected but no text content could be extracted. Consider using OCR or image analysis tools." except Exception as e: return f"Image processing failed: {str(e)}"