""" Viral Clip Extractor API Deploy to Render for YouTube viral clip analysis """ from flask import Flask, request, jsonify, send_file, after_this_request from flask_cors import CORS import yt_dlp import requests import google.generativeai as genai from openai import OpenAI import json import re from io import StringIO import os import re from typing import Dict, List, Optional, Any from api.proxy_manager import ProxyManager app = Flask(__name__) CORS(app) # ============== CONFIGURATION ============== # Constants DEFAULT_CLIP_LENGTH = 40 # seconds DEFAULT_WINDOW_STEP = 30 # Increased from 10 for faster processing DEFAULT_MIN_CLIP_LENGTH = 25 DEFAULT_MAX_CLIP_LENGTH = 60 MAX_CANDIDATES = 10 # Limit candidates for faster AI processing HOOK_WORDS = [ "wait", "shocking", "unbelievable", "secret", "crazy", "exposed", "omg", "wtf", "plot twist", "insane", "cheating", "stunning", "mind blowing", "truth", "lie", "revealed", "hidden", "discover", "amazing", "incredible", "must see", "urgent" ] EMOTION_WORDS = [ "laugh", "cry", "angry", "shocked", "surprised", "scream", "love", "hate", "disgusted", "horrified", "amazed", "excited", "sad", "happy", "furious" ] # ============== UTILITY FUNCTIONS ============== def extract_video_id(url: str) -> Optional[str]: """Extract YouTube video ID from various URL formats""" patterns = [ r'(?:youtube\.com\/watch\?v=|youtu\.be\/|youtube\.com\/embed\/|youtube\.com\/v\/|youtube\.com\/shorts\/)([a-zA-Z0-9_-]{11})', r'^([a-zA-Z0-9_-]{11})$' ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return None def time_to_seconds(t) -> float: """Convert time string to seconds""" if isinstance(t, (int, float)): return float(t) if ':' in str(t): parts = list(map(int, str(t).split(':'))) if len(parts) == 3: return parts[0] * 3600 + parts[1] * 60 + parts[2] elif len(parts) == 2: return parts[0] * 60 + parts[1] elif len(parts) == 1: return parts[0] return float(t) def seconds_to_time(s: float) -> str: """Convert seconds to MM:SS format""" minutes = int(s // 60) seconds = int(s % 60) return f"{minutes}:{seconds:02d}" def parse_vtt_content(vtt_text: str) -> List[Dict]: """Parse VTT subtitle content""" segments = [] lines = vtt_text.strip().split('\n') i = 0 while i < len(lines) and not '-->' in lines[i]: i += 1 current_text = [] current_start = None current_end = None while i < len(lines): line = lines[i].strip() if '-->' in line: if current_text and current_start is not None: segments.append({ 'start': current_start, 'end': current_end, 'text': ' '.join(current_text) }) times = line.split('-->') if len(times) == 2: start_str = times[0].strip().split('.')[0] end_str = times[1].strip().split()[0].split('.')[0] current_start = time_to_seconds(start_str) current_end = time_to_seconds(end_str) current_text = [] elif line and not line.startswith('NOTE') and not line.startswith('STYLE'): clean_line = re.sub(r'<[^>]+>', '', line) if clean_line: current_text.append(clean_line) i += 1 if current_text and current_start is not None: segments.append({ 'start': current_start, 'end': current_end, 'text': ' '.join(current_text) }) return segments import sys import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ============== CORE CLASS ============== class ViralClipExtractor: def __init__(self): self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', } # Robust FFmpeg path resolution import shutil project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # 1. Check local ffmpeg folder (Windows/Dev) ffmpeg_path_local = os.path.join(project_root, "ffmpeg", "bin", "ffmpeg.exe") # 2. Check system PATH (Linux/Render/Docker) ffmpeg_path_system = shutil.which("ffmpeg") ffmpeg_location = None ffmpeg_dir = None if os.path.exists(ffmpeg_path_local): ffmpeg_location = ffmpeg_path_local ffmpeg_dir = os.path.dirname(ffmpeg_location) if ffmpeg_dir not in os.environ["PATH"]: os.environ["PATH"] += os.pathsep + ffmpeg_dir logger.info(f"Using Local FFmpeg: {ffmpeg_location}") elif ffmpeg_path_system: # If on system path, we don't need to specify location for yt-dlp usually, # but getting the dir is good practice ffmpeg_location = ffmpeg_path_system ffmpeg_dir = os.path.dirname(ffmpeg_location) logger.info(f"Using System FFmpeg: {ffmpeg_location}") else: logger.warning("FFmpeg NOT FOUND! Clip cutting will fail.") # Proxy configuration (for bypassing YouTube datacenter IP blocks) proxy_url = os.environ.get('PROXY_URL') # Single proxy or host:port:user:pass proxy_api_url = os.environ.get('PROXY_API_URL') # WebShare API URL # Initialize ProxyManager self.proxy_manager = ProxyManager(proxy_url=proxy_url, api_url=proxy_api_url) self.base_ydl_opts = { 'quiet': False, 'no_warnings': False, 'extractor_args': { 'youtube': { 'player_client': ['android_testsuite', 'mediaconnect'], # Bypass bot detection } }, 'http_headers': self.headers, 'extractor_retries': 3, # Performance optimizations 'socket_timeout': 30, # Faster timeout 'concurrent_fragment_downloads': 4, # Download fragments in parallel 'http_chunk_size': 10485760, # 10MB chunks for better proxy performance 'retries': 3, # 'ffmpeg_location': ffmpeg_dir # Removed, relying on PATH } # Bandwidth tracking self.last_bandwidth_used = 0 self.last_proxy_used = None if ffmpeg_dir: logger.info(f"Added FFmpeg to PATH: {ffmpeg_dir}") else: logger.warning("FFmpeg NOT FOUND! Clip cutting will fail.") def _get_ydl_opts_with_proxy(self, **extra_opts): """Get ydl_opts with current proxy from ProxyManager""" opts = {**self.base_ydl_opts, **extra_opts} # Get next proxy from manager proxy = self.proxy_manager.get_next_proxy() if proxy: opts['proxy'] = proxy self.last_proxy_used = proxy logger.info(f"Using proxy: {proxy.split('@')[1] if '@' in proxy else proxy}") return opts def _estimate_bandwidth(self, info_dict: dict) -> int: """Estimate bandwidth used based on yt-dlp info dict""" # Rough estimate based on formats and filesize try: filesize = info_dict.get('filesize') or info_dict.get('filesize_approx', 0) if filesize: return filesize # Fallback: estimate from duration and quality duration = info_dict.get('duration', 0) # Assume ~1MB per minute for metadata/transcript operations return int(duration * 1024 * 1024 / 60) if duration else 50000 # 50KB default except: return 50000 # 50KB default def download_clip(self, url: str, start: float, end: float, quality: str = "480") -> Optional[str]: """Download and cut a clip""" import uuid output_dir = "downloads" os.makedirs(output_dir, exist_ok=True) clip_id = f"clip_{uuid.uuid4().hex[:8]}" output_template = os.path.join(output_dir, f"{clip_id}.%(ext)s") # Prefer smaller, faster formats format_str = f'best[height<={quality}]/bestvideo[height<={quality}]+bestaudio/best' ydl_opts = self._get_ydl_opts_with_proxy( format=format_str, outtmpl=output_template, download_ranges=lambda info, ydl: [{ 'start_time': start, 'end_time': end }], force_keyframes_at_cuts=True ) try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) # Find the downloaded file for file in os.listdir(output_dir): if file.startswith(clip_id): return os.path.abspath(os.path.join(output_dir, file)) return None except Exception as e: with open("error.log", "a") as f: f.write(f"Download Error: {e}\n") logger.error(f"Error downloading clip: {e}") return None def analyze_with_gemini(self, transcript_segments: List[Dict], api_key: str) -> List[Dict]: """Analyze transcript using Gemini AI with parallel chunking""" import concurrent.futures genai.configure(api_key=api_key) model = genai.GenerativeModel('gemini-2.0-flash-exp') chunk_duration = 300 # 5 minutes overlap = 30 def process_chunk(chunk_segments, chunk_index): try: formatted_text = "" for seg in chunk_segments: formatted_text += f"[{int(seg['start'])}s] {seg['text']} " prompt = f""" Identify 1-2 viral clips from this transcript segment. Transcript: {formatted_text[:15000]} Return JSON array: [{{"start": number, "end": number, "viral_score": number, "reason": "string"}}] """ logger.info(f"Processing chunk {chunk_index} with Gemini...") response = model.generate_content(prompt) text = response.text text = re.sub(r"```json\s*", "", text) text = re.sub(r"```\s*", "", text) return json.loads(text) except Exception as e: logger.error(f"Gemini chunk {chunk_index} error: {e}") return [] # Create chunks chunks = [] if not transcript_segments: return [] full_duration = transcript_segments[-1]['end'] current_time = 0 while current_time < full_duration: chunk_end = current_time + chunk_duration chunk_segs = [s for s in transcript_segments if s['start'] >= current_time and s['start'] < chunk_end] if chunk_segs: chunks.append(chunk_segs) current_time += (chunk_duration - overlap) # Parallel Execution results = [] with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: infos = {executor.submit(process_chunk, c, i): i for i, c in enumerate(chunks)} for future in concurrent.futures.as_completed(infos): try: data = future.result() if isinstance(data, list): results.extend(data) except: pass results.sort(key=lambda x: x.get('viral_score', 0), reverse=True) return results[:5] def analyze_with_nvidia(self, transcript_segments: List[Dict], api_key: str) -> List[Dict]: """Analyze transcript using Nvidia/DeepSeek AI with parallel chunking""" import concurrent.futures chunk_duration = 300 # 5 minutes per chunk overlap = 30 # 30 seconds overlap # Helper to process a single chunk def process_chunk(chunk_segments, chunk_index): try: client = OpenAI( base_url="https://integrate.api.nvidia.com/v1", api_key=api_key ) formatted_text = "" for seg in chunk_segments: start = int(seg['start']) text = seg['text'] formatted_text += f"[{start}s] {text} " prompt = f""" Analyze this video segment and identify 1-2 most engaging viral clips (shorts/reels). Transcript Segment: {formatted_text[:15000]} Return raw JSON array (no markdown): [ {{ "start": , "end": , "viral_score": <0-100>, "reason": "" }} ] """ logger.info(f"Processing chunk {chunk_index} with Nvidia AI...") completion = client.chat.completions.create( model="deepseek-ai/deepseek-v3.2", messages=[{"role":"user","content":prompt}], temperature=0.2, max_tokens=1024, stream=False ) text = completion.choices[0].message.content text = re.sub(r"```json\s*", "", text) text = re.sub(r"```\s*", "", text) text = re.sub(r".*?", "", text, flags=re.DOTALL) return json.loads(text) except Exception as e: logger.error(f"Chunk {chunk_index} error: {e}") return [] # Create chunks chunks = [] if not transcript_segments: return [] full_duration = transcript_segments[-1]['end'] current_time = 0 while current_time < full_duration: chunk_end = current_time + chunk_duration # Filter segments for this chunk chunk_segs = [ s for s in transcript_segments if s['start'] >= current_time and s['start'] < chunk_end ] if chunk_segs: chunks.append(chunk_segs) current_time += (chunk_duration - overlap) # Process in parallel results = [] with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_chunk = {executor.submit(process_chunk, chunk, i): i for i, chunk in enumerate(chunks)} for future in concurrent.futures.as_completed(future_to_chunk): try: data = future.result() if isinstance(data, list): results.extend(data) except Exception as e: logger.error(f"Parallel processing error: {e}") # Sort by score and de-duplicate results.sort(key=lambda x: x.get('viral_score', 0), reverse=True) return results[:5] # Return top 5 def extract_video_info(self, url: str) -> Dict: """Get video metadata""" logger.info(f"Extracting video info for: {url}") ydl_opts = self._get_ydl_opts_with_proxy(simulate=True) try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) chapters = [] if 'chapters' in info and info['chapters']: for ch in info['chapters']: chapters.append({ "start": ch['start_time'], "end": ch.get('end_time') or ch['start_time'] + DEFAULT_CLIP_LENGTH, "title": ch.get('title', 'Untitled') }) logger.info(f"Video info extracted: {info.get('title')}") return { "id": info.get("id"), "title": info.get("title"), "uploader": info.get("uploader"), "duration": info.get("duration"), "description": info.get("description", "")[:500], "view_count": info.get("view_count"), "like_count": info.get("like_count"), "chapters": chapters } except Exception as e: logger.error(f"Error extracting video info: {e}") raise def fetch_full_transcript(self, url: str) -> List[Dict]: """Fetch full transcript once""" logger.info(f"Fetching full transcript for: {url}") ydl_opts = self._get_ydl_opts_with_proxy( skip_download=True, writesubtitles=True, writeautomaticsub=True, subtitleslangs=['en'] ) try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) sub_url = None if 'en' in info.get('subtitles', {}): sub_url = info['subtitles']['en'][-1].get('url') elif 'en' in info.get('automatic_captions', {}): sub_url = info['automatic_captions']['en'][-1].get('url') if not sub_url: logger.warning("No transcript found - no sub_url") return [] logger.info(f"Found subtitle URL: {sub_url[:100]}...") # Download subtitle with proxy support proxies_dict = {} if self.last_proxy_used: proxies_dict = { 'http': self.last_proxy_used, 'https': self.last_proxy_used } logger.info(f"Using proxy for subtitle download: {self.last_proxy_used.split('@')[1] if '@' in self.last_proxy_used else self.last_proxy_used}") else: logger.warning("No proxy available for subtitle download!") try: response = requests.get(sub_url, headers=self.headers, proxies=proxies_dict, timeout=15) response.raise_for_status() sub_text = response.text logger.info(f"Subtitle response length: {len(sub_text)} chars") logger.info(f"Subtitle sample: {sub_text[:200]}") except requests.exceptions.RequestException as e: logger.error(f"Subtitle download failed: {e}") return [] segments = [] if sub_url.endswith('.vtt') or 'vtt' in sub_url: segments = parse_vtt_content(sub_text) elif sub_url.endswith('.srt') or 'srt' in sub_url: # Reuse vtt logic as simple fallback segments = parse_vtt_content(sub_text) # If we have segments, return them if segments: return segments # Fallback to json3 if available/needed try: import json data = json.loads(sub_text) if 'events' in data: for event in data['events']: if 'segs' in event: start_time = event.get('tStartMs', 0) / 1000 duration = event.get('dDurationMs', 0) / 1000 text = ''.join(seg.get('utf8', '') for seg in event['segs']) segments.append({ 'start': start_time, 'end': start_time + duration, 'text': text }) except: pass return segments except Exception as e: logger.error(f"Transcript error: {e}") import traceback logger.error(traceback.format_exc()) return [] def get_transcript_text(self, segments: List[Dict], start: float, end: float) -> str: """Filter transcript from pre-fetched segments""" relevant = [ seg["text"] for seg in segments if seg["end"] >= start and seg["start"] <= end and seg["text"].strip() ] return " ".join(relevant) def get_transcript(self, url: str, start: float, end: float) -> str: """Legacy method for backward compatibility""" segments = self.fetch_full_transcript(url) return self.get_transcript_text(segments, start, end) def score_clip(self, url: str, start: float, end: float, transcript_segments: List[Dict] = None) -> Dict: """Calculate viral score for clip""" if transcript_segments: transcript = self.get_transcript_text(transcript_segments, start, end).lower() else: # Fallback transcript = self.get_transcript(url, start, end).lower() score = 0 reasons = [] # Hook words (up to 50 points) hooks = [w for w in HOOK_WORDS if w in transcript] hook_score = min(len(hooks) * 10, 50) score += hook_score if hooks: reasons.append(f"Hooks: {', '.join(hooks[:3])}") # Emotion words (up to 30 points) emotions = [w for w in EMOTION_WORDS if w in transcript] emotion_score = min(len(emotions) * 6, 30) score += emotion_score if emotions: reasons.append(f"Emotion: {', '.join(emotions[:2])}") # Conflict indicators (20 points) conflict_words = ["but", "however", "argument", "fight", "wrong", "disagree"] if any(w in transcript for w in conflict_words): score += 20 reasons.append("Conflict detected") # Duration bonus duration = end - start if DEFAULT_MIN_CLIP_LENGTH <= duration <= DEFAULT_MAX_CLIP_LENGTH: score += 10 reasons.append("Optimal duration") elif duration < DEFAULT_MIN_CLIP_LENGTH: score -= 10 reasons.append("Short clip") elif duration > DEFAULT_MAX_CLIP_LENGTH: score -= 5 reasons.append("Long clip") # Transcript quality bonus word_count = len(transcript.split()) if word_count > 10: score += 5 reasons.append("Good content density") return { "start": start, "end": end, "duration": round(duration, 1), "viral_score": max(0, min(score, 100)), "reasons": reasons, "transcript_preview": transcript[:150] + "..." if len(transcript) > 150 else transcript, "word_count": word_count } def generate_candidates(self, video_info: Dict, clip_length: int = None, window_step: int = None) -> List[Dict]: """Generate candidate clips""" duration = video_info.get("duration", 0) clip_length = clip_length or DEFAULT_CLIP_LENGTH window_step = window_step or DEFAULT_WINDOW_STEP candidates = [] # Use chapters if available if video_info.get("chapters"): logger.info(f"Using {len(video_info['chapters'])} chapters for candidates") for ch in video_info["chapters"]: clip_start = ch["start"] clip_end = min(ch["end"], clip_start + clip_length) if clip_end - clip_start >= DEFAULT_MIN_CLIP_LENGTH: candidates.append({ "start": clip_start, "end": clip_end, "title": ch.get("title", "Untitled") }) else: # Sliding windows logger.info("Using sliding windows for candidates") for start in range(0, int(duration) - clip_length, window_step): candidates.append({ "start": start, "end": min(start + clip_length, duration) }) return candidates[:MAX_CANDIDATES] # Limit candidates for speed # ============== API ROUTES ============== @app.route('/', methods=['GET']) def index(): """Root endpoint - API info""" return jsonify({ "name": "Viral Clip Extractor API", "version": "1.3.1 (AI Enabled + Fix)", "description": "Extract and analyze viral clips from YouTube videos", "quick_start": { "endpoint": "/clips", "method": "GET", "usage": "/clips?url=YOUTUBE_URL&quality=720&seconds=40&num=5&mode=ai&gemini_key=YOUR_KEY", "example": "/clips?url=https://youtube.com/watch?v=dQw4w9WgXcQ&seconds=30&num=3" }, "endpoints": { "/clips": "GET - Extract clips (Heuristic or AI)", "/health": "Health check", "/analyze": "POST - Full analysis", "/extract": "POST - Extract video info only", "/download_file": "GET - Download clip file" } }) @app.route('/proxy', methods=['GET', 'POST']) def proxy_management(): """Proxy management endpoint with web UI""" global extractor if request.method == 'POST': # Handle proxy configuration updates try: data = request.json or request.form action = data.get('action') if action == 'add_single': proxy_str = data.get('proxy') if proxy_str: extractor.proxy_manager.add_proxy(proxy_str) return jsonify({"success": True, "message": "Proxy added successfully"}) elif action == 'add_api': api_url = data.get('api_url') if api_url: extractor.proxy_manager.refresh_proxies(api_url) return jsonify({"success": True, "message": f"Loaded {len(extractor.proxy_manager.proxies)} proxies from API"}) elif action == 'get_stats': stats = extractor.proxy_manager.get_stats() return jsonify({"success": True, "stats": stats}) return jsonify({"success": False, "error": "Invalid action"}) except Exception as e: return jsonify({"success": False, "error": str(e)}) # GET request - return web UI stats = extractor.proxy_manager.get_stats() html = f""" Proxy Management - Viral Clip Extractor

🔒 Proxy Management

Configure and monitor proxy settings for YouTube access

Total Proxies

{stats['total_proxies']}
Active proxies in rotation

Bandwidth Used

{stats['total_bandwidth_mb']} MB
Out of 1024 MB limit
{round((stats['total_bandwidth_mb'] / 1024) * 100, 1)}%

Remaining

{stats['bandwidth_remaining_mb']} MB
Available bandwidth

Current Index

{stats['current_proxy_index']}
Next proxy in rotation

➕ Add Proxy

Example: 31.59.20.176:6754:nntlrciu:sx2noxvkj6y7

Paste your WebShare API URL to load all proxies automatically

📋 Active Proxies

{"".join([f'''
{p['proxy']}
{p['bandwidth_mb']} MB
''' for p in stats['per_proxy_stats']])}
{f'

No proxies configured yet

' if stats['total_proxies'] == 0 else ''}
""" return html @app.route('/health', methods=['GET']) def health(): """Health check endpoint""" return jsonify({ "status": "healthy", "service": "viral-clip-extractor" }) @app.route('/clips', methods=['GET']) def get_clips(): """ Get viral clips (Heuristic, Gemini AI, or Nvidia AI) """ try: url = request.args.get('url') if not url: return jsonify({"error": "Missing 'url' parameter"}), 400 quality = request.args.get('quality', '720') seconds = int(request.args.get('seconds', DEFAULT_CLIP_LENGTH)) num_clips = int(request.args.get('num', 5)) mode = request.args.get('mode', 'heuristic') # 'heuristic', 'ai' (defaults to gemini), 'nvidia' ai_provider = request.args.get('ai_provider', 'gemini') # 'gemini' or 'nvidia' # Handle API Keys nvidia_key = request.args.get('nvidia_key') or os.environ.get('NVIDIA_API_KEY') gemini_key = request.args.get('gemini_key') or os.environ.get('GEMINI_API_KEY') # Unified key handling if user passes 'api_key' generic param generic_key = request.args.get('api_key') if generic_key: if mode == 'nvidia' or ai_provider == 'nvidia': nvidia_key = generic_key else: gemini_key = generic_key extractor = ViralClipExtractor() # 1. Get video info video_info = extractor.extract_video_info(url) video_id = extract_video_id(url) if not video_id: return jsonify({"error": "Invalid YouTube URL"}), 400 # 2. Fetch transcript ONCE transcript_segments = extractor.fetch_full_transcript(url) scored_clips = [] if mode == 'ai' or mode == 'nvidia' or request.args.get('ai_provider'): # Determine provider provider = 'nvidia' if (mode == 'nvidia' or ai_provider == 'nvidia') else 'gemini' try: if provider == 'nvidia': if not nvidia_key: return jsonify({"error": "Nvidia API Key required"}), 400 logger.info("Using Nvidia DeepSeek for analysis...") scored_clips = extractor.analyze_with_nvidia(transcript_segments, nvidia_key) else: if not gemini_key: return jsonify({"error": "Gemini API Key required"}), 400 logger.info("Using Gemini AI for analysis...") scored_clips = extractor.analyze_with_gemini(transcript_segments, gemini_key) # Normalize response for clip in scored_clips: clip['title'] = clip.get('reason', f'{provider.title()} Selected Clip') clip.setdefault('start', 0) clip.setdefault('end', clip['start'] + seconds) clip.setdefault('viral_score', 80) except Exception as e: return jsonify({"error": f"AI Analysis Failed ({provider}): {str(e)}"}), 500 else: # Heuristic candidates = extractor.generate_candidates(video_info, seconds, DEFAULT_WINDOW_STEP) logger.info(f"Scoring {len(candidates)} candidates...") for i, cand in enumerate(candidates): score_data = extractor.score_clip(url, cand['start'], cand['end'], transcript_segments) score_data['title'] = cand.get('title', f'Clip {i+1}') scored_clips.append(score_data) scored_clips.sort(key=lambda x: x['viral_score'], reverse=True) # Select top non-overlapping clips final_clips = [] used_ranges = [] for clip in scored_clips: overlaps = False for used in used_ranges: if not (clip['end'] <= used['start'] or clip['start'] >= used['end']): overlaps = True break if not overlaps: final_clips.append(clip) used_ranges.append(clip) if len(final_clips) >= num_clips: break import urllib.parse encoded_url = urllib.parse.quote(url) for clip in final_clips: # Ensure start/end are floats clip['start'] = float(clip['start']) clip['end'] = float(clip['end']) clip['start_formatted'] = seconds_to_time(clip['start']) clip['end_formatted'] = seconds_to_time(clip['end']) clip['youtube_url'] = f"https://youtube.com/watch?v={video_id}&t={int(clip['start'])}" clip['download_link'] = f"/download_file?url={encoded_url}&start={clip['start']}&end={clip['end']}&quality={quality}" return jsonify({ "success": True, "video_id": video_id, "video_title": video_info.get('title'), "video_duration": video_info.get('duration'), "mode": mode, "clips": final_clips, "clips_count": len(final_clips) }) except Exception as e: logger.error(f"Error in /clips: {e}") return jsonify({ "success": False, "error": str(e) }), 500 @app.route('/debug_video', methods=['GET']) def debug_video(): """Debug endpoint to check transcript and AI analysis""" try: url = request.args.get('url') if not url: return jsonify({"error": "Missing 'url' parameter"}), 400 nvidia_key = request.args.get('nvidia_key') or os.environ.get('NVIDIA_API_KEY') extractor = ViralClipExtractor() debug_info = { "step": "init", "proxy_manager_proxies": len(extractor.proxy_manager.proxies), "last_proxy_used": None, "sub_url": None, "sub_response_status": None, "sub_response_length": None, "sub_response_sample": None, "available_subtitles": None, "available_auto_captions": None } # 1. Get video info debug_info["step"] = "extract_video_info" video_info = extractor.extract_video_info(url) debug_info["last_proxy_used_after_video_info"] = extractor.last_proxy_used # 2. Manually debug transcript fetching debug_info["step"] = "manual_subtitle_debug" ydl_opts = extractor._get_ydl_opts_with_proxy( skip_download=True, writesubtitles=True, writeautomaticsub=True, subtitleslangs=['en'] ) debug_info["last_proxy_used_for_transcript"] = extractor.last_proxy_used try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) # Get available subtitles debug_info["available_subtitles"] = list(info.get('subtitles', {}).keys()) debug_info["available_auto_captions"] = list(info.get('automatic_captions', {}).keys())[:10] # Limit to 10 sub_url = None if 'en' in info.get('subtitles', {}): sub_url = info['subtitles']['en'][-1].get('url') debug_info["sub_source"] = "subtitles" elif 'en' in info.get('automatic_captions', {}): sub_url = info['automatic_captions']['en'][-1].get('url') debug_info["sub_source"] = "automatic_captions" debug_info["sub_url"] = sub_url[:150] if sub_url else None if sub_url: # Try to download subtitle proxies_dict = {} if extractor.last_proxy_used: proxies_dict = { 'http': extractor.last_proxy_used, 'https': extractor.last_proxy_used } try: response = requests.get(sub_url, headers=extractor.headers, proxies=proxies_dict, timeout=15) debug_info["sub_response_status"] = response.status_code debug_info["sub_response_length"] = len(response.text) debug_info["sub_response_sample"] = response.text[:300] if response.text else None except Exception as e: debug_info["sub_download_error"] = str(e) except Exception as e: debug_info["ydl_error"] = str(e) # 3. Now call normal transcript function transcript_segments = extractor.fetch_full_transcript(url) debug_info["video_id"] = extract_video_id(url) debug_info["video_title"] = video_info.get('title') debug_info["video_duration"] = video_info.get('duration') debug_info["transcript_segments_count"] = len(transcript_segments) debug_info["transcript_sample"] = transcript_segments[:3] if transcript_segments else [] debug_info["nvidia_key_present"] = bool(nvidia_key) return jsonify(debug_info) except Exception as e: import traceback return jsonify({"error": str(e), "traceback": traceback.format_exc()}), 500 @app.route('/extract', methods=['POST']) def extract_video(): """Extract basic video information""" try: data = request.get_json() if not data or 'url' not in data: return jsonify({"error": "Missing 'url' parameter"}), 400 url = data['url'] video_id = extract_video_id(url) if not video_id: return jsonify({"error": "Invalid YouTube URL"}), 400 extractor = ViralClipExtractor() info = extractor.extract_video_info(url) return jsonify({ "success": True, "video_id": video_id, "data": info }) except Exception as e: return jsonify({ "success": False, "error": str(e) }), 500 @app.route('/analyze', methods=['POST']) def analyze_video(): """Full viral clip analysis""" try: data = request.get_json() if not data or 'url' not in data: return jsonify({"error": "Missing 'url' parameter"}), 400 url = data['url'] video_id = extract_video_id(url) if not video_id: return jsonify({"error": "Invalid YouTube URL"}), 400 num_clips = data.get('num_clips', 5) clip_length = data.get('clip_length', DEFAULT_CLIP_LENGTH) window_step = data.get('window_step', DEFAULT_WINDOW_STEP) quality = data.get('quality', '720') mode = data.get('mode', 'heuristic') gemini_key = data.get('gemini_key') or os.environ.get('GEMINI_API_KEY') extractor = ViralClipExtractor() # 1. Get video info video_info = extractor.extract_video_info(url) # 2. Fetch transcript ONCE transcript_segments = extractor.fetch_full_transcript(url) if mode == 'ai': if not gemini_key: return jsonify({"error": "Gemini API Key required for AI mode"}), 400 logger.info("Using Gemini AI for analysis") scored_clips = extractor.analyze_with_gemini(transcript_segments, gemini_key) # Normalize AI response for clip in scored_clips: clip['title'] = clip.get('reason', 'AI Selected Clip') clip.setdefault('start', 0) clip.setdefault('end', clip['start'] + clip_length) clip.setdefault('viral_score', 80) else: # 3. Generate candidates candidates = extractor.generate_candidates(video_info, clip_length, window_step) # 4. Score candidates scored_clips = [] logger.info(f"Scoring {len(candidates)} candidates...") for i, cand in enumerate(candidates): score_data = extractor.score_clip(url, cand['start'], cand['end'], transcript_segments) score_data['title'] = cand.get('title', f'Clip {i+1}') scored_clips.append(score_data) # Sort by viral score scored_clips.sort(key=lambda x: x['viral_score'], reverse=True) # Select top non-overlapping clips final_clips = [] used_ranges = [] for clip in scored_clips: overlaps = False for used in used_ranges: if not (clip['end'] <= used['start'] or clip['start'] >= used['end']): overlaps = True break if not overlaps: final_clips.append(clip) used_ranges.append(clip) if len(final_clips) >= num_clips: break # Add formatted times and YouTube URLs and Download Links import urllib.parse encoded_url = urllib.parse.quote(url) for clip in final_clips: clip['start'] = float(clip['start']) clip['end'] = float(clip['end']) clip['start_formatted'] = seconds_to_time(clip['start']) clip['end_formatted'] = seconds_to_time(clip['end']) clip['youtube_url'] = f"https://youtube.com/watch?v={video_id}&t={int(clip['start'])}" clip['download_link'] = f"/download_file?url={encoded_url}&start={clip['start']}&end={clip['end']}&quality={quality}" return jsonify({ "success": True, "video_id": video_id, "video_title": video_info.get('title'), "video_duration": video_info.get('duration'), "clips": final_clips }) except Exception as e: logger.error(f"Error in /analyze: {e}") return jsonify({ "success": False, "error": str(e) }), 500 @app.route('/download_file', methods=['GET']) def download_file_endpoint(): """Download the actual clip file""" try: url = request.args.get('url') if not url: return jsonify({"error": "Missing 'url' parameter"}), 400 start = float(request.args.get('start', 0)) end = float(request.args.get('end', 0)) quality = request.args.get('quality', '720') if end <= start: return jsonify({"error": "Invalid time range"}), 400 extractor = ViralClipExtractor() file_path = extractor.download_clip(url, start, end, quality) if file_path and os.path.exists(file_path): @after_this_request def remove_file(response): try: pass # windows file locking might prevent removal if not closed, but send_file should handle it # os.remove(file_path) # Defer removal or use a temp dir cleaner properly except Exception as error: app.logger.error("Error removing file", error) return response return send_file(file_path, as_attachment=True, download_name=os.path.basename(file_path)) else: return jsonify({"error": "Failed to download clip"}), 500 except Exception as e: logger.error(f"Error in /download: {e}") return jsonify({ "success": False, "error": str(e) }), 500 @app.route('/docs', methods=['GET']) def docs(): """API Documentation & Credits""" html = """ Viral Clip Extractor API | Documentation

Viral Clip Extractor API v1.3

Transform long-form YouTube videos into engaging viral clips using sophisticated AI analysis.

🔐 Security Best Practice: Do not pass API keys in URLs for public applications. Configure GEMINI_API_KEY and NVIDIA_API_KEY as Environment Variables on your server.

🧠 Analysis Modes & Usage

1. Heuristic Mode (Non-AI)

Best for: High-energy content, gaming, reactions. Fast and free.

No API Key required.
GET /clips?url={youtube_url}&mode=heuristic&num=5

2. Gemini AI Mode

Best for: Podcasts, storytelling, general dialogue. Balanced performance.

Requires GEMINI_API_KEY environment variable (or param).
GET /clips?url={youtube_url}&mode=ai&num=5

3. Nvidia/DeepSeek AI Mode

Best for: Technical content, debates, complex reasoning.

Requires NVIDIA_API_KEY environment variable (or param).
GET /clips?url={youtube_url}&mode=nvidia&num=5

📚 Endpoints

GET /clips

Main endpoint to analyze and get clip suggestions.

Parameters

NameTypeRequiredDescription
urlstringYesYouTube Video URL
modestringNoheuristic (default), ai, nvidia
numintNoNumber of clips (default: 5)
qualitystringNoVideo height (e.g., 720, 1080)

Example Response

{
  "success": true,
  "video_title": "Podcast Episode 1",
  "mode": "ai",
  "clips": [
    {
      "title": "Shocking Reveal",
      "start": 120.5,
      "end": 150.0,
      "viral_score": 95,
      "download_link": "/download_file?url=...&start=120.5&end=150"
    }
  ]
}

GET /download_file

Download a specific clip directly.

GET /download_file?url={url}&start={start}&end={end}
""" return html # Local development and Production if __name__ == '__main__': port = int(os.environ.get('PORT', 5000)) app.run(debug=True, host='0.0.0.0', port=port)