#!/usr/bin/env python3 """ AI SquadX VIP – Viral Shorts Backend Analyzes YouTube videos, finds hook segments, cuts 9:16 clips. Requirements: pip install flask flask-cors yt-dlp System deps: ffmpeg (must be in PATH) """ import json import math import socket import struct import subprocess import tempfile import time import uuid import wave from concurrent.futures import ThreadPoolExecutor, as_completed from html import escape from pathlib import Path from flask import Flask, jsonify, request, send_from_directory, Response, stream_with_context from werkzeug.middleware.proxy_fix import ProxyFix from flask_cors import CORS import whisper import cv2 import mediapipe as mp # ── Config ──────────────────────────────────────────────────────────────────── import os PORT = int(os.getenv("PORT", 5000)) COOKIES_FILE = os.getenv("COOKIES_FILE", "") # path to cookies.txt YTDLP_PROXY = os.getenv("YTDLP_PROXY", "") # e.g., http://user:pass@host:port MAX_CLIPS = 10 # max clips to generate per video CLIP_DURATION = 45 # seconds per short MIN_GAP_SECONDS = 60 # minimum spacing between clip start times SAMPLE_RATE = 8000 # Hz for audio energy extraction (low = fast) ENERGY_WINDOW = 5 # smoothing window in seconds BASE_DIR = Path(__file__).parent.resolve() CLIPS_DIR = BASE_DIR / "clips" DOWNLOADS_DIR = BASE_DIR / "downloads" CLIPS_DIR.mkdir(exist_ok=True) DOWNLOADS_DIR.mkdir(exist_ok=True) _INDEX_HTML_RAW = (BASE_DIR / "index.html").read_text(encoding="utf-8") # ── Phase 2 Resources ───────────────────────────────────────────────────────── VIBES_DIR = BASE_DIR / "vibes" VIBES_DIR.mkdir(exist_ok=True) # Lazy-load mediapipe face detection _face_detector = None def get_face_detector(): global _face_detector if _face_detector is None: print("[>>] Initializing AI face detector...") _face_detector = mp.solutions.face_detection.FaceDetection( model_selection=1, min_detection_confidence=0.5 ) return _face_detector def analyze_face_center(video_path: Path, start: float, duration: float): """Analyze a segment and return the average X-coordinate (0.0 to 1.0) of faces.""" try: cap = cv2.VideoCapture(str(video_path)) cap.set(cv2.CAP_PROP_POS_MSEC, start * 1000) detector = get_face_detector() centers = [] # Sample 1 frame per second for speed for i in range(int(duration)): cap.set(cv2.CAP_PROP_POS_MSEC, (start + i) * 1000) ret, frame = cap.read() if not ret: break rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = detector.process(rgb_frame) if results.detections: # Use the first detected face's center bbox = results.detections[0].location_data.relative_bounding_box center_x = bbox.xmin + (bbox.width / 2) centers.append(center_x) cap.release() return sum(centers) / len(centers) if centers else 0.5 except Exception as e: print(f"[!] Face analysis failed: {e}") return 0.5 # Lazy-load whisper model _whisper_model = None def get_whisper_model(): global _whisper_model if _whisper_model is None: print("[>>] Loading AI whisper model (tiny – fast mode)...") _whisper_model = whisper.load_model("tiny") return _whisper_model # ── ASS Subtitle Style Definitions (1080×1920) ──────────────────────────────── ASS_STYLES = { # MrBeast: viral yellow italic, thick black outline — like "YOUR Safe Zone" style # MarginV=650 = 650px from bottom → text at ~66% from top in a 1920 frame "mrbeast": ( "Style: Default,Arial Black,82,&H0000FFFF,&H000000FF,&H00000000,&H88000000," "1,1,0,0,100,110,0,0,1,6,3,2,40,40,650,1" ), # Podcast: white text on a clean dark semi-transparent box "podcast": ( "Style: Default,Arial,62,&H00FFFFFF,&H000000FF,&H00000000,&HAA000000," "1,0,0,0,100,100,0,0,4,0,0,2,40,40,650,1" ), # Neon: electric cyan with magenta outline, glowing shadow "neon": ( "Style: Default,Arial Black,70,&H0000FFFF,&H000000FF,&H00FF00FF,&H88000000," "1,0,0,0,100,100,1,0,1,4,6,2,40,40,650,1" ), # Horror: blood red with heavy black shadow "horror": ( "Style: Default,Arial Black,72,&H002020EE,&H000000FF,&H00000000,&H88000000," "1,0,0,0,100,100,1,0,1,5,8,2,40,40,650,1" ), # Minimal: clean white, thin subtle outline, modern feel "minimal": ( "Style: Default,Arial,58,&H00FFFFFF,&H000000FF,&H66000000,&H44000000," "0,0,0,0,100,100,0,0,1,2,1,2,40,40,650,1" ), } ASS_HEADER = """[Script Info] ScriptType: v4.00+ PlayResX: 1080 PlayResY: 1920 ScaledBorderAndShadow: yes [V4+ Styles] Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding {style_line} [Events] Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text """ def ass_time(seconds: float) -> str: """Format seconds as ASS timestamp h:mm:ss.cs""" h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = int(seconds % 60) cs = int((seconds % 1) * 100) return f"{h}:{m:02}:{s:02}.{cs:02}" def chunk_segments(segments, max_words: int = 4): """Break whisper segments into short word-level chunks for viral-style captions.""" chunks = [] for seg in segments: words = seg['text'].strip().split() if not words: continue start = seg['start'] end = seg['end'] duration = max(end - start, 0.1) word_dur = duration / len(words) for i in range(0, len(words), max_words): group = words[i:i + max_words] chunk_start = start + i * word_dur chunk_end = chunk_start + len(group) * word_dur chunks.append({ 'start': chunk_start, 'end': min(chunk_end, end), 'text': ' '.join(group) }) return chunks # ASS colour constants (format &HBBGGRR& — Blue Blue Green Green Red Red) _ASS_RED = "&H0000FF&" # Red #FF0000 _ASS_YELLOW = "&H04F3FF&" # Saturated Yellow #FFF304 _ASS_BLACK = "&H000000&" def _key_word_idx(words: list) -> int: """Pick the 'impact' word in a chunk — longest wins; ties go to last.""" if not words: return 0 return max(range(len(words)), key=lambda i: len(words[i])) def generate_ass(segments, style_name: str = "mrbeast") -> str: r"""Generate fancy ASS subtitles with per-word colour+size emphasis. Layout matches screenshots: - Key word: Red (#ff0000), fs=130, Red glow (\3c), bord=6, blur=4 - Others: Yellow (#fff304), fs=68, Black outline, bord=2 - Stacking: Injects \N before and after key word for verticality. """ style_line = ASS_STYLES.get(style_name, ASS_STYLES["mrbeast"]) header = ASS_HEADER.format(style_line=style_line) chunks = chunk_segments(segments, max_words=4) lines = [] for chunk in chunks: t0 = ass_time(chunk['start']) t1 = ass_time(chunk['end']) words = chunk['text'].replace('\n', ' ').split() if not words: continue key_idx = _key_word_idx(words) parts = [] for i, word in enumerate(words): if i == key_idx: # Impact word: saturated red, massive, glowing red border parts.append( f"{{\\1c{_ASS_RED}\\3c{_ASS_RED}\\fs130\\bord6\\shad3\\blur4}}{word}" ) else: # Supporting words: saturated yellow, medium, black border parts.append( f"{{\\1c{_ASS_YELLOW}\\3c{_ASS_BLACK}\\fs68\\bord2\\shad1\\blur0}}{word}" ) # Smart stacking: match Hrithik/The Boys screenshot layout # If we have 3-4 words, we want the key word on its own line in the center. if len(parts) >= 3: # Construct: [words before] \N [key word] \N [words after] final_text = "" if key_idx > 0: final_text += " ".join(parts[:key_idx]) + r"\N" final_text += parts[key_idx] if key_idx < len(parts) - 1: final_text += r"\N" + " ".join(parts[key_idx + 1:]) else: final_text = " ".join(parts) lines.append(f"Dialogue: 0,{t0},{t1},Default,,0,0,0,,{final_text}") return header + "\n".join(lines) app = Flask(__name__, static_folder=str(BASE_DIR), static_url_path="") # Hugging Face and other reverse proxies send X-Forwarded-* / X-Forwarded-Prefix app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) CORS(app, origins=["http://localhost:5000", "http://127.0.0.1:5000", "https://*.hf.space"]) # ── Dependency check ────────────────────────────────────────────────────────── def check_deps(): missing = [] for tool, flag in [("ffmpeg", "-version"), ("ffprobe", "-version"), ("yt-dlp", "--version")]: r = subprocess.run([tool, flag], capture_output=True) if r.returncode not in (0, 1): missing.append(tool) return missing # Some yt-dlp versions don't include newer options like --js-runtimes. # We detect it at runtime to avoid hard-failing downloads. _YT_DLP_SUPPORTS_JS_RUNTIMES = None def yt_dlp_supports_js_runtimes() -> bool: global _YT_DLP_SUPPORTS_JS_RUNTIMES if _YT_DLP_SUPPORTS_JS_RUNTIMES is not None: return _YT_DLP_SUPPORTS_JS_RUNTIMES # Also log Node.js version to verify installation try: node_v = subprocess.run(["node", "-v"], capture_output=True, text=True).stdout.strip() print(f"[OK] Node.js version: {node_v}", flush=True) except Exception: print("[!] Node.js NOT FOUND in PATH", flush=True) try: help_out = subprocess.run( ["yt-dlp", "--help"], capture_output=True, text=True, ).stdout _YT_DLP_SUPPORTS_JS_RUNTIMES = "--js-runtimes" in help_out except Exception: _YT_DLP_SUPPORTS_JS_RUNTIMES = False return _YT_DLP_SUPPORTS_JS_RUNTIMES # ── Step 1: Download (with retries, cookies, user-agent) ───────────────────── # Realistic browser User-Agent to avoid bot detection _USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/131.0.0.0 Safari/537.36" ) def _find_cookies_file() -> str | None: """Locate a cookies.txt file in several common locations with robust search.""" # 1. Explicit env var if COOKIES_FILE and Path(COOKIES_FILE).is_file(): print(f"[>>] Cookies found via env var: {COOKIES_FILE}", flush=True) return COOKIES_FILE # 2. Search local and home directory search_paths = [BASE_DIR, Path.cwd(), Path.home()] for p in search_paths: # Look for variations: cookies.txt, youtube_cookies.txt, etc. for candidate in p.glob("*cookies*.txt"): if candidate.is_file() and candidate.stat().st_size > 10: print(f"[>>] SUCCESS: Cookies found at: {candidate.resolve()}", flush=True) return str(candidate.resolve()) # 3. Log what we actually see in the directory to help debug print(f"[!] Cookies NOT FOUND. Current files in {BASE_DIR}:", flush=True) try: files = [f.name for f in BASE_DIR.glob("*") if f.is_file()] print(f" {files}", flush=True) except Exception: pass return None def download_video(youtube_url: str, max_retries: int = 3) -> Path: uid = uuid.uuid4().hex[:10] template = str(DOWNLOADS_DIR / f"{uid}.%(ext)s") cmd = [ "yt-dlp", "--force-ipv4", "--ignore-config", "--no-cache-dir", "--user-agent", _USER_AGENT, "--extractor-args", "youtube:player_client=web,tv,ios;player_skip=web_embedded_check", "--remote-components", "ejs:github", "--no-check-certificates", "--geo-bypass", "--add-header", "Accept-Language:en-US,en;q=0.9", "--add-header", "Accept:text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "-f", "bestvideo[height<=1080]+bestaudio/best[height<=1080]/best", "--merge-output-format", "mp4", "--no-playlist", "--no-part", "--socket-timeout", "30", "--retries", "3", "--file-access-retries", "3", "-o", template, youtube_url, ] # Add cookies if available cookies_path = _find_cookies_file() if cookies_path: cmd[1:1] = ["--cookies", cookies_path] print(f"[>>] Using cookies from: {cookies_path}", flush=True) # Force Node.js as the JS runtime to solve n-parameter challenges if yt_dlp_supports_js_runtimes(): cmd[1:1] = ["--js-runtimes", "node"] print("[>>] Forcing 'node' as yt-dlp JS runtime solver", flush=True) # Add Proxy if defined if YTDLP_PROXY: cmd[1:1] = ["--proxy", YTDLP_PROXY] print(f"[>>] Using proxy: {YTDLP_PROXY}", flush=True) last_err = "" for attempt in range(1, max_retries + 1): print(f"[>>] Download attempt {attempt}/{max_retries} ...", flush=True) # Quick DNS pre-check so we get a clear error instead of yt-dlp's wall of text. # SKIP this check if a proxy is defined, as the proxy handles its own resolution. if not YTDLP_PROXY: try: socket.setdefaulttimeout(10) socket.getaddrinfo("www.youtube.com", 443, socket.AF_INET) except socket.gaierror as dns_err: last_err = f"DNS resolution failed: {dns_err}" print(f"[!] {last_err}", flush=True) if attempt < max_retries: time.sleep(2 ** attempt) # exponential backoff: 2, 4, 8 s continue raise RuntimeError(last_err) else: print(f"[>>] Skipping DNS pre-check because YTDLP_PROXY is set.", flush=True) # Run yt-dlp result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: matches = list(DOWNLOADS_DIR.glob(f"{uid}.*")) if matches: print(f"[OK] Downloaded: {matches[0].name}", flush=True) return matches[0] last_err = "yt-dlp finished but produced no output file." else: stderr = result.stderr # Log the FULL stderr once to help with EJS/Challenge diagnostics print(f"[!] yt-dlp failed (attempt {attempt}):\n{stderr}", flush=True) last_err = stderr[-1000:] if attempt < max_retries: time.sleep(2 ** attempt) raise RuntimeError(f"Download failed after {max_retries} attempts:\n{last_err}") def get_video_info(video_path: Path): """Return (duration, width, height) via ffprobe JSON.""" r = subprocess.run( ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", str(video_path)], capture_output=True, ) data = json.loads(r.stdout) duration = float(data["format"]["duration"]) width = height = 0 for s in data.get("streams", []): if s.get("codec_type") == "video": width = int(s["width"]) height = int(s["height"]) break return duration, width, height def get_youtube_heatmap(video_path: Path, url: str): """ Fetch YouTube's 'Most Replayed' heatmap data using yt-dlp. Returns: [{start_time: float, end_time: float, score: float}] or [] """ print("[>>] Fetching YouTube heatmap...", flush=True) # We use the same hardened bypass settings as download_video info_json_path = video_path.with_suffix(".info.json") cmd = [ "yt-dlp", "--force-ipv4", "--ignore-config", "--no-cache-dir", "--user-agent", _USER_AGENT, "--extractor-args", "youtube:player_client=web,tv,ios;player_skip=web_embedded_check", "--remote-components", "ejs:github", "--no-check-certificates", "--geo-bypass", "--add-header", "Accept-Language:en-US,en;q=0.9", "--add-header", "Accept:text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "--write-info-json", "--skip-download", "-o", str(video_path.with_suffix("")), # This will result in filename.info.json url, ] # Add cookies if available cookies_path = _find_cookies_file() if cookies_path: cmd[1:1] = ["--cookies", cookies_path] # Add Proxy if defined if YTDLP_PROXY: cmd[1:1] = ["--proxy", YTDLP_PROXY] try: # Run yt-dlp to get JSON subprocess.run(cmd, capture_output=True, text=True, timeout=30) if not info_json_path.exists(): print("[!] Heatmap info JSON not found.", flush=True) return [] with open(info_json_path, "r", encoding="utf-8") as f: data = json.load(f) heatmap = data.get("heatmap") if not heatmap: print("[!] No heatmap data found in YouTube metadata.", flush=True) return [] # Normalize scores to 0.0 - 1.0 max_val = max((item.get("value", 0) for item in heatmap), default=1.0) if max_val == 0: max_val = 1.0 normalized = [] for item in heatmap: normalized.append({ "start_time": float(item["start_time"]), "end_time": float(item["end_time"]), "score": round(float(item["value"]) / max_val, 4) }) print(f"[OK] Extracted {len(normalized)} heatmap segments.", flush=True) # Cleanup try: info_json_path.unlink() except: pass return normalized except Exception as e: print(f"[!] Heatmap extraction error: {e}", flush=True) return [] # ── Step 3: Audio energy analysis ──────────────────────────────────────────── def extract_audio_energy(video_path: Path, duration: float): """ Pipe raw mono 8kHz PCM from FFmpeg and compute per-second RMS energy. Returns list of (time_sec, rms) tuples. """ proc = subprocess.Popen( [ "ffmpeg", "-i", str(video_path), "-vn", "-ar", str(SAMPLE_RATE), "-ac", "1", "-f", "s16le", "pipe:1", "-loglevel", "quiet", ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, ) raw, _ = proc.communicate() if not raw: # Fallback: uniform energy (clips will be evenly spaced) return [(t, 1.0) for t in range(int(duration))] n = len(raw) // 2 samples = struct.unpack(f"<{n}h", raw) win = SAMPLE_RATE * ENERGY_WINDOW # samples per window step = SAMPLE_RATE # 1-second increments result = [] for i in range(0, n - win, step): chunk = samples[i : i + win : 8] # subsample every 8th → speed if not chunk: continue rms = math.sqrt(sum(int(s) * int(s) for s in chunk) / len(chunk)) result.append((i / SAMPLE_RATE, rms)) return result def analyze_audio_energy(video_path: Path): """ Extract 16kHz mono WAV via FFmpeg and calculate per-second RMS energy. Returns: [{start_time: float, end_time: float, energy: float}] """ print(f"[>>] Analyzing audio energy for: {video_path.name}", flush=True) temp_wav = Path(tempfile.gettempdir()) / f"{uuid.uuid4().hex}.wav" try: # Extract 16kHz mono WAV cmd = [ "ffmpeg", "-i", str(video_path), "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", str(temp_wav), "-y", "-loglevel", "quiet" ] subprocess.run(cmd, check=True) if not temp_wav.exists(): print("[!] Audio extraction failed.", flush=True) return [] with wave.open(str(temp_wav), "rb") as w: n_frames = w.getnframes() sample_rate = w.getframerate() frames = w.readframes(n_frames) # samples are 16-bit signed ints (s16le) samples = struct.unpack(f"<{n_frames}h", frames) # 1 second chunks chunk_size = sample_rate energies = [] for i in range(0, n_frames, chunk_size): chunk = samples[i : i + chunk_size] if not chunk: continue # RMS = sqrt(mean(samples^2)) # Using float conversion to ensure precision during large sums ms = sum(float(s) * float(s) for s in chunk) / len(chunk) rms = math.sqrt(ms) energies.append({ "start_time": round(i / sample_rate, 2), "end_time": round((i + chunk_size) / sample_rate, 2), "energy": rms }) # Normalize to 0.0 - 1.0 max_e = max((e["energy"] for e in energies), default=1.0) if max_e == 0: max_e = 1.0 for e in energies: e["energy"] = round(e["energy"] / max_e, 4) print(f"[OK] Audio energy analyzed ({len(energies)} seconds).", flush=True) return energies except Exception as e: print(f"[!] Audio analysis error: {e}", flush=True) return [] finally: if temp_wav.exists(): try: temp_wav.unlink() except: pass def calculate_viral_score(segment_start, segment_end, heatmap_data, energy_data, transcript_score): """ Calculate a single viral score by blending heatmap, audio energy, and transcript potential. Returns: {final_score: float, heatmap_score: float, energy_score: float, transcript_score: float} """ # 1. Heatmap Scoring (Average of overlapping entries) # heatmap_data is [{start_time, end_time, score}] h_overlaps = [ d["score"] for d in heatmap_data if d.get("start_time", 0) < segment_end and d.get("end_time", 0) > segment_start ] h_avg = sum(h_overlaps) / len(h_overlaps) if h_overlaps else 0.0 # 2. Audio Energy Scoring (Max in range) # energy_data can be [{start_time, end_time, energy}] or [(time, val)] e_peaks = [] for d in energy_data: if isinstance(d, dict): if d.get("start_time", 0) < segment_end and d.get("end_time", 0) > segment_start: e_peaks.append(d.get("energy", 0.0)) elif isinstance(d, (list, tuple)): if segment_start <= d[0] <= segment_end: e_peaks.append(d[1]) e_max = max(e_peaks) if e_peaks else 0.0 # 3. Final Weighted Formula # Weights: 40% Heatmap, 35% Audio Energy, 25% Transcript final = (0.40 * h_avg) + (0.35 * e_max) + (0.25 * transcript_score) return { "final_score": round(final, 2), "heatmap_score": round(h_avg, 2), "energy_score": round(e_max, 2), "transcript_score": round(transcript_score, 2) } def analyze_video_parallel(video_path, url, transcript_segments): """ Simultaneously fetch heatmap, audio energy, and score all transcript segments. """ results = {"heatmap": [], "energy": [], "transcript_scores": []} def run_wrapper(name, func, *args): start_t = time.time() try: res = func(*args) elapsed = time.time() - start_t print(f"[OK] {name} completed in {elapsed:.2f}s", flush=True) return res except Exception as e: print(f"[!] {name} failed: {e}", flush=True) return [] # Wrapper for batch transcript scoring def score_batch(segs): return [score_transcript_segment(s.get("text", "")) for s in segs] print(f"[>>] Starting parallel analysis for {video_path.name}...", flush=True) with ThreadPoolExecutor(max_workers=3) as executor: futures = { executor.submit(run_wrapper, "Heatmap", get_youtube_heatmap, video_path, url): "heatmap", executor.submit(run_wrapper, "Audio Energy", analyze_audio_energy, video_path): "energy", executor.submit(run_wrapper, "Transcript Scoring", score_batch, transcript_segments): "transcript" } for future in as_completed(futures): key = futures[future] if key == "heatmap": results["heatmap"] = future.result() elif key == "energy": results["energy"] = future.result() elif key == "transcript": results["transcript_scores"] = future.result() return results def select_top_clips(transcript_segments, heatmap_data, energy_data, transcript_scores, num_clips=10, min_dur=0, max_dur=0): """ Ranks transcript segments by viral score and deduplicates repetitive temporal overlaps. If min_dur/max_dur are provided, expands segments to fit the requested duration. """ scored_segs = [] video_duration = transcript_segments[-1].get("end", 0) if transcript_segments else 0 # 1. Score each individual transcript segment for i, seg in enumerate(transcript_segments): start, end = seg.get("start", 0), seg.get("end", 0) t_score = transcript_scores[i] if i < len(transcript_scores) else 0.0 viral_data = calculate_viral_score(start, end, heatmap_data, energy_data, t_score) # Clone to avoid modifying original list in-place s_copy = seg.copy() s_copy["viral_score"] = viral_data scored_segs.append(s_copy) # 2. Sort by final_score descending scored_segs.sort(key=lambda x: x["viral_score"]["final_score"], reverse=True) # 3. Deduplication + Expansion selected = [] chosen_starts = [] # Deduplication window is larger for longer clips to avoid overlap dedup_window = max(30, min_dur) for seg in scored_segs: curr_start = seg.get("start", 0) curr_end = seg.get("end", 0) # Check for duplication is_duplicate = any(abs(curr_start - prev_s) < dedup_window for prev_s in chosen_starts) if is_duplicate: continue # Expansion Logic: Grow the segment to reach min_dur if min_dur > 0: actual_dur = curr_end - curr_start if actual_dur < min_dur: needed = min_dur - actual_dur # Expand symmetrically new_start = max(0, curr_start - needed / 2) new_end = min(video_duration, curr_end + needed / 2) # Correct if boundary hit if new_start == 0: new_end = min(video_duration, min_dur) elif new_end == video_duration: new_start = max(0, video_duration - min_dur) curr_start, curr_end = round(new_start, 2), round(new_end, 2) # Cap at max_dur if needed (rare since we expand to min_dur) if max_dur > 0 and (curr_end - curr_start) > max_dur: curr_end = curr_start + max_dur selected.append({ "start": curr_start, "end": curr_end, "text": seg.get("text", ""), "viral_score": seg["viral_score"] }) chosen_starts.append(curr_start) if len(selected) >= num_clips: break # 4. Final Formatting top_clips = [] for s in selected: v = s["viral_score"] top_clips.append({ "start_time": s["start"], "end_time": s["end"], "text": s["text"], "final_score": v["final_score"], "heatmap_score": v["heatmap_score"], "energy_score": v["energy_score"], "transcript_score": v["transcript_score"] }) # 5. Print Summary Table print("\n" + "="*85) print(f"{'RANK':<5} | {'START':<8} | {'END':<8} | {'SCORE':<8} | {'TOP SIGNAL SOURCE'}") print("-" * 85) for idx, c in enumerate(top_clips): # Identify which signal contributed most sig_map = { "Heatmap": c["heatmap_score"], "Audio Peak": c["energy_score"], "Psych Hook": c["transcript_score"] } top_sig = max(sig_map, key=sig_map.get) print(f"{idx+1:<5} | {c['start_time']:<8.1f} | {c['end_time']:<8.1f} | {c['final_score']:<8.2f} | {top_sig}") print("="*85 + "\n", flush=True) return top_clips # ── Step 4: Find hook segments ──────────────────────────────────────────────── def find_segments(energies, duration: float, n_clips: int = MAX_CLIPS): """ Greedy peak selection: 1. Smooth the RMS curve. 2. Repeatedly pick the highest-energy moment, then black out a MIN_GAP_SECONDS radius around it. Returns list of (start, end) in seconds. """ if not energies: step = max(60.0, (duration - CLIP_DURATION) / max(n_clips, 1)) return [(round(i * step + 10, 2), round(i * step + 10 + CLIP_DURATION, 2)) for i in range(n_clips) if i * step + 10 + CLIP_DURATION <= duration] # Support both list of tuples [(t, rms)] and list of dicts [{"energy": rms}] if isinstance(energies[0], dict): times = [e.get("start_time", 0) for e in energies] vals = [e.get("energy", 0) for e in energies] else: times = [e[0] for e in energies] vals = [e[1] for e in energies] # Smooth w = min(10, max(3, len(vals) // 20)) smoothed = [] for i in range(len(vals)): lo, hi = max(0, i - w), min(len(vals), i + w + 1) smoothed.append(sum(vals[lo:hi]) / (hi - lo)) # Greedy selection used = [False] * len(smoothed) peaks = [] gap_idx = MIN_GAP_SECONDS # since step ≈ 1 s, index ≈ seconds while len(peaks) < n_clips * 2: best_i = max( (i for i in range(len(smoothed)) if not used[i]), key=lambda i: smoothed[i], default=-1, ) if best_i < 0: break peaks.append(times[best_i]) lo = max(0, best_i - gap_idx) hi = min(len(used), best_i + gap_idx + 1) for j in range(lo, hi): used[j] = True peaks.sort() peaks = peaks[:n_clips] # If we have fewer peaks than requested, fill with evenly spaced ones if len(peaks) < n_clips: step = max(60.0, (duration - CLIP_DURATION) / max(n_clips, 1)) t = 10.0 while len(peaks) < n_clips and t + CLIP_DURATION <= duration: if all(abs(t - p) >= MIN_GAP_SECONDS for p in peaks): peaks.append(t) t += step peaks.sort() # Convert to (start, end) segments = [] for pt in peaks: start = max(0.0, pt - CLIP_DURATION * 0.25) end = start + CLIP_DURATION if end > duration: end = duration start = max(0.0, end - CLIP_DURATION) segments.append((round(start, 2), round(end, 2))) return segments # ── Step 5: Build 9:16 crop filter ─────────────────────────────────────────── # Valid color psychology modes _COLOR_MODES = frozenset({"off", "boost", "yellow", "red"}) def build_vf(width: int, height: int, center_x: float = 0.5, safe_zone: bool = False, color_mode: str = "off") -> str: """Return an FFmpeg -vf string that crops to 9:16 at 1080×1920. Args: width / height: source video dimensions. center_x: horizontal crop anchor (0.0–1.0; 0.5 = centre). safe_zone: When True, additionally crops top+bottom 12.5% so the viewer's eye is drawn to the central safe zone (Retention Psychology). color_mode: One of 'off' | 'boost' | 'yellow' | 'red'. - boost → +40% saturation (eq filter) - yellow → warm yellow-push via curves filter - red → red-dominant push via curves filter """ if color_mode not in _COLOR_MODES: color_mode = "off" ratio = 9 / 16 if width / height > ratio: # Landscape – crop left/right cw = int(height * ratio) ch = height cx = int((width * center_x) - (cw / 2)) cx = max(0, min(cx, width - cw)) # clamp cy = 0 else: # Portrait / square – crop top/bottom cw = width ch = int(width / ratio) cx = 0 cy = (height - ch) // 2 # Force even numbers cw -= cw % 2 ch -= ch % 2 vf = f"crop={cw}:{ch}:{cx}:{cy},scale=1080:1920:flags=lanczos" # Safe-zone: crop out top+bottom 12.5% to centre viewer attention if safe_zone: vf += ",crop=iw:ih*0.75:0:ih*0.125,scale=1080:1920:flags=lanczos" # Color Psychology: apply requested colour grade if color_mode == "boost": # +40% saturation – generic virality pop vf += ",eq=saturation=1.4" elif color_mode == "yellow": # Warm yellow push: lift reds+greens slightly, keep blues flat vf += ",curves=red='0/0 0.5/0.56 1/1':green='0/0 0.5/0.53 1/1':blue='0/0 0.5/0.46 1/1'" elif color_mode == "red": # Red-dominant push: lift reds, slightly desaturate greens+blues vf += ",curves=red='0/0 0.5/0.62 1/1':green='0/0 0.5/0.46 1/1':blue='0/0 0.5/0.44 1/1'" return vf # ── Retention Psychology: Analysis Helpers ──────────────────────────────────── # Keywords that score biological / emotional triggers (from screenshot guide) _TRIGGER_KEYWORDS = { "animal": ["dog", "cat", "wolf", "bear", "shark", "snake", "bird", "lion", "tiger"], "disaster": ["explosion", "crash", "fire", "flood", "earthquake", "storm", "accident"], "food": ["eating", "cooking", "recipe", "taste", "delicious", "meal", "food"], "baby": ["baby", "newborn", "puppy", "kitten", "child", "infant"], "shock": ["unbelievable", "insane", "crazy", "shocking", "impossible", "wtf", "omg"], "rage": ["wrong", "mistake", "error", "broken", "fail", "typo"], "narrative": ["story", "secret", "truth", "revealed", "you won't believe", "finally"], } def score_transcript_segment(text: str) -> float: """ Score a transcript segment for viral potential based on hooks, structure, and emotions. Returns a float 0.0–1.0. """ if not text or not text.strip(): return 0.0 score = 0.0 text_lower = text.lower().strip() # 1. Hook Phrases (+0.3) hooks = ["wait", "secret", "no one tells you", "here's why", "never", "always", "mistake", "truth", "actually"] if any(h in text_lower for h in hooks): score += 0.3 # 2. Question Mark (+0.2) if "?" in text: score += 0.2 # 3. Information Density (+0.2) word_count = len(text.split()) if word_count > 20: score += 0.2 # 4. Emotional Words (+0.15) emotions = ["crazy", "insane", "shocked", "love", "hate", "afraid", "angry", "excited", "wow", "unbelievable"] if any(e in text_lower for e in emotions): score += 0.15 # 5. Strong Openers (+0.15) openers = ["so", "but", "wait", "now", "here", "this is"] if any(text_lower.startswith(o) for o in openers): score += 0.15 return min(1.0, round(score, 2)) def score_pacing(energies: list, start: float, end: float) -> int: """Count how many 4-second windows within [start, end] contain an energy peak. Returns a 0–100 pacing score (higher = more frequent attention resets). """ if not energies: return 50 # neutral default window_energies = [] for d in energies: # Handle both dict format (from analyze_audio_energy) and legacy tuple format if isinstance(d, dict): t = d.get("start_time", 0) val = d.get("energy", 0) else: t, val = d if start <= t <= end: window_energies.append(val) if not window_energies: return 50 duration = end - start num_windows = max(1, int(duration / 4)) window_size = max(1, len(window_energies) // num_windows) peaks = 0 mean_energy = sum(window_energies) / len(window_energies) for i in range(0, len(window_energies), window_size): chunk = window_energies[i:i + window_size] if chunk and max(chunk) > mean_energy * 1.1: peaks += 1 # Normalise: perfect pacing = 1 peak per window score = min(100, int((peaks / num_windows) * 100)) return score def segment_structure(start: float, end: float) -> dict: """Split clip into Hook / Body / Reward timestamps (3-Part Structure). Returns: { "hook": {"start": float, "end": float}, # first 10% "body": {"start": float, "end": float}, # middle 75% "reward": {"start": float, "end": float}, # last 15% } """ dur = end - start hook_end = round(start + dur * 0.10, 2) body_end = round(start + dur * 0.85, 2) return { "hook": {"start": round(start, 2), "end": hook_end}, "body": {"start": hook_end, "end": body_end}, "reward": {"start": body_end, "end": round(end, 2)}, } def build_vf_pad(width: int, height: int) -> str: """Scale to fit inside 9:16 (1080×1920) preserving aspect ratio, pad remainder black.""" # scale down to fit, keeping aspect ratio # then pad symmetrically to exactly 1080x1920 return ( "scale=1080:1920:force_original_aspect_ratio=decrease:flags=lanczos," "pad=1080:1920:(ow-iw)/2:(oh-ih)/2:black," "setsar=1" ) # ── Step 6: Cut clip ────────────────────────────────────────────────────────── def ffmpeg_escape_text(text): """Escape text for FFmpeg drawtext filter.""" for ch in ["\\", "'", ":", ";", "[", "]", ",", "="]: text = text.replace(ch, f"\\{ch}") return text # ── Caption Style Definitions ───────────────────────────────────────────────── CAPTION_STYLES = { # MrBeast: Big bold white text, thick black outline, yellow highlight, centered low "mrbeast": ( "FontName=Arial Black,FontSize=22,Bold=1," "PrimaryColour=&H00FFFFFF,OutlineColour=&H00000000," "BackColour=&H00000000,BorderStyle=1,Outline=4,Shadow=2," "Alignment=2,MarginV=80,MarginL=20,MarginR=20," "Spacing=0" ), # Podcast: Clean white text on semi-transparent dark pill, centered bottom "podcast": ( "FontName=Arial,FontSize=18,Bold=1," "PrimaryColour=&H00FFFFFF,OutlineColour=&H00000000," "BackColour=&HAA000000,BorderStyle=4,Outline=0,Shadow=0," "Alignment=2,MarginV=90,MarginL=30,MarginR=30," "Spacing=1" ), # Neon: Bright cyan/yellow with glowing shadow "neon": ( "FontName=Arial Black,FontSize=20,Bold=1," "PrimaryColour=&H0000FFFF,OutlineColour=&H00FF00FF," "BackColour=&H00000000,BorderStyle=1,Outline=3,Shadow=4," "Alignment=2,MarginV=80,MarginL=20,MarginR=20," "Spacing=0" ), # Horror: Dark red dripping style, heavy shadow "horror": ( "FontName=Arial Black,FontSize=20,Bold=1," "PrimaryColour=&H002020EE,OutlineColour=&H00000000," "BackColour=&H00000000,BorderStyle=1,Outline=4,Shadow=6," "Alignment=2,MarginV=80,MarginL=20,MarginR=20," "Spacing=1" ), # Minimal: Small clean white text, no box, light outline "minimal": ( "FontName=Arial,FontSize=16,Bold=0," "PrimaryColour=&H00FFFFFF,OutlineColour=&H80000000," "BackColour=&H00000000,BorderStyle=1,Outline=1,Shadow=1," "Alignment=2,MarginV=100,MarginL=30,MarginR=30," "Spacing=1" ), } def cut_clip(video_path: Path, start: float, end: float, idx: int, width: int, height: int, mode: str = "fill", captions: bool = False, headline: str = "", cta: str = "", reframe: bool = False, progress_bar: bool = False, vibe: str = "none", caption_style: str = "mrbeast", safe_zone: bool = False, color_mode: str = "off", watermark_text: str = "", pre_segments: list = None) -> tuple: """Cut a clip and return (output_path, warnings_list, transcript_text).""" warnings = [] transcript_text = "" name = f"short_{idx + 1}_{uuid.uuid4().hex[:6]}.mp4" out = CLIPS_DIR / name dur = round(end - start, 2) center_x = 0.5 if reframe and mode == "fill": center_x = analyze_face_center(video_path, start, dur) vf_base = ( build_vf_pad(width, height) if mode == "pad" else build_vf(width, height, center_x, safe_zone=safe_zone, color_mode=color_mode) ) # Base filters filters = [vf_base] # Add Headline if headline: clean_headline = ffmpeg_escape_text(headline) filters.append( f"drawtext=text='{clean_headline}':fontcolor=white:fontsize=80:font='Arial':" f"x=(w-text_w)/2:y=150:box=1:boxcolor=black@0.6:boxborderw=20" ) # Add CTA if cta: clean_cta = ffmpeg_escape_text(cta) filters.append( f"drawtext=text='{clean_cta}':fontcolor=white:fontsize=70:font='Arial':" f"x=(w-text_w)/2:y=h-250:box=1:boxcolor=black@0.6:boxborderw=20" ) # Add Watermark (bottom-right corner, semi-transparent) if watermark_text: watermark_text = watermark_text[:30] # Server-side length limit clean_wm = ffmpeg_escape_text(watermark_text) filters.append( f"drawtext=text='{clean_wm}':fontcolor=white@0.55:fontsize=38:font='Arial':" f"x=w-text_w-30:y=h-80" ) # Add Progress Bar if progress_bar: # Progress bar at bottom # x=0, y=ih-10, w=iw * (t/duration), h=10 filters.append( f"drawbox=x=0:y=ih-12:w='min(iw,iw*t/{dur})':h=12:color=0xFCD34D@0.9:t=fill" ) # Add Subtitles (Captions) ass_path = None if captions: try: # Use pre-supplied segments if available (Intelligence Work-flow) if pre_segments is not None: print(f"[>>] Captions: Using pre-supplied segments for clip {idx + 1}", flush=True) # Filter segments that fall within the clip range and make timestamps relative to the clip clip_segments = [] for s in pre_segments: s_start = s.get("start", 0) s_end = s.get("end", 0) if s_start < end and s_end > start: # Slice/Clamp segment if it overlaps but starts before or ends after the clip rel_start = max(0, round(s_start - start, 2)) rel_end = min(dur, round(s_end - start, 2)) if rel_end > rel_start: clip_segments.append({ "start": rel_start, "end": rel_end, "text": s.get("text", "") }) transcript_text = " ".join(s.get("text", "") for s in clip_segments) ass_content = generate_ass(clip_segments, style_name=caption_style) ass_path = CLIPS_DIR / f"{name}.ass" ass_path.write_text(ass_content, encoding="utf-8") filters.append(f"subtitles={ass_path.name}") print(f"[>>] Captions: ASS written from pre-segments → {ass_path}", flush=True) else: # Fallback to per-clip transcription (Sequential Work-flow) print(f"[>>] Captions: starting for clip {idx + 1}", flush=True) model = get_whisper_model() temp_audio = CLIPS_DIR / f"{name}_audio.wav" subprocess.run([ "ffmpeg", "-i", str(video_path), "-ss", str(start), "-t", str(dur), "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", str(temp_audio) ], capture_output=True) if not temp_audio.exists() or temp_audio.stat().st_size < 8000: print("[!] Audio too short or silent – skipping captions for this clip", flush=True) warnings.append("Audio too short – captions skipped for this clip") if temp_audio.exists(): temp_audio.unlink() else: result = model.transcribe(str(temp_audio), task="transcribe", fp16=False) if temp_audio.exists(): temp_audio.unlink() transcript_text = " ".join(seg.get("text", "") for seg in result.get("segments", [])) ass_content = generate_ass(result['segments'], style_name=caption_style) ass_path = CLIPS_DIR / f"{name}.ass" ass_path.write_text(ass_content, encoding="utf-8") filters.append(f"subtitles={ass_path.name}") print(f"[>>] Captions: ASS written from transcription → {ass_path}", flush=True) except Exception as e: print(f"[!] Subtitle generation failed: {e}", flush=True) import traceback traceback.print_exc() if 'temp_audio' in locals() and temp_audio.exists(): try: temp_audio.unlink() except Exception: pass final_vf = ",".join(filters) # Prepare FFmpeg command cmd = [ "ffmpeg", "-loglevel", "error", "-ss", str(start), "-i", str(video_path) ] # Handle Background Music (Vibe) vibe_file = VIBES_DIR / f"{vibe}.mp3" if vibe != "none" and vibe_file.exists(): cmd.extend(["-stream_loop", "-1", "-i", str(vibe_file)]) # Mix audio: aevalsrc for delay + amix # volume 1.0 for original, volume 0.3 for background filter_complex = ( "[0:a]volume=1.0[main_a];" "[1:a]volume=0.3[vibe_a];" "[main_a][vibe_a]amix=inputs=2:duration=first:dropout_transition=2[aout]" ) cmd.extend(["-filter_complex", filter_complex, "-map", "0:v", "-map", "[aout]", "-c:a", "aac", "-b:a", "128k"]) else: cmd.extend(["-c:a", "aac", "-b:a", "128k"]) cmd.extend([ "-t", str(dur), "-vf", final_vf, "-c:v", "libx264", "-preset", "ultrafast", "-crf", "26", "-movflags", "+faststart", "-y", str(out), ]) print(f"[>>] FFmpeg cmd: {' '.join(cmd)}", flush=True) # cwd=CLIPS_DIR is required so the relative 'subtitles=filename.ass' path resolves correctly r = subprocess.run(cmd, capture_output=True, cwd=str(CLIPS_DIR)) if r.stderr: stderr_text = r.stderr.decode(errors='replace').strip() if stderr_text: print(f"[>>] FFmpeg stderr: {stderr_text[-500:]}", flush=True) # Cleanup ASS if ass_path and ass_path.exists(): try: ass_path.unlink() except Exception: pass if r.returncode != 0: raise RuntimeError( f"FFmpeg failed for clip {idx + 1}: " f"{r.stderr.decode(errors='replace')[-600:]}" ) return out, warnings, transcript_text # ── Routes ──────────────────────────────────────────────────────────────────── def _base_href(): """Directory URL for so style.css / app.js resolve behind HF path prefixes.""" sr = (request.script_root or "").strip() if not sr: return "/" return sr.rstrip("/") + "/" @app.route("/api/process", methods=["POST"]) def process(): data = request.get_json(force=True, silent=True) or {} youtube_url = (data.get("youtubeUrl") or "").strip() url_prefix = (request.script_root or "").rstrip("/") if not youtube_url: return jsonify({"error": "youtubeUrl is required"}), 400 n_clips = min(int(data.get("clips", MAX_CLIPS)), MAX_CLIPS) mode = data.get("mode", "fill").strip().lower() if mode not in ("fill", "pad"): mode = "fill" def generate(): video_path = None try: # 1. Download video_path = download_video(youtube_url) # 2. Info duration, width, height = get_video_info(video_path) if duration < 20: yield json.dumps({"error": "Video too short (minimum 20 s)."}) + "\n" return if width == 0 or height == 0: yield json.dumps({"error": "Could not read video dimensions."}) + "\n" return # ── Full Intelligence Work-flow: Transcribe First ─────────────── print("[>>] Intelligence Scan: Extracting full audio...", flush=True) model = get_whisper_model() full_audio = video_path.with_suffix(".full.wav") subprocess.run([ "ffmpeg", "-i", str(video_path), "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", str(full_audio) ], capture_output=True) print("[>>] Intelligence Scan: Transcribing full video (this may take a minute)...", flush=True) whisper_result = model.transcribe(str(full_audio), task="transcribe", fp16=False) full_segments = whisper_result.get("segments", []) if full_audio.exists(): full_audio.unlink() # ── Parallel Viral Intelligence: Blended Signal Scan ────────── # (Heatmap + Audio Energy + Psychology Scores) print("[>>] Intelligence Scan: Blending signals in parallel...", flush=True) parallel_res = analyze_video_parallel(video_path, youtube_url, full_segments) heatmap_data = parallel_res["heatmap"] energies = parallel_res["energy"] t_scores = parallel_res["transcript_scores"] # ── Viral Selection Engine: Rank & Deduplicate ─────────────── print("[>>] Intelligence Scan: Selecting viral winners...", flush=True) # Parse Duration Constraints duration_range = data.get("durationRange", "auto") min_dur, max_dur = 0, 0 if duration_range == "15-30": min_dur, max_dur = 15, 30 elif duration_range == "30-60": min_dur, max_dur = 30, 60 elif duration_range == "60-90": min_dur, max_dur = 60, 90 elif duration_range == "auto" and data.get("retentionMode", False): # Legacy retention logic min_dur, max_dur = 20, 59 top_segments = select_top_clips( full_segments, heatmap_data, energies, t_scores, num_clips=n_clips, min_dur=min_dur, max_dur=max_dur ) if not top_segments: yield json.dumps({"error": "No viral segments identified."}) + "\n" return # Tell the frontend how many clips to expect yield json.dumps({"total": len(top_segments)}) + "\n" # 5. Extract additional settings use_captions = data.get("captions", False) headline = data.get("headline", "").strip() cta = data.get("cta", "").strip() reframe = data.get("reframe", False) progress_bar = data.get("progressBar", False) vibe = data.get("vibe", "none") caption_style = data.get("captionStyle", "mrbeast").strip().lower() if caption_style not in ASS_STYLES: caption_style = "mrbeast" # ── Retention Psychology flags ──────────────────────────────────── color_mode = str(data.get("colorMode", "off")).strip().lower() watermark_text = str(data.get("watermarkText", "")).strip() safe_zone = data.get("safeZone", False) def _cut(args): i, seg_data = args s = seg_data["start_time"] e = seg_data["end_time"] return i, cut_clip( video_path, s, e, i, width, height, mode, captions=use_captions, headline=headline, cta=cta, reframe=reframe, progress_bar=progress_bar, vibe=vibe, caption_style=caption_style, safe_zone=safe_zone, color_mode=color_mode, watermark_text=watermark_text, pre_segments=full_segments ), seg_data with ThreadPoolExecutor(max_workers=min(len(top_segments), 3)) as pool: futures = {pool.submit(_cut, (i, seg_data)): i for i, seg_data in enumerate(top_segments)} for future in as_completed(futures): try: i, (clip_path, clip_warnings, _), seg_data = future.result() s = seg_data["start_time"] e = seg_data["end_time"] # Yield any warnings from clip processing for warning_msg in clip_warnings: yield json.dumps({"type": "warning", "msg": warning_msg}) + "\n" # ── Retention Psychology: per-clip analytics ────────── pacing = score_pacing(energies, s, e) structure = segment_structure(s, e) # Assemble viral analysis block for the response scores = { "Heatmap": seg_data["heatmap_score"], "Energy": seg_data["energy_score"], "Transcript": seg_data["transcript_score"] } top_sig = max(scores, key=scores.get).lower().replace(" ", "_") viral_analysis = { "rank": i + 1, "final_score": seg_data["final_score"], "heatmap_score": seg_data["heatmap_score"], "energy_score": seg_data["energy_score"], "transcript_score": seg_data["transcript_score"], "top_signal": top_sig } # Triggers for UI compatibility triggers = { "score": seg_data["final_score"], "tags": [ f"Viral: {int(seg_data['final_score']*100)}%", "Visual Peak" if seg_data["heatmap_score"] > 0.5 else "", "Audio Punch" if seg_data["energy_score"] > 0.6 else "" ] } triggers["tags"] = [t for t in triggers["tags"] if t] yield json.dumps({ "clip": f"{url_prefix}/clips/{clip_path.name}", "index": i, "pacing": pacing, "structure": structure, "triggers": triggers, "viral_analysis": viral_analysis }) + "\n" except Exception as clip_err: print(f"[!] Clip failed: {clip_err}", flush=True) yield json.dumps({"warning": f"Clip processing failed: {clip_err}"}) + "\n" except Exception as exc: yield json.dumps({"error": str(exc)}) + "\n" finally: if video_path and video_path.exists(): try: video_path.unlink() except OSError: pass return Response( stream_with_context(generate()), mimetype="application/x-ndjson", headers={"X-Accel-Buffering": "no"}, # prevent proxy buffering ) @app.route("/clips/") def serve_clip(filename): return send_from_directory(str(CLIPS_DIR), filename) @app.route("/health") def health(): missing = check_deps() return jsonify({ "status": "ok" if not missing else "degraded", "missing_tools": missing, }) @app.route("/health/dns") def health_dns(): """Diagnostic endpoint: test if this container can reach YouTube.""" results = {} for host in ["www.youtube.com", "www.google.com", "huggingface.co"]: try: addr = socket.getaddrinfo(host, 443, socket.AF_INET) results[host] = {"ok": True, "ip": addr[0][4][0]} except Exception as e: results[host] = {"ok": False, "error": str(e)} # Also report yt-dlp version try: ver = subprocess.run(["yt-dlp", "--version"], capture_output=True, text=True) yt_dlp_ver = ver.stdout.strip() except Exception: yt_dlp_ver = "unknown" # Check cookies cookies_path = _find_cookies_file() all_ok = all(r["ok"] for r in results.values()) return jsonify({ "dns_status": "ok" if all_ok else "BLOCKED", "hosts": results, "yt_dlp_version": yt_dlp_ver, "cookies_found": cookies_path or False, }) @app.route("/") def root(): base = _base_href() inject = f' \n' html = _INDEX_HTML_RAW if "", "\n" + inject, 1) return Response(html, mimetype="text/html; charset=utf-8") @app.route("/style.css") def serve_css(): return send_from_directory(str(BASE_DIR), "style.css") @app.route("/app.js") def serve_js(): return send_from_directory(str(BASE_DIR), "app.js") # ── Entry ───────────────────────────────────────────────────────────────────── if __name__ == "__main__": print("\n[>>] AI SquadX VIP - Clipper Backend") print(f" http://localhost:{PORT}") # Show yt-dlp version for debugging try: ver = subprocess.run(["yt-dlp", "--version"], capture_output=True, text=True) print(f" yt-dlp version: {ver.stdout.strip()}") except Exception: print(" yt-dlp version: unknown") # Show cookies status cookies = _find_cookies_file() if cookies: print(f" Cookies: {cookies}") else: print(" Cookies: NOT FOUND (YouTube may block downloads)") print(" → Place a cookies.txt next to server.py to fix this.") print() missing = check_deps() if missing: print(f"[!] Missing: {', '.join(missing)}") print(" Install them or clips won't generate.\n") else: print("[OK] ffmpeg, ffprobe, yt-dlp found\n") app.run(host="0.0.0.0", port=PORT, debug=False, threaded=False)