Spaces:
Runtime error
Runtime error
| import json | |
| import requests | |
| from tqdm import tqdm | |
| import isodate | |
| class YTstats: | |
| def __init__(self, api_key): | |
| self.api_key = api_key | |
| self.channel_statistics = None | |
| self.video_data = None | |
| def extract_all(self, channel_id): | |
| self.get_channel_statistics(channel_id) | |
| self.get_channel_video_data(channel_id) | |
| def get_channel_statistics(self, channel_id): | |
| """Extract the channel statistics""" | |
| print('get channel statistics...') | |
| url = f'https://www.googleapis.com/youtube/v3/channels?part=statistics&id={channel_id}&key={self.api_key}' | |
| #pbar = tqdm(total=1) | |
| json_url = requests.get(url) | |
| data = json.loads(json_url.text) | |
| try: | |
| data = data['items'][0]['statistics'] | |
| except KeyError: | |
| print('Could not get channel statistics') | |
| data = {} | |
| self.channel_statistics = data | |
| #pbar.update() | |
| #pbar.close() | |
| return data | |
| def get_channel_video_data(self, channel_id, df_sheet, loading_bar, progress_text, item_limit=3): | |
| "Extract all video information of the channel" | |
| print('get video data...') | |
| channel_videos, channel_playlists = self._get_channel_content(channel_id, limit=50) | |
| channel_videos_out = dict() | |
| total_items = len(channel_videos) | |
| item = 0 | |
| step_size=0 | |
| step=0 | |
| if total_items!=0: | |
| step_size=round(1/total_items,4) | |
| #step = step_size | |
| parts=["snippet", "statistics","contentDetails", "topicDetails"] | |
| for video_id in tqdm(channel_videos): | |
| if item == item_limit: | |
| break | |
| loading_bar.progress(step, text=progress_text) | |
| for part in parts: | |
| data = self._get_single_video_data(video_id, part) | |
| channel_videos[video_id].update(data) | |
| duration = isodate.parse_duration(channel_videos[video_id]['duration']) | |
| short_duration = isodate.parse_duration('PT4M') | |
| if duration > short_duration and video_id not in list(df_sheet.ID): | |
| item = item+1 | |
| step = step +step_size | |
| channel_videos_out[video_id] = channel_videos[video_id] | |
| step=1.0 | |
| loading_bar.progress(step, text=progress_text) | |
| self.video_data = channel_videos_out | |
| def _get_single_video_data(self, video_id, part): | |
| """ | |
| Extract further information for a single video | |
| parts can be: 'snippet', 'statistics', 'contentDetails', 'topicDetails' | |
| """ | |
| url = f"https://www.googleapis.com/youtube/v3/videos?part={part}&id={video_id}&key={self.api_key}" | |
| json_url = requests.get(url) | |
| data = json.loads(json_url.text) | |
| try: | |
| data = data['items'][0][part] | |
| except KeyError as e: | |
| print(f'Error! Could not get {part} part of data: \n{data}') | |
| data = dict() | |
| return data | |
| def _get_channel_content(self, channel_id, limit=None, check_all_pages=True): | |
| """ | |
| Extract all videos and playlists, can check all available search pages | |
| channel_videos = videoId: title, publishedAt | |
| channel_playlists = playlistId: title, publishedAt | |
| return channel_videos, channel_playlists | |
| """ | |
| url = f"https://www.googleapis.com/youtube/v3/search?key={self.api_key}&channelId={channel_id}&part=snippet,id&order=date" | |
| if limit is not None and isinstance(limit, int): | |
| url += "&maxResults=" + str(limit) | |
| vid, pl, npt = self._get_channel_content_per_page(url) | |
| idx = 0 | |
| while(check_all_pages and npt is not None and idx < 10): | |
| nexturl = url + "&pageToken=" + npt | |
| next_vid, next_pl, npt = self._get_channel_content_per_page(nexturl) | |
| vid.update(next_vid) | |
| pl.update(next_pl) | |
| idx += 1 | |
| return vid, pl | |
| def _get_channel_content_per_page(self, url): | |
| """ | |
| Extract all videos and playlists per page | |
| return channel_videos, channel_playlists, nextPageToken | |
| """ | |
| json_url = requests.get(url) | |
| data = json.loads(json_url.text) | |
| channel_videos = dict() | |
| channel_playlists = dict() | |
| if 'items' not in data: | |
| print('Error! Could not get correct channel data!\n', data) | |
| return channel_videos, channel_videos, None | |
| nextPageToken = data.get("nextPageToken", None) | |
| item_data = data['items'] | |
| for item in item_data: | |
| try: | |
| kind = item['id']['kind'] | |
| published_at = item['snippet']['publishedAt'] | |
| title = item['snippet']['title'] | |
| if kind == 'youtube#video': | |
| video_id = item['id']['videoId'] | |
| channel_videos[video_id] = {'publishedAt': published_at, 'title': title} | |
| elif kind == 'youtube#playlist': | |
| playlist_id = item['id']['playlistId'] | |
| channel_playlists[playlist_id] = {'publishedAt': published_at, 'title': title} | |
| except KeyError as e: | |
| print('Error! Could not extract data from item:\n', item) | |
| return channel_videos, channel_playlists, nextPageToken | |
| def dump(self, channel_id): | |
| """Dumps channel statistics and video data in a single json file""" | |
| if self.channel_statistics is None or self.video_data is None: | |
| print('data is missing!\nCall get_channel_statistics() and get_channel_video_data() first!') | |
| return | |
| fused_data = {channel_id: {"channel_statistics": self.channel_statistics, | |
| "video_data": self.video_data}} | |
| channel_title = self.video_data.popitem()[1].get('channelTitle', channel_id) | |
| channel_title = channel_title.replace(" ", "_").lower() | |
| filename = channel_title + '.json' | |
| with open(filename, 'w') as f: | |
| json.dump(fused_data, f, indent=4) | |
| print('file dumped to', filename) |