Spaces:
Sleeping
Sleeping
| """ | |
| fetcher.py — YouTube Data API v3 helpers | |
| """ | |
| import re | |
| import requests | |
| import pandas as pd | |
| # Video ID extraction | |
| def extract_video_id(url_or_id: str) -> str | None: | |
| """Return an 11-char YouTube video ID, or None if not found.""" | |
| patterns = [ | |
| r"(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/|youtube\.com/shorts/)([a-zA-Z0-9_-]{11})", | |
| r"^([a-zA-Z0-9_-]{11})$", | |
| ] | |
| for pattern in patterns: | |
| m = re.search(pattern, url_or_id) | |
| if m: | |
| return m.group(1) | |
| return None | |
| # Duration parser | |
| def _parse_duration(iso: str) -> str: | |
| m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", iso or "PT0S") | |
| if not m: | |
| return "0:00" | |
| h, mn, s = (int(x or 0) for x in m.groups()) | |
| return f"{h}:{mn:02d}:{s:02d}" if h else f"{mn}:{s:02d}" | |
| # Metadata | |
| def fetch_video_metadata(video_id: str, api_key: str) -> tuple[dict | None, str | None]: | |
| """Return (meta_dict, error_string). One will be None.""" | |
| try: | |
| resp = requests.get( | |
| "https://www.googleapis.com/youtube/v3/videos", | |
| params={ | |
| "id": video_id, | |
| "key": api_key, | |
| "part": "snippet,statistics,contentDetails", | |
| }, | |
| timeout=15, | |
| ) | |
| data = resp.json() | |
| if "error" in data: | |
| return None, data["error"].get("message", "YouTube API error") | |
| items = data.get("items", []) | |
| if not items: | |
| return None, "Video not found — check the ID or URL." | |
| item = items[0] | |
| sn = item.get("snippet", {}) | |
| st = item.get("statistics", {}) | |
| cd = item.get("contentDetails", {}) | |
| meta = { | |
| "title": sn.get("title", "Unknown"), | |
| "description": sn.get("description", ""), | |
| "channel_title": sn.get("channelTitle", "Unknown"), | |
| "published_at": sn.get("publishedAt", "")[:10], | |
| "tags": sn.get("tags", []), | |
| "thumbnail_url": ( | |
| sn.get("thumbnails", {}).get("high", {}).get("url", "") | |
| or sn.get("thumbnails", {}).get("medium", {}).get("url", "") | |
| ), | |
| "view_count": int(st.get("viewCount", 0)), | |
| "like_count": int(st.get("likeCount", 0)), | |
| "comment_count": int(st.get("commentCount", 0)), | |
| "duration": _parse_duration(cd.get("duration", "PT0S")), | |
| } | |
| return meta, None | |
| except requests.exceptions.Timeout: | |
| return None, "Request timed out. Check your internet connection." | |
| except Exception as exc: | |
| return None, str(exc) | |
| # Transcript | |
| def fetch_transcript(video_id: str) -> tuple[str, str]: | |
| """Return (text, status_message).""" | |
| try: | |
| from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound | |
| segments = YouTubeTranscriptApi.get_transcript(video_id) | |
| text = " ".join(s["text"] for s in segments) | |
| return text, f" Transcript: {len(text.split())} words" | |
| except Exception as exc: | |
| short = str(exc)[:80] | |
| return "", f" Transcript unavailable: {short}" | |
| # Comments | |
| def fetch_comments( | |
| video_id: str, | |
| api_key: str, | |
| max_comments: int = 150, | |
| ) -> tuple[pd.DataFrame, str]: | |
| """Return (DataFrame, status_message).""" | |
| rows = [] | |
| next_token = None | |
| try: | |
| while len(rows) < max_comments: | |
| want = min(100, max_comments - len(rows)) | |
| params = { | |
| "videoId": video_id, | |
| "key": api_key, | |
| "part": "snippet", | |
| "maxResults": want, | |
| "order": "relevance", | |
| } | |
| if next_token: | |
| params["pageToken"] = next_token | |
| resp = requests.get( | |
| "https://www.googleapis.com/youtube/v3/commentThreads", | |
| params=params, | |
| timeout=15, | |
| ) | |
| data = resp.json() | |
| if "error" in data: | |
| msg = data["error"].get("message", "Comment API error") | |
| break | |
| for item in data.get("items", []): | |
| c = item["snippet"]["topLevelComment"]["snippet"] | |
| rows.append({ | |
| "author": c.get("authorDisplayName", ""), | |
| "text": c.get("textDisplay", ""), | |
| "likes": int(c.get("likeCount", 0)), | |
| "published_at": c.get("publishedAt", "")[:10], | |
| }) | |
| next_token = data.get("nextPageToken") | |
| if not next_token or not data.get("items"): | |
| break | |
| if not rows: | |
| return pd.DataFrame(), " No comments fetched (comments may be disabled)" | |
| df = pd.DataFrame(rows) | |
| return df, f" Comments: {len(df)} fetched" | |
| except requests.exceptions.Timeout: | |
| return pd.DataFrame(), " Comments request timed out" | |
| except Exception as exc: | |
| return pd.DataFrame(), f" Comments error: {str(exc)[:80]}" | |
| # Search by keyword | |
| def search_videos_by_title( | |
| keyword: str, | |
| api_key: str, | |
| max_results: int = 5, | |
| ) -> list[dict]: | |
| try: | |
| resp = requests.get( | |
| "https://www.googleapis.com/youtube/v3/search", | |
| params={ | |
| "q": keyword, | |
| "key": api_key, | |
| "part": "snippet", | |
| "type": "video", | |
| "maxResults": max_results, | |
| }, | |
| timeout=15, | |
| ) | |
| data = resp.json() | |
| results = [] | |
| for item in data.get("items", []): | |
| vid_id = item.get("id", {}).get("videoId", "") | |
| sn = item.get("snippet", {}) | |
| if not vid_id: | |
| continue | |
| results.append({ | |
| "video_id": vid_id, | |
| "title": sn.get("title", ""), | |
| "channel_title": sn.get("channelTitle", ""), | |
| "published_at": sn.get("publishedAt", "")[:10], | |
| "thumbnail_url": sn.get("thumbnails", {}).get("medium", {}).get("url", ""), | |
| }) | |
| return results | |
| except Exception: | |
| return [] |