Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import os | |
| import tempfile | |
| import shutil | |
| import urllib.request | |
| import isodate | |
| import datetime | |
| # --- IMPORTANT: Ensure this environment variable is set --- | |
| API_KEY = os.getenv("YOUTUBE_API_KEY") | |
| BASE_URL = "https://www.googleapis.com/youtube/v3" | |
| # ----------------------- | |
| # API Usage Tracker | |
| # ----------------------- | |
| API_USAGE = {"units": 0} | |
| def api_get(url, cost, **kwargs): | |
| """Wrapper to count quota usage""" | |
| API_USAGE["units"] += cost | |
| r = requests.get(url, **kwargs) | |
| return r | |
| # ----------------------- | |
| # Helper Functions (Simplified) | |
| # ----------------------- | |
| def parse_duration(duration_str): | |
| try: | |
| return int(isodate.parse_duration(duration_str).total_seconds()) | |
| except Exception: | |
| return 0 | |
| def get_channel_info(channel_id): | |
| """Fetches channel snippet (including title) (Cost: 1).""" | |
| r = api_get(f"{BASE_URL}/channels?part=snippet&id={channel_id}&key={API_KEY}", 1) | |
| if r.status_code == 200 and 'items' in r.json() and r.json()['items']: | |
| return r.json()['items'][0]['snippet'] | |
| return None | |
| def extract_channel_id(url: str): | |
| """Extracts the Channel ID from various YouTube URLs.""" | |
| if "channel/" in url: | |
| return url.split("channel/")[1].split("/")[0] | |
| elif "/@" in url: | |
| handle = url.split("/@")[1].split("/")[0] | |
| r = api_get(f"{BASE_URL}/search?part=snippet&type=channel&q={handle}&key={API_KEY}", 100) | |
| if r.status_code != 200: return None | |
| data = r.json() | |
| if "items" in data and data["items"]: | |
| return data["items"][0]["snippet"]["channelId"] | |
| elif "user/" in url: | |
| username = url.split("user/")[1].split("/")[0] | |
| r = api_get(f"{BASE_URL}/channels?part=id&forUsername={username}&key={API_KEY}", 1) | |
| if r.status_code != 200: return None | |
| data = r.json() | |
| if "items" in data and data["items"]: | |
| return data["items"][0]["id"] | |
| return None | |
| def get_uploads_playlist(channel_id): | |
| """Fetches the 'uploads' playlist ID for a given channel (Cost: 1).""" | |
| r = api_get(f"{BASE_URL}/channels?part=contentDetails&id={channel_id}&key={API_KEY}", 1).json() | |
| return r['items'][0]['contentDetails']['relatedPlaylists']['uploads'] | |
| # ----------------------- | |
| # Fetch and Filter Video IDs | |
| # ----------------------- | |
| def filter_video_ids(video_ids, mode="videos"): | |
| """Filters a list of video IDs based on their duration (Cost: 1 unit per 50 videos).""" | |
| selected = [] | |
| for i in range(0, len(video_ids), 50): | |
| batch = video_ids[i:i+50] | |
| r = api_get(f"{BASE_URL}/videos?part=contentDetails&id={','.join(batch)}&key={API_KEY}", 1).json() | |
| for item in r.get("items", []): | |
| if 'contentDetails' not in item: continue | |
| duration = parse_duration(item["contentDetails"]["duration"]) | |
| vid = item["id"] | |
| if mode == "videos": | |
| if duration >= 60: | |
| selected.append(vid) | |
| elif mode == "shorts": | |
| if duration < 60: | |
| selected.append(vid) | |
| elif mode == "all": | |
| selected.append(vid) | |
| return selected | |
| def get_playlist_video_ids(playlist_id, max_videos=50, mode="videos"): | |
| """Pulls video IDs from a playlist and filters them until max_videos is reached.""" | |
| video_ids = [] | |
| next_page = None | |
| while len(video_ids) < max_videos: | |
| fetch_count = 50 | |
| url = f"{BASE_URL}/playlistItems?part=snippet&playlistId={playlist_id}&maxResults={fetch_count}&key={API_KEY}" | |
| if next_page: url += f"&pageToken={next_page}" | |
| r = api_get(url, 1).json() | |
| raw_ids = [item["snippet"]["resourceId"]["videoId"] for item in r.get("items", [])] | |
| filtered_ids = filter_video_ids(raw_ids, mode=mode) | |
| remaining_slots = max_videos - len(video_ids) | |
| video_ids.extend(filtered_ids[:remaining_slots]) | |
| next_page = r.get("nextPageToken") | |
| if not next_page or len(raw_ids) == 0: | |
| break | |
| return video_ids[:max_videos] | |
| def get_live_video_ids(channel_id, max_videos=50): | |
| """Fetches completed live streams (Cost: 100).""" | |
| video_ids = [] | |
| url = f"{BASE_URL}/search?part=id&channelId={channel_id}&eventType=completed&type=video&maxResults={max_videos}&key={API_KEY}" | |
| r = api_get(url, 100).json() | |
| for item in r.get("items", []): | |
| video_ids.append(item["id"]["videoId"]) | |
| return video_ids | |
| # ----------------------- | |
| # Thumbnails Download and Prep | |
| # ----------------------- | |
| def download_thumbnails(video_ids): | |
| """Downloads thumbnails to a temp directory (Cost: 1 unit per 50 thumbnails).""" | |
| tmp_dir = tempfile.mkdtemp() | |
| thumb_paths = [] | |
| for i in range(0, len(video_ids), 50): | |
| batch = video_ids[i:i+50] | |
| r = api_get(f"{BASE_URL}/videos?part=snippet&id={','.join(batch)}&key={API_KEY}", 1).json() | |
| for item in r.get("items", []): | |
| if 'snippet' not in item: continue | |
| snippet = item['snippet'] | |
| thumbnails = snippet['thumbnails'] | |
| thumb_url = thumbnails.get("maxres", thumbnails.get("standard", thumbnails.get("high", thumbnails.get("default"))))["url"] | |
| # Use the video title for the filename for better context in gr.Files | |
| title_safe = "".join(c if c.isalnum() or c in (' ', '_') else '_' for c in snippet['title']).strip().replace(' ', '_') | |
| filename = os.path.join(tmp_dir, f"{title_safe}_{item['id']}.jpg") | |
| urllib.request.urlretrieve(thumb_url, filename) | |
| thumb_paths.append(filename) | |
| return tmp_dir, thumb_paths | |
| def fetch_channel_thumbnails(channel_url, max_videos, page_mode): | |
| """Main function to orchestrate video fetching and thumbnail download.""" | |
| channel_id = extract_channel_id(channel_url) | |
| if not channel_id: | |
| return "β Could not extract channel ID", None, None, None | |
| channel_info = get_channel_info(channel_id) | |
| if not channel_info: | |
| return "β Could not fetch channel info", None, None, None | |
| channel_name = channel_info.get("title", "unknown_channel") | |
| if page_mode in ["videos", "shorts", "all"]: | |
| try: | |
| playlist_id = get_uploads_playlist(channel_id) | |
| except Exception: | |
| return "β Could not find channel 'uploads' playlist ID", None, None, None | |
| video_ids = get_playlist_video_ids(playlist_id, max_videos=max_videos, mode=page_mode) | |
| elif page_mode == "live": | |
| video_ids = get_live_video_ids(channel_id, max_videos=max_videos) | |
| else: | |
| return "β Unknown mode", None, None, None | |
| if not video_ids: | |
| return f"β No {page_mode} found", None, None, None | |
| tmp_dir, thumbs = download_thumbnails(video_ids) | |
| return f"β Fetched {len(thumbs)} {page_mode}", thumbs, tmp_dir, channel_name | |
| def prepare_zip(thumb_dir, channel_name): | |
| """Creates a zip archive with a custom filename.""" | |
| safe_channel_name = "".join(c if c.isalnum() or c in (' ', '_') else '_' for c in channel_name).strip().replace(' ', '_') | |
| date_str = datetime.datetime.now().strftime("%Y%m%d") | |
| zip_filename_base = f"{safe_channel_name}_Thumbnails_{date_str}" | |
| zip_path_no_ext = os.path.join(tempfile.gettempdir(), zip_filename_base) | |
| shutil.make_archive(zip_path_no_ext, 'zip', thumb_dir) | |
| final_zip_path = zip_path_no_ext + ".zip" | |
| return final_zip_path | |
| # ----------------------- | |
| # Generator for live status updates | |
| # ----------------------- | |
| def fetch_and_zip_progress(channel_url, max_videos, page_mode): | |
| API_USAGE["units"] = 0 | |
| yield f"Starting fetch... | API quota used: {API_USAGE['units']} units", [], None, gr.File(visible=False) # π‘ Added gr.File update | |
| status, thumbs, tmp_dir, channel_name = fetch_channel_thumbnails(channel_url, max_videos, page_mode) | |
| quota_used = API_USAGE["units"] | |
| final_status = status.replace("videos", "long-form videos (>= 60s)") if page_mode == "videos" else status | |
| final_status = final_status.replace("shorts", "shorts (< 60s)") if page_mode == "shorts" else final_status | |
| zip_file = None | |
| if thumbs: | |
| zip_file = prepare_zip(tmp_dir, channel_name) | |
| elif tmp_dir and os.path.isdir(tmp_dir): | |
| shutil.rmtree(tmp_dir) | |
| # π‘ IMPORTANT: Now yielding a list of file paths (thumbs) and the zip file path. | |
| # The 'thumbs' list goes to gr.Files. | |
| yield f"{final_status} | API quota used: {quota_used} units", thumbs, zip_file, gr.File(visible=True) # π‘ Set visible=True on success | |
| # ----------------------- | |
| # Gradio Interface (Modified) | |
| # ----------------------- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## π¬ YouTube Channel Thumbnails Downloader (Files Preview)") | |
| gr.Markdown("Thumbnails are now listed as individual files. Click the filename to preview/download.") | |
| url_input = gr.Textbox(label="YouTube Channel URL", placeholder="https://www.youtube.com/@roisinmurphyofficial") | |
| page_selector = gr.Dropdown( | |
| choices=["videos", "shorts", "live", "all"], | |
| value="videos", | |
| label="Page to Collect" | |
| ) | |
| max_videos_slider = gr.Slider(minimum=1, maximum=100, step=1, value=20, label="Max Items to Fetch") | |
| start_btn = gr.Button("π Start Collect") | |
| status_output = gr.Textbox(label="Status") | |
| # π‘ REPLACED gr.Gallery with gr.Files | |
| thumbs_list = gr.Files( | |
| label="Thumbnails Preview and Download (Click name for preview)", | |
| file_count="multiple", # Allows multiple files | |
| type="filepath", # Returns the path, which is what we need | |
| visible=True # Ensure it's visible initially | |
| ) | |
| download_btn = gr.File(label="Download All Thumbnails (ZIP)") | |
| start_btn.click( | |
| fetch_and_zip_progress, | |
| inputs=[url_input, max_videos_slider, page_selector], | |
| # π‘ Updated output targets to match the new return values | |
| outputs=[status_output, thumbs_list, download_btn, download_btn] | |
| # Note: Added download_btn twice as the generator yields 4 items, | |
| # but the last one is a gr.File update to hide/show the component. | |
| # This is a slightly awkward necessity of Gradio's generator API. | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |