File size: 5,638 Bytes
a30fc90
3bf4af2
a30fc90
 
3bf4af2
a30fc90
3bf4af2
a30fc90
 
 
 
 
3bf4af2
 
 
 
 
 
 
a30fc90
 
 
 
 
 
 
3bf4af2
a30fc90
3bf4af2
 
 
 
a30fc90
3bf4af2
 
 
 
a30fc90
 
 
 
 
 
 
 
 
3bf4af2
 
a30fc90
 
 
 
 
 
 
 
 
3bf4af2
 
a30fc90
 
 
 
 
 
 
 
 
3bf4af2
 
a30fc90
3bf4af2
a30fc90
 
 
 
3bf4af2
a30fc90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3bf4af2
 
a30fc90
3bf4af2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a30fc90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#!/usr/bin/env python3
from __future__ import annotations
import re
from typing import Optional, List
from bs4 import BeautifulSoup
import requests
from langchain_core.tools import tool

# ------------------ CONFIG ----------------------

DEFAULT_TIMEOUT = 20
HEADERS = {
    # Helps avoid consent/anti-bot interstitials on some sites
    "User-Agent": (
        "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
        "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
    )
}

INVIDIOUS_POOL = [
    "https://yewtu.be",
    "https://inv.nadeko.net",
    "https://invidious.tiekoetter.com/",
    "https://invidious.f5.si",
    "https://invidious.nerdvpn.de",
]

# ------------------ HELPERS ----------------------


def _fetch_html(url: str, *, timeout: int = DEFAULT_TIMEOUT) -> str:
    """Download raw HTML for a URL or raise on HTTP errors."""
    resp = requests.get(url, headers=HEADERS, timeout=timeout)
    resp.raise_for_status()
    return resp.text


def _http_json(base: str, path: str, **params):
    r = requests.get(
        base.rstrip("/") + path, params=params, headers=HEADERS, timeout=DEFAULT_TIMEOUT
    )
    if r.status_code in (404, 410):
        raise FileNotFoundError(f"No captions on {base}")
    if r.status_code >= 400:
        raise requests.HTTPError(f"{r.status_code} at {base}{path}")
    return r.json()


def _http_text(base: str, path: str, **params) -> str:
    r = requests.get(
        base.rstrip("/") + path, params=params, headers=HEADERS, timeout=DEFAULT_TIMEOUT
    )
    if r.status_code in (404, 410):
        raise FileNotFoundError(f"No captions on {base}")
    if r.status_code >= 400:
        raise requests.HTTPError(f"{r.status_code} at {base}{path}")
    return r.text


def _vtt_to_text(vtt: str) -> str:
    lines = []
    for line in vtt.splitlines():
        if not line or line.startswith("WEBVTT"):
            continue
        if re.match(r"\d{2}:\d{2}:\d{2}\.\d{3} --> ", line) or line.isdigit():
            continue
        lines.append(line)
    return re.sub(r"<[^>]+>", "", "\n".join(lines)).strip()


# ------------------ YOUTUBE FETCHERS ----------------------


def extract_video_id(video_url: str) -> str:
    """
    Extract a YouTube video ID from any URL format (watch?v=, youtu.be/, shorts/, embed/, or raw ID).
    """
    url = (video_url or "").strip()
    if re.fullmatch(r"[A-Za-z0-9_-]{11}", url):
        return url

    patterns = [
        r"[?&]v=([A-Za-z0-9_-]{11})",
        r"youtu\.be/([A-Za-z0-9_-]{11})",
        r"youtube\.com/embed/([A-Za-z0-9_-]{11})",
        r"youtube\.com/shorts/([A-Za-z0-9_-]{11})",
    ]
    for pat in patterns:
        m = re.search(pat, url)
        if m:
            return m.group(1)

    m = re.search(r"/([A-Za-z0-9_-]{11})(?:\?|$)", url)
    if m:
        return m.group(1)

    raise ValueError(
        "Unable to extract a valid YouTube video ID from the provided input."
    )


@tool("get_youtube_title_description")
def get_youtube_title_description(video_url: str) -> str:
    """
    get_youtube title and description from youtube url (ex: https://www.youtube.com/watch?v=1htKBjuUWec)

    Extract YouTube title + description from Open Graph meta tags.
    Falls back to standard <meta name="title"/"description"> if needed.
    """
    html = _fetch_html(video_url)
    soup = BeautifulSoup(html, "html.parser")

    title_tag = soup.find("meta", property="og:title") or soup.find(
        "meta", attrs={"name": "title"}
    )
    desc_tag = soup.find("meta", property="og:description") or soup.find(
        "meta", attrs={"name": "description"}
    )

    title = (title_tag.get("content") if title_tag else None) or "No title found"
    description = (
        desc_tag.get("content") if desc_tag else None
    ) or "No description found"

    return f"Title: {title}\nDescription: {description}"


@tool("get_youtube_transcript")
def get_youtube_transcript_tool(
    video_id: str, langs: Optional[List[str]] = None
) -> str:
    """
    Tool: return YouTube transcript text for a video ID.
    """
    langs = langs or ["en", "en-US", "fr", "fr-FR"]
    last_err = None
    for base in INVIDIOUS_POOL:
        try:
            caps = _http_json(base, f"/api/v1/captions/{video_id}")
            tracks = caps.get("captions", caps) if isinstance(caps, dict) else caps
            if not isinstance(tracks, list) or not tracks:
                continue
            pick = None
            for lang in langs:
                pick = next(
                    (
                        c
                        for c in tracks
                        if (c.get("languageCode", "").lower() == lang.lower())
                        or (
                            c.get("label", "")
                            .lower()
                            .startswith(lang.split("-")[0].lower())
                        )
                    ),
                    None,
                )
                if pick:
                    break
            pick = pick or tracks[0]
            vtt = (
                _http_text(base, pick["url"])
                if pick.get("url")
                else _http_text(
                    base,
                    f"/api/v1/captions/{video_id}",
                    label=pick.get("label") or pick.get("languageCode") or "English",
                    lang=pick.get("languageCode") or "en",
                )
            )
            text = _vtt_to_text(vtt)
            if text:
                return text
        except Exception as e:
            last_err = e
            continue
    raise RuntimeError(f"No captions available. Last error: {last_err}")