clementBE's picture
Update app.py
25bf82c verified
raw
history blame
10.4 kB
import gradio as gr
import requests
import os
import tempfile
import shutil
import urllib.request
import isodate
import datetime
# --- IMPORTANT: Ensure this environment variable is set ---
API_KEY = os.getenv("YOUTUBE_API_KEY")
BASE_URL = "https://www.googleapis.com/youtube/v3"
# -----------------------
# API Usage Tracker
# -----------------------
API_USAGE = {"units": 0}
def api_get(url, cost, **kwargs):
"""Wrapper to count quota usage"""
API_USAGE["units"] += cost
r = requests.get(url, **kwargs)
return r
# -----------------------
# Helper Functions (Simplified)
# -----------------------
def parse_duration(duration_str):
try:
return int(isodate.parse_duration(duration_str).total_seconds())
except Exception:
return 0
def get_channel_info(channel_id):
"""Fetches channel snippet (including title) (Cost: 1)."""
r = api_get(f"{BASE_URL}/channels?part=snippet&id={channel_id}&key={API_KEY}", 1)
if r.status_code == 200 and 'items' in r.json() and r.json()['items']:
return r.json()['items'][0]['snippet']
return None
def extract_channel_id(url: str):
"""Extracts the Channel ID from various YouTube URLs."""
if "channel/" in url:
return url.split("channel/")[1].split("/")[0]
elif "/@" in url:
handle = url.split("/@")[1].split("/")[0]
r = api_get(f"{BASE_URL}/search?part=snippet&type=channel&q={handle}&key={API_KEY}", 100)
if r.status_code != 200: return None
data = r.json()
if "items" in data and data["items"]:
return data["items"][0]["snippet"]["channelId"]
elif "user/" in url:
username = url.split("user/")[1].split("/")[0]
r = api_get(f"{BASE_URL}/channels?part=id&forUsername={username}&key={API_KEY}", 1)
if r.status_code != 200: return None
data = r.json()
if "items" in data and data["items"]:
return data["items"][0]["id"]
return None
def get_uploads_playlist(channel_id):
"""Fetches the 'uploads' playlist ID for a given channel (Cost: 1)."""
r = api_get(f"{BASE_URL}/channels?part=contentDetails&id={channel_id}&key={API_KEY}", 1).json()
return r['items'][0]['contentDetails']['relatedPlaylists']['uploads']
# -----------------------
# Fetch and Filter Video IDs
# -----------------------
def filter_video_ids(video_ids, mode="videos"):
"""Filters a list of video IDs based on their duration (Cost: 1 unit per 50 videos)."""
selected = []
for i in range(0, len(video_ids), 50):
batch = video_ids[i:i+50]
r = api_get(f"{BASE_URL}/videos?part=contentDetails&id={','.join(batch)}&key={API_KEY}", 1).json()
for item in r.get("items", []):
if 'contentDetails' not in item: continue
duration = parse_duration(item["contentDetails"]["duration"])
vid = item["id"]
if mode == "videos":
if duration >= 60:
selected.append(vid)
elif mode == "shorts":
if duration < 60:
selected.append(vid)
elif mode == "all":
selected.append(vid)
return selected
def get_playlist_video_ids(playlist_id, max_videos=50, mode="videos"):
"""Pulls video IDs from a playlist and filters them until max_videos is reached."""
video_ids = []
next_page = None
while len(video_ids) < max_videos:
fetch_count = 50
url = f"{BASE_URL}/playlistItems?part=snippet&playlistId={playlist_id}&maxResults={fetch_count}&key={API_KEY}"
if next_page: url += f"&pageToken={next_page}"
r = api_get(url, 1).json()
raw_ids = [item["snippet"]["resourceId"]["videoId"] for item in r.get("items", [])]
filtered_ids = filter_video_ids(raw_ids, mode=mode)
remaining_slots = max_videos - len(video_ids)
video_ids.extend(filtered_ids[:remaining_slots])
next_page = r.get("nextPageToken")
if not next_page or len(raw_ids) == 0:
break
return video_ids[:max_videos]
def get_live_video_ids(channel_id, max_videos=50):
"""Fetches completed live streams (Cost: 100)."""
video_ids = []
url = f"{BASE_URL}/search?part=id&channelId={channel_id}&eventType=completed&type=video&maxResults={max_videos}&key={API_KEY}"
r = api_get(url, 100).json()
for item in r.get("items", []):
video_ids.append(item["id"]["videoId"])
return video_ids
# -----------------------
# Thumbnails Download and Prep
# -----------------------
def download_thumbnails(video_ids):
"""Downloads thumbnails to a temp directory (Cost: 1 unit per 50 thumbnails)."""
tmp_dir = tempfile.mkdtemp()
thumb_paths = []
for i in range(0, len(video_ids), 50):
batch = video_ids[i:i+50]
r = api_get(f"{BASE_URL}/videos?part=snippet&id={','.join(batch)}&key={API_KEY}", 1).json()
for item in r.get("items", []):
if 'snippet' not in item: continue
snippet = item['snippet']
thumbnails = snippet['thumbnails']
thumb_url = thumbnails.get("maxres", thumbnails.get("standard", thumbnails.get("high", thumbnails.get("default"))))["url"]
# Use the video title for the filename for better context in gr.Files
title_safe = "".join(c if c.isalnum() or c in (' ', '_') else '_' for c in snippet['title']).strip().replace(' ', '_')
filename = os.path.join(tmp_dir, f"{title_safe}_{item['id']}.jpg")
urllib.request.urlretrieve(thumb_url, filename)
thumb_paths.append(filename)
return tmp_dir, thumb_paths
def fetch_channel_thumbnails(channel_url, max_videos, page_mode):
"""Main function to orchestrate video fetching and thumbnail download."""
channel_id = extract_channel_id(channel_url)
if not channel_id:
return "❌ Could not extract channel ID", None, None, None
channel_info = get_channel_info(channel_id)
if not channel_info:
return "❌ Could not fetch channel info", None, None, None
channel_name = channel_info.get("title", "unknown_channel")
if page_mode in ["videos", "shorts", "all"]:
try:
playlist_id = get_uploads_playlist(channel_id)
except Exception:
return "❌ Could not find channel 'uploads' playlist ID", None, None, None
video_ids = get_playlist_video_ids(playlist_id, max_videos=max_videos, mode=page_mode)
elif page_mode == "live":
video_ids = get_live_video_ids(channel_id, max_videos=max_videos)
else:
return "❌ Unknown mode", None, None, None
if not video_ids:
return f"❌ No {page_mode} found", None, None, None
tmp_dir, thumbs = download_thumbnails(video_ids)
return f"βœ… Fetched {len(thumbs)} {page_mode}", thumbs, tmp_dir, channel_name
def prepare_zip(thumb_dir, channel_name):
"""Creates a zip archive with a custom filename."""
safe_channel_name = "".join(c if c.isalnum() or c in (' ', '_') else '_' for c in channel_name).strip().replace(' ', '_')
date_str = datetime.datetime.now().strftime("%Y%m%d")
zip_filename_base = f"{safe_channel_name}_Thumbnails_{date_str}"
zip_path_no_ext = os.path.join(tempfile.gettempdir(), zip_filename_base)
shutil.make_archive(zip_path_no_ext, 'zip', thumb_dir)
final_zip_path = zip_path_no_ext + ".zip"
return final_zip_path
# -----------------------
# Generator for live status updates
# -----------------------
def fetch_and_zip_progress(channel_url, max_videos, page_mode):
API_USAGE["units"] = 0
yield f"Starting fetch... | API quota used: {API_USAGE['units']} units", [], None, gr.File(visible=False) # πŸ’‘ Added gr.File update
status, thumbs, tmp_dir, channel_name = fetch_channel_thumbnails(channel_url, max_videos, page_mode)
quota_used = API_USAGE["units"]
final_status = status.replace("videos", "long-form videos (>= 60s)") if page_mode == "videos" else status
final_status = final_status.replace("shorts", "shorts (< 60s)") if page_mode == "shorts" else final_status
zip_file = None
if thumbs:
zip_file = prepare_zip(tmp_dir, channel_name)
elif tmp_dir and os.path.isdir(tmp_dir):
shutil.rmtree(tmp_dir)
# πŸ’‘ IMPORTANT: Now yielding a list of file paths (thumbs) and the zip file path.
# The 'thumbs' list goes to gr.Files.
yield f"{final_status} | API quota used: {quota_used} units", thumbs, zip_file, gr.File(visible=True) # πŸ’‘ Set visible=True on success
# -----------------------
# Gradio Interface (Modified)
# -----------------------
with gr.Blocks() as demo:
gr.Markdown("## 🎬 YouTube Channel Thumbnails Downloader (Files Preview)")
gr.Markdown("Thumbnails are now listed as individual files. Click the filename to preview/download.")
url_input = gr.Textbox(label="YouTube Channel URL", placeholder="https://www.youtube.com/@roisinmurphyofficial")
page_selector = gr.Dropdown(
choices=["videos", "shorts", "live", "all"],
value="videos",
label="Page to Collect"
)
max_videos_slider = gr.Slider(minimum=1, maximum=100, step=1, value=20, label="Max Items to Fetch")
start_btn = gr.Button("πŸš€ Start Collect")
status_output = gr.Textbox(label="Status")
# πŸ’‘ REPLACED gr.Gallery with gr.Files
thumbs_list = gr.Files(
label="Thumbnails Preview and Download (Click name for preview)",
file_count="multiple", # Allows multiple files
type="filepath", # Returns the path, which is what we need
visible=True # Ensure it's visible initially
)
download_btn = gr.File(label="Download All Thumbnails (ZIP)")
start_btn.click(
fetch_and_zip_progress,
inputs=[url_input, max_videos_slider, page_selector],
# πŸ’‘ Updated output targets to match the new return values
outputs=[status_output, thumbs_list, download_btn, download_btn]
# Note: Added download_btn twice as the generator yields 4 items,
# but the last one is a gr.File update to hide/show the component.
# This is a slightly awkward necessity of Gradio's generator API.
)
if __name__ == "__main__":
demo.launch()