GeminiRAG / src /utils /download_video.py
TorchLLM's picture
Initial commit for deploying the project
d9e3edb
import logging
import os
import random
import time
from typing import Any, Dict, List, Optional
import requests
import yt_dlp
from bs4 import BeautifulSoup
from pytube import YouTube
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class ContentDownloader:
def __init__(self, download_path: str = "./downloads/"):
self.download_path = download_path
self.create_download_directory()
def create_download_directory(self) -> None:
"""Create download directory if it doesn't exist."""
os.makedirs(self.download_path, exist_ok=True)
def _get_available_formats(self, url: str) -> List[Dict]:
"""Get list of available formats for a YouTube video."""
ydl_opts = {"quiet": True, "no_warnings": True, "extract_flat": True}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
formats = info.get("formats", [])
# Filter for formats that have both video and audio
combined_formats = [
f
for f in formats
if f.get("vcodec") != "none" and f.get("acodec") != "none"
]
return combined_formats
except Exception as e:
logger.error(f"Error getting formats: {str(e)}")
return []
def download_youtube_content(
self, url: str, download_audio: bool = False
) -> Optional[str]:
"""
Download YouTube content with automatic format selection.
"""
if download_audio:
ydl_opts = {
"outtmpl": os.path.join(self.download_path, "%(title)s.%(ext)s"),
"format": "bestaudio/best",
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
}
],
}
else:
# Get available formats first
formats = self._get_available_formats(url)
if not formats:
logger.error("No suitable formats found")
return None
# Configure options for video download
ydl_opts = {
"outtmpl": os.path.join(self.download_path, "%(title)s.%(ext)s"),
"format": "bv*[ext=mp4]+ba[ext=m4a]/b[ext=mp4] / bv*+ba/b", # Prefer MP4 format
"merge_output_format": "mp4",
"postprocessors": [
{
"key": "FFmpegVideoRemuxer",
"preferedformat": "mp4",
}
],
"quiet": False,
"no_warnings": False,
"max_filesize": 2048 * 1024 * 1024, # 2GB max
"geo_bypass": True,
"nocheckcertificate": True,
"http_headers": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-us,en;q=0.5",
"Sec-Fetch-Mode": "navigate",
},
}
try:
# First update yt-dlp
os.system("yt-dlp -U")
# Attempt download with yt-dlp
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
logger.info("Attempting download with yt-dlp...")
ydl.download([url])
return self.download_path
except Exception as e:
logger.warning(f"yt-dlp download failed: {str(e)}")
logger.info("Attempting fallback to direct stream download...")
return self._download_with_direct_stream(url)
def _download_with_direct_stream(
self, url: str, max_retries: int = 3
) -> Optional[str]:
"""Alternative download method using direct stream access."""
for attempt in range(max_retries):
try:
if attempt > 0:
time.sleep(random.uniform(2, 5))
yt = YouTube(url)
# Sort streams by both resolution and bitrate
streams = yt.streams.filter(progressive=True, file_extension="mp4")
stream = streams.order_by("resolution").desc().first()
if stream:
# Add random query parameter to avoid caching
timestamp = int(time.time())
stream.url = f"{stream.url}&_={timestamp}"
file_path = stream.download(
output_path=self.download_path,
filename_prefix=f"video_{timestamp}_",
)
logger.info(f"Successfully downloaded to: {file_path}")
return file_path
else:
logger.error("No suitable stream found")
return None
except Exception as e:
logger.error(f"Download attempt {attempt + 1} failed: {str(e)}")
if attempt == max_retries - 1:
logger.error("All download attempts failed")
return None
def downlaod_video_from_url(youtube_url="", download_path="./downloads/"):
# Update yt-dlp first
os.system("yt-dlp -U")
downloader = ContentDownloader(download_path=download_path)
# First, check available formats
formats = downloader._get_available_formats(youtube_url)
if formats:
print("\nAvailable formats:")
for f in formats:
print(
f"Format ID: {f.get('format_id')} - "
f"Resolution: {f.get('resolution')} - "
f"Filesize: {f.get('filesize_approx', 'unknown')} bytes"
)
# Download video with audio
video_path = downloader.download_youtube_content(youtube_url)
if video_path:
print(f"\nVideo downloaded to: {video_path}")
else:
print("\nDownload failed")