from langchain_community.document_loaders import WikipediaLoader from langchain_community.document_loaders import ArxivLoader from langchain_core.tools import tool from youtube_transcript_api import YouTubeTranscriptApi import os @tool def multiply(a: int, b: int) -> int: """Multiply two numbers. Args: a: first int b: second int """ return a * b @tool def wiki_search(query: str) -> str: """Search Wikipedia for a query and return up to 4 articles. Args: query: The search query.""" try: import wikipedia wikipedia.API_URL = "https://en.wikipedia.org/w/api.php" wikipedia.set_rate_limiting(True) search_docs = WikipediaLoader(query=query, load_max_docs=4).load() except Exception as e: return f"Wikipedia search failed: {e}" formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ]) return formatted_search_docs or "(no Wikipedia results)" @tool def web_search(query: str) -> str: """Search the public web via DuckDuckGo (no API key). Returns titles, URLs and short snippets. Args: query: The search query.""" try: from ddgs import DDGS except ImportError as e: return f"Web search unavailable (install ddgs): {e}" max_results = int(os.getenv("DDG_MAX_RESULTS", "8")) q = (query or "").strip() if not q: return "(empty query)" timeout = int(os.getenv("DDG_TIMEOUT", "25")) try: with DDGS(timeout=timeout) as ddgs: hits = list(ddgs.text(q, max_results=max_results)) except Exception as e: return f"DuckDuckGo search failed: {e}" if not hits: return "(no web results)" parts: list[str] = [] for r in hits: title = (r.get("title") or "").strip() url = (r.get("href") or r.get("url") or "").strip() body = (r.get("body") or "")[:1500] parts.append(f'\n{title}\n{body}\n') return "\n\n---\n\n".join(parts) @tool def arvix_search(query: str) -> str: """Search Arxiv for a query and return maximum 3 result. Args: query: The search query.""" try: search_docs = ArxivLoader(query=query, load_max_docs=3).load() except Exception as e: return f"Arxiv search failed: {e}" formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content[:1000]}\n' for doc in search_docs ]) return formatted_search_docs or "(no Arxiv results)" @tool def execute_python_code(source: str) -> str: """Run Python source in an isolated subprocess (same interpreter). Returns stdout; includes stderr if non-zero exit. Use when the question embeds or attaches Python code and you need the actual printed/numeric output. Args: source: Python source code to execute as a single string.""" import subprocess import sys import os proc = subprocess.run( [sys.executable, "-c", source], capture_output=True, text=True, timeout=int(os.getenv("PYTHON_TOOL_TIMEOUT", "45")), ) out = (proc.stdout or "").strip() err = (proc.stderr or "").strip() if proc.returncode != 0: combined = f"exit={proc.returncode}\nSTDOUT:\n{out}\nSTDERR:\n{err}".strip() return combined[:8000] text = out if out else "(empty stdout)" if err: text = f"{text}\nSTDERR:\n{err}" return text[:8000] @tool def read_excel_format(file_path: str) -> str: """Read an Excel (.xlsx) file and return all its sheets as Markdown tables. Use this tool whenever the question references a spreadsheet or .xlsx file. Prefer this over execute_python_code when you just need to read and reason about tabular data — no need to write any code. Args: file_path: Absolute path to the .xlsx file as provided in the 'file_path' field of the question. """ try: import pandas as pd except ImportError: return "pandas is not installed. Run: pip install pandas openpyxl" if not os.path.exists(file_path): return f"File not found: {file_path}" try: xl = pd.ExcelFile(file_path) except Exception as e: return f"Failed to open Excel file: {e}" filename = os.path.basename(file_path) parts: list[str] = [f"**File:** `{filename}`\n"] for sheet_name in xl.sheet_names: try: df = xl.parse(sheet_name) except Exception as e: parts.append(f"### Sheet: {sheet_name}\n(error reading sheet: {e})\n") continue parts.append(f"### Sheet: `{sheet_name}` — {df.shape[0]} rows × {df.shape[1]} columns\n") parts.append(df.to_markdown(index=False)) parts.append("") return "\n".join(parts) @tool def YouTubeVideoAnalysisTool(video_id: str) -> str: """ Fetches the transcript of a YouTube video by its ID and performs. Args: video_id: The ID of the YouTube video. Returns: video transcript in text format. """ try: fetched = YouTubeTranscriptApi().fetch(video_id) full_transcript = " ".join([snippet.text for snippet in fetched]) except Exception as e: return f"An error occurred while fetching the YouTube transcript: {e}" return "the transcript of the youtube video is the following: "+ full_transcript @tool def transcribe_mp3(file_path: str) -> str: """Transcribe an MP3 audio file to text using Whisper (Hugging Face Inference API). Use this tool when the question references an .mp3 audio file. Args: file_path: Absolute path to the .mp3 file. """ if not os.path.exists(file_path): return f"File not found: {file_path}" token = os.getenv("HF_TOKEN") if not token: return "HF_TOKEN is not set in the environment." try: from huggingface_hub import InferenceClient client = InferenceClient(api_key=token) with open(file_path, "rb") as f: output = client.automatic_speech_recognition( f.read(), model="openai/whisper-large-v3", ) return output.text or "(empty transcription)" except Exception as e: return f"Transcription failed: {e}"