Spaces:
Sleeping
Sleeping
| from langchain_core.tools import tool | |
| from langchain_community.utilities.duckduckgo_search import DuckDuckGoSearchAPIWrapper | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain_community.document_loaders.wikipedia import WikipediaLoader | |
| from langchain_community.document_loaders.arxiv import ArxivLoader | |
| from langchain_community.document_loaders.pubmed import PubMedLoader | |
| from langchain_community.tools.google_search.tool import GoogleSearchRun | |
| from typing import Optional | |
| import os | |
| import tempfile | |
| import requests | |
| from urllib.parse import urlparse, parse_qs | |
| import pytesseract | |
| from PIL import Image | |
| import pandas as pd | |
| import uuid | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| ## Simple algebra tools | |
| def add(a: float, b: float) -> float: | |
| """Add two numbers. | |
| Args: | |
| a: first float | |
| b: second float | |
| """ | |
| return a + b | |
| def substract(a: float, b: float) -> float: | |
| """Substract two numbers. | |
| Args: | |
| a: first float | |
| b: second float | |
| """ | |
| return a - b | |
| def multiply(a: float, b: float) -> float: | |
| """Multiply two numbers. | |
| Args: | |
| a: first float | |
| b: second float | |
| """ | |
| return a * b | |
| def divide(a: float, b: float) -> float: | |
| """Divide two numbers. | |
| Args: | |
| a: first float | |
| b: second float | |
| """ | |
| if b == 0: | |
| raise ValueError("Cannot divide any number by zero.") | |
| return a / b | |
| # Maybe add algebra tools??? | |
| ## Search Tools | |
| def DuckDuckGoSearchTool(query: str) -> str: | |
| """Search DuckDuckGo for a query and return maximum 5 results. | |
| Args: | |
| query: The search query. | |
| """ | |
| results = DuckDuckGoSearchAPIWrapper().results(query = query, max_results=5) | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{res["link"]}" title="{res["title"]}">\n{res["snippet"]}\n</Document>' | |
| for res in results | |
| ]) | |
| return {"web_results": formatted_search_docs} | |
| def TavilySearchTool(query: str) -> str: | |
| """Search Tavily for a query and return maximum 3 results. | |
| Args: | |
| query: The search query.""" | |
| search_docs = TavilySearchResults(max_results=5).invoke(query=query) | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return {"web_results": formatted_search_docs} | |
| def combined_web_search(query: str) -> str: | |
| """Search Google, DuckDuckGo, and Tavily for a query and return combined results.""" | |
| google_docs = GoogleSearchAPIWrapper(k=5).run(query) | |
| duck_docs = DuckDuckGoSearchAPIWrapper().results(query = query, max_results=5) | |
| tavily_docs = TavilySearchResults(max_results=5).invoke(query=query) | |
| all_docs = google_docs + duck_docs + tavily_docs | |
| formatted_results = "\n\n---\n\n".join( | |
| f'<Document source="{doc.metadata.get("source", "unknown")}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in all_docs | |
| ) | |
| return {"web_results": formatted_results} | |
| def WikipediaSearchTool(query: str) -> str: | |
| """Search Wikipedia for a query and return maximum 2 results. | |
| Args: | |
| query: The search query.""" | |
| search_docs = WikipediaLoader(query=query, load_max_docs=5).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return {"wiki_results": formatted_search_docs} | |
| def ArxivSearchTool(query: str) -> str: | |
| """Search Arxiv for a query and return maximum 3 result. | |
| Args: | |
| query: The search query.""" | |
| search_docs = ArxivLoader(query=query, load_max_docs=5).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return {"arvix_results": formatted_search_docs} | |
| def PubmedSearchTool(query: str) -> str: | |
| """Search Arxiv for a query and return maximum 3 result. | |
| Args: | |
| query: The search query.""" | |
| search_docs = PubMedLoader(query=query, load_max_docs=5).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["uid"]}" title="{doc.metadata["Title"]}"/>\n{doc.page_content[:1000]}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return {"pubmed_results": formatted_search_docs} | |
| def save_and_read_file(content: str, filename: Optional[str] = None) -> str: | |
| """Save content to a file and return the path. | |
| Args: | |
| content (str): the content to save to the file | |
| filename (str, optional): the name of the file. If not provided, a random name file will be created. | |
| """ | |
| temp_dir = tempfile.gettempdir() | |
| if filename is None: | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir) | |
| filepath = temp_file.name | |
| else: | |
| filepath = os.path.join(temp_dir, filename) | |
| with open(filepath, "w") as f: | |
| f.write(content) | |
| return f"File saved to {filepath}. You can read this file to process its contents." | |
| def download_file_from_url(url: str, filename: Optional[str] = None) -> str: | |
| """Download a file from a URL and save it to a temporary location. | |
| Args: | |
| url (str): the URL of the file to download. | |
| filename (str, optional): the name of the file. If not provided, a random name file will be created. | |
| """ | |
| try: | |
| # Parse URL to get filename if not provided | |
| if not filename: | |
| path = urlparse(url).path | |
| filename = os.path.basename(path) | |
| if not filename: | |
| filename = f"downloaded_{uuid.uuid4().hex[:8]}" | |
| # Create temporary file | |
| temp_dir = tempfile.gettempdir() | |
| filepath = os.path.join(temp_dir, filename) | |
| # Download the file | |
| response = requests.get(url, stream=True) | |
| response.raise_for_status() | |
| # Save the file | |
| with open(filepath, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return f"File downloaded to {filepath}. You can read this file to process its contents." | |
| except Exception as e: | |
| return f"Error downloading file: {str(e)}" | |
| def extract_text_from_image(image_path: str) -> str: | |
| """Extract text from an image using OCR library pytesseract (if available). | |
| Args: | |
| image_path (str): the path to the image file. | |
| """ | |
| try: | |
| # Open the image | |
| image = Image.open(image_path) | |
| # Extract text from the image | |
| text = pytesseract.image_to_string(image) | |
| return f"Extracted text from image:\n\n{text}" | |
| except Exception as e: | |
| return f"Error extracting text from image: {str(e)}" | |
| def analyze_csv_file(file_path: str, query: str) -> str: | |
| """Analyze a CSV file using pandas and answer a question about it. | |
| Args: | |
| file_path (str): the path to the CSV file. | |
| query (str): Question about the data | |
| """ | |
| try: | |
| # Read the CSV file | |
| df = pd.read_csv(file_path) | |
| # Run various analyses based on the query | |
| result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
| result += f"Columns: {', '.join(df.columns)}\n\n" | |
| # Add summary statistics | |
| result += "Summary statistics:\n" | |
| result += str(df.describe()) | |
| return result | |
| except Exception as e: | |
| return f"Error analyzing CSV file: {str(e)}" | |
| def analyze_excel_file(file_path: str, query: str) -> str: | |
| """Analyze an Excel file using pandas and answer a question about it. | |
| Args: | |
| file_path (str): the path to the Excel file. | |
| query (str): Question about the data | |
| """ | |
| try: | |
| # Read the Excel file | |
| df = pd.read_excel(file_path) | |
| # Run various analyses based on the query | |
| result = ( | |
| f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
| ) | |
| result += f"Columns: {', '.join(df.columns)}\n\n" | |
| # Add summary statistics | |
| result += "Summary statistics:\n" | |
| result += str(df.describe()) | |
| return result | |
| except Exception as e: | |
| return f"Error analyzing Excel file: {str(e)}" | |
| ## Analyze Youtube Transcript tools | |
| def extract_video_id(youtube_url: str) -> str | None: | |
| """Extract the video ID from a YouTube URL. | |
| Supports standard and shortened formats like: | |
| - https://www.youtube.com/watch?v=VIDEO_ID | |
| - https://youtu.be/VIDEO_ID | |
| """ | |
| try: | |
| parsed_url = urlparse(youtube_url) | |
| host = parsed_url.hostname | |
| if host in ("www.youtube.com", "youtube.com"): | |
| return parse_qs(parsed_url.query).get("v", [None])[0] | |
| elif host == "youtu.be": | |
| return parsed_url.path.strip("/") | |
| except Exception: | |
| return None | |
| return None | |
| def get_youtube_transcript(youtube_url: str) -> str: | |
| """Returns the transcript of a YouTube video as plain text. | |
| Use this tool to extract spoken words from videos for Q&A, summarization, | |
| or analysis. This does not include visual or on-screen content. | |
| """ | |
| video_id = extract_video_id(youtube_url) | |
| if not video_id: | |
| return "Invalid or unsupported YouTube URL format." | |
| try: | |
| transcript = YouTubeTranscriptApi.get_transcript(video_id) | |
| return " ".join(entry["text"] for entry in transcript) | |
| except Exception as e: | |
| return f"Transcript unavailable: {str(e)}" |