import gradio as gr import clip import torch from qdrant_client import QdrantClient import subprocess import os import uuid import yt_dlp # Setup CLIP device = "cuda" if torch.cuda.is_available() else "cpu" model, preprocess = clip.load("ViT-B/32", device=device) # Setup Qdrant client = QdrantClient( url="https://265484ec-5f64-40ec-a619-c7c9dffc2dd9.us-east-1-0.aws.cloud.qdrant.io:6333", api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.I2MgcVnOKkWmOXwFlqJqEqm6LFQIF4cjxU5up4wxwyw" ) COLLECTION_NAME = "video_segments" # Paths CLIP_OUTPUT_DIR = "generated_clips" os.makedirs(CLIP_OUTPUT_DIR, exist_ok=True) # YouTube URLs mapping VIDEO_URLS = { "temp_video_0.mp4": 'https://www.youtube.com/watch?v=9CGGh6ivg68', "temp_video_1.mp4": 'https://www.youtube.com/watch?v=WXoOohWU28Y', "temp_video_2.mp4": 'https://www.youtube.com/watch?v=TV-DjM8242s', "temp_video_3.mp4": 'https://www.youtube.com/watch?v=rCVlIVKqqGE', "temp_video_4.mp4": 'https://www.youtube.com/watch?v=lb_5AdUpfuA', "temp_video_5.mp4": 'https://www.youtube.com/watch?v=FCQ-rih6cHY', "temp_video_6.mp4": 'https://www.youtube.com/watch?v=eQ6UE968Xe4', "temp_video_7.mp4": 'https://www.youtube.com/watch?v=eFgkZKhNUdM' } DEFAULT_VIDEO_URL = VIDEO_URLS["temp_video_0.mp4"] def extract_video_clip(video_url, start_time, end_time): """ Use yt-dlp and ffmpeg to extract a clip directly from YouTube. """ clip_name = f"clip_{uuid.uuid4().hex}.mp4" output_path = os.path.join(CLIP_OUTPUT_DIR, clip_name) duration = end_time - start_time print(f"[INFO] Attempting to extract clip from {video_url} ({start_time} - {end_time})") # Method 1: Use youtube-dl with ffmpeg try: # First, get the best direct URL from youtube-dl ydl_opts = { 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', 'quiet': True } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(video_url, download=False) formats = info.get('formats', [info]) # Get the best format URL best_url = None for f in formats: if f.get('ext') == 'mp4' and f.get('url'): best_url = f['url'] break if not best_url and info.get('url'): best_url = info['url'] if not best_url: print("[WARN] Could not find a suitable direct URL") raise Exception("No suitable URL found") # Use ffmpeg to download the segment command = [ "ffmpeg", "-ss", str(start_time), "-i", best_url, "-t", str(duration), "-c:v", "libx264", "-c:a", "aac", "-preset", "ultrafast", output_path, "-y" ] print(f"[INFO] Running ffmpeg command: {' '.join(command)}") result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: print(f"[WARN] ffmpeg command failed with code {result.returncode}") print(f"[WARN] stderr: {result.stderr.decode('utf-8')}") raise Exception(f"ffmpeg failed with code {result.returncode}") if os.path.exists(output_path) and os.path.getsize(output_path) > 0: print(f"[INFO] Successfully extracted clip to {output_path}") return output_path else: print(f"[WARN] Output file missing or empty: {output_path}") raise Exception("Output file missing or empty") except Exception as e: print(f"[ERROR] Method 1 failed: {str(e)}") # Method 2: Download whole video first try: print("[INFO] Trying Method 2: Download full video first") temp_video = os.path.join(CLIP_OUTPUT_DIR, f"temp_{uuid.uuid4().hex}.mp4") ydl_opts = { 'format': 'best[ext=mp4]/best', 'outtmpl': temp_video, 'quiet': True } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([video_url]) if os.path.exists(temp_video) and os.path.getsize(temp_video) > 0: # Extract clip from the downloaded video command = [ "ffmpeg", "-ss", str(start_time), "-i", temp_video, "-t", str(duration), "-c:v", "copy", "-c:a", "copy", output_path, "-y" ] print(f"[INFO] Running ffmpeg command: {' '.join(command)}") result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Clean up temp file regardless of success try: os.remove(temp_video) print(f"[INFO] Removed temporary file {temp_video}") except Exception as cleanup_error: print(f"[WARN] Failed to remove temp file: {cleanup_error}") if result.returncode != 0: print(f"[WARN] ffmpeg command failed with code {result.returncode}") print(f"[WARN] stderr: {result.stderr.decode('utf-8')}") raise Exception(f"ffmpeg failed with code {result.returncode}") if os.path.exists(output_path) and os.path.getsize(output_path) > 0: print(f"[INFO] Successfully extracted clip to {output_path}") return output_path else: print(f"[WARN] Output file missing or empty: {output_path}") raise Exception("Output file missing or empty") except Exception as e: print(f"[ERROR] Method 2 failed: {str(e)}") # Both methods failed print("[ERROR] All extraction methods failed") return None def time_to_seconds(time_str): h, m, s = time_str.split(':') return int(h) * 3600 + int(m) * 60 + float(s) def search_and_clip_video(text_query: str): print(f"[INFO] Searching for: {text_query}") # Fixed-size wrapper wrapper = "
{}
" # Encode query with torch.no_grad(): text_tokens = clip.tokenize([text_query]).to(device) text_features = model.encode_text(text_tokens) text_features /= text_features.norm(dim=1, keepdim=True) search_result = client.search( collection_name=COLLECTION_NAME, query_vector=text_features.cpu().numpy()[0].tolist(), limit=1, ) if not search_result: print("[WARN] No result found.") return wrapper.format("

No matching video found.

") hit = search_result[0] start = hit.payload.get("start", 0) end = hit.payload.get("end", 0) start = time_to_seconds(start) if isinstance(start, str) else float(start) end = time_to_seconds(end) if isinstance(end, str) else float(end) video_filename = hit.payload.get("video_path", "temp_video_0.mp4") video_url = VIDEO_URLS.get(video_filename, DEFAULT_VIDEO_URL) embed_url = video_url.replace("watch?v=", "embed/") + f"?start={int(start)}&end={int(end)}&autoplay=1" iframe = f""" """ return wrapper.format(iframe) # Function to get a test video def get_test_video(): print("[INFO] Returning test YouTube URL") return DEFAULT_VIDEO_URL # Gradio Interfaces search_demo = gr.Interface( fn=search_and_clip_video, inputs=gr.Textbox(label="Enter search query", value="sample query"), # outputs=gr.Video(label="Video Result"),\ outputs=gr.HTML(label="YouTube Clip"), title="🎥 Semantic Video Search with Clip Extraction", description="Returns a clipped video segment matching your query." ) test_demo = gr.Interface( fn=get_test_video, inputs=None, outputs=gr.Video(label="Test Video"), title="Simple Video Test", description="Always displays the default video to verify video player works." ) demo = gr.TabbedInterface( [search_demo, test_demo], ["Search Video", "Test Video Player"] ) if __name__ == "__main__": demo.launch(share=True)