Spaces:
Sleeping
Sleeping
| from smolagents import Tool, tool | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import os | |
| import pandas as pd | |
| from pypdf import PdfReader | |
| import time | |
| import torch | |
| from transformers import pipeline | |
| import numpy as np | |
| class TranscriptTool(Tool): | |
| name = "transcribe_media" | |
| description = "Transcribes audio or vide files (mp3, wav, mp4) into text. Use this for podcasts, voice memos, or video files." | |
| inputs = {'file_path': {'type': 'string', 'description': 'The path to the audio or video file'}} | |
| output_type = "string" | |
| def forward(self, file_path: str) -> str: | |
| if not os.path.exists(file_path): | |
| return f"Error: File {file_path} not found." | |
| try: | |
| transcriber = pipeline("automatic-speech-recognition", model="openai/whisper-small") | |
| result = transciber(file_path) | |
| return f"Transcription of {os.path.basename(file_path)}:\n\n{result['text']}" | |
| except Exception as e: | |
| return f"Error transcribing file: {str(e)}" | |
| # Instantiate audio transcriber tool | |
| transcription_tool = TranscriptTool() | |
| class GoogleSearchTool(Tool): | |
| name = "web_search" | |
| description = "Searches the web using Google. Essential for finding specific articles and papers." | |
| inputs = {'query': {'type': 'string', 'description': 'The search query.'}} | |
| output_type = "string" | |
| def forward(self, query: str) -> str: | |
| api_key = os.getenv("SERPER_API_KEY") | |
| if not api_key: | |
| return "Error: SERPER_API_KEY not found in environment variables." | |
| url = "https://google.serper.dev/search" | |
| payload = {"q": query} | |
| headers = { | |
| 'X-API-KEY': api_key, | |
| 'Content-Type': 'application/json' | |
| } | |
| try: | |
| response = requests.post(url, headers=headers, json=payload) | |
| response.raise_for_status() | |
| results = response.json() | |
| if 'organic' not in results: | |
| return "No results found." | |
| output = [] | |
| for item in results['organic'][:5]: # Take top 5 results | |
| output.append(f"Title: {item['title']}\nLink: {item['link']}\nSnippet: {item['snippet']}\n") | |
| return "\n---\n".join(output) | |
| except Exception as e: | |
| return f"Google Search failed: {str(e)}" | |
| search_tool = GoogleSearchTool() | |
| class VisitWebpageTool(Tool): | |
| name = "visit_webpage" | |
| description = "Visits a webpage at the given URL and returns its content as a clean string." | |
| inputs = {'url': {'type': 'string', 'description': 'The URL of the webpage to visit.'}} | |
| output_type = "string" | |
| def forward(self, url: str) -> str: | |
| if "youtube.com" in url or "youtu.be" in url: | |
| return ( | |
| "ERROR: Cannot visit YoutTube directly. " | |
| "STRATEGY: Extract the Video ID from the URL and use 'web_search' to find the video title, " | |
| "then search for the title + 'transcript' or 'summary'." | |
| ) | |
| try: | |
| # fake user-agent to avoid 403 Forbidden errors | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| response = requests.get(url, headers=headers, timeout=10) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| for script_or_style in soup(["script", "style"]): | |
| script_or_style.extract() | |
| text = soup.get_text(separator='\n') | |
| lines = (line.strip() for line in text.splitlines()) | |
| text = '\n'.join(line for line in lines if line) | |
| return text[:10000] | |
| except Exception as e: | |
| return f"Error visiting {url}: {str(e)}" | |
| visit_webpage = VisitWebpageTool() | |
| def handle_file(file_path: str) -> str: | |
| """ | |
| This tool extracts content from different file types (PDF, Excel, CSV, TXT). | |
| For CSV/Excel, it returns a preview and instructions to load the file in pandas. | |
| Args: | |
| file_path: The local path to the file. | |
| """ | |
| if not os.path.exists(file_path): | |
| return f"Error: File {file_path} not found." | |
| ext = os.path.splitext(file_path)[1].lower() | |
| try: | |
| if ext == '.csv': | |
| # Return hints for the agent to write its own code | |
| df = pd.read_csv(file_path) | |
| return ( | |
| f"CSV loaded. Shape: {df.shape}\n" | |
| f"Columns: {list(df.columns)}\n" | |
| f"First 5 rows:\n{df.head(5).to_markdown()}\n\n" | |
| "[IMPORTANT]: To analyze the full file, write Python code: `df = pd.read_csv('{file_path}')`" | |
| ) | |
| elif ext in ['.xlsx', '.xls']: | |
| df = pd.read_excel(file_path) | |
| return ( | |
| f"Excel loaded. Shape: {df.shape}\n" | |
| f"Columns: {list(df.columns)}\n" | |
| f"First 5 rows:\n{df.head(5).to_markdown()}\n\n" | |
| "[IMPORTANT]: To analyze the full file, write Python code: `df = pd.read_excel('{file_path}')`" | |
| ) | |
| elif ext == '.pdf': | |
| reader = PdfReader(file_path) | |
| text = "" | |
| for page in reader.pages: | |
| content = page.extract_text() | |
| if content: | |
| text += content + "\n" | |
| return text[:15000] | |
| else: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| return f.read()[:15000] | |
| except Exception as e: | |
| return f"Error processing {ext} file: {str(e)}" |