import cv2 import requests import tempfile import os from urllib.parse import urlparse, parse_qs import yt_dlp class VideoParser: def __init__(self): self.temp_dir = tempfile.mkdtemp() def download_youtube_video(self, url: str) -> str: """Download YouTube video and return local path""" ydl_opts = { 'format': 'worst[height<=480]/worst', 'outtmpl': os.path.join(self.temp_dir, '%(title)s.%(ext)s'), 'quiet': True, 'no_warnings': True, 'extract_flat': False, 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) return ydl.prepare_filename(info) def analyze_video_frames(self, video_path: str, sample_rate: int = 30): """Analyze video frames for object detection/counting""" cap = cv2.VideoCapture(video_path) frame_count = 0 results = [] while cap.isOpened(): ret, frame = cap.read() if not ret: break if frame_count % sample_rate == 0: # Basic frame analysis - you'd integrate with object detection here results.append({ 'frame': frame_count, 'timestamp': frame_count / cap.get(cv2.CAP_PROP_FPS), 'frame_data': frame }) frame_count += 1 cap.release() return results def extract_audio(self, video_path: str) -> str: """Extract audio from video for speech analysis""" audio_path = video_path.rsplit('.', 1)[0] + '.wav' # Use ffmpeg to extract audio import subprocess subprocess.run([ 'ffmpeg', '-i', video_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', audio_path, '-y' ], capture_output=True) return audio_path def get_youtube_metadata(self, url: str) -> dict: """Extract YouTube video metadata without downloading""" try: ydl_opts = { 'quiet': True, 'no_download': True, 'extract_flat': False } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) return { 'title': info.get('title', 'Unknown'), 'description': info.get('description', '')[:500], 'duration': info.get('duration', 0), 'view_count': info.get('view_count', 0), 'upload_date': info.get('upload_date', 'Unknown'), 'uploader': info.get('uploader', 'Unknown') } except Exception as e: return {'error': str(e)} def cleanup(self): """Clean up temporary files""" import shutil shutil.rmtree(self.temp_dir, ignore_errors=True)