Spaces:
Sleeping
Sleeping
| # src/functions.py | |
| import os | |
| import re | |
| from urllib.parse import urlparse, parse_qs, urlunparse | |
| from agents import function_tool | |
| from youtube_transcript_api import ( | |
| YouTubeTranscriptApi, | |
| TranscriptsDisabled, | |
| NoTranscriptFound, | |
| VideoUnavailable, | |
| ) | |
| from youtube_transcript_api.proxies import GenericProxyConfig | |
| # --------------------------- | |
| # YouTube URL / ID utilities | |
| # --------------------------- | |
| _YT_ID_RE = re.compile(r"^[a-zA-Z0-9_-]{11}$") | |
| def _extract_video_id(url_or_id: str) -> str | None: | |
| """ | |
| Accepts a raw 11-char video ID or any common YouTube URL: | |
| - https://www.youtube.com/watch?v=VIDEOID&... | |
| - https://youtu.be/VIDEOID?t=123 | |
| - https://www.youtube.com/shorts/VIDEOID | |
| - https://www.youtube.com/embed/VIDEOID | |
| Ignores extra params (list, t, etc.). | |
| """ | |
| s = (url_or_id or "").strip() | |
| # Bare ID | |
| if _YT_ID_RE.match(s): | |
| return s | |
| p = urlparse(s) | |
| if not p.netloc: | |
| return None | |
| # /watch?v=VIDEOID | |
| if p.path == "/watch": | |
| v = parse_qs(p.query).get("v", [None])[0] | |
| return v if v and _YT_ID_RE.match(v) else None | |
| # youtu.be/VIDEOID | |
| if p.netloc.endswith("youtu.be"): | |
| vid = p.path.lstrip("/") | |
| return vid if _YT_ID_RE.match(vid) else None | |
| # /shorts/VIDEOID or /embed/VIDEOID | |
| parts = p.path.strip("/").split("/") | |
| if len(parts) >= 2 and parts[0] in ("shorts", "embed"): | |
| vid = parts[1] | |
| return vid if _YT_ID_RE.match(vid) else None | |
| return None | |
| # --------------------------- | |
| # Proxy configuration | |
| # --------------------------- | |
| def _build_proxy_config() -> GenericProxyConfig | None: | |
| """ | |
| Supports these envs (Repository secrets on HF Spaces): | |
| - PROXY_AUTH_URL = http://USER:PASS@HOST:PORT (preferred) | |
| - OR: | |
| PROXY_URL = http://HOST:PORT (or https://HOST:PORT) | |
| PROXY_USERNAME = user (optional) | |
| PROXY_PASSWORD = pass (optional) | |
| Returns a youtube_transcript_api GenericProxyConfig if possible, else None. | |
| """ | |
| auth_url = os.getenv("PROXY_AUTH_URL", "").strip() | |
| if auth_url: | |
| # If scheme missing, assume http | |
| if not auth_url.startswith(("http://", "https://")): | |
| auth_url = "http://" + auth_url | |
| # Build both http/https variants if needed | |
| http_url = auth_url.replace("https://", "http://") | |
| https_url = auth_url.replace("http://", "https://") | |
| return GenericProxyConfig(http_url=http_url, https_url=https_url) | |
| base = os.getenv("PROXY_URL", "").strip() | |
| user = os.getenv("PROXY_USERNAME", "").strip() | |
| pwd = os.getenv("PROXY_PASSWORD", "").strip() | |
| if not base: | |
| return None | |
| # Ensure scheme; default to http | |
| if not base.startswith(("http://", "https://")): | |
| base = "http://" + base | |
| if user and pwd: | |
| # Insert credentials into netloc | |
| p = urlparse(base) | |
| netloc = f"{user}:{pwd}@{p.hostname}" | |
| if p.port: | |
| netloc += f":{p.port}" | |
| authd = urlunparse((p.scheme, netloc, p.path or "", "", "", "")) | |
| http_url = authd.replace("https://", "http://") | |
| https_url = authd.replace("http://", "https://") | |
| return GenericProxyConfig(http_url=http_url, https_url=https_url) | |
| else: | |
| # No-auth proxy | |
| http_url = base.replace("https://", "http://") | |
| https_url = base.replace("http://", "https://") | |
| return GenericProxyConfig(http_url=http_url, https_url=https_url) | |
| def _export_proxy_env() -> None: | |
| """ | |
| Universal fallback: export HTTP(S)_PROXY so underlying HTTP client (requests/httpx) | |
| uses the proxy even if youtube-transcript-api signature changes. | |
| """ | |
| p = (os.getenv("PROXY_AUTH_URL") or os.getenv("PROXY_URL") or "").strip() | |
| if not p: | |
| return | |
| if not p.startswith(("http://", "https://")): | |
| p = "http://" + p | |
| os.environ["HTTP_PROXY"] = p | |
| os.environ["HTTPS_PROXY"] = p | |
| # --------------------------- | |
| # Formatting | |
| # --------------------------- | |
| def _format_transcript(entries: list[dict]) -> str: | |
| """ | |
| entries: list of dicts like {"text": "...", "start": 12.34, "duration": 3.21} | |
| Output: one line per entry, "[MM:SS] Text" | |
| """ | |
| lines = [] | |
| for e in entries: | |
| try: | |
| start = float(e.get("start", 0)) | |
| except Exception: | |
| start = 0.0 | |
| minutes = int(start // 60) | |
| seconds = int(start % 60) | |
| ts = f"[{minutes:02d}:{seconds:02d}]" | |
| text = (e.get("text") or "").replace("\n", " ").strip() | |
| if text: | |
| lines.append(f"{ts} {text}") | |
| return "\n".join(lines) | |
| # --------------------------- | |
| # Tools | |
| # --------------------------- | |
| def fetch_video_transcript(url: str) -> str: | |
| """ | |
| Extract transcript with timestamps from a YouTube video URL and format it for LLM consumption. | |
| Args: | |
| url (str): YouTube video URL (any common form is accepted) | |
| Returns: | |
| str: Formatted transcript with timestamps, one per line: "[MM:SS] Text" | |
| or a specific, user-friendly error message. | |
| """ | |
| video_id = _extract_video_id(url) | |
| if not video_id: | |
| return "❌ I couldn’t parse a valid YouTube video ID from your input. Please paste a direct video link." | |
| # Make sure the environment knows about the proxy universally | |
| _export_proxy_env() | |
| proxy_cfg = _build_proxy_config() | |
| preferred_langs = ["en", "en-US", "en-GB", "ko", "ja"] | |
| # Helper: call get_transcript with fallback to older API style | |
| def _get_transcript_any() -> list[dict]: | |
| try: | |
| return YouTubeTranscriptApi.get_transcript( | |
| video_id, | |
| languages=preferred_langs, | |
| proxy=proxy_cfg, # newer APIs | |
| ) | |
| except TypeError: | |
| # older style | |
| return YouTubeTranscriptApi(proxy_config=proxy_cfg).fetch(video_id) | |
| # Helper: call list_transcripts with fallback | |
| def _list_transcripts_any(): | |
| try: | |
| return YouTubeTranscriptApi.list_transcripts(video_id, proxy=proxy_cfg) | |
| except TypeError: | |
| return YouTubeTranscriptApi(proxy_config=proxy_cfg).list_transcripts(video_id) | |
| try: | |
| # Fast path: direct fetch with preferred langs | |
| entries = _get_transcript_any() | |
| return _format_transcript(entries) | |
| except NoTranscriptFound: | |
| # Try listing to find auto-generated or translatable transcripts | |
| try: | |
| listing = _list_transcripts_any() | |
| # 1) Exact language match (non-generated) | |
| for lang in preferred_langs: | |
| try: | |
| t = listing.find_transcript([lang]) | |
| return _format_transcript(t.fetch()) | |
| except Exception: | |
| pass | |
| # 2) Auto-generated (first available) | |
| for tr in listing: | |
| if getattr(tr, "is_generated", False): | |
| try: | |
| return _format_transcript(tr.fetch()) | |
| except Exception: | |
| pass | |
| # 3) Translate to English as last resort | |
| for tr in listing: | |
| try: | |
| t_en = tr.translate("en") | |
| return _format_transcript(t_en.fetch()) | |
| except Exception: | |
| continue | |
| return "❌ No transcript is available for this video (no captions found, even auto-generated)." | |
| except TranscriptsDisabled: | |
| return "❌ Transcripts are disabled for this video." | |
| except VideoUnavailable: | |
| return "❌ This video is unavailable in the current region or has restrictions." | |
| except Exception as e: | |
| # Likely network/restrictions if not a true NoTranscriptFound | |
| return f"⚠️ Error while searching transcripts: {e}" | |
| except TranscriptsDisabled: | |
| return "❌ Transcripts are disabled for this video." | |
| except VideoUnavailable: | |
| return "❌ This video is unavailable in the current region or has restrictions." | |
| except Exception as e: | |
| # Most common here: connection error / blocked / proxy needed | |
| hint = "" | |
| if not (os.getenv("PROXY_AUTH_URL") or os.getenv("PROXY_URL")): | |
| hint = " (Tip: if this Space is on Hugging Face, set a proxy via PROXY_AUTH_URL or PROXY_URL/USERNAME/PASSWORD in Repository secrets.)" | |
| return f"⚠️ Error fetching transcript: {e}{hint}" | |
| def fetch_intstructions(prompt_name: str) -> str: | |
| """ | |
| Fetch instructions for a given prompt name from the prompts/ directory. | |
| Available prompts: | |
| - write_blog_post | |
| - write_social_post | |
| - write_video_chapters | |
| """ | |
| script_dir = os.path.dirname(__file__) | |
| prompt_path = os.path.join(script_dir, "prompts", f"{prompt_name}.md") | |
| with open(prompt_path, "r", encoding="utf-8") as f: | |
| return f.read() | |