Spaces:
Runtime error
Runtime error
| """ | |
| Tools for the FlexibleAgent | |
| All tool functions that the agent can use | |
| """ | |
| import os | |
| import re | |
| import requests | |
| import tempfile | |
| import mimetypes | |
| from pathlib import Path | |
| from langchain_core.tools import tool | |
| from langchain_community.retrievers import WikipediaRetriever | |
| from langchain_community.document_loaders import ( | |
| UnstructuredFileLoader, | |
| TextLoader, | |
| CSVLoader, | |
| PDFPlumberLoader, | |
| UnstructuredImageLoader, | |
| UnstructuredMarkdownLoader, | |
| UnstructuredWordDocumentLoader, | |
| UnstructuredPowerPointLoader, | |
| UnstructuredExcelLoader | |
| ) | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain_core.tools import Tool | |
| from langchain_google_community import GoogleSearchAPIWrapper | |
| from langchain_community.tools import DuckDuckGoSearchResults | |
| from langchain_community.document_loaders import WebBaseLoader | |
| def wikipedia_search(query: str) -> str: | |
| """Search Wikipedia for information. Use this for factual information and encyclopedic content. | |
| Args: | |
| query: The search query.""" | |
| try: | |
| retriever = WikipediaRetriever(load_max_docs=10) | |
| docs = retriever.invoke(query) | |
| if not docs: | |
| return f"No Wikipedia articles found for '{query}'" | |
| output = f"Wikipedia search results for '{query}':\n\n" | |
| # Format the search results as HTML | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in docs | |
| ] | |
| ) | |
| return output + formatted_search_docs | |
| except Exception as e: | |
| return f"Wikipedia search failed: {str(e)}" | |
| def youtube_search(query: str) -> str: | |
| """Search YouTube for videos and get video information. Use this when you need YouTube-specific content.""" | |
| try: | |
| from youtubesearchpython import VideosSearch | |
| search = VideosSearch(query, limit=3) | |
| results = search.result() | |
| output = f"YouTube search results for '{query}':\n" | |
| for video in results['result']: | |
| output += f"- {video['title']} by {video['channel']['name']}\n" | |
| output += f" Duration: {video['duration']}, Views: {video['viewCount']['text']}\n" | |
| output += f" URL: {video['link']}\n\n" | |
| return output | |
| except Exception as e: | |
| return f"YouTube search failed: {str(e)}" | |
| def web_search(query: str) -> str: | |
| """Search the web for a query and return the first results. | |
| Args: | |
| query: The search query.""" | |
| result = "Results from web search:\n\n" | |
| search = DuckDuckGoSearchResults(output_format="list") | |
| search_results = search.invoke(query) | |
| urls = [search_result['link'] for search_result in search_results[:3]] | |
| loader = WebBaseLoader(web_paths=urls) | |
| for doc in loader.lazy_load(): | |
| result += f"{doc.metadata}\n\n" | |
| result += f"{doc.page_content}\n\n" | |
| result += f"--------------------------------\n\n" | |
| return result | |
| def decode_text(text: str) -> str: | |
| """Decode or reverse text that might be encoded backwards or in other ways.""" | |
| try: | |
| # Try reversing words | |
| words = text.split() | |
| reversed_words = [word[::-1] for word in words] | |
| reversed_text = " ".join(reversed_words) | |
| # Try reversing the entire string | |
| fully_reversed = text[::-1] | |
| return f"Original: {text}\nWord-by-word reversed: {reversed_text}\nFully reversed: {fully_reversed}" | |
| except Exception as e: | |
| return f"Text decoding failed: {str(e)}" | |
| def download_and_process_file(task_id: str) -> str: | |
| """Download and process a file from the GAIA API using the task_id. | |
| Use this tool when detect_file_requirement indicates a file is needed.""" | |
| api_url = "https://agents-course-unit4-scoring.hf.space" | |
| try: | |
| # Download file from API | |
| file_url = f"{api_url}/files/{task_id}" | |
| print(f"Downloading file from: {file_url}") | |
| response = requests.get(file_url, timeout=30) | |
| response.raise_for_status() | |
| # Get filename from Content-Disposition header or use task_id | |
| filename = task_id | |
| if 'Content-Disposition' in response.headers: | |
| cd = response.headers['Content-Disposition'] | |
| filename_match = re.search(r'filename="?([^"]+)"?', cd) | |
| if filename_match: | |
| filename = filename_match.group(1) | |
| # Create temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as tmp_file: | |
| tmp_file.write(response.content) | |
| temp_path = tmp_file.name | |
| # Process the file based on type | |
| file_content = _process_downloaded_file(temp_path, filename) | |
| # Clean up | |
| os.unlink(temp_path) | |
| return f"FILE PROCESSED: {filename}\n\nContent:\n{file_content}" | |
| except requests.exceptions.RequestException as e: | |
| return f"File download failed: {str(e)}" | |
| except Exception as e: | |
| return f"File processing failed: {str(e)}" | |
| def _process_downloaded_file(file_path: str, filename: str) -> str: | |
| """Process a downloaded file based on its type and return content.""" | |
| try: | |
| # Determine file type | |
| mime_type, _ = mimetypes.guess_type(filename) | |
| file_extension = Path(filename).suffix.lower() | |
| # Handle audio files | |
| if mime_type and mime_type.startswith('audio') or file_extension in ['.mp3', '.wav', '.m4a', '.ogg']: | |
| return _process_audio_file(file_path) | |
| # Handle image files | |
| elif mime_type and mime_type.startswith('image') or file_extension in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']: | |
| return _process_image_file(file_path) | |
| # Handle documents | |
| elif file_extension in ['.pdf']: | |
| loader = PDFPlumberLoader(file_path) | |
| docs = loader.load() | |
| return "\n".join([doc.page_content for doc in docs]) | |
| elif file_extension in ['.docx', '.doc']: | |
| loader = UnstructuredWordDocumentLoader(file_path) | |
| docs = loader.load() | |
| return "\n".join([doc.page_content for doc in docs]) | |
| elif file_extension in ['.pptx', '.ppt']: | |
| loader = UnstructuredPowerPointLoader(file_path) | |
| docs = loader.load() | |
| return "\n".join([doc.page_content for doc in docs]) | |
| elif file_extension in ['.xlsx', '.xls']: | |
| loader = UnstructuredExcelLoader(file_path) | |
| docs = loader.load() | |
| return "\n".join([doc.page_content for doc in docs]) | |
| elif file_extension in ['.csv']: | |
| loader = CSVLoader(file_path) | |
| docs = loader.load() | |
| return "\n".join([doc.page_content for doc in docs]) | |
| elif file_extension in ['.md', '.markdown']: | |
| loader = UnstructuredMarkdownLoader(file_path) | |
| docs = loader.load() | |
| return "\n".join([doc.page_content for doc in docs]) | |
| elif file_extension in ['.txt'] or mime_type and mime_type.startswith('text'): | |
| loader = TextLoader(file_path) | |
| docs = loader.load() | |
| return "\n".join([doc.page_content for doc in docs]) | |
| # Fallback: try unstructured loader | |
| else: | |
| loader = UnstructuredFileLoader(file_path) | |
| docs = loader.load() | |
| return "\n".join([doc.page_content for doc in docs]) | |
| except Exception as e: | |
| return f"Error processing file {filename}: {str(e)}" | |
| def _process_audio_file(file_path: str) -> str: | |
| """Process audio files using speech recognition.""" | |
| try: | |
| import speech_recognition as sr | |
| from pydub import AudioSegment | |
| # Convert to WAV if needed | |
| audio = AudioSegment.from_file(file_path) | |
| wav_path = file_path + ".wav" | |
| audio.export(wav_path, format="wav") | |
| # Use speech recognition | |
| recognizer = sr.Recognizer() | |
| with sr.AudioFile(wav_path) as source: | |
| audio_data = recognizer.record(source) | |
| text = recognizer.recognize_google(audio_data) | |
| # Clean up temporary WAV file | |
| if os.path.exists(wav_path): | |
| os.unlink(wav_path) | |
| return f"Audio transcription:\n{text}" | |
| except ImportError: | |
| return "Audio processing requires additional dependencies (speech_recognition, pydub)" | |
| except Exception as e: | |
| # Fallback: try with whisper if available | |
| try: | |
| import whisper | |
| model = whisper.load_model("base") | |
| result = model.transcribe(file_path) | |
| return f"Audio transcription (Whisper):\n{result['text']}" | |
| except ImportError: | |
| return f"Audio processing failed: {str(e)}. Consider installing speech_recognition, pydub, or openai-whisper." | |
| except Exception as e2: | |
| return f"Audio processing failed: {str(e2)}" | |
| def _process_image_file(file_path: str) -> str: | |
| """Process image files.""" | |
| try: | |
| # Use unstructured image loader | |
| loader = UnstructuredImageLoader(file_path) | |
| docs = loader.load() | |
| content = "\n".join([doc.page_content for doc in docs]) | |
| if content.strip(): | |
| return f"Image content extracted:\n{content}" | |
| else: | |
| return f"Image file detected but no text content could be extracted. Consider using OCR or image analysis tools." | |
| except Exception as e: | |
| return f"Image processing failed: {str(e)}" | |