| | import gradio as gr |
| | import clip |
| | import torch |
| | from qdrant_client import QdrantClient |
| | import subprocess |
| | import os |
| | import uuid |
| | import yt_dlp |
| |
|
| | |
| | device = "cuda" if torch.cuda.is_available() else "cpu" |
| | model, preprocess = clip.load("ViT-B/32", device=device) |
| |
|
| | |
| | client = QdrantClient( |
| | url="https://265484ec-5f64-40ec-a619-c7c9dffc2dd9.us-east-1-0.aws.cloud.qdrant.io:6333", |
| | api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.I2MgcVnOKkWmOXwFlqJqEqm6LFQIF4cjxU5up4wxwyw" |
| | ) |
| | COLLECTION_NAME = "video_segments" |
| |
|
| | |
| | CLIP_OUTPUT_DIR = "generated_clips" |
| | os.makedirs(CLIP_OUTPUT_DIR, exist_ok=True) |
| |
|
| | |
| | VIDEO_URLS = { |
| | "temp_video_0.mp4": 'https://www.youtube.com/watch?v=9CGGh6ivg68', |
| | "temp_video_1.mp4": 'https://www.youtube.com/watch?v=WXoOohWU28Y', |
| | "temp_video_2.mp4": 'https://www.youtube.com/watch?v=TV-DjM8242s', |
| | "temp_video_3.mp4": 'https://www.youtube.com/watch?v=rCVlIVKqqGE', |
| | "temp_video_4.mp4": 'https://www.youtube.com/watch?v=lb_5AdUpfuA', |
| | "temp_video_5.mp4": 'https://www.youtube.com/watch?v=FCQ-rih6cHY', |
| | "temp_video_6.mp4": 'https://www.youtube.com/watch?v=eQ6UE968Xe4', |
| | "temp_video_7.mp4": 'https://www.youtube.com/watch?v=eFgkZKhNUdM' |
| | } |
| |
|
| | DEFAULT_VIDEO_URL = VIDEO_URLS["temp_video_0.mp4"] |
| |
|
| | def extract_video_clip(video_url, start_time, end_time): |
| | """ |
| | Use yt-dlp and ffmpeg to extract a clip directly from YouTube. |
| | """ |
| | clip_name = f"clip_{uuid.uuid4().hex}.mp4" |
| | output_path = os.path.join(CLIP_OUTPUT_DIR, clip_name) |
| | duration = end_time - start_time |
| | |
| | print(f"[INFO] Attempting to extract clip from {video_url} ({start_time} - {end_time})") |
| | |
| | |
| | try: |
| | |
| | ydl_opts = { |
| | 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', |
| | 'quiet': True |
| | } |
| | |
| | with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| | info = ydl.extract_info(video_url, download=False) |
| | formats = info.get('formats', [info]) |
| | |
| | |
| | best_url = None |
| | for f in formats: |
| | if f.get('ext') == 'mp4' and f.get('url'): |
| | best_url = f['url'] |
| | break |
| | |
| | if not best_url and info.get('url'): |
| | best_url = info['url'] |
| | |
| | if not best_url: |
| | print("[WARN] Could not find a suitable direct URL") |
| | raise Exception("No suitable URL found") |
| | |
| | |
| | command = [ |
| | "ffmpeg", |
| | "-ss", str(start_time), |
| | "-i", best_url, |
| | "-t", str(duration), |
| | "-c:v", "libx264", |
| | "-c:a", "aac", |
| | "-preset", "ultrafast", |
| | output_path, |
| | "-y" |
| | ] |
| | |
| | print(f"[INFO] Running ffmpeg command: {' '.join(command)}") |
| | result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| | |
| | if result.returncode != 0: |
| | print(f"[WARN] ffmpeg command failed with code {result.returncode}") |
| | print(f"[WARN] stderr: {result.stderr.decode('utf-8')}") |
| | raise Exception(f"ffmpeg failed with code {result.returncode}") |
| | |
| | if os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
| | print(f"[INFO] Successfully extracted clip to {output_path}") |
| | return output_path |
| | else: |
| | print(f"[WARN] Output file missing or empty: {output_path}") |
| | raise Exception("Output file missing or empty") |
| | |
| | except Exception as e: |
| | print(f"[ERROR] Method 1 failed: {str(e)}") |
| | |
| | |
| | try: |
| | print("[INFO] Trying Method 2: Download full video first") |
| | temp_video = os.path.join(CLIP_OUTPUT_DIR, f"temp_{uuid.uuid4().hex}.mp4") |
| | |
| | ydl_opts = { |
| | 'format': 'best[ext=mp4]/best', |
| | 'outtmpl': temp_video, |
| | 'quiet': True |
| | } |
| | |
| | with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| | ydl.download([video_url]) |
| | |
| | if os.path.exists(temp_video) and os.path.getsize(temp_video) > 0: |
| | |
| | command = [ |
| | "ffmpeg", |
| | "-ss", str(start_time), |
| | "-i", temp_video, |
| | "-t", str(duration), |
| | "-c:v", "copy", |
| | "-c:a", "copy", |
| | output_path, |
| | "-y" |
| | ] |
| | |
| | print(f"[INFO] Running ffmpeg command: {' '.join(command)}") |
| | result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| | |
| | |
| | try: |
| | os.remove(temp_video) |
| | print(f"[INFO] Removed temporary file {temp_video}") |
| | except Exception as cleanup_error: |
| | print(f"[WARN] Failed to remove temp file: {cleanup_error}") |
| | |
| | if result.returncode != 0: |
| | print(f"[WARN] ffmpeg command failed with code {result.returncode}") |
| | print(f"[WARN] stderr: {result.stderr.decode('utf-8')}") |
| | raise Exception(f"ffmpeg failed with code {result.returncode}") |
| | |
| | if os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
| | print(f"[INFO] Successfully extracted clip to {output_path}") |
| | return output_path |
| | else: |
| | print(f"[WARN] Output file missing or empty: {output_path}") |
| | raise Exception("Output file missing or empty") |
| | |
| | except Exception as e: |
| | print(f"[ERROR] Method 2 failed: {str(e)}") |
| | |
| | |
| | print("[ERROR] All extraction methods failed") |
| | return None |
| |
|
| | def time_to_seconds(time_str): |
| | h, m, s = time_str.split(':') |
| | return int(h) * 3600 + int(m) * 60 + float(s) |
| |
|
| | def search_and_clip_video(text_query: str): |
| | print(f"[INFO] Searching for: {text_query}") |
| |
|
| | |
| | wrapper = "<div style='width:100%; max-width:720px; height:405px; margin:auto;'>{}</div>" |
| |
|
| | |
| | with torch.no_grad(): |
| | text_tokens = clip.tokenize([text_query]).to(device) |
| | text_features = model.encode_text(text_tokens) |
| | text_features /= text_features.norm(dim=1, keepdim=True) |
| |
|
| | search_result = client.search( |
| | collection_name=COLLECTION_NAME, |
| | query_vector=text_features.cpu().numpy()[0].tolist(), |
| | limit=1, |
| | ) |
| |
|
| | if not search_result: |
| | print("[WARN] No result found.") |
| | return wrapper.format("<p style='text-align:center; padding-top:180px;'>No matching video found.</p>") |
| |
|
| | hit = search_result[0] |
| | start = hit.payload.get("start", 0) |
| | end = hit.payload.get("end", 0) |
| | start = time_to_seconds(start) if isinstance(start, str) else float(start) |
| | end = time_to_seconds(end) if isinstance(end, str) else float(end) |
| | video_filename = hit.payload.get("video_path", "temp_video_0.mp4") |
| | video_url = VIDEO_URLS.get(video_filename, DEFAULT_VIDEO_URL) |
| |
|
| | embed_url = video_url.replace("watch?v=", "embed/") + f"?start={int(start)}&end={int(end)}&autoplay=1" |
| |
|
| | iframe = f""" |
| | <iframe width="100%" height="100%" |
| | src="{embed_url}" |
| | frameborder="0" |
| | allow="autoplay; encrypted-media" |
| | allowfullscreen></iframe> |
| | """ |
| | return wrapper.format(iframe) |
| | |
| | def get_test_video(): |
| | print("[INFO] Returning test YouTube URL") |
| | return DEFAULT_VIDEO_URL |
| |
|
| | |
| | search_demo = gr.Interface( |
| | fn=search_and_clip_video, |
| | inputs=gr.Textbox(label="Enter search query", value="sample query"), |
| | |
| | outputs=gr.HTML(label="YouTube Clip"), |
| | title="🎥 Semantic Video Search with Clip Extraction", |
| | description="Returns a clipped video segment matching your query." |
| | ) |
| |
|
| | test_demo = gr.Interface( |
| | fn=get_test_video, |
| | inputs=None, |
| | outputs=gr.Video(label="Test Video"), |
| | title="Simple Video Test", |
| | description="Always displays the default video to verify video player works." |
| | ) |
| |
|
| | demo = gr.TabbedInterface( |
| | [search_demo, test_demo], |
| | ["Search Video", "Test Video Player"] |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch(share=True) |