| |
| """ |
| AI SquadX VIP β Viral Shorts Backend |
| Analyzes YouTube videos, finds hook segments, cuts 9:16 clips. |
| |
| Requirements: pip install flask flask-cors yt-dlp |
| System deps: ffmpeg (must be in PATH) |
| """ |
|
|
| import json |
| import math |
| import socket |
| import struct |
| import subprocess |
| import tempfile |
| import time |
| import uuid |
| import wave |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from html import escape |
| from pathlib import Path |
|
|
| from flask import Flask, jsonify, request, send_from_directory, Response, stream_with_context |
| from werkzeug.middleware.proxy_fix import ProxyFix |
| from flask_cors import CORS |
| import whisper |
| import cv2 |
| import mediapipe as mp |
|
|
| |
| import os |
| PORT = int(os.getenv("PORT", 5000)) |
| COOKIES_FILE = os.getenv("COOKIES_FILE", "") |
| YTDLP_PROXY = os.getenv("YTDLP_PROXY", "") |
| MAX_CLIPS = 10 |
| CLIP_DURATION = 45 |
| MIN_GAP_SECONDS = 60 |
| SAMPLE_RATE = 8000 |
| ENERGY_WINDOW = 5 |
|
|
| BASE_DIR = Path(__file__).parent.resolve() |
| CLIPS_DIR = BASE_DIR / "clips" |
| DOWNLOADS_DIR = BASE_DIR / "downloads" |
| CLIPS_DIR.mkdir(exist_ok=True) |
| DOWNLOADS_DIR.mkdir(exist_ok=True) |
|
|
| _INDEX_HTML_RAW = (BASE_DIR / "index.html").read_text(encoding="utf-8") |
|
|
| |
| VIBES_DIR = BASE_DIR / "vibes" |
| VIBES_DIR.mkdir(exist_ok=True) |
|
|
| |
| _face_detector = None |
| def get_face_detector(): |
| global _face_detector |
| if _face_detector is None: |
| print("[>>] Initializing AI face detector...") |
| _face_detector = mp.solutions.face_detection.FaceDetection( |
| model_selection=1, min_detection_confidence=0.5 |
| ) |
| return _face_detector |
|
|
| def analyze_face_center(video_path: Path, start: float, duration: float): |
| """Analyze a segment and return the average X-coordinate (0.0 to 1.0) of faces.""" |
| try: |
| cap = cv2.VideoCapture(str(video_path)) |
| cap.set(cv2.CAP_PROP_POS_MSEC, start * 1000) |
| |
| detector = get_face_detector() |
| centers = [] |
| |
| |
| for i in range(int(duration)): |
| cap.set(cv2.CAP_PROP_POS_MSEC, (start + i) * 1000) |
| ret, frame = cap.read() |
| if not ret: |
| break |
| |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| results = detector.process(rgb_frame) |
| |
| if results.detections: |
| |
| bbox = results.detections[0].location_data.relative_bounding_box |
| center_x = bbox.xmin + (bbox.width / 2) |
| centers.append(center_x) |
| |
| cap.release() |
| return sum(centers) / len(centers) if centers else 0.5 |
| except Exception as e: |
| print(f"[!] Face analysis failed: {e}") |
| return 0.5 |
|
|
| |
| _whisper_model = None |
| def get_whisper_model(): |
| global _whisper_model |
| if _whisper_model is None: |
| print("[>>] Loading AI whisper model (tiny β fast mode)...") |
| _whisper_model = whisper.load_model("tiny") |
| return _whisper_model |
|
|
| |
| ASS_STYLES = { |
| |
| |
| "mrbeast": ( |
| "Style: Default,Arial Black,82,&H0000FFFF,&H000000FF,&H00000000,&H88000000," |
| "1,1,0,0,100,110,0,0,1,6,3,2,40,40,650,1" |
| ), |
| |
| "podcast": ( |
| "Style: Default,Arial,62,&H00FFFFFF,&H000000FF,&H00000000,&HAA000000," |
| "1,0,0,0,100,100,0,0,4,0,0,2,40,40,650,1" |
| ), |
| |
| "neon": ( |
| "Style: Default,Arial Black,70,&H0000FFFF,&H000000FF,&H00FF00FF,&H88000000," |
| "1,0,0,0,100,100,1,0,1,4,6,2,40,40,650,1" |
| ), |
| |
| "horror": ( |
| "Style: Default,Arial Black,72,&H002020EE,&H000000FF,&H00000000,&H88000000," |
| "1,0,0,0,100,100,1,0,1,5,8,2,40,40,650,1" |
| ), |
| |
| "minimal": ( |
| "Style: Default,Arial,58,&H00FFFFFF,&H000000FF,&H66000000,&H44000000," |
| "0,0,0,0,100,100,0,0,1,2,1,2,40,40,650,1" |
| ), |
| } |
|
|
| ASS_HEADER = """[Script Info] |
| ScriptType: v4.00+ |
| PlayResX: 1080 |
| PlayResY: 1920 |
| ScaledBorderAndShadow: yes |
| |
| [V4+ Styles] |
| Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding |
| {style_line} |
| |
| [Events] |
| Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text |
| """ |
|
|
| def ass_time(seconds: float) -> str: |
| """Format seconds as ASS timestamp h:mm:ss.cs""" |
| h = int(seconds // 3600) |
| m = int((seconds % 3600) // 60) |
| s = int(seconds % 60) |
| cs = int((seconds % 1) * 100) |
| return f"{h}:{m:02}:{s:02}.{cs:02}" |
|
|
|
|
| def chunk_segments(segments, max_words: int = 4): |
| """Break whisper segments into short word-level chunks for viral-style captions.""" |
| chunks = [] |
| for seg in segments: |
| words = seg['text'].strip().split() |
| if not words: |
| continue |
| start = seg['start'] |
| end = seg['end'] |
| duration = max(end - start, 0.1) |
| word_dur = duration / len(words) |
| for i in range(0, len(words), max_words): |
| group = words[i:i + max_words] |
| chunk_start = start + i * word_dur |
| chunk_end = chunk_start + len(group) * word_dur |
| chunks.append({ |
| 'start': chunk_start, |
| 'end': min(chunk_end, end), |
| 'text': ' '.join(group) |
| }) |
| return chunks |
|
|
|
|
| |
| _ASS_RED = "&H0000FF&" |
| _ASS_YELLOW = "&H04F3FF&" |
| _ASS_BLACK = "&H000000&" |
|
|
|
|
| def _key_word_idx(words: list) -> int: |
| """Pick the 'impact' word in a chunk β longest wins; ties go to last.""" |
| if not words: |
| return 0 |
| return max(range(len(words)), key=lambda i: len(words[i])) |
|
|
|
|
| def generate_ass(segments, style_name: str = "mrbeast") -> str: |
| r"""Generate fancy ASS subtitles with per-word colour+size emphasis. |
| |
| Layout matches screenshots: |
| - Key word: Red (#ff0000), fs=130, Red glow (\3c), bord=6, blur=4 |
| - Others: Yellow (#fff304), fs=68, Black outline, bord=2 |
| - Stacking: Injects \N before and after key word for verticality. |
| """ |
| style_line = ASS_STYLES.get(style_name, ASS_STYLES["mrbeast"]) |
| header = ASS_HEADER.format(style_line=style_line) |
| chunks = chunk_segments(segments, max_words=4) |
| lines = [] |
|
|
| for chunk in chunks: |
| t0 = ass_time(chunk['start']) |
| t1 = ass_time(chunk['end']) |
| words = chunk['text'].replace('\n', ' ').split() |
| if not words: |
| continue |
|
|
| key_idx = _key_word_idx(words) |
| parts = [] |
| for i, word in enumerate(words): |
| if i == key_idx: |
| |
| parts.append( |
| f"{{\\1c{_ASS_RED}\\3c{_ASS_RED}\\fs130\\bord6\\shad3\\blur4}}{word}" |
| ) |
| else: |
| |
| parts.append( |
| f"{{\\1c{_ASS_YELLOW}\\3c{_ASS_BLACK}\\fs68\\bord2\\shad1\\blur0}}{word}" |
| ) |
|
|
| |
| |
| if len(parts) >= 3: |
| |
| final_text = "" |
| if key_idx > 0: |
| final_text += " ".join(parts[:key_idx]) + r"\N" |
| final_text += parts[key_idx] |
| if key_idx < len(parts) - 1: |
| final_text += r"\N" + " ".join(parts[key_idx + 1:]) |
| else: |
| final_text = " ".join(parts) |
|
|
| lines.append(f"Dialogue: 0,{t0},{t1},Default,,0,0,0,,{final_text}") |
|
|
| return header + "\n".join(lines) |
|
|
|
|
| app = Flask(__name__, static_folder=str(BASE_DIR), static_url_path="") |
| |
| app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) |
| CORS(app, origins=["http://localhost:5000", "http://127.0.0.1:5000", "https://*.hf.space"]) |
|
|
|
|
| |
| def check_deps(): |
| missing = [] |
| for tool, flag in [("ffmpeg", "-version"), ("ffprobe", "-version"), ("yt-dlp", "--version")]: |
| r = subprocess.run([tool, flag], capture_output=True) |
| if r.returncode not in (0, 1): |
| missing.append(tool) |
| return missing |
|
|
|
|
| |
| |
| _YT_DLP_SUPPORTS_JS_RUNTIMES = None |
| def yt_dlp_supports_js_runtimes() -> bool: |
| global _YT_DLP_SUPPORTS_JS_RUNTIMES |
| if _YT_DLP_SUPPORTS_JS_RUNTIMES is not None: |
| return _YT_DLP_SUPPORTS_JS_RUNTIMES |
| |
| |
| try: |
| node_v = subprocess.run(["node", "-v"], capture_output=True, text=True).stdout.strip() |
| print(f"[OK] Node.js version: {node_v}", flush=True) |
| except Exception: |
| print("[!] Node.js NOT FOUND in PATH", flush=True) |
|
|
| try: |
| help_out = subprocess.run( |
| ["yt-dlp", "--help"], |
| capture_output=True, |
| text=True, |
| ).stdout |
| _YT_DLP_SUPPORTS_JS_RUNTIMES = "--js-runtimes" in help_out |
| except Exception: |
| _YT_DLP_SUPPORTS_JS_RUNTIMES = False |
| return _YT_DLP_SUPPORTS_JS_RUNTIMES |
|
|
|
|
| |
|
|
| |
| _USER_AGENT = ( |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) " |
| "Chrome/131.0.0.0 Safari/537.36" |
| ) |
|
|
|
|
| def _find_cookies_file() -> str | None: |
| """Locate a cookies.txt file in several common locations with robust search.""" |
| |
| if COOKIES_FILE and Path(COOKIES_FILE).is_file(): |
| print(f"[>>] Cookies found via env var: {COOKIES_FILE}", flush=True) |
| return COOKIES_FILE |
| |
| |
| search_paths = [BASE_DIR, Path.cwd(), Path.home()] |
| for p in search_paths: |
| |
| for candidate in p.glob("*cookies*.txt"): |
| if candidate.is_file() and candidate.stat().st_size > 10: |
| print(f"[>>] SUCCESS: Cookies found at: {candidate.resolve()}", flush=True) |
| return str(candidate.resolve()) |
| |
| |
| print(f"[!] Cookies NOT FOUND. Current files in {BASE_DIR}:", flush=True) |
| try: |
| files = [f.name for f in BASE_DIR.glob("*") if f.is_file()] |
| print(f" {files}", flush=True) |
| except Exception: |
| pass |
|
|
| return None |
|
|
|
|
| def download_video(youtube_url: str, max_retries: int = 3) -> Path: |
| uid = uuid.uuid4().hex[:10] |
| template = str(DOWNLOADS_DIR / f"{uid}.%(ext)s") |
|
|
| cmd = [ |
| "yt-dlp", |
| "--force-ipv4", |
| "--ignore-config", |
| "--no-cache-dir", |
| "--user-agent", _USER_AGENT, |
| "--extractor-args", "youtube:player_client=web,tv,ios;player_skip=web_embedded_check", |
| "--remote-components", "ejs:github", |
| "--no-check-certificates", |
| "--geo-bypass", |
| "--add-header", "Accept-Language:en-US,en;q=0.9", |
| "--add-header", "Accept:text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", |
| "-f", "bestvideo[height<=1080]+bestaudio/best[height<=1080]/best", |
| "--merge-output-format", "mp4", |
| "--no-playlist", |
| "--no-part", |
| "--socket-timeout", "30", |
| "--retries", "3", |
| "--file-access-retries", "3", |
| "-o", template, |
| youtube_url, |
| ] |
|
|
| |
| cookies_path = _find_cookies_file() |
| if cookies_path: |
| cmd[1:1] = ["--cookies", cookies_path] |
| print(f"[>>] Using cookies from: {cookies_path}", flush=True) |
|
|
| |
| if yt_dlp_supports_js_runtimes(): |
| cmd[1:1] = ["--js-runtimes", "node"] |
| print("[>>] Forcing 'node' as yt-dlp JS runtime solver", flush=True) |
|
|
| |
| if YTDLP_PROXY: |
| cmd[1:1] = ["--proxy", YTDLP_PROXY] |
| print(f"[>>] Using proxy: {YTDLP_PROXY}", flush=True) |
|
|
| last_err = "" |
| for attempt in range(1, max_retries + 1): |
| print(f"[>>] Download attempt {attempt}/{max_retries} ...", flush=True) |
|
|
| |
| |
| if not YTDLP_PROXY: |
| try: |
| socket.setdefaulttimeout(10) |
| socket.getaddrinfo("www.youtube.com", 443, socket.AF_INET) |
| except socket.gaierror as dns_err: |
| last_err = f"DNS resolution failed: {dns_err}" |
| print(f"[!] {last_err}", flush=True) |
| if attempt < max_retries: |
| time.sleep(2 ** attempt) |
| continue |
| raise RuntimeError(last_err) |
| else: |
| print(f"[>>] Skipping DNS pre-check because YTDLP_PROXY is set.", flush=True) |
|
|
| |
| result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
| if result.returncode == 0: |
| matches = list(DOWNLOADS_DIR.glob(f"{uid}.*")) |
| if matches: |
| print(f"[OK] Downloaded: {matches[0].name}", flush=True) |
| return matches[0] |
| last_err = "yt-dlp finished but produced no output file." |
| else: |
| stderr = result.stderr |
| |
| print(f"[!] yt-dlp failed (attempt {attempt}):\n{stderr}", flush=True) |
| last_err = stderr[-1000:] |
|
|
| if attempt < max_retries: |
| time.sleep(2 ** attempt) |
|
|
| raise RuntimeError(f"Download failed after {max_retries} attempts:\n{last_err}") |
|
|
|
|
| def get_video_info(video_path: Path): |
| """Return (duration, width, height) via ffprobe JSON.""" |
| r = subprocess.run( |
| ["ffprobe", "-v", "quiet", "-print_format", "json", |
| "-show_format", "-show_streams", str(video_path)], |
| capture_output=True, |
| ) |
| data = json.loads(r.stdout) |
| duration = float(data["format"]["duration"]) |
| width = height = 0 |
| for s in data.get("streams", []): |
| if s.get("codec_type") == "video": |
| width = int(s["width"]) |
| height = int(s["height"]) |
| break |
| return duration, width, height |
|
|
|
|
| def get_youtube_heatmap(video_path: Path, url: str): |
| """ |
| Fetch YouTube's 'Most Replayed' heatmap data using yt-dlp. |
| Returns: [{start_time: float, end_time: float, score: float}] or [] |
| """ |
| print("[>>] Fetching YouTube heatmap...", flush=True) |
| |
| |
| info_json_path = video_path.with_suffix(".info.json") |
| |
| cmd = [ |
| "yt-dlp", |
| "--force-ipv4", |
| "--ignore-config", |
| "--no-cache-dir", |
| "--user-agent", _USER_AGENT, |
| "--extractor-args", "youtube:player_client=web,tv,ios;player_skip=web_embedded_check", |
| "--remote-components", "ejs:github", |
| "--no-check-certificates", |
| "--geo-bypass", |
| "--add-header", "Accept-Language:en-US,en;q=0.9", |
| "--add-header", "Accept:text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", |
| "--write-info-json", |
| "--skip-download", |
| "-o", str(video_path.with_suffix("")), |
| url, |
| ] |
| |
| |
| cookies_path = _find_cookies_file() |
| if cookies_path: |
| cmd[1:1] = ["--cookies", cookies_path] |
| |
| |
| if YTDLP_PROXY: |
| cmd[1:1] = ["--proxy", YTDLP_PROXY] |
|
|
| try: |
| |
| subprocess.run(cmd, capture_output=True, text=True, timeout=30) |
| |
| if not info_json_path.exists(): |
| print("[!] Heatmap info JSON not found.", flush=True) |
| return [] |
| |
| with open(info_json_path, "r", encoding="utf-8") as f: |
| data = json.load(f) |
| |
| heatmap = data.get("heatmap") |
| if not heatmap: |
| print("[!] No heatmap data found in YouTube metadata.", flush=True) |
| return [] |
| |
| |
| max_val = max((item.get("value", 0) for item in heatmap), default=1.0) |
| if max_val == 0: max_val = 1.0 |
| |
| normalized = [] |
| for item in heatmap: |
| normalized.append({ |
| "start_time": float(item["start_time"]), |
| "end_time": float(item["end_time"]), |
| "score": round(float(item["value"]) / max_val, 4) |
| }) |
| |
| print(f"[OK] Extracted {len(normalized)} heatmap segments.", flush=True) |
| |
| |
| try: info_json_path.unlink() |
| except: pass |
| |
| return normalized |
| except Exception as e: |
| print(f"[!] Heatmap extraction error: {e}", flush=True) |
| return [] |
|
|
|
|
| |
| def extract_audio_energy(video_path: Path, duration: float): |
| """ |
| Pipe raw mono 8kHz PCM from FFmpeg and compute per-second RMS energy. |
| Returns list of (time_sec, rms) tuples. |
| """ |
| proc = subprocess.Popen( |
| [ |
| "ffmpeg", "-i", str(video_path), |
| "-vn", "-ar", str(SAMPLE_RATE), "-ac", "1", |
| "-f", "s16le", "pipe:1", |
| "-loglevel", "quiet", |
| ], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.DEVNULL, |
| ) |
| raw, _ = proc.communicate() |
|
|
| if not raw: |
| |
| return [(t, 1.0) for t in range(int(duration))] |
|
|
| n = len(raw) // 2 |
| samples = struct.unpack(f"<{n}h", raw) |
|
|
| win = SAMPLE_RATE * ENERGY_WINDOW |
| step = SAMPLE_RATE |
| result = [] |
|
|
| for i in range(0, n - win, step): |
| chunk = samples[i : i + win : 8] |
| if not chunk: |
| continue |
| rms = math.sqrt(sum(int(s) * int(s) for s in chunk) / len(chunk)) |
| result.append((i / SAMPLE_RATE, rms)) |
|
|
| return result |
|
|
|
|
| def analyze_audio_energy(video_path: Path): |
| """ |
| Extract 16kHz mono WAV via FFmpeg and calculate per-second RMS energy. |
| Returns: [{start_time: float, end_time: float, energy: float}] |
| """ |
| print(f"[>>] Analyzing audio energy for: {video_path.name}", flush=True) |
| temp_wav = Path(tempfile.gettempdir()) / f"{uuid.uuid4().hex}.wav" |
| |
| try: |
| |
| cmd = [ |
| "ffmpeg", "-i", str(video_path), |
| "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", |
| str(temp_wav), "-y", "-loglevel", "quiet" |
| ] |
| subprocess.run(cmd, check=True) |
|
|
| if not temp_wav.exists(): |
| print("[!] Audio extraction failed.", flush=True) |
| return [] |
|
|
| with wave.open(str(temp_wav), "rb") as w: |
| n_frames = w.getnframes() |
| sample_rate = w.getframerate() |
| frames = w.readframes(n_frames) |
| |
| |
| samples = struct.unpack(f"<{n_frames}h", frames) |
| |
| |
| chunk_size = sample_rate |
| energies = [] |
| |
| for i in range(0, n_frames, chunk_size): |
| chunk = samples[i : i + chunk_size] |
| if not chunk: |
| continue |
| |
| |
| |
| ms = sum(float(s) * float(s) for s in chunk) / len(chunk) |
| rms = math.sqrt(ms) |
| |
| energies.append({ |
| "start_time": round(i / sample_rate, 2), |
| "end_time": round((i + chunk_size) / sample_rate, 2), |
| "energy": rms |
| }) |
| |
| |
| max_e = max((e["energy"] for e in energies), default=1.0) |
| if max_e == 0: max_e = 1.0 |
| |
| for e in energies: |
| e["energy"] = round(e["energy"] / max_e, 4) |
| |
| print(f"[OK] Audio energy analyzed ({len(energies)} seconds).", flush=True) |
| return energies |
| |
| except Exception as e: |
| print(f"[!] Audio analysis error: {e}", flush=True) |
| return [] |
| finally: |
| if temp_wav.exists(): |
| try: temp_wav.unlink() |
| except: pass |
|
|
|
|
| def calculate_viral_score(segment_start, segment_end, heatmap_data, energy_data, transcript_score): |
| """ |
| Calculate a single viral score by blending heatmap, audio energy, and transcript potential. |
| Returns: {final_score: float, heatmap_score: float, energy_score: float, transcript_score: float} |
| """ |
| |
| |
| h_overlaps = [ |
| d["score"] for d in heatmap_data |
| if d.get("start_time", 0) < segment_end and d.get("end_time", 0) > segment_start |
| ] |
| h_avg = sum(h_overlaps) / len(h_overlaps) if h_overlaps else 0.0 |
| |
| |
| |
| e_peaks = [] |
| for d in energy_data: |
| if isinstance(d, dict): |
| if d.get("start_time", 0) < segment_end and d.get("end_time", 0) > segment_start: |
| e_peaks.append(d.get("energy", 0.0)) |
| elif isinstance(d, (list, tuple)): |
| if segment_start <= d[0] <= segment_end: |
| e_peaks.append(d[1]) |
| |
| e_max = max(e_peaks) if e_peaks else 0.0 |
| |
| |
| |
| final = (0.40 * h_avg) + (0.35 * e_max) + (0.25 * transcript_score) |
| |
| return { |
| "final_score": round(final, 2), |
| "heatmap_score": round(h_avg, 2), |
| "energy_score": round(e_max, 2), |
| "transcript_score": round(transcript_score, 2) |
| } |
|
|
|
|
| def analyze_video_parallel(video_path, url, transcript_segments): |
| """ |
| Simultaneously fetch heatmap, audio energy, and score all transcript segments. |
| """ |
| results = {"heatmap": [], "energy": [], "transcript_scores": []} |
| |
| def run_wrapper(name, func, *args): |
| start_t = time.time() |
| try: |
| res = func(*args) |
| elapsed = time.time() - start_t |
| print(f"[OK] {name} completed in {elapsed:.2f}s", flush=True) |
| return res |
| except Exception as e: |
| print(f"[!] {name} failed: {e}", flush=True) |
| return [] |
|
|
| |
| def score_batch(segs): |
| return [score_transcript_segment(s.get("text", "")) for s in segs] |
|
|
| print(f"[>>] Starting parallel analysis for {video_path.name}...", flush=True) |
| with ThreadPoolExecutor(max_workers=3) as executor: |
| futures = { |
| executor.submit(run_wrapper, "Heatmap", get_youtube_heatmap, video_path, url): "heatmap", |
| executor.submit(run_wrapper, "Audio Energy", analyze_audio_energy, video_path): "energy", |
| executor.submit(run_wrapper, "Transcript Scoring", score_batch, transcript_segments): "transcript" |
| } |
| |
| for future in as_completed(futures): |
| key = futures[future] |
| if key == "heatmap": |
| results["heatmap"] = future.result() |
| elif key == "energy": |
| results["energy"] = future.result() |
| elif key == "transcript": |
| results["transcript_scores"] = future.result() |
| |
| return results |
|
|
|
|
| def select_top_clips(transcript_segments, heatmap_data, energy_data, transcript_scores, num_clips=10, min_dur=0, max_dur=0): |
| """ |
| Ranks transcript segments by viral score and deduplicates repetitive temporal overlaps. |
| If min_dur/max_dur are provided, expands segments to fit the requested duration. |
| """ |
| scored_segs = [] |
| video_duration = transcript_segments[-1].get("end", 0) if transcript_segments else 0 |
| |
| |
| for i, seg in enumerate(transcript_segments): |
| start, end = seg.get("start", 0), seg.get("end", 0) |
| t_score = transcript_scores[i] if i < len(transcript_scores) else 0.0 |
| viral_data = calculate_viral_score(start, end, heatmap_data, energy_data, t_score) |
| |
| |
| s_copy = seg.copy() |
| s_copy["viral_score"] = viral_data |
| scored_segs.append(s_copy) |
| |
| |
| scored_segs.sort(key=lambda x: x["viral_score"]["final_score"], reverse=True) |
| |
| |
| selected = [] |
| chosen_starts = [] |
| |
| |
| dedup_window = max(30, min_dur) |
| |
| for seg in scored_segs: |
| curr_start = seg.get("start", 0) |
| curr_end = seg.get("end", 0) |
| |
| |
| is_duplicate = any(abs(curr_start - prev_s) < dedup_window for prev_s in chosen_starts) |
| if is_duplicate: |
| continue |
| |
| |
| if min_dur > 0: |
| actual_dur = curr_end - curr_start |
| if actual_dur < min_dur: |
| needed = min_dur - actual_dur |
| |
| new_start = max(0, curr_start - needed / 2) |
| new_end = min(video_duration, curr_end + needed / 2) |
| |
| |
| if new_start == 0: |
| new_end = min(video_duration, min_dur) |
| elif new_end == video_duration: |
| new_start = max(0, video_duration - min_dur) |
| |
| curr_start, curr_end = round(new_start, 2), round(new_end, 2) |
| |
| |
| if max_dur > 0 and (curr_end - curr_start) > max_dur: |
| curr_end = curr_start + max_dur |
| |
| selected.append({ |
| "start": curr_start, |
| "end": curr_end, |
| "text": seg.get("text", ""), |
| "viral_score": seg["viral_score"] |
| }) |
| chosen_starts.append(curr_start) |
| |
| if len(selected) >= num_clips: |
| break |
| |
| |
| top_clips = [] |
| for s in selected: |
| v = s["viral_score"] |
| top_clips.append({ |
| "start_time": s["start"], |
| "end_time": s["end"], |
| "text": s["text"], |
| "final_score": v["final_score"], |
| "heatmap_score": v["heatmap_score"], |
| "energy_score": v["energy_score"], |
| "transcript_score": v["transcript_score"] |
| }) |
| |
| |
| print("\n" + "="*85) |
| print(f"{'RANK':<5} | {'START':<8} | {'END':<8} | {'SCORE':<8} | {'TOP SIGNAL SOURCE'}") |
| print("-" * 85) |
| for idx, c in enumerate(top_clips): |
| |
| sig_map = { |
| "Heatmap": c["heatmap_score"], |
| "Audio Peak": c["energy_score"], |
| "Psych Hook": c["transcript_score"] |
| } |
| top_sig = max(sig_map, key=sig_map.get) |
| print(f"{idx+1:<5} | {c['start_time']:<8.1f} | {c['end_time']:<8.1f} | {c['final_score']:<8.2f} | {top_sig}") |
| print("="*85 + "\n", flush=True) |
| |
| return top_clips |
|
|
|
|
| |
| def find_segments(energies, duration: float, n_clips: int = MAX_CLIPS): |
| """ |
| Greedy peak selection: |
| 1. Smooth the RMS curve. |
| 2. Repeatedly pick the highest-energy moment, then |
| black out a MIN_GAP_SECONDS radius around it. |
| Returns list of (start, end) in seconds. |
| """ |
| if not energies: |
| step = max(60.0, (duration - CLIP_DURATION) / max(n_clips, 1)) |
| return [(round(i * step + 10, 2), round(i * step + 10 + CLIP_DURATION, 2)) |
| for i in range(n_clips) if i * step + 10 + CLIP_DURATION <= duration] |
|
|
| |
| if isinstance(energies[0], dict): |
| times = [e.get("start_time", 0) for e in energies] |
| vals = [e.get("energy", 0) for e in energies] |
| else: |
| times = [e[0] for e in energies] |
| vals = [e[1] for e in energies] |
|
|
| |
| w = min(10, max(3, len(vals) // 20)) |
| smoothed = [] |
| for i in range(len(vals)): |
| lo, hi = max(0, i - w), min(len(vals), i + w + 1) |
| smoothed.append(sum(vals[lo:hi]) / (hi - lo)) |
|
|
| |
| used = [False] * len(smoothed) |
| peaks = [] |
| gap_idx = MIN_GAP_SECONDS |
|
|
| while len(peaks) < n_clips * 2: |
| best_i = max( |
| (i for i in range(len(smoothed)) if not used[i]), |
| key=lambda i: smoothed[i], |
| default=-1, |
| ) |
| if best_i < 0: |
| break |
| peaks.append(times[best_i]) |
| lo = max(0, best_i - gap_idx) |
| hi = min(len(used), best_i + gap_idx + 1) |
| for j in range(lo, hi): |
| used[j] = True |
|
|
| peaks.sort() |
| peaks = peaks[:n_clips] |
|
|
| |
| if len(peaks) < n_clips: |
| step = max(60.0, (duration - CLIP_DURATION) / max(n_clips, 1)) |
| t = 10.0 |
| while len(peaks) < n_clips and t + CLIP_DURATION <= duration: |
| if all(abs(t - p) >= MIN_GAP_SECONDS for p in peaks): |
| peaks.append(t) |
| t += step |
| peaks.sort() |
|
|
| |
| segments = [] |
| for pt in peaks: |
| start = max(0.0, pt - CLIP_DURATION * 0.25) |
| end = start + CLIP_DURATION |
| if end > duration: |
| end = duration |
| start = max(0.0, end - CLIP_DURATION) |
| segments.append((round(start, 2), round(end, 2))) |
|
|
| return segments |
|
|
|
|
| |
| |
| _COLOR_MODES = frozenset({"off", "boost", "yellow", "red"}) |
|
|
|
|
| def build_vf(width: int, height: int, center_x: float = 0.5, |
| safe_zone: bool = False, color_mode: str = "off") -> str: |
| """Return an FFmpeg -vf string that crops to 9:16 at 1080Γ1920. |
| |
| Args: |
| width / height: source video dimensions. |
| center_x: horizontal crop anchor (0.0β1.0; 0.5 = centre). |
| safe_zone: When True, additionally crops top+bottom 12.5% so the |
| viewer's eye is drawn to the central safe zone (Retention Psychology). |
| color_mode: One of 'off' | 'boost' | 'yellow' | 'red'. |
| - boost β +40% saturation (eq filter) |
| - yellow β warm yellow-push via curves filter |
| - red β red-dominant push via curves filter |
| """ |
| if color_mode not in _COLOR_MODES: |
| color_mode = "off" |
|
|
| ratio = 9 / 16 |
| if width / height > ratio: |
| |
| cw = int(height * ratio) |
| ch = height |
| cx = int((width * center_x) - (cw / 2)) |
| cx = max(0, min(cx, width - cw)) |
| cy = 0 |
| else: |
| |
| cw = width |
| ch = int(width / ratio) |
| cx = 0 |
| cy = (height - ch) // 2 |
|
|
| |
| cw -= cw % 2 |
| ch -= ch % 2 |
| vf = f"crop={cw}:{ch}:{cx}:{cy},scale=1080:1920:flags=lanczos" |
|
|
| |
| if safe_zone: |
| vf += ",crop=iw:ih*0.75:0:ih*0.125,scale=1080:1920:flags=lanczos" |
|
|
| |
| if color_mode == "boost": |
| |
| vf += ",eq=saturation=1.4" |
| elif color_mode == "yellow": |
| |
| vf += ",curves=red='0/0 0.5/0.56 1/1':green='0/0 0.5/0.53 1/1':blue='0/0 0.5/0.46 1/1'" |
| elif color_mode == "red": |
| |
| vf += ",curves=red='0/0 0.5/0.62 1/1':green='0/0 0.5/0.46 1/1':blue='0/0 0.5/0.44 1/1'" |
|
|
| return vf |
|
|
|
|
| |
|
|
| |
| _TRIGGER_KEYWORDS = { |
| "animal": ["dog", "cat", "wolf", "bear", "shark", "snake", "bird", "lion", "tiger"], |
| "disaster": ["explosion", "crash", "fire", "flood", "earthquake", "storm", "accident"], |
| "food": ["eating", "cooking", "recipe", "taste", "delicious", "meal", "food"], |
| "baby": ["baby", "newborn", "puppy", "kitten", "child", "infant"], |
| "shock": ["unbelievable", "insane", "crazy", "shocking", "impossible", "wtf", "omg"], |
| "rage": ["wrong", "mistake", "error", "broken", "fail", "typo"], |
| "narrative": ["story", "secret", "truth", "revealed", "you won't believe", "finally"], |
| } |
|
|
|
|
| def score_transcript_segment(text: str) -> float: |
| """ |
| Score a transcript segment for viral potential based on hooks, structure, and emotions. |
| Returns a float 0.0β1.0. |
| """ |
| if not text or not text.strip(): |
| return 0.0 |
| |
| score = 0.0 |
| text_lower = text.lower().strip() |
| |
| |
| hooks = ["wait", "secret", "no one tells you", "here's why", "never", "always", "mistake", "truth", "actually"] |
| if any(h in text_lower for h in hooks): |
| score += 0.3 |
| |
| |
| if "?" in text: |
| score += 0.2 |
| |
| |
| word_count = len(text.split()) |
| if word_count > 20: |
| score += 0.2 |
| |
| |
| emotions = ["crazy", "insane", "shocked", "love", "hate", "afraid", "angry", "excited", "wow", "unbelievable"] |
| if any(e in text_lower for e in emotions): |
| score += 0.15 |
| |
| |
| openers = ["so", "but", "wait", "now", "here", "this is"] |
| if any(text_lower.startswith(o) for o in openers): |
| score += 0.15 |
| |
| return min(1.0, round(score, 2)) |
|
|
|
|
| def score_pacing(energies: list, start: float, end: float) -> int: |
| """Count how many 4-second windows within [start, end] contain an energy peak. |
| |
| Returns a 0β100 pacing score (higher = more frequent attention resets). |
| """ |
| if not energies: |
| return 50 |
|
|
| window_energies = [] |
| for d in energies: |
| |
| if isinstance(d, dict): |
| t = d.get("start_time", 0) |
| val = d.get("energy", 0) |
| else: |
| t, val = d |
| |
| if start <= t <= end: |
| window_energies.append(val) |
|
|
| if not window_energies: |
| return 50 |
| duration = end - start |
| num_windows = max(1, int(duration / 4)) |
| window_size = max(1, len(window_energies) // num_windows) |
| peaks = 0 |
| mean_energy = sum(window_energies) / len(window_energies) |
| for i in range(0, len(window_energies), window_size): |
| chunk = window_energies[i:i + window_size] |
| if chunk and max(chunk) > mean_energy * 1.1: |
| peaks += 1 |
| |
| score = min(100, int((peaks / num_windows) * 100)) |
| return score |
|
|
|
|
| def segment_structure(start: float, end: float) -> dict: |
| """Split clip into Hook / Body / Reward timestamps (3-Part Structure). |
| |
| Returns: |
| { |
| "hook": {"start": float, "end": float}, # first 10% |
| "body": {"start": float, "end": float}, # middle 75% |
| "reward": {"start": float, "end": float}, # last 15% |
| } |
| """ |
| dur = end - start |
| hook_end = round(start + dur * 0.10, 2) |
| body_end = round(start + dur * 0.85, 2) |
| return { |
| "hook": {"start": round(start, 2), "end": hook_end}, |
| "body": {"start": hook_end, "end": body_end}, |
| "reward": {"start": body_end, "end": round(end, 2)}, |
| } |
|
|
|
|
| def build_vf_pad(width: int, height: int) -> str: |
| """Scale to fit inside 9:16 (1080Γ1920) preserving aspect ratio, pad remainder black.""" |
| |
| |
| return ( |
| "scale=1080:1920:force_original_aspect_ratio=decrease:flags=lanczos," |
| "pad=1080:1920:(ow-iw)/2:(oh-ih)/2:black," |
| "setsar=1" |
| ) |
|
|
|
|
| |
|
|
| def ffmpeg_escape_text(text): |
| """Escape text for FFmpeg drawtext filter.""" |
| for ch in ["\\", "'", ":", ";", "[", "]", ",", "="]: |
| text = text.replace(ch, f"\\{ch}") |
| return text |
|
|
| |
| CAPTION_STYLES = { |
| |
| "mrbeast": ( |
| "FontName=Arial Black,FontSize=22,Bold=1," |
| "PrimaryColour=&H00FFFFFF,OutlineColour=&H00000000," |
| "BackColour=&H00000000,BorderStyle=1,Outline=4,Shadow=2," |
| "Alignment=2,MarginV=80,MarginL=20,MarginR=20," |
| "Spacing=0" |
| ), |
| |
| "podcast": ( |
| "FontName=Arial,FontSize=18,Bold=1," |
| "PrimaryColour=&H00FFFFFF,OutlineColour=&H00000000," |
| "BackColour=&HAA000000,BorderStyle=4,Outline=0,Shadow=0," |
| "Alignment=2,MarginV=90,MarginL=30,MarginR=30," |
| "Spacing=1" |
| ), |
| |
| "neon": ( |
| "FontName=Arial Black,FontSize=20,Bold=1," |
| "PrimaryColour=&H0000FFFF,OutlineColour=&H00FF00FF," |
| "BackColour=&H00000000,BorderStyle=1,Outline=3,Shadow=4," |
| "Alignment=2,MarginV=80,MarginL=20,MarginR=20," |
| "Spacing=0" |
| ), |
| |
| "horror": ( |
| "FontName=Arial Black,FontSize=20,Bold=1," |
| "PrimaryColour=&H002020EE,OutlineColour=&H00000000," |
| "BackColour=&H00000000,BorderStyle=1,Outline=4,Shadow=6," |
| "Alignment=2,MarginV=80,MarginL=20,MarginR=20," |
| "Spacing=1" |
| ), |
| |
| "minimal": ( |
| "FontName=Arial,FontSize=16,Bold=0," |
| "PrimaryColour=&H00FFFFFF,OutlineColour=&H80000000," |
| "BackColour=&H00000000,BorderStyle=1,Outline=1,Shadow=1," |
| "Alignment=2,MarginV=100,MarginL=30,MarginR=30," |
| "Spacing=1" |
| ), |
| } |
| def cut_clip(video_path: Path, start: float, end: float, |
| idx: int, width: int, height: int, mode: str = "fill", |
| captions: bool = False, headline: str = "", cta: str = "", |
| reframe: bool = False, progress_bar: bool = False, vibe: str = "none", |
| caption_style: str = "mrbeast", |
| safe_zone: bool = False, color_mode: str = "off", |
| watermark_text: str = "", |
| pre_segments: list = None) -> tuple: |
| """Cut a clip and return (output_path, warnings_list, transcript_text).""" |
| warnings = [] |
| transcript_text = "" |
| name = f"short_{idx + 1}_{uuid.uuid4().hex[:6]}.mp4" |
| out = CLIPS_DIR / name |
| |
| dur = round(end - start, 2) |
| center_x = 0.5 |
| if reframe and mode == "fill": |
| center_x = analyze_face_center(video_path, start, dur) |
|
|
| vf_base = ( |
| build_vf_pad(width, height) |
| if mode == "pad" |
| else build_vf(width, height, center_x, |
| safe_zone=safe_zone, color_mode=color_mode) |
| ) |
|
|
| |
| filters = [vf_base] |
|
|
| |
| if headline: |
| clean_headline = ffmpeg_escape_text(headline) |
| filters.append( |
| f"drawtext=text='{clean_headline}':fontcolor=white:fontsize=80:font='Arial':" |
| f"x=(w-text_w)/2:y=150:box=1:boxcolor=black@0.6:boxborderw=20" |
| ) |
|
|
| |
| if cta: |
| clean_cta = ffmpeg_escape_text(cta) |
| filters.append( |
| f"drawtext=text='{clean_cta}':fontcolor=white:fontsize=70:font='Arial':" |
| f"x=(w-text_w)/2:y=h-250:box=1:boxcolor=black@0.6:boxborderw=20" |
| ) |
|
|
| |
| if watermark_text: |
| watermark_text = watermark_text[:30] |
| clean_wm = ffmpeg_escape_text(watermark_text) |
| filters.append( |
| f"drawtext=text='{clean_wm}':fontcolor=white@0.55:fontsize=38:font='Arial':" |
| f"x=w-text_w-30:y=h-80" |
| ) |
|
|
| |
| if progress_bar: |
| |
| |
| filters.append( |
| f"drawbox=x=0:y=ih-12:w='min(iw,iw*t/{dur})':h=12:color=0xFCD34D@0.9:t=fill" |
| ) |
|
|
| |
| ass_path = None |
| if captions: |
| try: |
| |
| if pre_segments is not None: |
| print(f"[>>] Captions: Using pre-supplied segments for clip {idx + 1}", flush=True) |
| |
| clip_segments = [] |
| for s in pre_segments: |
| s_start = s.get("start", 0) |
| s_end = s.get("end", 0) |
| if s_start < end and s_end > start: |
| |
| rel_start = max(0, round(s_start - start, 2)) |
| rel_end = min(dur, round(s_end - start, 2)) |
| if rel_end > rel_start: |
| clip_segments.append({ |
| "start": rel_start, |
| "end": rel_end, |
| "text": s.get("text", "") |
| }) |
| |
| transcript_text = " ".join(s.get("text", "") for s in clip_segments) |
| ass_content = generate_ass(clip_segments, style_name=caption_style) |
| ass_path = CLIPS_DIR / f"{name}.ass" |
| ass_path.write_text(ass_content, encoding="utf-8") |
| filters.append(f"subtitles={ass_path.name}") |
| print(f"[>>] Captions: ASS written from pre-segments β {ass_path}", flush=True) |
| else: |
| |
| print(f"[>>] Captions: starting for clip {idx + 1}", flush=True) |
| model = get_whisper_model() |
| temp_audio = CLIPS_DIR / f"{name}_audio.wav" |
| subprocess.run([ |
| "ffmpeg", "-i", str(video_path), "-ss", str(start), "-t", str(dur), |
| "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", str(temp_audio) |
| ], capture_output=True) |
|
|
| if not temp_audio.exists() or temp_audio.stat().st_size < 8000: |
| print("[!] Audio too short or silent β skipping captions for this clip", flush=True) |
| warnings.append("Audio too short β captions skipped for this clip") |
| if temp_audio.exists(): temp_audio.unlink() |
| else: |
| result = model.transcribe(str(temp_audio), task="transcribe", fp16=False) |
| if temp_audio.exists(): temp_audio.unlink() |
| transcript_text = " ".join(seg.get("text", "") for seg in result.get("segments", [])) |
| ass_content = generate_ass(result['segments'], style_name=caption_style) |
| ass_path = CLIPS_DIR / f"{name}.ass" |
| ass_path.write_text(ass_content, encoding="utf-8") |
| filters.append(f"subtitles={ass_path.name}") |
| print(f"[>>] Captions: ASS written from transcription β {ass_path}", flush=True) |
| except Exception as e: |
| print(f"[!] Subtitle generation failed: {e}", flush=True) |
| import traceback |
| traceback.print_exc() |
| if 'temp_audio' in locals() and temp_audio.exists(): |
| try: |
| temp_audio.unlink() |
| except Exception: |
| pass |
|
|
| final_vf = ",".join(filters) |
|
|
| |
| cmd = [ |
| "ffmpeg", "-loglevel", "error", "-ss", str(start), "-i", str(video_path) |
| ] |
|
|
| |
| vibe_file = VIBES_DIR / f"{vibe}.mp3" |
| if vibe != "none" and vibe_file.exists(): |
| cmd.extend(["-stream_loop", "-1", "-i", str(vibe_file)]) |
| |
| |
| filter_complex = ( |
| "[0:a]volume=1.0[main_a];" |
| "[1:a]volume=0.3[vibe_a];" |
| "[main_a][vibe_a]amix=inputs=2:duration=first:dropout_transition=2[aout]" |
| ) |
| cmd.extend(["-filter_complex", filter_complex, "-map", "0:v", "-map", "[aout]", |
| "-c:a", "aac", "-b:a", "128k"]) |
| else: |
| cmd.extend(["-c:a", "aac", "-b:a", "128k"]) |
|
|
| cmd.extend([ |
| "-t", str(dur), |
| "-vf", final_vf, |
| "-c:v", "libx264", "-preset", "ultrafast", "-crf", "26", |
| "-movflags", "+faststart", |
| "-y", str(out), |
| ]) |
|
|
| print(f"[>>] FFmpeg cmd: {' '.join(cmd)}", flush=True) |
| |
| r = subprocess.run(cmd, capture_output=True, cwd=str(CLIPS_DIR)) |
|
|
| if r.stderr: |
| stderr_text = r.stderr.decode(errors='replace').strip() |
| if stderr_text: |
| print(f"[>>] FFmpeg stderr: {stderr_text[-500:]}", flush=True) |
|
|
| |
| if ass_path and ass_path.exists(): |
| try: |
| ass_path.unlink() |
| except Exception: |
| pass |
|
|
| if r.returncode != 0: |
| raise RuntimeError( |
| f"FFmpeg failed for clip {idx + 1}: " |
| f"{r.stderr.decode(errors='replace')[-600:]}" |
| ) |
| return out, warnings, transcript_text |
|
|
|
|
| |
| def _base_href(): |
| """Directory URL for <base> so style.css / app.js resolve behind HF path prefixes.""" |
| sr = (request.script_root or "").strip() |
| if not sr: |
| return "/" |
| return sr.rstrip("/") + "/" |
|
|
|
|
| @app.route("/api/process", methods=["POST"]) |
| def process(): |
| data = request.get_json(force=True, silent=True) or {} |
| youtube_url = (data.get("youtubeUrl") or "").strip() |
| url_prefix = (request.script_root or "").rstrip("/") |
|
|
| if not youtube_url: |
| return jsonify({"error": "youtubeUrl is required"}), 400 |
|
|
| n_clips = min(int(data.get("clips", MAX_CLIPS)), MAX_CLIPS) |
| mode = data.get("mode", "fill").strip().lower() |
| if mode not in ("fill", "pad"): |
| mode = "fill" |
|
|
| def generate(): |
| video_path = None |
| try: |
| |
| video_path = download_video(youtube_url) |
| |
| |
| duration, width, height = get_video_info(video_path) |
| if duration < 20: |
| yield json.dumps({"error": "Video too short (minimum 20 s)."}) + "\n" |
| return |
| if width == 0 or height == 0: |
| yield json.dumps({"error": "Could not read video dimensions."}) + "\n" |
| return |
|
|
| |
| print("[>>] Intelligence Scan: Extracting full audio...", flush=True) |
| model = get_whisper_model() |
| full_audio = video_path.with_suffix(".full.wav") |
| subprocess.run([ |
| "ffmpeg", "-i", str(video_path), "-vn", "-acodec", "pcm_s16le", |
| "-ar", "16000", "-ac", "1", "-y", str(full_audio) |
| ], capture_output=True) |
|
|
| print("[>>] Intelligence Scan: Transcribing full video (this may take a minute)...", flush=True) |
| whisper_result = model.transcribe(str(full_audio), task="transcribe", fp16=False) |
| full_segments = whisper_result.get("segments", []) |
| if full_audio.exists(): full_audio.unlink() |
|
|
| |
| |
| print("[>>] Intelligence Scan: Blending signals in parallel...", flush=True) |
| parallel_res = analyze_video_parallel(video_path, youtube_url, full_segments) |
| heatmap_data = parallel_res["heatmap"] |
| energies = parallel_res["energy"] |
| t_scores = parallel_res["transcript_scores"] |
|
|
| |
| print("[>>] Intelligence Scan: Selecting viral winners...", flush=True) |
|
|
| |
| duration_range = data.get("durationRange", "auto") |
| min_dur, max_dur = 0, 0 |
| if duration_range == "15-30": |
| min_dur, max_dur = 15, 30 |
| elif duration_range == "30-60": |
| min_dur, max_dur = 30, 60 |
| elif duration_range == "60-90": |
| min_dur, max_dur = 60, 90 |
| elif duration_range == "auto" and data.get("retentionMode", False): |
| |
| min_dur, max_dur = 20, 59 |
| |
| top_segments = select_top_clips( |
| full_segments, heatmap_data, energies, t_scores, |
| num_clips=n_clips, min_dur=min_dur, max_dur=max_dur |
| ) |
| |
| if not top_segments: |
| yield json.dumps({"error": "No viral segments identified."}) + "\n" |
| return |
|
|
| |
| yield json.dumps({"total": len(top_segments)}) + "\n" |
|
|
| |
| use_captions = data.get("captions", False) |
| headline = data.get("headline", "").strip() |
| cta = data.get("cta", "").strip() |
| reframe = data.get("reframe", False) |
| progress_bar = data.get("progressBar", False) |
| vibe = data.get("vibe", "none") |
| caption_style = data.get("captionStyle", "mrbeast").strip().lower() |
| if caption_style not in ASS_STYLES: |
| caption_style = "mrbeast" |
|
|
| |
| color_mode = str(data.get("colorMode", "off")).strip().lower() |
| watermark_text = str(data.get("watermarkText", "")).strip() |
| safe_zone = data.get("safeZone", False) |
|
|
| def _cut(args): |
| i, seg_data = args |
| s = seg_data["start_time"] |
| e = seg_data["end_time"] |
| return i, cut_clip( |
| video_path, s, e, i, width, height, mode, |
| captions=use_captions, headline=headline, cta=cta, |
| reframe=reframe, progress_bar=progress_bar, vibe=vibe, |
| caption_style=caption_style, |
| safe_zone=safe_zone, color_mode=color_mode, |
| watermark_text=watermark_text, |
| pre_segments=full_segments |
| ), seg_data |
|
|
| with ThreadPoolExecutor(max_workers=min(len(top_segments), 3)) as pool: |
| futures = {pool.submit(_cut, (i, seg_data)): i |
| for i, seg_data in enumerate(top_segments)} |
| for future in as_completed(futures): |
| try: |
| i, (clip_path, clip_warnings, _), seg_data = future.result() |
| s = seg_data["start_time"] |
| e = seg_data["end_time"] |
|
|
| |
| for warning_msg in clip_warnings: |
| yield json.dumps({"type": "warning", "msg": warning_msg}) + "\n" |
|
|
| |
| pacing = score_pacing(energies, s, e) |
| structure = segment_structure(s, e) |
|
|
| |
| scores = { |
| "Heatmap": seg_data["heatmap_score"], |
| "Energy": seg_data["energy_score"], |
| "Transcript": seg_data["transcript_score"] |
| } |
| top_sig = max(scores, key=scores.get).lower().replace(" ", "_") |
|
|
| viral_analysis = { |
| "rank": i + 1, |
| "final_score": seg_data["final_score"], |
| "heatmap_score": seg_data["heatmap_score"], |
| "energy_score": seg_data["energy_score"], |
| "transcript_score": seg_data["transcript_score"], |
| "top_signal": top_sig |
| } |
|
|
| |
| triggers = { |
| "score": seg_data["final_score"], |
| "tags": [ |
| f"Viral: {int(seg_data['final_score']*100)}%", |
| "Visual Peak" if seg_data["heatmap_score"] > 0.5 else "", |
| "Audio Punch" if seg_data["energy_score"] > 0.6 else "" |
| ] |
| } |
| triggers["tags"] = [t for t in triggers["tags"] if t] |
|
|
| yield json.dumps({ |
| "clip": f"{url_prefix}/clips/{clip_path.name}", |
| "index": i, |
| "pacing": pacing, |
| "structure": structure, |
| "triggers": triggers, |
| "viral_analysis": viral_analysis |
| }) + "\n" |
| except Exception as clip_err: |
| print(f"[!] Clip failed: {clip_err}", flush=True) |
| yield json.dumps({"warning": f"Clip processing failed: {clip_err}"}) + "\n" |
|
|
| except Exception as exc: |
| yield json.dumps({"error": str(exc)}) + "\n" |
|
|
| finally: |
| if video_path and video_path.exists(): |
| try: |
| video_path.unlink() |
| except OSError: |
| pass |
|
|
| return Response( |
| stream_with_context(generate()), |
| mimetype="application/x-ndjson", |
| headers={"X-Accel-Buffering": "no"}, |
| ) |
|
|
|
|
| @app.route("/clips/<path:filename>") |
| def serve_clip(filename): |
| return send_from_directory(str(CLIPS_DIR), filename) |
|
|
|
|
| @app.route("/health") |
| def health(): |
| missing = check_deps() |
| return jsonify({ |
| "status": "ok" if not missing else "degraded", |
| "missing_tools": missing, |
| }) |
|
|
|
|
| @app.route("/health/dns") |
| def health_dns(): |
| """Diagnostic endpoint: test if this container can reach YouTube.""" |
| results = {} |
| for host in ["www.youtube.com", "www.google.com", "huggingface.co"]: |
| try: |
| addr = socket.getaddrinfo(host, 443, socket.AF_INET) |
| results[host] = {"ok": True, "ip": addr[0][4][0]} |
| except Exception as e: |
| results[host] = {"ok": False, "error": str(e)} |
|
|
| |
| try: |
| ver = subprocess.run(["yt-dlp", "--version"], capture_output=True, text=True) |
| yt_dlp_ver = ver.stdout.strip() |
| except Exception: |
| yt_dlp_ver = "unknown" |
|
|
| |
| cookies_path = _find_cookies_file() |
|
|
| all_ok = all(r["ok"] for r in results.values()) |
| return jsonify({ |
| "dns_status": "ok" if all_ok else "BLOCKED", |
| "hosts": results, |
| "yt_dlp_version": yt_dlp_ver, |
| "cookies_found": cookies_path or False, |
| }) |
|
|
|
|
| @app.route("/") |
| def root(): |
| base = _base_href() |
| inject = f' <base href="{escape(base)}">\n' |
| html = _INDEX_HTML_RAW |
| if "<base " not in _INDEX_HTML_RAW: |
| html = _INDEX_HTML_RAW.replace("<head>", "<head>\n" + inject, 1) |
| return Response(html, mimetype="text/html; charset=utf-8") |
|
|
|
|
| @app.route("/style.css") |
| def serve_css(): |
| return send_from_directory(str(BASE_DIR), "style.css") |
|
|
|
|
| @app.route("/app.js") |
| def serve_js(): |
| return send_from_directory(str(BASE_DIR), "app.js") |
|
|
|
|
| |
| if __name__ == "__main__": |
| print("\n[>>] AI SquadX VIP - Clipper Backend") |
| print(f" http://localhost:{PORT}") |
|
|
| |
| try: |
| ver = subprocess.run(["yt-dlp", "--version"], capture_output=True, text=True) |
| print(f" yt-dlp version: {ver.stdout.strip()}") |
| except Exception: |
| print(" yt-dlp version: unknown") |
|
|
| |
| cookies = _find_cookies_file() |
| if cookies: |
| print(f" Cookies: {cookies}") |
| else: |
| print(" Cookies: NOT FOUND (YouTube may block downloads)") |
| print(" β Place a cookies.txt next to server.py to fix this.") |
|
|
| print() |
|
|
| missing = check_deps() |
| if missing: |
| print(f"[!] Missing: {', '.join(missing)}") |
| print(" Install them or clips won't generate.\n") |
| else: |
| print("[OK] ffmpeg, ffprobe, yt-dlp found\n") |
|
|
| app.run(host="0.0.0.0", port=PORT, debug=False, threaded=False) |
|
|