File size: 8,950 Bytes
96966f9
c3a8356
20613bf
 
 
bcd4ece
20613bf
 
 
 
 
 
 
 
96966f9
20613bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96966f9
20613bf
 
 
 
 
 
 
 
 
 
 
 
 
 
96966f9
20613bf
 
 
96966f9
20613bf
 
 
 
 
 
 
 
 
 
 
 
 
bcd4ece
20613bf
96966f9
 
 
 
 
 
 
 
 
 
 
 
 
 
20613bf
 
 
 
 
 
 
 
 
 
96966f9
 
 
 
20613bf
 
 
 
 
 
 
 
 
 
 
 
bcd4ece
b0b8612
bcd4ece
20613bf
 
bcd4ece
20613bf
 
bcd4ece
20613bf
 
bcd4ece
20613bf
 
 
 
96966f9
 
20613bf
96966f9
20613bf
 
96966f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c3a8356
20613bf
96966f9
20613bf
 
 
 
 
96966f9
c3a8356
20613bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96966f9
 
 
 
 
 
20613bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bcd4ece
 
 
 
20613bf
 
 
 
 
bcd4ece
d147352
 
20613bf
96966f9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# src/functions.py
import os
import re
from urllib.parse import urlparse, parse_qs, urlunparse

from agents import function_tool
from youtube_transcript_api import (
    YouTubeTranscriptApi,
    TranscriptsDisabled,
    NoTranscriptFound,
    VideoUnavailable,
)
from youtube_transcript_api.proxies import GenericProxyConfig


# ---------------------------
# YouTube URL / ID utilities
# ---------------------------
_YT_ID_RE = re.compile(r"^[a-zA-Z0-9_-]{11}$")

def _extract_video_id(url_or_id: str) -> str | None:
    """
    Accepts a raw 11-char video ID or any common YouTube URL:
      - https://www.youtube.com/watch?v=VIDEOID&...
      - https://youtu.be/VIDEOID?t=123
      - https://www.youtube.com/shorts/VIDEOID
      - https://www.youtube.com/embed/VIDEOID
    Ignores extra params (list, t, etc.).
    """
    s = (url_or_id or "").strip()

    # Bare ID
    if _YT_ID_RE.match(s):
        return s

    p = urlparse(s)
    if not p.netloc:
        return None

    # /watch?v=VIDEOID
    if p.path == "/watch":
        v = parse_qs(p.query).get("v", [None])[0]
        return v if v and _YT_ID_RE.match(v) else None

    # youtu.be/VIDEOID
    if p.netloc.endswith("youtu.be"):
        vid = p.path.lstrip("/")
        return vid if _YT_ID_RE.match(vid) else None

    # /shorts/VIDEOID or /embed/VIDEOID
    parts = p.path.strip("/").split("/")
    if len(parts) >= 2 and parts[0] in ("shorts", "embed"):
        vid = parts[1]
        return vid if _YT_ID_RE.match(vid) else None

    return None


# ---------------------------
# Proxy configuration
# ---------------------------
def _build_proxy_config() -> GenericProxyConfig | None:
    """
    Supports these envs (Repository secrets on HF Spaces):
      - PROXY_AUTH_URL = http://USER:PASS@HOST:PORT (preferred)
      - OR:
          PROXY_URL      = http://HOST:PORT (or https://HOST:PORT)
          PROXY_USERNAME = user (optional)
          PROXY_PASSWORD = pass (optional)
    Returns a youtube_transcript_api GenericProxyConfig if possible, else None.
    """
    auth_url = os.getenv("PROXY_AUTH_URL", "").strip()
    if auth_url:
        # If scheme missing, assume http
        if not auth_url.startswith(("http://", "https://")):
            auth_url = "http://" + auth_url
        # Build both http/https variants if needed
        http_url = auth_url.replace("https://", "http://")
        https_url = auth_url.replace("http://", "https://")
        return GenericProxyConfig(http_url=http_url, https_url=https_url)

    base = os.getenv("PROXY_URL", "").strip()
    user = os.getenv("PROXY_USERNAME", "").strip()
    pwd  = os.getenv("PROXY_PASSWORD", "").strip()

    if not base:
        return None

    # Ensure scheme; default to http
    if not base.startswith(("http://", "https://")):
        base = "http://" + base

    if user and pwd:
        # Insert credentials into netloc
        p = urlparse(base)
        netloc = f"{user}:{pwd}@{p.hostname}"
        if p.port:
            netloc += f":{p.port}"
        authd = urlunparse((p.scheme, netloc, p.path or "", "", "", ""))
        http_url = authd.replace("https://", "http://")
        https_url = authd.replace("http://", "https://")
        return GenericProxyConfig(http_url=http_url, https_url=https_url)
    else:
        # No-auth proxy
        http_url = base.replace("https://", "http://")
        https_url = base.replace("http://", "https://")
        return GenericProxyConfig(http_url=http_url, https_url=https_url)


def _export_proxy_env() -> None:
    """
    Universal fallback: export HTTP(S)_PROXY so underlying HTTP client (requests/httpx)
    uses the proxy even if youtube-transcript-api signature changes.
    """
    p = (os.getenv("PROXY_AUTH_URL") or os.getenv("PROXY_URL") or "").strip()
    if not p:
        return
    if not p.startswith(("http://", "https://")):
        p = "http://" + p
    os.environ["HTTP_PROXY"] = p
    os.environ["HTTPS_PROXY"] = p


# ---------------------------
# Formatting
# ---------------------------
def _format_transcript(entries: list[dict]) -> str:
    """
    entries: list of dicts like {"text": "...", "start": 12.34, "duration": 3.21}
    Output: one line per entry, "[MM:SS] Text"
    """
    lines = []
    for e in entries:
        try:
            start = float(e.get("start", 0))
        except Exception:
            start = 0.0
        minutes = int(start // 60)
        seconds = int(start % 60)
        ts = f"[{minutes:02d}:{seconds:02d}]"
        text = (e.get("text") or "").replace("\n", " ").strip()
        if text:
            lines.append(f"{ts} {text}")
    return "\n".join(lines)


# ---------------------------
# Tools
# ---------------------------
@function_tool
def fetch_video_transcript(url: str) -> str:
    """
    Extract transcript with timestamps from a YouTube video URL and format it for LLM consumption.

    Args:
        url (str): YouTube video URL (any common form is accepted)

    Returns:
        str: Formatted transcript with timestamps, one per line: "[MM:SS] Text"
             or a specific, user-friendly error message.
    """
    video_id = _extract_video_id(url)
    if not video_id:
        return "❌ I couldn’t parse a valid YouTube video ID from your input. Please paste a direct video link."

    # Make sure the environment knows about the proxy universally
    _export_proxy_env()

    proxy_cfg = _build_proxy_config()
    preferred_langs = ["en", "en-US", "en-GB", "ko", "ja"]

    # Helper: call get_transcript with fallback to older API style
    def _get_transcript_any() -> list[dict]:
        try:
            return YouTubeTranscriptApi.get_transcript(
                video_id,
                languages=preferred_langs,
                proxy=proxy_cfg,  # newer APIs
            )
        except TypeError:
            # older style
            return YouTubeTranscriptApi(proxy_config=proxy_cfg).fetch(video_id)

    # Helper: call list_transcripts with fallback
    def _list_transcripts_any():
        try:
            return YouTubeTranscriptApi.list_transcripts(video_id, proxy=proxy_cfg)
        except TypeError:
            return YouTubeTranscriptApi(proxy_config=proxy_cfg).list_transcripts(video_id)

    try:
        # Fast path: direct fetch with preferred langs
        entries = _get_transcript_any()
        return _format_transcript(entries)

    except NoTranscriptFound:
        # Try listing to find auto-generated or translatable transcripts
        try:
            listing = _list_transcripts_any()

            # 1) Exact language match (non-generated)
            for lang in preferred_langs:
                try:
                    t = listing.find_transcript([lang])
                    return _format_transcript(t.fetch())
                except Exception:
                    pass

            # 2) Auto-generated (first available)
            for tr in listing:
                if getattr(tr, "is_generated", False):
                    try:
                        return _format_transcript(tr.fetch())
                    except Exception:
                        pass

            # 3) Translate to English as last resort
            for tr in listing:
                try:
                    t_en = tr.translate("en")
                    return _format_transcript(t_en.fetch())
                except Exception:
                    continue

            return "❌ No transcript is available for this video (no captions found, even auto-generated)."

        except TranscriptsDisabled:
            return "❌ Transcripts are disabled for this video."
        except VideoUnavailable:
            return "❌ This video is unavailable in the current region or has restrictions."
        except Exception as e:
            # Likely network/restrictions if not a true NoTranscriptFound
            return f"⚠️ Error while searching transcripts: {e}"

    except TranscriptsDisabled:
        return "❌ Transcripts are disabled for this video."
    except VideoUnavailable:
        return "❌ This video is unavailable in the current region or has restrictions."
    except Exception as e:
        # Most common here: connection error / blocked / proxy needed
        hint = ""
        if not (os.getenv("PROXY_AUTH_URL") or os.getenv("PROXY_URL")):
            hint = " (Tip: if this Space is on Hugging Face, set a proxy via PROXY_AUTH_URL or PROXY_URL/USERNAME/PASSWORD in Repository secrets.)"
        return f"⚠️ Error fetching transcript: {e}{hint}"


@function_tool
def fetch_intstructions(prompt_name: str) -> str:
    """
    Fetch instructions for a given prompt name from the prompts/ directory.
    Available prompts:
      - write_blog_post
      - write_social_post
      - write_video_chapters
    """
    script_dir = os.path.dirname(__file__)
    prompt_path = os.path.join(script_dir, "prompts", f"{prompt_name}.md")
    with open(prompt_path, "r", encoding="utf-8") as f:
        return f.read()