File size: 6,595 Bytes
e317d56
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46cc63a
e317d56
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""YouTube comment fetch and suggested-video metadata."""

from __future__ import annotations

import os
import re
from pathlib import Path
from typing import Any

import yaml

from src.utils.logger import get_logger

logger = get_logger(__name__)

PROJECT_ROOT = Path(__file__).resolve().parents[2]
SUGGESTED_CONFIG = PROJECT_ROOT / "configs" / "suggested_videos.yaml"

_VIDEO_ID_PATTERNS = (
    r"youtube\.com/watch\?v=([a-zA-Z0-9_-]{11})",
    r"youtu\.be/([a-zA-Z0-9_-]{11})",
    r"youtube\.com/embed/([a-zA-Z0-9_-]{11})",
)


class CommentsFetchError(Exception):
    """Raised when comments cannot be fetched and demo fallback must not be used."""


def extract_video_id(url: str) -> str | None:
    for pattern in _VIDEO_ID_PATTERNS:
        match = re.search(pattern, url)
        if match:
            return match.group(1)
    return None


def load_suggested_config() -> dict[str, Any]:
    if not SUGGESTED_CONFIG.exists():
        return {"max_comments": 15, "videos": [{"id": "jNQXAC9IVRw"}]}
    with SUGGESTED_CONFIG.open(encoding="utf-8") as f:
        return yaml.safe_load(f) or {}


def _parse_youtube_error(exc: Exception) -> str:
    err_text = str(exc)
    if "commentsDisabled" in err_text:
        return "Comments are disabled on this video"
    if "disabled comments" in err_text.lower():
        return "Comments are disabled on this video"
    if "quota" in err_text.lower():
        return "YouTube API quota exceeded"
    try:
        from googleapiclient.errors import HttpError

        if isinstance(exc, HttpError):
            for detail in getattr(exc, "error_details", []) or []:
                reason = detail.get("reason") if isinstance(detail, dict) else None
                if reason == "commentsDisabled":
                    return "Comments are disabled on this video"
    except ImportError:
        pass
    return err_text


def fetch_comments(url: str, max_comments: int) -> tuple[list[str], str]:
    video_id = extract_video_id(url) or "unknown"
    api_key = os.getenv("YOUTUBE_API_KEY", "").strip()
    if api_key:
        return _fetch_via_api(url, api_key, max_comments, video_id)
    return _demo_comments(video_id, max_comments), "demo"


def _fetch_via_api(
    url: str, api_key: str, max_comments: int, video_id: str
) -> tuple[list[str], str]:
    from googleapiclient.discovery import build

    if video_id == "unknown":
        raise CommentsFetchError(f"Could not parse video id from: {url}")

    youtube = build("youtube", "v3", developerKey=api_key)
    comments: list[str] = []
    page_token = None

    try:
        while len(comments) < max_comments:
            response = (
                youtube.commentThreads()
                .list(
                    part="snippet",
                    videoId=video_id,
                    maxResults=min(100, max_comments - len(comments)),
                    pageToken=page_token,
                    textFormat="plainText",
                )
                .execute()
            )
            for item in response.get("items", []):
                text = item["snippet"]["topLevelComment"]["snippet"]["textDisplay"]
                comments.append(text)
            page_token = response.get("nextPageToken")
            if not page_token:
                break
    except Exception as exc:
        message = _parse_youtube_error(exc)
        logger.warning("YouTube API failed for %s: %s", video_id, message)
        raise CommentsFetchError(message) from exc

    if not comments:
        raise CommentsFetchError("No comments found for this video")

    logger.info("YouTube API: fetched %s comments for %s", len(comments), video_id)
    return comments[:max_comments], "youtube"


def fetch_video_metadata(video_ids: list[str]) -> list[dict[str, Any]]:
    api_key = os.getenv("YOUTUBE_API_KEY", "").strip()
    if not api_key or not video_ids:
        return [_placeholder_meta(vid) for vid in video_ids]

    try:
        from googleapiclient.discovery import build

        youtube = build("youtube", "v3", developerKey=api_key)
        response = (
            youtube.videos()
            .list(part="snippet,status", id=",".join(video_ids))
            .execute()
        )
        by_id: dict[str, dict[str, Any]] = {}
        for item in response.get("items", []):
            vid = item["id"]
            snip = item["snippet"]
            status = item.get("status", {})
            thumbs = snip.get("thumbnails", {})
            thumb = thumbs.get("medium") or thumbs.get("default") or {}
            embeddable = status.get("embeddable", True)
            by_id[vid] = {
                "id": vid,
                "title": snip.get("title", vid),
                "channel_title": snip.get("channelTitle", "Unknown"),
                "thumbnail_url": thumb.get("url", f"https://i.ytimg.com/vi/{vid}/mqdefault.jpg"),
                "watch_url": f"https://www.youtube.com/watch?v={vid}",
                "embeddable": bool(embeddable),
            }
        return [by_id.get(vid, _placeholder_meta(vid)) for vid in video_ids]
    except Exception as exc:
        logger.warning("YouTube metadata failed: %s", exc)
        return [_placeholder_meta(vid) for vid in video_ids]


def _placeholder_meta(video_id: str) -> dict[str, Any]:
    return {
        "id": video_id,
        "title": f"Video {video_id}",
        "channel_title": "YouTube",
        "thumbnail_url": f"https://i.ytimg.com/vi/{video_id}/mqdefault.jpg",
        "watch_url": f"https://www.youtube.com/watch?v={video_id}",
        "embeddable": True,
    }


def _demo_comments(video_id: str, max_comments: int) -> list[str]:
    import random

    logger.info("Using demo comments for %s (no YOUTUBE_API_KEY)", video_id)
    examples = [
        "This video is really informative, thanks for sharing!",
        "You are all stupid idiots, get out of here!",
        "Great content, I learned a lot from this.",
        "These people should be eliminated from society.",
        "I agree with the presenter's point of view.",
        "What a bunch of racist criminals!",
        "Thank you for this analysis, very helpful.",
        "Kill them all, they don't deserve to live.",
        "Interesting perspective on the topic.",
        "This is absolute bullshit propaganda!",
        "I think we need to look at both sides.",
        "Well researched video, good job.",
        "Go back to where you came from!",
        "The data presented here is compelling.",
    ]
    rng = random.Random(video_id)
    rotated = examples[:]
    rng.shuffle(rotated)
    return rotated[:max_comments]