File size: 4,729 Bytes
d6a77ec
e2815a0
d6a77ec
 
e2815a0
d6a77ec
e2815a0
d6a77ec
e2815a0
d6a77ec
 
 
e2815a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156

from __future__ import annotations

import os
import json
import csv
from typing import Optional

import pdfplumber
import httpx
from bs4 import BeautifulSoup

from langchain_core.tools import tool
from langchain_community.tools import DuckDuckGoSearchRun


# -------------------------
# 1) DuckDuckGo search tool
# -------------------------
_ddg = DuckDuckGoSearchRun()

@tool("web_search")
def web_search(query: str) -> str:
    """Search the web (DuckDuckGo) and return text results."""
    # DuckDuckGoSearchRun returns a string summary of results
    return _ddg.run(query)


# # -------------------------
# # 2) Local file reader tool
# # -------------------------
# def _read_pdf(path: str) -> str:
#     text = []
#     with pdfplumber.open(path) as pdf:
#         for page in pdf.pages:
#             page_text = page.extract_text()
#             if page_text:
#                 text.append(page_text)
#     return "\n".join(text)

# def _read_json(path: str) -> str:
#     with open(path, "r", encoding="utf-8") as f:
#         data = json.load(f)
#     return json.dumps(data, indent=2, ensure_ascii=False)

# def _read_txt(path: str) -> str:
#     with open(path, "r", encoding="utf-8") as f:
#         return f.read()

# def _read_csv(path: str) -> str:
#     rows = []
#     with open(path, newline="", encoding="utf-8") as f:
#         reader = csv.reader(f)
#         for row in reader:
#             rows.append(", ".join(row))
#     return "\n".join(rows)

# @tool("file_reader")
# def file_reader(path: str) -> str:
#     """
#     Read local files and return extracted text.
#     Supports PDF, JSON, TXT, and CSV.
#     """
#     if not os.path.exists(path):
#         return f"Error: file not found at {path}"

#     ext = os.path.splitext(path)[1].lower()

#     try:
#         if ext == ".pdf":
#             return _read_pdf(path)
#         if ext == ".json":
#             return _read_json(path)
#         if ext == ".txt":
#             return _read_txt(path)
#         if ext == ".csv":
#             return _read_csv(path)
#         return f"Unsupported file type: {ext}"
#     except Exception as e:
#         return f"Error reading file: {e}"


# -------------------------
# 3) Web fetch tool
# -------------------------
def _clean_html_to_text(html: str, max_lines: int = 5000) -> str:
    soup = BeautifulSoup(html, "html.parser")

    # Remove noisy tags
    for tag in soup(["script", "style", "noscript", "nav", "footer", "header", "aside"]):
        tag.decompose()

    text = soup.get_text(separator="\n")
    lines = [line.strip() for line in text.splitlines() if line.strip()]
    return "\n".join(lines[:max_lines])

@tool("web_fetch")
def web_fetch(url: str) -> str:
    """
    Retrieves and reads the text content of a specific URL. 
    
    Use this to read articles, documentation, or static webpages.
    
    Do NOT use this tool for YouTube URLs (use 'youtube_transcript' instead).
    Limitations:
    - Returns cleaned plain text, not raw HTML.
    - Cannot execute JavaScript (may fail on heavy SPAs or dynamic sites).
    - Content is truncated at 5000 lines.
    """
    try:
        with httpx.Client(follow_redirects=True, timeout=20) as client:
            r = client.get(
                url,
                headers={
                    # Some sites block empty UA; this helps
                    "User-Agent": "Mozilla/5.0 (compatible; LangChainTool/1.0)"
                },
            )
            r.raise_for_status()

        return _clean_html_to_text(r.text, max_lines=5000)
    except Exception as e:
        return f"Error fetching page: {e}"

from langchain_core.tools import tool
from youtube_transcript_api import YouTubeTranscriptApi

def _extract_video_id(url: str) -> str:
    # handles https://www.youtube.com/watch?v=VIDEOID
    import urllib.parse as up
    q = up.urlparse(url)
    if q.hostname in ("www.youtube.com", "youtube.com"):
        return up.parse_qs(q.query).get("v", [""])[0]
    if q.hostname == "youtu.be":
        return q.path.lstrip("/")
    return ""

@tool("youtube_transcript")
def youtube_transcript(url: str) -> str:
    """
    Retrieves the full English transcript text from a YouTube video URL.
    
    Use this tool when a user asks questions about a video's content, wants a summary, 
    or needs specific quotes. 
    
    Note: This tool only supports videos with English captions/subtitles.
    """
    vid = _extract_video_id(url)
    if not vid:
        return "Error: could not parse video id"
    try:
        chunks = YouTubeTranscriptApi.get_transcript(vid, languages=["en"])
        return "\n".join([c["text"] for c in chunks])
    except Exception as e:
        return f"Error fetching transcript: {e}"