Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import tempfile | |
| import traceback | |
| import fitz # PyMuPDF | |
| import pandas as pd | |
| import requests | |
| from smolagents import Tool | |
| class DownloadFileFromTaskTool(Tool): | |
| name = "download_file_from_task" | |
| description = """Downloads a file for a GAIA task ID and saves it in a temporary directory. | |
| Use this when question requires information from a mentioned file, before reading a file.""" | |
| inputs = { | |
| "task_id": {"type": "string", "description": "The GAIA task ID (REQUIRED)."}, | |
| "filename": { | |
| "type": "string", | |
| "description": "Optional custom filename to save the file as (e.g., 'data.xlsx').", | |
| "nullable": True, | |
| }, | |
| } | |
| output_type = "string" | |
| def forward(self, task_id: str, filename: str = None) -> str: | |
| if not task_id or not re.match(r"^[0-9a-f\-]{36}$", task_id): | |
| return "❌ Invalid or missing task_id." | |
| file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}" | |
| try: | |
| response = requests.get(file_url, timeout=15) | |
| if response.status_code == 404: | |
| return "⚠️ No file found for this task." | |
| response.raise_for_status() | |
| # Try extracting filename and extension from header | |
| disposition = response.headers.get("content-disposition", "") | |
| header_filename_match = re.search(r'filename="(.+?)"', disposition) | |
| ext = "" | |
| if header_filename_match: | |
| ext = os.path.splitext(header_filename_match.group(1))[1] | |
| # Final filename logic | |
| if not filename: | |
| filename = f"{task_id}{ext or '.bin'}" | |
| temp_dir = tempfile.mkdtemp() | |
| file_path = os.path.join(temp_dir, filename) | |
| with open(file_path, "wb") as f: | |
| f.write(response.content) | |
| print(f"File saved at: {file_path}") | |
| return file_path | |
| except Exception as e: | |
| return f"❌ Error: {e}" | |
| class ReadFileContentTool(Tool): | |
| name = "read_file_content" | |
| description = """Reads and returns the content of a file. Use after downloading a file using `download_file_from_task`.""" | |
| inputs = { | |
| "file_path": {"type": "string", "description": "Full path to a file to read."} | |
| } | |
| output_type = "string" | |
| def forward(self, file_path: str) -> str: | |
| if not os.path.exists(file_path): | |
| return f"❌ File does not exist: {file_path}" | |
| ext = os.path.splitext(file_path)[1].lower() | |
| try: | |
| if ext == ".txt": | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| return f.read() | |
| elif ext == ".csv": | |
| df = pd.read_csv(file_path) | |
| return df.head().to_string(index=False) | |
| elif ext == ".xlsx": | |
| df = pd.read_excel(file_path) | |
| return df.head().to_string(index=False) | |
| elif ext == ".pdf": | |
| doc = fitz.open(file_path) | |
| text = "" | |
| for page in doc: | |
| text += page.get_text() | |
| doc.close() | |
| return text.strip() or "⚠️ PDF contains no readable text." | |
| elif ext == ".json": | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| return f.read() | |
| elif ext == ".py": | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| return f.read() | |
| elif ext in [".mp3", ".wav"]: | |
| return f"ℹ️ Audio file detected: {os.path.basename(file_path)}. Use audio processing tool if needed." | |
| elif ext in [".mp4", ".mov", ".avi"]: | |
| return f"ℹ️ Video file detected: {os.path.basename(file_path)}. Use video analysis tool if available." | |
| else: | |
| return f"ℹ️ Unsupported file type: {ext}. File saved at {file_path}" | |
| except Exception as e: | |
| return f"❌ Could not read {file_path}: {e}" | |
| class GetWikipediaInfoTool(Tool): | |
| name = "get_wikipedia_info" | |
| description = """Fetches a short summary about a topic from Wikipedia. | |
| Use this when a user asks for background information, an explanation, or context on a well-known subject.""" | |
| inputs = { | |
| "topic": { | |
| "type": "string", | |
| "description": "The topic to search for on Wikipedia.", | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, topic: str) -> str: | |
| print(f"EXECUTING TOOL: get_wikipedia_info(topic='{topic}')") | |
| try: | |
| search_url = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={topic}&format=json" | |
| search_response = requests.get(search_url, timeout=10) | |
| search_response.raise_for_status() | |
| search_data = search_response.json() | |
| if not search_data.get("query", {}).get("search", []): | |
| return f"No Wikipedia info for '{topic}'." | |
| page_id = search_data["query"]["search"][0]["pageid"] | |
| content_url = ( | |
| f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&" | |
| f"exintro=1&explaintext=1&pageids={page_id}&format=json" | |
| ) | |
| content_response = requests.get(content_url, timeout=10) | |
| content_response.raise_for_status() | |
| content_data = content_response.json() | |
| extract = content_data["query"]["pages"][str(page_id)]["extract"] | |
| if len(extract) > 1500: | |
| extract = extract[:1500] + "..." | |
| result = f"Wikipedia summary for '{topic}':\n{extract}" | |
| print(f"-> Tool Result (Wikipedia): {result[:100]}...") | |
| return result | |
| except Exception as e: | |
| print(f"❌ Error in get_wikipedia_info: {e}") | |
| traceback.print_exc() | |
| return f"Error wiki: {e}" | |
| class VisitWebpageTool(Tool): | |
| name = "visit_webpage" | |
| description = """ | |
| Visits a given URL and returns structured page content including title, metadata, headings, paragraphs, | |
| tables, lists, and links. | |
| """ | |
| inputs = { | |
| "url": { | |
| "type": "string", | |
| "description": "The full URL of the webpage to visit.", | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, url: str) -> str: | |
| try: | |
| import json | |
| import requests | |
| from bs4 import BeautifulSoup | |
| response = requests.get(url, timeout=10) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| def clean(text): | |
| return " ".join(text.strip().split()) | |
| def extract_tables(soup): | |
| tables_data = [] | |
| for table in soup.find_all("table"): | |
| headers = [clean(th.get_text()) for th in table.find_all("th")] | |
| rows = [] | |
| for row in table.find_all("tr"): | |
| cells = [clean(td.get_text()) for td in row.find_all("td")] | |
| if cells: | |
| rows.append(cells) | |
| if headers and rows: | |
| tables_data.append({"headers": headers, "rows": rows}) | |
| return tables_data | |
| def extract_lists(soup): | |
| all_lists = [] | |
| for ul in soup.find_all("ul"): | |
| items = [clean(li.get_text()) for li in ul.find_all("li")] | |
| if items: | |
| all_lists.append(items) | |
| for ol in soup.find_all("ol"): | |
| items = [clean(li.get_text()) for li in ol.find_all("li")] | |
| if items: | |
| all_lists.append(items) | |
| return all_lists | |
| def extract_meta(soup): | |
| metas = {} | |
| for meta in soup.find_all("meta"): | |
| name = meta.get("name") or meta.get("property") | |
| content = meta.get("content") | |
| if name and content: | |
| metas[name.lower()] = clean(content) | |
| return metas | |
| result = { | |
| "title": clean(soup.title.string) if soup.title else None, | |
| "meta": extract_meta(soup), | |
| "headings": { | |
| "h1": [clean(h.get_text()) for h in soup.find_all("h1")], | |
| "h2": [clean(h.get_text()) for h in soup.find_all("h2")], | |
| "h3": [clean(h.get_text()) for h in soup.find_all("h3")], | |
| }, | |
| "paragraphs": [clean(p.get_text()) for p in soup.find_all("p")], | |
| "lists": extract_lists(soup), | |
| "tables": extract_tables(soup), | |
| "links": [ | |
| {"text": clean(a.get_text()), "href": a["href"]} | |
| for a in soup.find_all("a", href=True) | |
| ], | |
| } | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| return f"❌ Failed to fetch or parse webpage: {str(e)}" | |
| class TranscribeAudioTool(Tool): | |
| name = "transcribe_audio" | |
| description = ( | |
| """Transcribes spoken audio (e.g. voice memos, lectures) into plain text.""" | |
| ) | |
| inputs = {"file_path": {"type": "string", "description": "Path to an audio file."}} | |
| output_type = "string" | |
| def forward(self, file_path: str) -> str: | |
| try: | |
| import os | |
| import tempfile | |
| import speech_recognition as sr | |
| from pydub import AudioSegment | |
| # Initialize recognizer | |
| recognizer = sr.Recognizer() | |
| # Convert to WAV if not already (needed for speech_recognition) | |
| file_ext = os.path.splitext(file_path)[1].lower() | |
| if file_ext != ".wav": | |
| # Create temp WAV file | |
| temp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name | |
| # Convert to WAV using pydub | |
| audio = AudioSegment.from_file(file_path) | |
| audio.export(temp_wav, format="wav") | |
| audio_path = temp_wav | |
| else: | |
| audio_path = file_path | |
| # Transcribe audio using Google's speech recognition | |
| with sr.AudioFile(audio_path) as source: | |
| audio_data = recognizer.record(source) | |
| transcript = recognizer.recognize_google(audio_data) | |
| # Clean up temp file if created | |
| if file_ext != ".wav" and os.path.exists(temp_wav): | |
| os.remove(temp_wav) | |
| return transcript.strip() | |
| except Exception as e: | |
| return f"❌ Transcription failed: {str(e)}" | |
| class TranscibeVideoFileTool(Tool): | |
| name = "transcribe_video" | |
| description = """Transcribes speech from a video file. Use this to understand video lectures, tutorials, or visual demos.""" | |
| inputs = { | |
| "file_path": { | |
| "type": "string", | |
| "description": "Path to the video file (e.g., .mp4, .mov).", | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, file_path: str) -> str: | |
| try: | |
| import os | |
| import tempfile | |
| import moviepy.editor as mp | |
| import speech_recognition as sr | |
| # Extract audio from video | |
| video = mp.VideoFileClip(file_path) | |
| # Create temporary audio file | |
| temp_audio = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name | |
| # Extract audio to WAV format (required for speech_recognition) | |
| video.audio.write_audiofile(temp_audio, verbose=False, logger=None) | |
| video.close() | |
| # Initialize recognizer | |
| recognizer = sr.Recognizer() | |
| # Transcribe audio | |
| with sr.AudioFile(temp_audio) as source: | |
| audio_data = recognizer.record(source) | |
| transcript = recognizer.recognize_google(audio_data) | |
| # Clean up temp file | |
| if os.path.exists(temp_audio): | |
| os.remove(temp_audio) | |
| return transcript.strip() | |
| except Exception as e: | |
| return f"❌ Video processing failed: {str(e)}" | |