from __future__ import annotations import os import json import csv from typing import Optional import pdfplumber import httpx from bs4 import BeautifulSoup from langchain_core.tools import tool from langchain_community.tools import DuckDuckGoSearchRun # ------------------------- # 1) DuckDuckGo search tool # ------------------------- _ddg = DuckDuckGoSearchRun() @tool("web_search") def web_search(query: str) -> str: """Search the web (DuckDuckGo) and return text results.""" # DuckDuckGoSearchRun returns a string summary of results return _ddg.run(query) # # ------------------------- # # 2) Local file reader tool # # ------------------------- # def _read_pdf(path: str) -> str: # text = [] # with pdfplumber.open(path) as pdf: # for page in pdf.pages: # page_text = page.extract_text() # if page_text: # text.append(page_text) # return "\n".join(text) # def _read_json(path: str) -> str: # with open(path, "r", encoding="utf-8") as f: # data = json.load(f) # return json.dumps(data, indent=2, ensure_ascii=False) # def _read_txt(path: str) -> str: # with open(path, "r", encoding="utf-8") as f: # return f.read() # def _read_csv(path: str) -> str: # rows = [] # with open(path, newline="", encoding="utf-8") as f: # reader = csv.reader(f) # for row in reader: # rows.append(", ".join(row)) # return "\n".join(rows) # @tool("file_reader") # def file_reader(path: str) -> str: # """ # Read local files and return extracted text. # Supports PDF, JSON, TXT, and CSV. # """ # if not os.path.exists(path): # return f"Error: file not found at {path}" # ext = os.path.splitext(path)[1].lower() # try: # if ext == ".pdf": # return _read_pdf(path) # if ext == ".json": # return _read_json(path) # if ext == ".txt": # return _read_txt(path) # if ext == ".csv": # return _read_csv(path) # return f"Unsupported file type: {ext}" # except Exception as e: # return f"Error reading file: {e}" # ------------------------- # 3) Web fetch tool # ------------------------- def _clean_html_to_text(html: str, max_lines: int = 5000) -> str: soup = BeautifulSoup(html, "html.parser") # Remove noisy tags for tag in soup(["script", "style", "noscript", "nav", "footer", "header", "aside"]): tag.decompose() text = soup.get_text(separator="\n") lines = [line.strip() for line in text.splitlines() if line.strip()] return "\n".join(lines[:max_lines]) @tool("web_fetch") def web_fetch(url: str) -> str: """ Retrieves and reads the text content of a specific URL. Use this to read articles, documentation, or static webpages. Do NOT use this tool for YouTube URLs (use 'youtube_transcript' instead). Limitations: - Returns cleaned plain text, not raw HTML. - Cannot execute JavaScript (may fail on heavy SPAs or dynamic sites). - Content is truncated at 5000 lines. """ try: with httpx.Client(follow_redirects=True, timeout=20) as client: r = client.get( url, headers={ # Some sites block empty UA; this helps "User-Agent": "Mozilla/5.0 (compatible; LangChainTool/1.0)" }, ) r.raise_for_status() return _clean_html_to_text(r.text, max_lines=5000) except Exception as e: return f"Error fetching page: {e}" from langchain_core.tools import tool from youtube_transcript_api import YouTubeTranscriptApi def _extract_video_id(url: str) -> str: # handles https://www.youtube.com/watch?v=VIDEOID import urllib.parse as up q = up.urlparse(url) if q.hostname in ("www.youtube.com", "youtube.com"): return up.parse_qs(q.query).get("v", [""])[0] if q.hostname == "youtu.be": return q.path.lstrip("/") return "" @tool("youtube_transcript") def youtube_transcript(url: str) -> str: """ Retrieves the full English transcript text from a YouTube video URL. Use this tool when a user asks questions about a video's content, wants a summary, or needs specific quotes. Note: This tool only supports videos with English captions/subtitles. """ vid = _extract_video_id(url) if not vid: return "Error: could not parse video id" try: chunks = YouTubeTranscriptApi.get_transcript(vid, languages=["en"]) return "\n".join([c["text"] for c in chunks]) except Exception as e: return f"Error fetching transcript: {e}"