Spaces:
Sleeping
Sleeping
| import html | |
| import json | |
| import mimetypes | |
| import os | |
| import re | |
| import time | |
| import traceback | |
| from pathlib import Path | |
| from typing import Dict, List | |
| from urllib.parse import urlparse | |
| import chromadb | |
| import chromadb.utils.embedding_functions as embedding_functions | |
| import fitz # PyMuPDF | |
| import pandas as pd | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from duckduckgo_search import DDGS | |
| from duckduckgo_search.exceptions import ( | |
| ConversationLimitException, | |
| DuckDuckGoSearchException, | |
| RatelimitException, | |
| TimeoutException, | |
| ) | |
| from langchain_community.document_loaders import ( | |
| BSHTMLLoader, | |
| JSONLoader, | |
| PyPDFLoader, | |
| TextLoader, | |
| UnstructuredFileLoader, | |
| ) | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.tools import BraveSearch | |
| from markdownify import markdownify | |
| from smolagents import Tool, tool | |
| from smolagents.utils import truncate_content | |
| from typing import Dict, List | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from urllib.parse import quote_plus | |
| class ReadFileContentTool(Tool): | |
| name = "read_file_content" | |
| description = """Reads local files in various formats (text, CSV, Excel, PDF, HTML, etc.) and returns their content as readable text. Automatically detects and processes the appropriate file format.""" | |
| inputs = { | |
| "file_path": { | |
| "type": "string", | |
| "description": "The full path to the file from which the content should be read.", | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, file_path: str) -> str: | |
| if not os.path.exists(file_path): | |
| return f"❌ File does not exist: {file_path}" | |
| ext = os.path.splitext(file_path)[1].lower() | |
| try: | |
| if ext == ".txt": | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| return truncate_content(f.read()) | |
| elif ext == ".csv": | |
| df = pd.read_csv(file_path) | |
| return truncate_content( | |
| f"CSV Content:\n{df.to_string(index=False)}\n\nColumn names: {', '.join(df.columns)}" | |
| ) | |
| elif ext in [".xlsx", ".xls"]: | |
| df = pd.read_excel(file_path) | |
| return truncate_content( | |
| f"Excel Content:\n{df.to_string(index=False)}\n\nColumn names: {', '.join(df.columns)}" | |
| ) | |
| elif ext == ".pdf": | |
| doc = fitz.open(file_path) | |
| text = "".join([page.get_text() for page in doc]) | |
| doc.close() | |
| return truncate_content( | |
| text.strip() or "⚠️ PDF contains no readable text." | |
| ) | |
| elif ext == ".json": | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| return truncate_content(f.read()) | |
| elif ext == ".py": | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| return truncate_content(f.read()) | |
| elif ext in [".html", ".htm"]: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| html = f.read() | |
| try: | |
| markdown = markdownify(html).strip() | |
| markdown = re.sub(r"\n{3,}", "\n\n", markdown) | |
| return f"📄 HTML content (converted to Markdown):\n\n{truncate_content(markdown)}" | |
| except Exception: | |
| soup = BeautifulSoup(html, "html.parser") | |
| text = soup.get_text(separator="\n").strip() | |
| return f"📄 HTML content (raw text fallback):\n\n{truncate_content(text)}" | |
| elif ext in [".mp3", ".wav"]: | |
| return f"ℹ️ Audio file detected: {os.path.basename(file_path)}. Use transcribe_audio tool to process the audio content." | |
| elif ext in [".mp4", ".mov", ".avi"]: | |
| return f"ℹ️ Video file detected: {os.path.basename(file_path)}. Use transcribe_video tool to process the video content." | |
| else: | |
| return f"ℹ️ Unsupported file type: {ext}. File saved at {file_path}" | |
| except Exception as e: | |
| return f"❌ Could not read {file_path}: {e}" | |
| class WikipediaSearchTool(Tool): | |
| name = "wikipedia_search" | |
| description = """Searches Wikipedia for a specific topic and returns a concise summary. Useful for background information on subjects, concepts, historical events, or scientific topics.""" | |
| inputs = { | |
| "query": { | |
| "type": "string", | |
| "description": "The query or subject to search for on Wikipedia.", | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, query: str) -> str: | |
| print(f"EXECUTING TOOL: wikipedia_search(query='{query}')") | |
| try: | |
| search_link = f"https://en.wikipedia.org/w/api.php?action=query&list=search&srsearch={query}&format=json" | |
| search_response = requests.get(search_link, timeout=10) | |
| search_response.raise_for_status() | |
| search_data = search_response.json() | |
| if not search_data.get("query", {}).get("search", []): | |
| return f"No Wikipedia info for '{query}'." | |
| page_id = search_data["query"]["search"][0]["pageid"] | |
| content_link = ( | |
| f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&" | |
| f"exintro=1&explaintext=1&pageids={page_id}&format=json" | |
| ) | |
| content_response = requests.get(content_link, timeout=10) | |
| content_response.raise_for_status() | |
| content_data = content_response.json() | |
| extract = content_data["query"]["pages"][str(page_id)]["extract"] | |
| if len(extract) > 1500: | |
| extract = extract[:1500] + "..." | |
| result = f"Wikipedia summary for '{query}':\n{extract}" | |
| print(f"-> Tool Result (Wikipedia): {result[:100]}...") | |
| return result | |
| except Exception as e: | |
| print(f"❌ Error in wikipedia_search: {e}") | |
| traceback.print_exc() | |
| return f"Error wiki: {e}" | |
| class TranscribeAudioTool(Tool): | |
| name = "transcribe_audio" | |
| description = """Converts spoken content in audio files to text. Handles various audio formats and produces a transcript of the spoken content for analysis.""" | |
| inputs = { | |
| "file_path": { | |
| "type": "string", | |
| "description": "The full path to the audio file that needs to be transcribed.", | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, file_path: str) -> str: | |
| try: | |
| import os | |
| import tempfile | |
| import speech_recognition as sr | |
| from pydub import AudioSegment | |
| # Verify file exists | |
| if not os.path.exists(file_path): | |
| return ( | |
| f"❌ Audio file not found at: {file_path}. Download the file first." | |
| ) | |
| # Initialize recognizer | |
| recognizer = sr.Recognizer() | |
| # Convert to WAV if not already (needed for speech_recognition) | |
| file_ext = os.path.splitext(file_path)[1].lower() | |
| if file_ext != ".wav": | |
| # Create temp WAV file | |
| temp_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name | |
| # Convert to WAV using pydub | |
| audio = AudioSegment.from_file(file_path) | |
| audio.export(temp_wav, format="wav") | |
| audio_path = temp_wav | |
| else: | |
| audio_path = file_path | |
| # Transcribe audio using Google's speech recognition | |
| with sr.AudioFile(audio_path) as source: | |
| audio_data = recognizer.record(source) | |
| transcript = recognizer.recognize_google(audio_data) | |
| # Clean up temp file if created | |
| if file_ext != ".wav" and os.path.exists(temp_wav): | |
| os.remove(temp_wav) | |
| return transcript.strip() | |
| except Exception as e: | |
| return f"❌ Transcription failed: {str(e)}" | |
| class TranscibeVideoFileTool(Tool): | |
| name = "transcribe_video" | |
| description = """Extracts and transcribes speech from video files. Converts the audio portion of videos into readable text for analysis or reference.""" | |
| inputs = { | |
| "file_path": { | |
| "type": "string", | |
| "description": "The full path to the video file that needs to be transcribed.", | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, file_path: str) -> str: | |
| try: | |
| # Verify file exists | |
| if not os.path.exists(file_path): | |
| return ( | |
| f"❌ Video file not found at: {file_path}. Download the file first." | |
| ) | |
| import os | |
| import tempfile | |
| import moviepy.editor as mp | |
| import speech_recognition as sr | |
| # Extract audio from video | |
| video = mp.VideoFileClip(file_path) | |
| # Create temporary audio file | |
| temp_audio = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name | |
| # Extract audio to WAV format (required for speech_recognition) | |
| video.audio.write_audiofile(temp_audio, verbose=False, logger=None) | |
| video.close() | |
| # Initialize recognizer | |
| recognizer = sr.Recognizer() | |
| # Transcribe audio | |
| with sr.AudioFile(temp_audio) as source: | |
| audio_data = recognizer.record(source) | |
| transcript = recognizer.recognize_google(audio_data) | |
| # Clean up temp file | |
| if os.path.exists(temp_audio): | |
| os.remove(temp_audio) | |
| return transcript.strip() | |
| except Exception as e: | |
| return f"❌ Video processing failed: {str(e)}" | |
| class BraveWebSearchTool(Tool): | |
| name = "web_search" | |
| description = """Performs web searches and returns content from top results. Provides real-time information from across the internet including current events, facts, and website content relevant to your query.""" | |
| inputs = { | |
| "query": { | |
| "type": "string", | |
| "description": "A web search query string (e.g., a question or query).", | |
| } | |
| } | |
| output_type = "string" | |
| # api_key = os.getenv("BRAVE_SEARCH_API_KEY") | |
| api_key=None | |
| count = 3 | |
| char_limit = 4000 # Adjust based on LLM context window | |
| tool = BraveSearch.from_api_key(api_key=api_key, search_kwargs={"count": count}) | |
| def extract_main_text(self, url: str, char_limit: int) -> str: | |
| try: | |
| headers = {"User-Agent": "Mozilla/5.0"} | |
| response = requests.get(url, headers=headers, timeout=10) | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| # Remove scripts/styles | |
| for tag in soup(["script", "style", "noscript"]): | |
| tag.extract() | |
| # Heuristic: extract visible text from body | |
| body = soup.body | |
| if not body: | |
| return "⚠️ Could not extract content." | |
| text = " ".join(t.strip() for t in body.stripped_strings) | |
| return text[:char_limit].strip() | |
| except Exception as e: | |
| return f"⚠️ Failed to extract article: {e}" | |
| def forward(self, query: str) -> str: | |
| try: | |
| results_json = self.tool.run(query) | |
| results = ( | |
| json.loads(results_json) | |
| if isinstance(results_json, str) | |
| else results_json | |
| ) | |
| output_parts = [] | |
| for i, r in enumerate(results[: self.count], start=1): | |
| title = html.unescape(r.get("title", "").strip()) | |
| link = r.get("link", "").strip() | |
| article_text = self.extract_main_text(link, self.char_limit) | |
| result_block = ( | |
| f"Result {i}:\n" | |
| f"Title: {title}\n" | |
| f"URL: {link}\n" | |
| f"Extracted Content:\n{article_text}\n" | |
| ) | |
| output_parts.append(result_block) | |
| return "\n\n".join(output_parts).strip() | |
| except Exception as e: | |
| return f"Search failed: {str(e)}" | |
| class DescribeImageTool(Tool): | |
| name = "describe_image" | |
| description = """Analyzes images and generates detailed text descriptions. Identifies objects, scenes, text, and visual elements within the image to provide context or understanding.""" | |
| inputs = { | |
| "image_path": { | |
| "type": "string", | |
| "description": "The full path to the image file to describe.", | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, image_path: str) -> str: | |
| import os | |
| from PIL import Image | |
| from transformers import BlipForConditionalGeneration, BlipProcessor | |
| if not os.path.exists(image_path): | |
| return f"❌ Image file does not exist: {image_path}" | |
| try: | |
| processor = BlipProcessor.from_pretrained( | |
| "Salesforce/blip-image-captioning-base", use_fast=True | |
| ) | |
| model = BlipForConditionalGeneration.from_pretrained( | |
| "Salesforce/blip-image-captioning-base" | |
| ) | |
| image = Image.open(image_path).convert("RGB") | |
| inputs = processor(images=image, return_tensors="pt") | |
| output_ids = model.generate(**inputs) | |
| caption = processor.decode(output_ids[0], skip_special_tokens=True) | |
| return caption.strip() or "⚠️ No caption could be generated." | |
| except Exception as e: | |
| return f"❌ Failed to describe image: {e}" | |
| class DownloadFileFromLinkTool(Tool): | |
| name = "download_file_from_link" | |
| description = "Downloads files from a URL and saves them locally. Supports various formats including PDFs, documents, images, and data files. Returns the local file path for further processing." | |
| inputs = { | |
| "link": {"type": "string", "description": "The URL to download the file from."}, | |
| "file_name": { | |
| "type": "string", | |
| "description": "Desired name of the saved file, without extension.", | |
| "nullable": True, | |
| }, | |
| } | |
| output_type = "string" | |
| SUPPORTED_EXTENSIONS = { | |
| ".xlsx", | |
| ".pdf", | |
| ".txt", | |
| ".csv", | |
| ".json", | |
| ".xml", | |
| ".html", | |
| ".jpg", | |
| ".jpeg", | |
| ".png", | |
| ".mp4", | |
| ".mp3", | |
| ".wav", | |
| ".zip", | |
| } | |
| def forward(self, link: str, file_name: str = "taskfile") -> str: | |
| print(f"⬇️ Downloading file from: {link}") | |
| dir_path = "./downloads" | |
| os.makedirs(dir_path, exist_ok=True) | |
| try: | |
| response = requests.get(link, stream=True, timeout=30) | |
| except requests.RequestException as e: | |
| return f"❌ Error: Request failed - {e}" | |
| if response.status_code != 200: | |
| return ( | |
| f"❌ Error: Unable to fetch file. Status code: {response.status_code}" | |
| ) | |
| # Step 1: Try extracting extension from provided filename | |
| base_name, provided_ext = os.path.splitext(file_name) | |
| provided_ext = provided_ext.lower() | |
| # Step 2: Check if provided extension is supported | |
| if provided_ext and provided_ext in self.SUPPORTED_EXTENSIONS: | |
| ext = provided_ext | |
| else: | |
| # Step 3: Try to infer from Content-Type | |
| content_type = ( | |
| response.headers.get("Content-Type", "").split(";")[0].strip() | |
| ) | |
| guessed_ext = mimetypes.guess_extension(content_type or "") or "" | |
| # Step 4: If mimetype returned .bin or nothing useful, try to fallback to URL | |
| if guessed_ext in ("", ".bin"): | |
| parsed_link = urlparse(link) | |
| _, url_ext = os.path.splitext(parsed_link.path) | |
| if url_ext.lower() in self.SUPPORTED_EXTENSIONS: | |
| ext = url_ext.lower() | |
| else: | |
| return f"⚠️ Warning: Cannot determine a valid file extension from '{content_type}' or URL. Please retry with an explicit valid filename and extension." | |
| else: | |
| ext = guessed_ext | |
| # Step 5: Final path and save | |
| file_path = os.path.join(dir_path, base_name + ext) | |
| downloaded = 0 | |
| with open(file_path, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=1024): | |
| if chunk: | |
| f.write(chunk) | |
| downloaded += len(chunk) | |
| return file_path | |
| class DuckDuckGoSearchTool(Tool): | |
| name = "web_search" | |
| description = """Performs web searches and returns content from top results. Provides real-time information from across the internet including current events, facts, and website content relevant to your query.""" | |
| inputs = { | |
| "query": { | |
| "type": "string", | |
| "description": "The search query to run on DuckDuckGo", | |
| }, | |
| } | |
| output_type = "string" | |
| def _configure(self, max_retries: int = 3, retry_sleep: int = 3): | |
| self._max_retries = max_retries | |
| self._retry_sleep = retry_sleep | |
| def forward(self, query: str) -> str: | |
| self._configure() | |
| print( | |
| f"EXECUTING TOOL: duckduckgo_search(query='{query}', top_results={top_results})" | |
| ) | |
| top_results = 5 | |
| retries = 0 | |
| max_retries = getattr(self, "_max_retries", 3) | |
| retry_sleep = getattr(self, "_retry_sleep", 2) | |
| while retries < max_retries: | |
| try: | |
| results = DDGS().text( | |
| keywords=query, | |
| region="wt-wt", | |
| safesearch="moderate", | |
| max_results=top_results, | |
| ) | |
| if not results: | |
| return "No results found." | |
| output_lines = [] | |
| for idx, res in enumerate(results[:top_results], start=1): | |
| title = res.get("title", "N/A") | |
| url = res.get("href", "N/A") | |
| snippet = res.get("body", "N/A") | |
| output_lines.append( | |
| f"Result {idx}:\n" | |
| f"Title: {title}\n" | |
| f"URL: {url}\n" | |
| f"Snippet: {snippet}\n" | |
| ) | |
| output = "\n".join(output_lines) | |
| print(f"-> Tool Result (DuckDuckGo): {output[:1500]}...") | |
| return output | |
| except ( | |
| DuckDuckGoSearchException, | |
| TimeoutException, | |
| RatelimitException, | |
| ConversationLimitException, | |
| ) as e: | |
| retries += 1 | |
| print( | |
| f"⚠️ DuckDuckGo Exception (Attempt {retries}/{max_retries}): {type(e).__name__}: {e}" | |
| ) | |
| traceback.print_exc() | |
| time.sleep(retry_sleep) | |
| except Exception as e: | |
| print(f"❌ Unexpected Error: {e}") | |
| traceback.print_exc() | |
| return f"Unhandled exception during DuckDuckGo search: {e}" | |
| return f"❌ Failed to retrieve results after {max_retries} retries." | |
| huggingface_ef = embedding_functions.HuggingFaceEmbeddingFunction( | |
| model_name="sentence-transformers/all-mpnet-base-v2" | |
| ) | |
| SUPPORTED_EXTENSIONS = [ | |
| ".txt", | |
| ".md", | |
| ".py", | |
| ".pdf", | |
| ".json", | |
| ".jsonl", | |
| ".html", | |
| ".htm", | |
| ] | |
| class AddDocumentToVectorStoreTool(Tool): | |
| name = "add_document_to_vector_store" | |
| description = "Processes a document and adds it to the vector database for semantic search. Automatically chunks files and creates text embeddings to enable powerful content retrieval." | |
| inputs = { | |
| "file_path": { | |
| "type": "string", | |
| "description": "Absolute path to the file to be indexed.", | |
| } | |
| } | |
| output_type = "string" | |
| def _load_file(self, path: Path): | |
| """Select the right loader for the file extension.""" | |
| if path.suffix == ".pdf": | |
| return PyPDFLoader(str(path)).load() | |
| elif path.suffix == ".json": | |
| return JSONLoader(str(path), jq_schema=".").load() | |
| elif path.suffix in [".md"]: | |
| return UnstructuredFileLoader(str(path)).load() | |
| elif path.suffix in [".html", ".htm"]: | |
| return BSHTMLLoader(str(path)).load() | |
| else: # fallback for .txt, .py, etc. | |
| return TextLoader(str(path)).load() | |
| def forward(self, file_path: str) -> str: | |
| print(f"📄 Adding document to vector store: {file_path}") | |
| try: | |
| collection_name = "vectorstore" | |
| path = Path(file_path) | |
| if not path.exists() or path.suffix not in SUPPORTED_EXTENSIONS: | |
| return f"Unsupported or missing file: {file_path}" | |
| docs = self._load_file(path) | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=500, chunk_overlap=50 | |
| ) | |
| split_docs = text_splitter.split_documents(docs) | |
| client = chromadb.Client( | |
| chromadb.config.Settings( | |
| persist_directory="./chroma_store", | |
| ) | |
| ) | |
| collection = client.get_or_create_collection( | |
| name=collection_name, | |
| configuration={"embedding_function": huggingface_ef}, | |
| ) | |
| texts = [doc.page_content for doc in split_docs] | |
| metadatas = [doc.metadata for doc in split_docs] | |
| collection.add( | |
| documents=texts, | |
| metadatas=metadatas, | |
| ids=[f"{path.stem}_{i}" for i in range(len(texts))], | |
| ) | |
| return f"✅ Successfully added {len(texts)} chunks from '{file_path}' to collection '{collection_name}'." | |
| except Exception as e: | |
| print(f"❌ Error in add_to_vector_store: {e}") | |
| traceback.print_exc() | |
| return f"Error: {e}" | |
| class QueryVectorStoreTool(Tool): | |
| name = "query_downloaded_documents" | |
| description = "Performs semantic searches across your downloaded documents. Use detailed queries to find specific information, concepts, or answers from your collected resources." | |
| inputs = { | |
| "query": { | |
| "type": "string", | |
| "description": "The search query. Ensure this is constructed intelligently so to retrieve the most relevant outputs.", | |
| }, | |
| "top_k": { | |
| "type": "integer", | |
| "description": "Number of top results to retrieve. Usually between 3 and 30", | |
| "nullable": True, | |
| }, | |
| } | |
| output_type = "string" | |
| def forward(self, query: str, top_k: int = 5) -> str: | |
| collection_name = "vectorstore" | |
| if k < 3: | |
| k = 3 | |
| if k > 30: | |
| k = 30 | |
| print(f"🔎 Querying vector store '{collection_name}' with: '{query}'") | |
| try: | |
| client = chromadb.Client( | |
| chromadb.config.Settings( | |
| persist_directory="./chroma_store", | |
| ) | |
| ) | |
| collection = client.get_collection(name=collection_name) | |
| results = collection.query( | |
| query_texts=[query], | |
| n_results=top_k, | |
| ) | |
| formatted = [] | |
| for i in range(len(results["documents"][0])): | |
| doc = results["documents"][0][i] | |
| metadata = results["metadatas"][0][i] | |
| formatted.append( | |
| f"Result {i+1}:\n" f"Content: {doc}\n" f"Metadata: {metadata}\n" | |
| ) | |
| return "\n".join(formatted) or "No relevant documents found." | |
| except Exception as e: | |
| print(f"❌ Error in query_vector_store: {e}") | |
| traceback.print_exc() | |
| return f"Error querying vector store: {e}" | |
| def image_question_answering(image_path: str, prompt: str) -> str: | |
| """ | |
| Analyzes images and answers specific questions about their content. Can identify objects, read text, describe scenes, or interpret visual information based on your questions. | |
| Args: | |
| image_path: The path to the image file | |
| prompt: The question to ask about the image | |
| Returns: | |
| A string answer generated by the local Ollama model | |
| """ | |
| # Check for supported file types | |
| file_extension = image_path.lower().split(".")[-1] | |
| if file_extension not in ["jpg", "jpeg", "png", "bmp", "gif", "webp"]: | |
| return "Unsupported file type. Please provide an image." | |
| path = Path(image_path) | |
| if not path.exists(): | |
| return f"File not found at: {image_path}" | |
| # Send the image and prompt to Ollama's local model | |
| response = chat( | |
| model="llava", # Assuming your model is named 'lava' | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": prompt, | |
| "images": [path], | |
| }, | |
| ], | |
| options={"temperature": 0.2}, # Slight randomness for naturalness | |
| ) | |
| return response.message.content.strip() | |
| class VisitWebpageTool(Tool): | |
| name = "visit_webpage" | |
| description = "Loads a webpage from a URL and converts its content to markdown format. Use this to browse websites, extract information, or identify downloadable resources from a specific web address." | |
| inputs = { | |
| "url": { | |
| "type": "string", | |
| "description": "The url of the webpage to visit.", | |
| } | |
| } | |
| output_type = "string" | |
| def forward(self, url: str) -> str: | |
| try: | |
| from urllib.parse import urlparse | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from markdownify import markdownify | |
| from requests.exceptions import RequestException | |
| from smolagents.utils import truncate_content | |
| except ImportError as e: | |
| raise ImportError( | |
| "You must install packages `markdownify`, `requests`, and `beautifulsoup4` to run this tool: for instance run `pip install markdownify requests beautifulsoup4`." | |
| ) from e | |
| try: | |
| # Get the webpage content | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
| } | |
| response = requests.get(url, headers=headers, timeout=20) | |
| response.raise_for_status() | |
| # Parse the HTML with BeautifulSoup | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| # Extract domain name for context | |
| domain = urlparse(url).netloc | |
| # Remove common clutter elements | |
| self._remove_clutter(soup) | |
| # Try to identify and prioritize main content | |
| main_content = self._extract_main_content(soup) | |
| if main_content: | |
| # Convert the cleaned HTML to markdown | |
| markdown_content = markdownify(str(main_content)).strip() | |
| else: | |
| # Fallback to full page content if main content extraction fails | |
| markdown_content = markdownify(str(soup)).strip() | |
| # Post-process the markdown content | |
| markdown_content = self._clean_markdown(markdown_content) | |
| # Add source information | |
| result = f"Content from {domain}:\n\n{markdown_content}" | |
| return truncate_content(result, 40000) | |
| except requests.exceptions.Timeout: | |
| return "The request timed out. Please try again later or check the URL." | |
| except RequestException as e: | |
| return f"Error fetching the webpage: {str(e)}" | |
| except Exception as e: | |
| return f"An unexpected error occurred: {str(e)}" | |
| def _remove_clutter(self, soup): | |
| """Remove common elements that clutter web pages.""" | |
| # Common non-content elements to remove | |
| clutter_selectors = [ | |
| "header", | |
| "footer", | |
| "nav", | |
| ".nav", | |
| ".navigation", | |
| ".menu", | |
| ".sidebar", | |
| ".footer", | |
| ".header", | |
| "#footer", | |
| "#header", | |
| "#nav", | |
| "#sidebar", | |
| ".widget", | |
| ".cookie", | |
| ".cookies", | |
| ".ad", | |
| ".ads", | |
| ".advertisement", | |
| "script", | |
| "style", | |
| "noscript", | |
| "iframe", | |
| ".social", | |
| ".share", | |
| ".comment", | |
| ".comments", | |
| ".subscription", | |
| ".newsletter", | |
| '[role="banner"]', | |
| '[role="navigation"]', | |
| '[role="complementary"]', | |
| ] | |
| for selector in clutter_selectors: | |
| for element in soup.select(selector): | |
| element.decompose() | |
| # Remove hidden elements | |
| for hidden in soup.select( | |
| '[style*="display: none"], [style*="display:none"], [style*="visibility: hidden"], [style*="visibility:hidden"], [hidden]' | |
| ): | |
| hidden.decompose() | |
| def _extract_main_content(self, soup): | |
| """Try to identify and extract the main content of the page.""" | |
| # Priority order for common main content containers | |
| main_content_selectors = [ | |
| "main", | |
| '[role="main"]', | |
| "article", | |
| ".content", | |
| ".main-content", | |
| ".post-content", | |
| "#content", | |
| "#main", | |
| "#main-content", | |
| ".article", | |
| ".post", | |
| ".entry", | |
| ".page-content", | |
| ".entry-content", | |
| ] | |
| # Try to find the main content container | |
| for selector in main_content_selectors: | |
| main_content = soup.select(selector) | |
| if main_content: | |
| # If multiple matches, find the one with the most text content | |
| if len(main_content) > 1: | |
| return max(main_content, key=lambda x: len(x.get_text())) | |
| return main_content[0] | |
| # If no main content container found, look for the largest text block | |
| paragraphs = soup.find_all("p") | |
| if paragraphs: | |
| # Find the parent that contains the most paragraphs | |
| parents = {} | |
| for p in paragraphs: | |
| if p.parent: | |
| if p.parent not in parents: | |
| parents[p.parent] = 0 | |
| parents[p.parent] += 1 | |
| if parents: | |
| # Return the parent with the most paragraphs | |
| return max(parents.items(), key=lambda x: x[1])[0] | |
| # Return None if we can't identify main content | |
| return None | |
| def _clean_markdown(self, content): | |
| """Clean up the markdown content.""" | |
| # Normalize whitespace | |
| content = re.sub(r"\n{3,}", "\n\n", content) | |
| # Remove consecutive duplicate links | |
| content = re.sub(r"(\[.*?\]\(.*?\))\s*\1+", r"\1", content) | |
| # Remove very short lines that are likely menu items | |
| lines = content.split("\n") | |
| filtered_lines = [] | |
| # Skip consecutive short lines (likely menus) | |
| short_line_threshold = 40 # characters | |
| consecutive_short_lines = 0 | |
| max_consecutive_short_lines = 3 | |
| for line in lines: | |
| stripped_line = line.strip() | |
| if len( | |
| stripped_line | |
| ) < short_line_threshold and not stripped_line.startswith("#"): | |
| consecutive_short_lines += 1 | |
| if consecutive_short_lines > max_consecutive_short_lines: | |
| continue | |
| else: | |
| consecutive_short_lines = 0 | |
| filtered_lines.append(line) | |
| content = "\n".join(filtered_lines) | |
| # Remove duplicate headers | |
| seen_headers = set() | |
| lines = content.split("\n") | |
| filtered_lines = [] | |
| for line in lines: | |
| if line.startswith("#"): | |
| header_text = line.strip() | |
| if header_text in seen_headers: | |
| continue | |
| seen_headers.add(header_text) | |
| filtered_lines.append(line) | |
| content = "\n".join(filtered_lines) | |
| # Remove lines containing common footer patterns | |
| footer_patterns = [ | |
| r"^copyright", | |
| r"^©", | |
| r"^all rights reserved", | |
| r"^terms", | |
| r"^privacy policy", | |
| r"^contact us", | |
| r"^follow us", | |
| r"^social media", | |
| r"^disclaimer", | |
| ] | |
| footer_pattern = "|".join(footer_patterns) | |
| lines = content.split("\n") | |
| filtered_lines = [] | |
| for line in lines: | |
| if not re.search(footer_pattern, line.lower()): | |
| filtered_lines.append(line) | |
| content = "\n".join(filtered_lines) | |
| return content | |
| class ArxivSearchTool(Tool): | |
| name = "arxiv_search" | |
| description = """Searches arXiv for academic papers and returns structured information including titles, authors, publication dates, abstracts, and download links.""" | |
| inputs = { | |
| "query": { | |
| "type": "string", | |
| "description": "A research-related query (e.g., 'AI regulation')", | |
| }, | |
| "from_date": { | |
| "type": "string", | |
| "description": "Optional search start date in format (YYYY or YYYY-MM or YYYY-MM-DD) (e.g., '2022-06' or '2022' or '2022-04-12')", | |
| "nullable": True, | |
| }, | |
| "to_date": { | |
| "type": "string", | |
| "description": "Optional search end date in (YYYY or YYYY-MM or YYYY-MM-DD) (e.g., '2022-06' or '2022' or '2022-04-12')", | |
| "nullable": True, | |
| }, | |
| } | |
| output_type = "string" | |
| def forward( | |
| self, | |
| query: str, | |
| from_date: str = None, | |
| to_date: str = None, | |
| ) -> str: | |
| # 1) build URL | |
| url = build_arxiv_url(query, from_date, to_date, size=50) | |
| # 2) fetch & parse | |
| try: | |
| papers = fetch_and_parse_arxiv(url) | |
| except Exception as e: | |
| return f"❌ Failed to fetch or parse arXiv results: {e}" | |
| if not papers: | |
| return "No results found for your query." | |
| # 3) format into a single string | |
| output_lines = [] | |
| for idx, p in enumerate(papers, start=1): | |
| output_lines += [ | |
| f"🔍 RESULT {idx}", | |
| f"Title : {p['title']}", | |
| f"Authors : {p['authors']}", | |
| f"Published : {p['published']}", | |
| f"Summary : {p['abstract'][:500]}{'...' if len(p['abstract'])>500 else ''}", | |
| f"Entry ID : {p['entry_link']}", | |
| f"Download link: {p['download_link']}", | |
| "", | |
| ] | |
| return "\n".join(output_lines).strip() | |
| def fetch_and_parse_arxiv(url: str) -> List[Dict[str, str]]: | |
| """ | |
| Fetches the given arXiv advanced‐search URL, parses the HTML, | |
| and returns a list of results. Each result is a dict containing: | |
| - title | |
| - authors | |
| - published | |
| - abstract | |
| - entry_link | |
| - doi (or "[N/A]" if none) | |
| """ | |
| resp = requests.get(url) | |
| resp.raise_for_status() | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| results = [] | |
| for li in soup.find_all("li", class_="arxiv-result"): | |
| # Title | |
| t = li.find("p", class_="title") | |
| title = t.get_text(strip=True) if t else "" | |
| # Authors | |
| a = li.find("p", class_="authors") | |
| authors = a.get_text(strip=True).replace("Authors:", "").strip() if a else "" | |
| # Abstract | |
| ab = li.find("span", class_="abstract-full") | |
| abstract = ( | |
| ab.get_text(strip=True).replace("Abstract:", "").strip() if ab else "" | |
| ) | |
| # Published date | |
| d = li.find("p", class_="is-size-7") | |
| published = d.get_text(strip=True) if d else "" | |
| # Entry link | |
| lt = li.find("p", class_="list-title") | |
| entry_link = lt.find("a")["href"] if lt and lt.find("a") else "" | |
| # DOI | |
| idblock = li.find("p", class_="list-identifier") | |
| if idblock: | |
| for a_tag in idblock.find_all("a", href=True): | |
| if "doi.org" in a_tag["href"]: | |
| doi = a_tag["href"] | |
| break | |
| results.append( | |
| { | |
| "title": title, | |
| "authors": authors, | |
| "published": published, | |
| "abstract": abstract, | |
| "entry_link": entry_link, | |
| "download_link": ( | |
| entry_link.replace("abs", "pdf") if "abs" in entry_link else "N/A" | |
| ), | |
| } | |
| ) | |
| return results | |
| def build_arxiv_url( | |
| query: str, from_date: str = None, to_date: str = None, size: int = 50 | |
| ) -> str: | |
| """ | |
| Build an arXiv advanced-search URL matching the exact segment order: | |
| 1) ?advanced | |
| 2) terms-0-operator=AND | |
| 3) terms-0-term=… | |
| 4) terms-0-field=all | |
| 5) classification-physics_archives=all | |
| 6) classification-include_cross_list=include | |
| [ optional date‐range block ] | |
| 7) abstracts=show | |
| 8) size=… | |
| 9) order=-announced_date_first | |
| If from_date or to_date is None, the date-range block is omitted. | |
| """ | |
| base = "https://arxiv.org/search/advanced?advanced=" | |
| parts = [ | |
| "&terms-0-operator=AND", | |
| f"&terms-0-term={quote_plus(query)}", | |
| "&terms-0-field=all", | |
| "&classification-physics_archives=all", | |
| "&classification-include_cross_list=include", | |
| ] | |
| # optional date-range filtering | |
| if from_date and to_date: | |
| parts += [ | |
| "&date-year=", | |
| "&date-filter_by=date_range", | |
| f"&date-from_date={from_date}", | |
| f"&date-to_date={to_date}", | |
| "&date-date_type=submitted_date", | |
| ] | |
| parts += [ | |
| "&abstracts=show", | |
| f"&size={size}", | |
| "&order=-announced_date_first", | |
| ] | |
| return base + "".join(parts) | |