Spaces:
Sleeping
Sleeping
| """ | |
| YouTube Data API Client | |
| More reliable alternative to web scraping | |
| """ | |
| import os | |
| from googleapiclient.discovery import build | |
| from typing import List, Dict | |
| class YouTubeAPIClient: | |
| """YouTube Data API v3 client""" | |
| def __init__(self, api_key: str = None): | |
| """Initialize with API key from environment or parameter""" | |
| self.api_key = api_key or os.getenv("YOUTUBE_API_KEY") | |
| if not self.api_key: | |
| raise ValueError("YouTube API key required") | |
| self.youtube = build('youtube', 'v3', developerKey=self.api_key) | |
| def search_videos(self, query: str, max_results: int = 5) -> List[Dict]: | |
| """ | |
| Search YouTube videos using official API | |
| Args: | |
| query: Search query | |
| max_results: Maximum number of results | |
| Returns: | |
| List of video dictionaries | |
| """ | |
| try: | |
| # Search for videos | |
| search_response = self.youtube.search().list( | |
| q=query, | |
| part='id,snippet', | |
| maxResults=max_results, | |
| type='video', | |
| order='relevance', | |
| videoDefinition='any' | |
| ).execute() | |
| videos = [] | |
| video_ids = [] | |
| # Extract video IDs and basic info | |
| for item in search_response.get('items', []): | |
| video_id = item['id']['videoId'] | |
| video_ids.append(video_id) | |
| snippet = item['snippet'] | |
| videos.append({ | |
| 'video_id': video_id, | |
| 'title': snippet['title'], | |
| 'channel': snippet['channelTitle'], | |
| 'description': snippet['description'], | |
| 'thumbnail': snippet['thumbnails']['high']['url'], | |
| 'published_at': snippet['publishedAt'] | |
| }) | |
| # Get additional statistics (views, duration) | |
| if video_ids: | |
| video_response = self.youtube.videos().list( | |
| part='statistics,contentDetails', | |
| id=','.join(video_ids) | |
| ).execute() | |
| for i, video_item in enumerate(video_response.get('items', [])): | |
| stats = video_item['statistics'] | |
| duration = video_item['contentDetails']['duration'] | |
| videos[i]['views'] = int(stats.get('viewCount', 0)) | |
| videos[i]['views_text'] = f"{int(stats.get('viewCount', 0)):,} views" | |
| videos[i]['likes'] = int(stats.get('likeCount', 0)) | |
| videos[i]['duration'] = self._parse_duration(duration) | |
| videos[i]['url'] = f"https://www.youtube.com/watch?v={videos[i]['video_id']}" | |
| print(f"✅ Found {len(videos)} videos via YouTube API") | |
| return videos | |
| except Exception as e: | |
| print(f"❌ YouTube API error: {e}") | |
| return [] | |
| def _parse_duration(self, duration: str) -> str: | |
| """Convert ISO 8601 duration to readable format (PT1H2M10S -> 1:02:10)""" | |
| import re | |
| match = re.match(r'PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?', duration) | |
| if not match: | |
| return "N/A" | |
| hours, minutes, seconds = match.groups() | |
| hours = int(hours) if hours else 0 | |
| minutes = int(minutes) if minutes else 0 | |
| seconds = int(seconds) if seconds else 0 | |
| if hours > 0: | |
| return f"{hours}:{minutes:02d}:{seconds:02d}" | |
| else: | |
| return f"{minutes}:{seconds:02d}" | |
| def format_video_list(self, videos: List[Dict]) -> str: | |
| """Format video list for display""" | |
| if not videos: | |
| return "No videos found." | |
| result = "" | |
| for i, video in enumerate(videos, 1): | |
| views_k = video.get('views', 0) / 1000 | |
| result += f"{i}. **{video['title']}**\n" | |
| result += f" • Channel: {video['channel']}\n" | |
| result += f" • Duration: {video.get('duration', 'N/A')} | Views: {views_k:.1f}K\n" | |
| result += f" • Link: {video['url']}\n\n" | |
| return result | |
| # For backwards compatibility | |
| class YouTubeScraper(YouTubeAPIClient): | |
| """Alias for backwards compatibility""" | |
| pass | |