File size: 4,466 Bytes
c066961
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""
YouTube Data API Client
More reliable alternative to web scraping
"""

import os
from googleapiclient.discovery import build
from typing import List, Dict


class YouTubeAPIClient:
    """YouTube Data API v3 client"""
    
    def __init__(self, api_key: str = None):
        """Initialize with API key from environment or parameter"""
        self.api_key = api_key or os.getenv("YOUTUBE_API_KEY")
        if not self.api_key:
            raise ValueError("YouTube API key required")
        
        self.youtube = build('youtube', 'v3', developerKey=self.api_key)
    
    def search_videos(self, query: str, max_results: int = 5) -> List[Dict]:
        """
        Search YouTube videos using official API
        
        Args:
            query: Search query
            max_results: Maximum number of results
            
        Returns:
            List of video dictionaries
        """
        try:
            # Search for videos
            search_response = self.youtube.search().list(
                q=query,
                part='id,snippet',
                maxResults=max_results,
                type='video',
                order='relevance',
                videoDefinition='any'
            ).execute()
            
            videos = []
            video_ids = []
            
            # Extract video IDs and basic info
            for item in search_response.get('items', []):
                video_id = item['id']['videoId']
                video_ids.append(video_id)
                
                snippet = item['snippet']
                videos.append({
                    'video_id': video_id,
                    'title': snippet['title'],
                    'channel': snippet['channelTitle'],
                    'description': snippet['description'],
                    'thumbnail': snippet['thumbnails']['high']['url'],
                    'published_at': snippet['publishedAt']
                })
            
            # Get additional statistics (views, duration)
            if video_ids:
                video_response = self.youtube.videos().list(
                    part='statistics,contentDetails',
                    id=','.join(video_ids)
                ).execute()
                
                for i, video_item in enumerate(video_response.get('items', [])):
                    stats = video_item['statistics']
                    duration = video_item['contentDetails']['duration']
                    
                    videos[i]['views'] = int(stats.get('viewCount', 0))
                    videos[i]['views_text'] = f"{int(stats.get('viewCount', 0)):,} views"
                    videos[i]['likes'] = int(stats.get('likeCount', 0))
                    videos[i]['duration'] = self._parse_duration(duration)
                    videos[i]['url'] = f"https://www.youtube.com/watch?v={videos[i]['video_id']}"
            
            print(f"✅ Found {len(videos)} videos via YouTube API")
            return videos
            
        except Exception as e:
            print(f"❌ YouTube API error: {e}")
            return []
    
    def _parse_duration(self, duration: str) -> str:
        """Convert ISO 8601 duration to readable format (PT1H2M10S -> 1:02:10)"""
        import re
        
        match = re.match(r'PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?', duration)
        if not match:
            return "N/A"
        
        hours, minutes, seconds = match.groups()
        hours = int(hours) if hours else 0
        minutes = int(minutes) if minutes else 0
        seconds = int(seconds) if seconds else 0
        
        if hours > 0:
            return f"{hours}:{minutes:02d}:{seconds:02d}"
        else:
            return f"{minutes}:{seconds:02d}"
    
    def format_video_list(self, videos: List[Dict]) -> str:
        """Format video list for display"""
        if not videos:
            return "No videos found."
        
        result = ""
        for i, video in enumerate(videos, 1):
            views_k = video.get('views', 0) / 1000
            result += f"{i}. **{video['title']}**\n"
            result += f"   • Channel: {video['channel']}\n"
            result += f"   • Duration: {video.get('duration', 'N/A')} | Views: {views_k:.1f}K\n"
            result += f"   • Link: {video['url']}\n\n"
        
        return result


# For backwards compatibility
class YouTubeScraper(YouTubeAPIClient):
    """Alias for backwards compatibility"""
    pass