clpper / server.py
areebsatin's picture
feat: add clip duration selector (15-90s) with intelligent segment expansion
9705942
Raw
History Blame Contribute Delete
61.2 kB
#!/usr/bin/env python3
"""
AI SquadX VIP – Viral Shorts Backend
Analyzes YouTube videos, finds hook segments, cuts 9:16 clips.
Requirements: pip install flask flask-cors yt-dlp
System deps: ffmpeg (must be in PATH)
"""
import json
import math
import socket
import struct
import subprocess
import tempfile
import time
import uuid
import wave
from concurrent.futures import ThreadPoolExecutor, as_completed
from html import escape
from pathlib import Path
from flask import Flask, jsonify, request, send_from_directory, Response, stream_with_context
from werkzeug.middleware.proxy_fix import ProxyFix
from flask_cors import CORS
import whisper
import cv2
import mediapipe as mp
# ── Config ────────────────────────────────────────────────────────────────────
import os
PORT = int(os.getenv("PORT", 5000))
COOKIES_FILE = os.getenv("COOKIES_FILE", "") # path to cookies.txt
YTDLP_PROXY = os.getenv("YTDLP_PROXY", "") # e.g., http://user:pass@host:port
MAX_CLIPS = 10 # max clips to generate per video
CLIP_DURATION = 45 # seconds per short
MIN_GAP_SECONDS = 60 # minimum spacing between clip start times
SAMPLE_RATE = 8000 # Hz for audio energy extraction (low = fast)
ENERGY_WINDOW = 5 # smoothing window in seconds
BASE_DIR = Path(__file__).parent.resolve()
CLIPS_DIR = BASE_DIR / "clips"
DOWNLOADS_DIR = BASE_DIR / "downloads"
CLIPS_DIR.mkdir(exist_ok=True)
DOWNLOADS_DIR.mkdir(exist_ok=True)
_INDEX_HTML_RAW = (BASE_DIR / "index.html").read_text(encoding="utf-8")
# ── Phase 2 Resources ─────────────────────────────────────────────────────────
VIBES_DIR = BASE_DIR / "vibes"
VIBES_DIR.mkdir(exist_ok=True)
# Lazy-load mediapipe face detection
_face_detector = None
def get_face_detector():
global _face_detector
if _face_detector is None:
print("[>>] Initializing AI face detector...")
_face_detector = mp.solutions.face_detection.FaceDetection(
model_selection=1, min_detection_confidence=0.5
)
return _face_detector
def analyze_face_center(video_path: Path, start: float, duration: float):
"""Analyze a segment and return the average X-coordinate (0.0 to 1.0) of faces."""
try:
cap = cv2.VideoCapture(str(video_path))
cap.set(cv2.CAP_PROP_POS_MSEC, start * 1000)
detector = get_face_detector()
centers = []
# Sample 1 frame per second for speed
for i in range(int(duration)):
cap.set(cv2.CAP_PROP_POS_MSEC, (start + i) * 1000)
ret, frame = cap.read()
if not ret:
break
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = detector.process(rgb_frame)
if results.detections:
# Use the first detected face's center
bbox = results.detections[0].location_data.relative_bounding_box
center_x = bbox.xmin + (bbox.width / 2)
centers.append(center_x)
cap.release()
return sum(centers) / len(centers) if centers else 0.5
except Exception as e:
print(f"[!] Face analysis failed: {e}")
return 0.5
# Lazy-load whisper model
_whisper_model = None
def get_whisper_model():
global _whisper_model
if _whisper_model is None:
print("[>>] Loading AI whisper model (tiny – fast mode)...")
_whisper_model = whisper.load_model("tiny")
return _whisper_model
# ── ASS Subtitle Style Definitions (1080Γ—1920) ────────────────────────────────
ASS_STYLES = {
# MrBeast: viral yellow italic, thick black outline β€” like "YOUR Safe Zone" style
# MarginV=650 = 650px from bottom β†’ text at ~66% from top in a 1920 frame
"mrbeast": (
"Style: Default,Arial Black,82,&H0000FFFF,&H000000FF,&H00000000,&H88000000,"
"1,1,0,0,100,110,0,0,1,6,3,2,40,40,650,1"
),
# Podcast: white text on a clean dark semi-transparent box
"podcast": (
"Style: Default,Arial,62,&H00FFFFFF,&H000000FF,&H00000000,&HAA000000,"
"1,0,0,0,100,100,0,0,4,0,0,2,40,40,650,1"
),
# Neon: electric cyan with magenta outline, glowing shadow
"neon": (
"Style: Default,Arial Black,70,&H0000FFFF,&H000000FF,&H00FF00FF,&H88000000,"
"1,0,0,0,100,100,1,0,1,4,6,2,40,40,650,1"
),
# Horror: blood red with heavy black shadow
"horror": (
"Style: Default,Arial Black,72,&H002020EE,&H000000FF,&H00000000,&H88000000,"
"1,0,0,0,100,100,1,0,1,5,8,2,40,40,650,1"
),
# Minimal: clean white, thin subtle outline, modern feel
"minimal": (
"Style: Default,Arial,58,&H00FFFFFF,&H000000FF,&H66000000,&H44000000,"
"0,0,0,0,100,100,0,0,1,2,1,2,40,40,650,1"
),
}
ASS_HEADER = """[Script Info]
ScriptType: v4.00+
PlayResX: 1080
PlayResY: 1920
ScaledBorderAndShadow: yes
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
{style_line}
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
def ass_time(seconds: float) -> str:
"""Format seconds as ASS timestamp h:mm:ss.cs"""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
cs = int((seconds % 1) * 100)
return f"{h}:{m:02}:{s:02}.{cs:02}"
def chunk_segments(segments, max_words: int = 4):
"""Break whisper segments into short word-level chunks for viral-style captions."""
chunks = []
for seg in segments:
words = seg['text'].strip().split()
if not words:
continue
start = seg['start']
end = seg['end']
duration = max(end - start, 0.1)
word_dur = duration / len(words)
for i in range(0, len(words), max_words):
group = words[i:i + max_words]
chunk_start = start + i * word_dur
chunk_end = chunk_start + len(group) * word_dur
chunks.append({
'start': chunk_start,
'end': min(chunk_end, end),
'text': ' '.join(group)
})
return chunks
# ASS colour constants (format &HBBGGRR& β€” Blue Blue Green Green Red Red)
_ASS_RED = "&H0000FF&" # Red #FF0000
_ASS_YELLOW = "&H04F3FF&" # Saturated Yellow #FFF304
_ASS_BLACK = "&H000000&"
def _key_word_idx(words: list) -> int:
"""Pick the 'impact' word in a chunk β€” longest wins; ties go to last."""
if not words:
return 0
return max(range(len(words)), key=lambda i: len(words[i]))
def generate_ass(segments, style_name: str = "mrbeast") -> str:
r"""Generate fancy ASS subtitles with per-word colour+size emphasis.
Layout matches screenshots:
- Key word: Red (#ff0000), fs=130, Red glow (\3c), bord=6, blur=4
- Others: Yellow (#fff304), fs=68, Black outline, bord=2
- Stacking: Injects \N before and after key word for verticality.
"""
style_line = ASS_STYLES.get(style_name, ASS_STYLES["mrbeast"])
header = ASS_HEADER.format(style_line=style_line)
chunks = chunk_segments(segments, max_words=4)
lines = []
for chunk in chunks:
t0 = ass_time(chunk['start'])
t1 = ass_time(chunk['end'])
words = chunk['text'].replace('\n', ' ').split()
if not words:
continue
key_idx = _key_word_idx(words)
parts = []
for i, word in enumerate(words):
if i == key_idx:
# Impact word: saturated red, massive, glowing red border
parts.append(
f"{{\\1c{_ASS_RED}\\3c{_ASS_RED}\\fs130\\bord6\\shad3\\blur4}}{word}"
)
else:
# Supporting words: saturated yellow, medium, black border
parts.append(
f"{{\\1c{_ASS_YELLOW}\\3c{_ASS_BLACK}\\fs68\\bord2\\shad1\\blur0}}{word}"
)
# Smart stacking: match Hrithik/The Boys screenshot layout
# If we have 3-4 words, we want the key word on its own line in the center.
if len(parts) >= 3:
# Construct: [words before] \N [key word] \N [words after]
final_text = ""
if key_idx > 0:
final_text += " ".join(parts[:key_idx]) + r"\N"
final_text += parts[key_idx]
if key_idx < len(parts) - 1:
final_text += r"\N" + " ".join(parts[key_idx + 1:])
else:
final_text = " ".join(parts)
lines.append(f"Dialogue: 0,{t0},{t1},Default,,0,0,0,,{final_text}")
return header + "\n".join(lines)
app = Flask(__name__, static_folder=str(BASE_DIR), static_url_path="")
# Hugging Face and other reverse proxies send X-Forwarded-* / X-Forwarded-Prefix
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
CORS(app, origins=["http://localhost:5000", "http://127.0.0.1:5000", "https://*.hf.space"])
# ── Dependency check ──────────────────────────────────────────────────────────
def check_deps():
missing = []
for tool, flag in [("ffmpeg", "-version"), ("ffprobe", "-version"), ("yt-dlp", "--version")]:
r = subprocess.run([tool, flag], capture_output=True)
if r.returncode not in (0, 1):
missing.append(tool)
return missing
# Some yt-dlp versions don't include newer options like --js-runtimes.
# We detect it at runtime to avoid hard-failing downloads.
_YT_DLP_SUPPORTS_JS_RUNTIMES = None
def yt_dlp_supports_js_runtimes() -> bool:
global _YT_DLP_SUPPORTS_JS_RUNTIMES
if _YT_DLP_SUPPORTS_JS_RUNTIMES is not None:
return _YT_DLP_SUPPORTS_JS_RUNTIMES
# Also log Node.js version to verify installation
try:
node_v = subprocess.run(["node", "-v"], capture_output=True, text=True).stdout.strip()
print(f"[OK] Node.js version: {node_v}", flush=True)
except Exception:
print("[!] Node.js NOT FOUND in PATH", flush=True)
try:
help_out = subprocess.run(
["yt-dlp", "--help"],
capture_output=True,
text=True,
).stdout
_YT_DLP_SUPPORTS_JS_RUNTIMES = "--js-runtimes" in help_out
except Exception:
_YT_DLP_SUPPORTS_JS_RUNTIMES = False
return _YT_DLP_SUPPORTS_JS_RUNTIMES
# ── Step 1: Download (with retries, cookies, user-agent) ─────────────────────
# Realistic browser User-Agent to avoid bot detection
_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
)
def _find_cookies_file() -> str | None:
"""Locate a cookies.txt file in several common locations with robust search."""
# 1. Explicit env var
if COOKIES_FILE and Path(COOKIES_FILE).is_file():
print(f"[>>] Cookies found via env var: {COOKIES_FILE}", flush=True)
return COOKIES_FILE
# 2. Search local and home directory
search_paths = [BASE_DIR, Path.cwd(), Path.home()]
for p in search_paths:
# Look for variations: cookies.txt, youtube_cookies.txt, etc.
for candidate in p.glob("*cookies*.txt"):
if candidate.is_file() and candidate.stat().st_size > 10:
print(f"[>>] SUCCESS: Cookies found at: {candidate.resolve()}", flush=True)
return str(candidate.resolve())
# 3. Log what we actually see in the directory to help debug
print(f"[!] Cookies NOT FOUND. Current files in {BASE_DIR}:", flush=True)
try:
files = [f.name for f in BASE_DIR.glob("*") if f.is_file()]
print(f" {files}", flush=True)
except Exception:
pass
return None
def download_video(youtube_url: str, max_retries: int = 3) -> Path:
uid = uuid.uuid4().hex[:10]
template = str(DOWNLOADS_DIR / f"{uid}.%(ext)s")
cmd = [
"yt-dlp",
"--force-ipv4",
"--ignore-config",
"--no-cache-dir",
"--user-agent", _USER_AGENT,
"--extractor-args", "youtube:player_client=web,tv,ios;player_skip=web_embedded_check",
"--remote-components", "ejs:github",
"--no-check-certificates",
"--geo-bypass",
"--add-header", "Accept-Language:en-US,en;q=0.9",
"--add-header", "Accept:text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"-f", "bestvideo[height<=1080]+bestaudio/best[height<=1080]/best",
"--merge-output-format", "mp4",
"--no-playlist",
"--no-part",
"--socket-timeout", "30",
"--retries", "3",
"--file-access-retries", "3",
"-o", template,
youtube_url,
]
# Add cookies if available
cookies_path = _find_cookies_file()
if cookies_path:
cmd[1:1] = ["--cookies", cookies_path]
print(f"[>>] Using cookies from: {cookies_path}", flush=True)
# Force Node.js as the JS runtime to solve n-parameter challenges
if yt_dlp_supports_js_runtimes():
cmd[1:1] = ["--js-runtimes", "node"]
print("[>>] Forcing 'node' as yt-dlp JS runtime solver", flush=True)
# Add Proxy if defined
if YTDLP_PROXY:
cmd[1:1] = ["--proxy", YTDLP_PROXY]
print(f"[>>] Using proxy: {YTDLP_PROXY}", flush=True)
last_err = ""
for attempt in range(1, max_retries + 1):
print(f"[>>] Download attempt {attempt}/{max_retries} ...", flush=True)
# Quick DNS pre-check so we get a clear error instead of yt-dlp's wall of text.
# SKIP this check if a proxy is defined, as the proxy handles its own resolution.
if not YTDLP_PROXY:
try:
socket.setdefaulttimeout(10)
socket.getaddrinfo("www.youtube.com", 443, socket.AF_INET)
except socket.gaierror as dns_err:
last_err = f"DNS resolution failed: {dns_err}"
print(f"[!] {last_err}", flush=True)
if attempt < max_retries:
time.sleep(2 ** attempt) # exponential backoff: 2, 4, 8 s
continue
raise RuntimeError(last_err)
else:
print(f"[>>] Skipping DNS pre-check because YTDLP_PROXY is set.", flush=True)
# Run yt-dlp
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
matches = list(DOWNLOADS_DIR.glob(f"{uid}.*"))
if matches:
print(f"[OK] Downloaded: {matches[0].name}", flush=True)
return matches[0]
last_err = "yt-dlp finished but produced no output file."
else:
stderr = result.stderr
# Log the FULL stderr once to help with EJS/Challenge diagnostics
print(f"[!] yt-dlp failed (attempt {attempt}):\n{stderr}", flush=True)
last_err = stderr[-1000:]
if attempt < max_retries:
time.sleep(2 ** attempt)
raise RuntimeError(f"Download failed after {max_retries} attempts:\n{last_err}")
def get_video_info(video_path: Path):
"""Return (duration, width, height) via ffprobe JSON."""
r = subprocess.run(
["ffprobe", "-v", "quiet", "-print_format", "json",
"-show_format", "-show_streams", str(video_path)],
capture_output=True,
)
data = json.loads(r.stdout)
duration = float(data["format"]["duration"])
width = height = 0
for s in data.get("streams", []):
if s.get("codec_type") == "video":
width = int(s["width"])
height = int(s["height"])
break
return duration, width, height
def get_youtube_heatmap(video_path: Path, url: str):
"""
Fetch YouTube's 'Most Replayed' heatmap data using yt-dlp.
Returns: [{start_time: float, end_time: float, score: float}] or []
"""
print("[>>] Fetching YouTube heatmap...", flush=True)
# We use the same hardened bypass settings as download_video
info_json_path = video_path.with_suffix(".info.json")
cmd = [
"yt-dlp",
"--force-ipv4",
"--ignore-config",
"--no-cache-dir",
"--user-agent", _USER_AGENT,
"--extractor-args", "youtube:player_client=web,tv,ios;player_skip=web_embedded_check",
"--remote-components", "ejs:github",
"--no-check-certificates",
"--geo-bypass",
"--add-header", "Accept-Language:en-US,en;q=0.9",
"--add-header", "Accept:text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"--write-info-json",
"--skip-download",
"-o", str(video_path.with_suffix("")), # This will result in filename.info.json
url,
]
# Add cookies if available
cookies_path = _find_cookies_file()
if cookies_path:
cmd[1:1] = ["--cookies", cookies_path]
# Add Proxy if defined
if YTDLP_PROXY:
cmd[1:1] = ["--proxy", YTDLP_PROXY]
try:
# Run yt-dlp to get JSON
subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if not info_json_path.exists():
print("[!] Heatmap info JSON not found.", flush=True)
return []
with open(info_json_path, "r", encoding="utf-8") as f:
data = json.load(f)
heatmap = data.get("heatmap")
if not heatmap:
print("[!] No heatmap data found in YouTube metadata.", flush=True)
return []
# Normalize scores to 0.0 - 1.0
max_val = max((item.get("value", 0) for item in heatmap), default=1.0)
if max_val == 0: max_val = 1.0
normalized = []
for item in heatmap:
normalized.append({
"start_time": float(item["start_time"]),
"end_time": float(item["end_time"]),
"score": round(float(item["value"]) / max_val, 4)
})
print(f"[OK] Extracted {len(normalized)} heatmap segments.", flush=True)
# Cleanup
try: info_json_path.unlink()
except: pass
return normalized
except Exception as e:
print(f"[!] Heatmap extraction error: {e}", flush=True)
return []
# ── Step 3: Audio energy analysis ────────────────────────────────────────────
def extract_audio_energy(video_path: Path, duration: float):
"""
Pipe raw mono 8kHz PCM from FFmpeg and compute per-second RMS energy.
Returns list of (time_sec, rms) tuples.
"""
proc = subprocess.Popen(
[
"ffmpeg", "-i", str(video_path),
"-vn", "-ar", str(SAMPLE_RATE), "-ac", "1",
"-f", "s16le", "pipe:1",
"-loglevel", "quiet",
],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
raw, _ = proc.communicate()
if not raw:
# Fallback: uniform energy (clips will be evenly spaced)
return [(t, 1.0) for t in range(int(duration))]
n = len(raw) // 2
samples = struct.unpack(f"<{n}h", raw)
win = SAMPLE_RATE * ENERGY_WINDOW # samples per window
step = SAMPLE_RATE # 1-second increments
result = []
for i in range(0, n - win, step):
chunk = samples[i : i + win : 8] # subsample every 8th β†’ speed
if not chunk:
continue
rms = math.sqrt(sum(int(s) * int(s) for s in chunk) / len(chunk))
result.append((i / SAMPLE_RATE, rms))
return result
def analyze_audio_energy(video_path: Path):
"""
Extract 16kHz mono WAV via FFmpeg and calculate per-second RMS energy.
Returns: [{start_time: float, end_time: float, energy: float}]
"""
print(f"[>>] Analyzing audio energy for: {video_path.name}", flush=True)
temp_wav = Path(tempfile.gettempdir()) / f"{uuid.uuid4().hex}.wav"
try:
# Extract 16kHz mono WAV
cmd = [
"ffmpeg", "-i", str(video_path),
"-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1",
str(temp_wav), "-y", "-loglevel", "quiet"
]
subprocess.run(cmd, check=True)
if not temp_wav.exists():
print("[!] Audio extraction failed.", flush=True)
return []
with wave.open(str(temp_wav), "rb") as w:
n_frames = w.getnframes()
sample_rate = w.getframerate()
frames = w.readframes(n_frames)
# samples are 16-bit signed ints (s16le)
samples = struct.unpack(f"<{n_frames}h", frames)
# 1 second chunks
chunk_size = sample_rate
energies = []
for i in range(0, n_frames, chunk_size):
chunk = samples[i : i + chunk_size]
if not chunk:
continue
# RMS = sqrt(mean(samples^2))
# Using float conversion to ensure precision during large sums
ms = sum(float(s) * float(s) for s in chunk) / len(chunk)
rms = math.sqrt(ms)
energies.append({
"start_time": round(i / sample_rate, 2),
"end_time": round((i + chunk_size) / sample_rate, 2),
"energy": rms
})
# Normalize to 0.0 - 1.0
max_e = max((e["energy"] for e in energies), default=1.0)
if max_e == 0: max_e = 1.0
for e in energies:
e["energy"] = round(e["energy"] / max_e, 4)
print(f"[OK] Audio energy analyzed ({len(energies)} seconds).", flush=True)
return energies
except Exception as e:
print(f"[!] Audio analysis error: {e}", flush=True)
return []
finally:
if temp_wav.exists():
try: temp_wav.unlink()
except: pass
def calculate_viral_score(segment_start, segment_end, heatmap_data, energy_data, transcript_score):
"""
Calculate a single viral score by blending heatmap, audio energy, and transcript potential.
Returns: {final_score: float, heatmap_score: float, energy_score: float, transcript_score: float}
"""
# 1. Heatmap Scoring (Average of overlapping entries)
# heatmap_data is [{start_time, end_time, score}]
h_overlaps = [
d["score"] for d in heatmap_data
if d.get("start_time", 0) < segment_end and d.get("end_time", 0) > segment_start
]
h_avg = sum(h_overlaps) / len(h_overlaps) if h_overlaps else 0.0
# 2. Audio Energy Scoring (Max in range)
# energy_data can be [{start_time, end_time, energy}] or [(time, val)]
e_peaks = []
for d in energy_data:
if isinstance(d, dict):
if d.get("start_time", 0) < segment_end and d.get("end_time", 0) > segment_start:
e_peaks.append(d.get("energy", 0.0))
elif isinstance(d, (list, tuple)):
if segment_start <= d[0] <= segment_end:
e_peaks.append(d[1])
e_max = max(e_peaks) if e_peaks else 0.0
# 3. Final Weighted Formula
# Weights: 40% Heatmap, 35% Audio Energy, 25% Transcript
final = (0.40 * h_avg) + (0.35 * e_max) + (0.25 * transcript_score)
return {
"final_score": round(final, 2),
"heatmap_score": round(h_avg, 2),
"energy_score": round(e_max, 2),
"transcript_score": round(transcript_score, 2)
}
def analyze_video_parallel(video_path, url, transcript_segments):
"""
Simultaneously fetch heatmap, audio energy, and score all transcript segments.
"""
results = {"heatmap": [], "energy": [], "transcript_scores": []}
def run_wrapper(name, func, *args):
start_t = time.time()
try:
res = func(*args)
elapsed = time.time() - start_t
print(f"[OK] {name} completed in {elapsed:.2f}s", flush=True)
return res
except Exception as e:
print(f"[!] {name} failed: {e}", flush=True)
return []
# Wrapper for batch transcript scoring
def score_batch(segs):
return [score_transcript_segment(s.get("text", "")) for s in segs]
print(f"[>>] Starting parallel analysis for {video_path.name}...", flush=True)
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(run_wrapper, "Heatmap", get_youtube_heatmap, video_path, url): "heatmap",
executor.submit(run_wrapper, "Audio Energy", analyze_audio_energy, video_path): "energy",
executor.submit(run_wrapper, "Transcript Scoring", score_batch, transcript_segments): "transcript"
}
for future in as_completed(futures):
key = futures[future]
if key == "heatmap":
results["heatmap"] = future.result()
elif key == "energy":
results["energy"] = future.result()
elif key == "transcript":
results["transcript_scores"] = future.result()
return results
def select_top_clips(transcript_segments, heatmap_data, energy_data, transcript_scores, num_clips=10, min_dur=0, max_dur=0):
"""
Ranks transcript segments by viral score and deduplicates repetitive temporal overlaps.
If min_dur/max_dur are provided, expands segments to fit the requested duration.
"""
scored_segs = []
video_duration = transcript_segments[-1].get("end", 0) if transcript_segments else 0
# 1. Score each individual transcript segment
for i, seg in enumerate(transcript_segments):
start, end = seg.get("start", 0), seg.get("end", 0)
t_score = transcript_scores[i] if i < len(transcript_scores) else 0.0
viral_data = calculate_viral_score(start, end, heatmap_data, energy_data, t_score)
# Clone to avoid modifying original list in-place
s_copy = seg.copy()
s_copy["viral_score"] = viral_data
scored_segs.append(s_copy)
# 2. Sort by final_score descending
scored_segs.sort(key=lambda x: x["viral_score"]["final_score"], reverse=True)
# 3. Deduplication + Expansion
selected = []
chosen_starts = []
# Deduplication window is larger for longer clips to avoid overlap
dedup_window = max(30, min_dur)
for seg in scored_segs:
curr_start = seg.get("start", 0)
curr_end = seg.get("end", 0)
# Check for duplication
is_duplicate = any(abs(curr_start - prev_s) < dedup_window for prev_s in chosen_starts)
if is_duplicate:
continue
# Expansion Logic: Grow the segment to reach min_dur
if min_dur > 0:
actual_dur = curr_end - curr_start
if actual_dur < min_dur:
needed = min_dur - actual_dur
# Expand symmetrically
new_start = max(0, curr_start - needed / 2)
new_end = min(video_duration, curr_end + needed / 2)
# Correct if boundary hit
if new_start == 0:
new_end = min(video_duration, min_dur)
elif new_end == video_duration:
new_start = max(0, video_duration - min_dur)
curr_start, curr_end = round(new_start, 2), round(new_end, 2)
# Cap at max_dur if needed (rare since we expand to min_dur)
if max_dur > 0 and (curr_end - curr_start) > max_dur:
curr_end = curr_start + max_dur
selected.append({
"start": curr_start,
"end": curr_end,
"text": seg.get("text", ""),
"viral_score": seg["viral_score"]
})
chosen_starts.append(curr_start)
if len(selected) >= num_clips:
break
# 4. Final Formatting
top_clips = []
for s in selected:
v = s["viral_score"]
top_clips.append({
"start_time": s["start"],
"end_time": s["end"],
"text": s["text"],
"final_score": v["final_score"],
"heatmap_score": v["heatmap_score"],
"energy_score": v["energy_score"],
"transcript_score": v["transcript_score"]
})
# 5. Print Summary Table
print("\n" + "="*85)
print(f"{'RANK':<5} | {'START':<8} | {'END':<8} | {'SCORE':<8} | {'TOP SIGNAL SOURCE'}")
print("-" * 85)
for idx, c in enumerate(top_clips):
# Identify which signal contributed most
sig_map = {
"Heatmap": c["heatmap_score"],
"Audio Peak": c["energy_score"],
"Psych Hook": c["transcript_score"]
}
top_sig = max(sig_map, key=sig_map.get)
print(f"{idx+1:<5} | {c['start_time']:<8.1f} | {c['end_time']:<8.1f} | {c['final_score']:<8.2f} | {top_sig}")
print("="*85 + "\n", flush=True)
return top_clips
# ── Step 4: Find hook segments ────────────────────────────────────────────────
def find_segments(energies, duration: float, n_clips: int = MAX_CLIPS):
"""
Greedy peak selection:
1. Smooth the RMS curve.
2. Repeatedly pick the highest-energy moment, then
black out a MIN_GAP_SECONDS radius around it.
Returns list of (start, end) in seconds.
"""
if not energies:
step = max(60.0, (duration - CLIP_DURATION) / max(n_clips, 1))
return [(round(i * step + 10, 2), round(i * step + 10 + CLIP_DURATION, 2))
for i in range(n_clips) if i * step + 10 + CLIP_DURATION <= duration]
# Support both list of tuples [(t, rms)] and list of dicts [{"energy": rms}]
if isinstance(energies[0], dict):
times = [e.get("start_time", 0) for e in energies]
vals = [e.get("energy", 0) for e in energies]
else:
times = [e[0] for e in energies]
vals = [e[1] for e in energies]
# Smooth
w = min(10, max(3, len(vals) // 20))
smoothed = []
for i in range(len(vals)):
lo, hi = max(0, i - w), min(len(vals), i + w + 1)
smoothed.append(sum(vals[lo:hi]) / (hi - lo))
# Greedy selection
used = [False] * len(smoothed)
peaks = []
gap_idx = MIN_GAP_SECONDS # since step β‰ˆ 1 s, index β‰ˆ seconds
while len(peaks) < n_clips * 2:
best_i = max(
(i for i in range(len(smoothed)) if not used[i]),
key=lambda i: smoothed[i],
default=-1,
)
if best_i < 0:
break
peaks.append(times[best_i])
lo = max(0, best_i - gap_idx)
hi = min(len(used), best_i + gap_idx + 1)
for j in range(lo, hi):
used[j] = True
peaks.sort()
peaks = peaks[:n_clips]
# If we have fewer peaks than requested, fill with evenly spaced ones
if len(peaks) < n_clips:
step = max(60.0, (duration - CLIP_DURATION) / max(n_clips, 1))
t = 10.0
while len(peaks) < n_clips and t + CLIP_DURATION <= duration:
if all(abs(t - p) >= MIN_GAP_SECONDS for p in peaks):
peaks.append(t)
t += step
peaks.sort()
# Convert to (start, end)
segments = []
for pt in peaks:
start = max(0.0, pt - CLIP_DURATION * 0.25)
end = start + CLIP_DURATION
if end > duration:
end = duration
start = max(0.0, end - CLIP_DURATION)
segments.append((round(start, 2), round(end, 2)))
return segments
# ── Step 5: Build 9:16 crop filter ───────────────────────────────────────────
# Valid color psychology modes
_COLOR_MODES = frozenset({"off", "boost", "yellow", "red"})
def build_vf(width: int, height: int, center_x: float = 0.5,
safe_zone: bool = False, color_mode: str = "off") -> str:
"""Return an FFmpeg -vf string that crops to 9:16 at 1080Γ—1920.
Args:
width / height: source video dimensions.
center_x: horizontal crop anchor (0.0–1.0; 0.5 = centre).
safe_zone: When True, additionally crops top+bottom 12.5% so the
viewer's eye is drawn to the central safe zone (Retention Psychology).
color_mode: One of 'off' | 'boost' | 'yellow' | 'red'.
- boost β†’ +40% saturation (eq filter)
- yellow β†’ warm yellow-push via curves filter
- red β†’ red-dominant push via curves filter
"""
if color_mode not in _COLOR_MODES:
color_mode = "off"
ratio = 9 / 16
if width / height > ratio:
# Landscape – crop left/right
cw = int(height * ratio)
ch = height
cx = int((width * center_x) - (cw / 2))
cx = max(0, min(cx, width - cw)) # clamp
cy = 0
else:
# Portrait / square – crop top/bottom
cw = width
ch = int(width / ratio)
cx = 0
cy = (height - ch) // 2
# Force even numbers
cw -= cw % 2
ch -= ch % 2
vf = f"crop={cw}:{ch}:{cx}:{cy},scale=1080:1920:flags=lanczos"
# Safe-zone: crop out top+bottom 12.5% to centre viewer attention
if safe_zone:
vf += ",crop=iw:ih*0.75:0:ih*0.125,scale=1080:1920:flags=lanczos"
# Color Psychology: apply requested colour grade
if color_mode == "boost":
# +40% saturation – generic virality pop
vf += ",eq=saturation=1.4"
elif color_mode == "yellow":
# Warm yellow push: lift reds+greens slightly, keep blues flat
vf += ",curves=red='0/0 0.5/0.56 1/1':green='0/0 0.5/0.53 1/1':blue='0/0 0.5/0.46 1/1'"
elif color_mode == "red":
# Red-dominant push: lift reds, slightly desaturate greens+blues
vf += ",curves=red='0/0 0.5/0.62 1/1':green='0/0 0.5/0.46 1/1':blue='0/0 0.5/0.44 1/1'"
return vf
# ── Retention Psychology: Analysis Helpers ────────────────────────────────────
# Keywords that score biological / emotional triggers (from screenshot guide)
_TRIGGER_KEYWORDS = {
"animal": ["dog", "cat", "wolf", "bear", "shark", "snake", "bird", "lion", "tiger"],
"disaster": ["explosion", "crash", "fire", "flood", "earthquake", "storm", "accident"],
"food": ["eating", "cooking", "recipe", "taste", "delicious", "meal", "food"],
"baby": ["baby", "newborn", "puppy", "kitten", "child", "infant"],
"shock": ["unbelievable", "insane", "crazy", "shocking", "impossible", "wtf", "omg"],
"rage": ["wrong", "mistake", "error", "broken", "fail", "typo"],
"narrative": ["story", "secret", "truth", "revealed", "you won't believe", "finally"],
}
def score_transcript_segment(text: str) -> float:
"""
Score a transcript segment for viral potential based on hooks, structure, and emotions.
Returns a float 0.0–1.0.
"""
if not text or not text.strip():
return 0.0
score = 0.0
text_lower = text.lower().strip()
# 1. Hook Phrases (+0.3)
hooks = ["wait", "secret", "no one tells you", "here's why", "never", "always", "mistake", "truth", "actually"]
if any(h in text_lower for h in hooks):
score += 0.3
# 2. Question Mark (+0.2)
if "?" in text:
score += 0.2
# 3. Information Density (+0.2)
word_count = len(text.split())
if word_count > 20:
score += 0.2
# 4. Emotional Words (+0.15)
emotions = ["crazy", "insane", "shocked", "love", "hate", "afraid", "angry", "excited", "wow", "unbelievable"]
if any(e in text_lower for e in emotions):
score += 0.15
# 5. Strong Openers (+0.15)
openers = ["so", "but", "wait", "now", "here", "this is"]
if any(text_lower.startswith(o) for o in openers):
score += 0.15
return min(1.0, round(score, 2))
def score_pacing(energies: list, start: float, end: float) -> int:
"""Count how many 4-second windows within [start, end] contain an energy peak.
Returns a 0–100 pacing score (higher = more frequent attention resets).
"""
if not energies:
return 50 # neutral default
window_energies = []
for d in energies:
# Handle both dict format (from analyze_audio_energy) and legacy tuple format
if isinstance(d, dict):
t = d.get("start_time", 0)
val = d.get("energy", 0)
else:
t, val = d
if start <= t <= end:
window_energies.append(val)
if not window_energies:
return 50
duration = end - start
num_windows = max(1, int(duration / 4))
window_size = max(1, len(window_energies) // num_windows)
peaks = 0
mean_energy = sum(window_energies) / len(window_energies)
for i in range(0, len(window_energies), window_size):
chunk = window_energies[i:i + window_size]
if chunk and max(chunk) > mean_energy * 1.1:
peaks += 1
# Normalise: perfect pacing = 1 peak per window
score = min(100, int((peaks / num_windows) * 100))
return score
def segment_structure(start: float, end: float) -> dict:
"""Split clip into Hook / Body / Reward timestamps (3-Part Structure).
Returns:
{
"hook": {"start": float, "end": float}, # first 10%
"body": {"start": float, "end": float}, # middle 75%
"reward": {"start": float, "end": float}, # last 15%
}
"""
dur = end - start
hook_end = round(start + dur * 0.10, 2)
body_end = round(start + dur * 0.85, 2)
return {
"hook": {"start": round(start, 2), "end": hook_end},
"body": {"start": hook_end, "end": body_end},
"reward": {"start": body_end, "end": round(end, 2)},
}
def build_vf_pad(width: int, height: int) -> str:
"""Scale to fit inside 9:16 (1080Γ—1920) preserving aspect ratio, pad remainder black."""
# scale down to fit, keeping aspect ratio
# then pad symmetrically to exactly 1080x1920
return (
"scale=1080:1920:force_original_aspect_ratio=decrease:flags=lanczos,"
"pad=1080:1920:(ow-iw)/2:(oh-ih)/2:black,"
"setsar=1"
)
# ── Step 6: Cut clip ──────────────────────────────────────────────────────────
def ffmpeg_escape_text(text):
"""Escape text for FFmpeg drawtext filter."""
for ch in ["\\", "'", ":", ";", "[", "]", ",", "="]:
text = text.replace(ch, f"\\{ch}")
return text
# ── Caption Style Definitions ─────────────────────────────────────────────────
CAPTION_STYLES = {
# MrBeast: Big bold white text, thick black outline, yellow highlight, centered low
"mrbeast": (
"FontName=Arial Black,FontSize=22,Bold=1,"
"PrimaryColour=&H00FFFFFF,OutlineColour=&H00000000,"
"BackColour=&H00000000,BorderStyle=1,Outline=4,Shadow=2,"
"Alignment=2,MarginV=80,MarginL=20,MarginR=20,"
"Spacing=0"
),
# Podcast: Clean white text on semi-transparent dark pill, centered bottom
"podcast": (
"FontName=Arial,FontSize=18,Bold=1,"
"PrimaryColour=&H00FFFFFF,OutlineColour=&H00000000,"
"BackColour=&HAA000000,BorderStyle=4,Outline=0,Shadow=0,"
"Alignment=2,MarginV=90,MarginL=30,MarginR=30,"
"Spacing=1"
),
# Neon: Bright cyan/yellow with glowing shadow
"neon": (
"FontName=Arial Black,FontSize=20,Bold=1,"
"PrimaryColour=&H0000FFFF,OutlineColour=&H00FF00FF,"
"BackColour=&H00000000,BorderStyle=1,Outline=3,Shadow=4,"
"Alignment=2,MarginV=80,MarginL=20,MarginR=20,"
"Spacing=0"
),
# Horror: Dark red dripping style, heavy shadow
"horror": (
"FontName=Arial Black,FontSize=20,Bold=1,"
"PrimaryColour=&H002020EE,OutlineColour=&H00000000,"
"BackColour=&H00000000,BorderStyle=1,Outline=4,Shadow=6,"
"Alignment=2,MarginV=80,MarginL=20,MarginR=20,"
"Spacing=1"
),
# Minimal: Small clean white text, no box, light outline
"minimal": (
"FontName=Arial,FontSize=16,Bold=0,"
"PrimaryColour=&H00FFFFFF,OutlineColour=&H80000000,"
"BackColour=&H00000000,BorderStyle=1,Outline=1,Shadow=1,"
"Alignment=2,MarginV=100,MarginL=30,MarginR=30,"
"Spacing=1"
),
}
def cut_clip(video_path: Path, start: float, end: float,
idx: int, width: int, height: int, mode: str = "fill",
captions: bool = False, headline: str = "", cta: str = "",
reframe: bool = False, progress_bar: bool = False, vibe: str = "none",
caption_style: str = "mrbeast",
safe_zone: bool = False, color_mode: str = "off",
watermark_text: str = "",
pre_segments: list = None) -> tuple:
"""Cut a clip and return (output_path, warnings_list, transcript_text)."""
warnings = []
transcript_text = ""
name = f"short_{idx + 1}_{uuid.uuid4().hex[:6]}.mp4"
out = CLIPS_DIR / name
dur = round(end - start, 2)
center_x = 0.5
if reframe and mode == "fill":
center_x = analyze_face_center(video_path, start, dur)
vf_base = (
build_vf_pad(width, height)
if mode == "pad"
else build_vf(width, height, center_x,
safe_zone=safe_zone, color_mode=color_mode)
)
# Base filters
filters = [vf_base]
# Add Headline
if headline:
clean_headline = ffmpeg_escape_text(headline)
filters.append(
f"drawtext=text='{clean_headline}':fontcolor=white:fontsize=80:font='Arial':"
f"x=(w-text_w)/2:y=150:box=1:boxcolor=black@0.6:boxborderw=20"
)
# Add CTA
if cta:
clean_cta = ffmpeg_escape_text(cta)
filters.append(
f"drawtext=text='{clean_cta}':fontcolor=white:fontsize=70:font='Arial':"
f"x=(w-text_w)/2:y=h-250:box=1:boxcolor=black@0.6:boxborderw=20"
)
# Add Watermark (bottom-right corner, semi-transparent)
if watermark_text:
watermark_text = watermark_text[:30] # Server-side length limit
clean_wm = ffmpeg_escape_text(watermark_text)
filters.append(
f"drawtext=text='{clean_wm}':fontcolor=white@0.55:fontsize=38:font='Arial':"
f"x=w-text_w-30:y=h-80"
)
# Add Progress Bar
if progress_bar:
# Progress bar at bottom
# x=0, y=ih-10, w=iw * (t/duration), h=10
filters.append(
f"drawbox=x=0:y=ih-12:w='min(iw,iw*t/{dur})':h=12:color=0xFCD34D@0.9:t=fill"
)
# Add Subtitles (Captions)
ass_path = None
if captions:
try:
# Use pre-supplied segments if available (Intelligence Work-flow)
if pre_segments is not None:
print(f"[>>] Captions: Using pre-supplied segments for clip {idx + 1}", flush=True)
# Filter segments that fall within the clip range and make timestamps relative to the clip
clip_segments = []
for s in pre_segments:
s_start = s.get("start", 0)
s_end = s.get("end", 0)
if s_start < end and s_end > start:
# Slice/Clamp segment if it overlaps but starts before or ends after the clip
rel_start = max(0, round(s_start - start, 2))
rel_end = min(dur, round(s_end - start, 2))
if rel_end > rel_start:
clip_segments.append({
"start": rel_start,
"end": rel_end,
"text": s.get("text", "")
})
transcript_text = " ".join(s.get("text", "") for s in clip_segments)
ass_content = generate_ass(clip_segments, style_name=caption_style)
ass_path = CLIPS_DIR / f"{name}.ass"
ass_path.write_text(ass_content, encoding="utf-8")
filters.append(f"subtitles={ass_path.name}")
print(f"[>>] Captions: ASS written from pre-segments β†’ {ass_path}", flush=True)
else:
# Fallback to per-clip transcription (Sequential Work-flow)
print(f"[>>] Captions: starting for clip {idx + 1}", flush=True)
model = get_whisper_model()
temp_audio = CLIPS_DIR / f"{name}_audio.wav"
subprocess.run([
"ffmpeg", "-i", str(video_path), "-ss", str(start), "-t", str(dur),
"-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", str(temp_audio)
], capture_output=True)
if not temp_audio.exists() or temp_audio.stat().st_size < 8000:
print("[!] Audio too short or silent – skipping captions for this clip", flush=True)
warnings.append("Audio too short – captions skipped for this clip")
if temp_audio.exists(): temp_audio.unlink()
else:
result = model.transcribe(str(temp_audio), task="transcribe", fp16=False)
if temp_audio.exists(): temp_audio.unlink()
transcript_text = " ".join(seg.get("text", "") for seg in result.get("segments", []))
ass_content = generate_ass(result['segments'], style_name=caption_style)
ass_path = CLIPS_DIR / f"{name}.ass"
ass_path.write_text(ass_content, encoding="utf-8")
filters.append(f"subtitles={ass_path.name}")
print(f"[>>] Captions: ASS written from transcription β†’ {ass_path}", flush=True)
except Exception as e:
print(f"[!] Subtitle generation failed: {e}", flush=True)
import traceback
traceback.print_exc()
if 'temp_audio' in locals() and temp_audio.exists():
try:
temp_audio.unlink()
except Exception:
pass
final_vf = ",".join(filters)
# Prepare FFmpeg command
cmd = [
"ffmpeg", "-loglevel", "error", "-ss", str(start), "-i", str(video_path)
]
# Handle Background Music (Vibe)
vibe_file = VIBES_DIR / f"{vibe}.mp3"
if vibe != "none" and vibe_file.exists():
cmd.extend(["-stream_loop", "-1", "-i", str(vibe_file)])
# Mix audio: aevalsrc for delay + amix
# volume 1.0 for original, volume 0.3 for background
filter_complex = (
"[0:a]volume=1.0[main_a];"
"[1:a]volume=0.3[vibe_a];"
"[main_a][vibe_a]amix=inputs=2:duration=first:dropout_transition=2[aout]"
)
cmd.extend(["-filter_complex", filter_complex, "-map", "0:v", "-map", "[aout]",
"-c:a", "aac", "-b:a", "128k"])
else:
cmd.extend(["-c:a", "aac", "-b:a", "128k"])
cmd.extend([
"-t", str(dur),
"-vf", final_vf,
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "26",
"-movflags", "+faststart",
"-y", str(out),
])
print(f"[>>] FFmpeg cmd: {' '.join(cmd)}", flush=True)
# cwd=CLIPS_DIR is required so the relative 'subtitles=filename.ass' path resolves correctly
r = subprocess.run(cmd, capture_output=True, cwd=str(CLIPS_DIR))
if r.stderr:
stderr_text = r.stderr.decode(errors='replace').strip()
if stderr_text:
print(f"[>>] FFmpeg stderr: {stderr_text[-500:]}", flush=True)
# Cleanup ASS
if ass_path and ass_path.exists():
try:
ass_path.unlink()
except Exception:
pass
if r.returncode != 0:
raise RuntimeError(
f"FFmpeg failed for clip {idx + 1}: "
f"{r.stderr.decode(errors='replace')[-600:]}"
)
return out, warnings, transcript_text
# ── Routes ────────────────────────────────────────────────────────────────────
def _base_href():
"""Directory URL for <base> so style.css / app.js resolve behind HF path prefixes."""
sr = (request.script_root or "").strip()
if not sr:
return "/"
return sr.rstrip("/") + "/"
@app.route("/api/process", methods=["POST"])
def process():
data = request.get_json(force=True, silent=True) or {}
youtube_url = (data.get("youtubeUrl") or "").strip()
url_prefix = (request.script_root or "").rstrip("/")
if not youtube_url:
return jsonify({"error": "youtubeUrl is required"}), 400
n_clips = min(int(data.get("clips", MAX_CLIPS)), MAX_CLIPS)
mode = data.get("mode", "fill").strip().lower()
if mode not in ("fill", "pad"):
mode = "fill"
def generate():
video_path = None
try:
# 1. Download
video_path = download_video(youtube_url)
# 2. Info
duration, width, height = get_video_info(video_path)
if duration < 20:
yield json.dumps({"error": "Video too short (minimum 20 s)."}) + "\n"
return
if width == 0 or height == 0:
yield json.dumps({"error": "Could not read video dimensions."}) + "\n"
return
# ── Full Intelligence Work-flow: Transcribe First ───────────────
print("[>>] Intelligence Scan: Extracting full audio...", flush=True)
model = get_whisper_model()
full_audio = video_path.with_suffix(".full.wav")
subprocess.run([
"ffmpeg", "-i", str(video_path), "-vn", "-acodec", "pcm_s16le",
"-ar", "16000", "-ac", "1", "-y", str(full_audio)
], capture_output=True)
print("[>>] Intelligence Scan: Transcribing full video (this may take a minute)...", flush=True)
whisper_result = model.transcribe(str(full_audio), task="transcribe", fp16=False)
full_segments = whisper_result.get("segments", [])
if full_audio.exists(): full_audio.unlink()
# ── Parallel Viral Intelligence: Blended Signal Scan ──────────
# (Heatmap + Audio Energy + Psychology Scores)
print("[>>] Intelligence Scan: Blending signals in parallel...", flush=True)
parallel_res = analyze_video_parallel(video_path, youtube_url, full_segments)
heatmap_data = parallel_res["heatmap"]
energies = parallel_res["energy"]
t_scores = parallel_res["transcript_scores"]
# ── Viral Selection Engine: Rank & Deduplicate ───────────────
print("[>>] Intelligence Scan: Selecting viral winners...", flush=True)
# Parse Duration Constraints
duration_range = data.get("durationRange", "auto")
min_dur, max_dur = 0, 0
if duration_range == "15-30":
min_dur, max_dur = 15, 30
elif duration_range == "30-60":
min_dur, max_dur = 30, 60
elif duration_range == "60-90":
min_dur, max_dur = 60, 90
elif duration_range == "auto" and data.get("retentionMode", False):
# Legacy retention logic
min_dur, max_dur = 20, 59
top_segments = select_top_clips(
full_segments, heatmap_data, energies, t_scores,
num_clips=n_clips, min_dur=min_dur, max_dur=max_dur
)
if not top_segments:
yield json.dumps({"error": "No viral segments identified."}) + "\n"
return
# Tell the frontend how many clips to expect
yield json.dumps({"total": len(top_segments)}) + "\n"
# 5. Extract additional settings
use_captions = data.get("captions", False)
headline = data.get("headline", "").strip()
cta = data.get("cta", "").strip()
reframe = data.get("reframe", False)
progress_bar = data.get("progressBar", False)
vibe = data.get("vibe", "none")
caption_style = data.get("captionStyle", "mrbeast").strip().lower()
if caption_style not in ASS_STYLES:
caption_style = "mrbeast"
# ── Retention Psychology flags ────────────────────────────────────
color_mode = str(data.get("colorMode", "off")).strip().lower()
watermark_text = str(data.get("watermarkText", "")).strip()
safe_zone = data.get("safeZone", False)
def _cut(args):
i, seg_data = args
s = seg_data["start_time"]
e = seg_data["end_time"]
return i, cut_clip(
video_path, s, e, i, width, height, mode,
captions=use_captions, headline=headline, cta=cta,
reframe=reframe, progress_bar=progress_bar, vibe=vibe,
caption_style=caption_style,
safe_zone=safe_zone, color_mode=color_mode,
watermark_text=watermark_text,
pre_segments=full_segments
), seg_data
with ThreadPoolExecutor(max_workers=min(len(top_segments), 3)) as pool:
futures = {pool.submit(_cut, (i, seg_data)): i
for i, seg_data in enumerate(top_segments)}
for future in as_completed(futures):
try:
i, (clip_path, clip_warnings, _), seg_data = future.result()
s = seg_data["start_time"]
e = seg_data["end_time"]
# Yield any warnings from clip processing
for warning_msg in clip_warnings:
yield json.dumps({"type": "warning", "msg": warning_msg}) + "\n"
# ── Retention Psychology: per-clip analytics ──────────
pacing = score_pacing(energies, s, e)
structure = segment_structure(s, e)
# Assemble viral analysis block for the response
scores = {
"Heatmap": seg_data["heatmap_score"],
"Energy": seg_data["energy_score"],
"Transcript": seg_data["transcript_score"]
}
top_sig = max(scores, key=scores.get).lower().replace(" ", "_")
viral_analysis = {
"rank": i + 1,
"final_score": seg_data["final_score"],
"heatmap_score": seg_data["heatmap_score"],
"energy_score": seg_data["energy_score"],
"transcript_score": seg_data["transcript_score"],
"top_signal": top_sig
}
# Triggers for UI compatibility
triggers = {
"score": seg_data["final_score"],
"tags": [
f"Viral: {int(seg_data['final_score']*100)}%",
"Visual Peak" if seg_data["heatmap_score"] > 0.5 else "",
"Audio Punch" if seg_data["energy_score"] > 0.6 else ""
]
}
triggers["tags"] = [t for t in triggers["tags"] if t]
yield json.dumps({
"clip": f"{url_prefix}/clips/{clip_path.name}",
"index": i,
"pacing": pacing,
"structure": structure,
"triggers": triggers,
"viral_analysis": viral_analysis
}) + "\n"
except Exception as clip_err:
print(f"[!] Clip failed: {clip_err}", flush=True)
yield json.dumps({"warning": f"Clip processing failed: {clip_err}"}) + "\n"
except Exception as exc:
yield json.dumps({"error": str(exc)}) + "\n"
finally:
if video_path and video_path.exists():
try:
video_path.unlink()
except OSError:
pass
return Response(
stream_with_context(generate()),
mimetype="application/x-ndjson",
headers={"X-Accel-Buffering": "no"}, # prevent proxy buffering
)
@app.route("/clips/<path:filename>")
def serve_clip(filename):
return send_from_directory(str(CLIPS_DIR), filename)
@app.route("/health")
def health():
missing = check_deps()
return jsonify({
"status": "ok" if not missing else "degraded",
"missing_tools": missing,
})
@app.route("/health/dns")
def health_dns():
"""Diagnostic endpoint: test if this container can reach YouTube."""
results = {}
for host in ["www.youtube.com", "www.google.com", "huggingface.co"]:
try:
addr = socket.getaddrinfo(host, 443, socket.AF_INET)
results[host] = {"ok": True, "ip": addr[0][4][0]}
except Exception as e:
results[host] = {"ok": False, "error": str(e)}
# Also report yt-dlp version
try:
ver = subprocess.run(["yt-dlp", "--version"], capture_output=True, text=True)
yt_dlp_ver = ver.stdout.strip()
except Exception:
yt_dlp_ver = "unknown"
# Check cookies
cookies_path = _find_cookies_file()
all_ok = all(r["ok"] for r in results.values())
return jsonify({
"dns_status": "ok" if all_ok else "BLOCKED",
"hosts": results,
"yt_dlp_version": yt_dlp_ver,
"cookies_found": cookies_path or False,
})
@app.route("/")
def root():
base = _base_href()
inject = f' <base href="{escape(base)}">\n'
html = _INDEX_HTML_RAW
if "<base " not in _INDEX_HTML_RAW:
html = _INDEX_HTML_RAW.replace("<head>", "<head>\n" + inject, 1)
return Response(html, mimetype="text/html; charset=utf-8")
@app.route("/style.css")
def serve_css():
return send_from_directory(str(BASE_DIR), "style.css")
@app.route("/app.js")
def serve_js():
return send_from_directory(str(BASE_DIR), "app.js")
# ── Entry ─────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("\n[>>] AI SquadX VIP - Clipper Backend")
print(f" http://localhost:{PORT}")
# Show yt-dlp version for debugging
try:
ver = subprocess.run(["yt-dlp", "--version"], capture_output=True, text=True)
print(f" yt-dlp version: {ver.stdout.strip()}")
except Exception:
print(" yt-dlp version: unknown")
# Show cookies status
cookies = _find_cookies_file()
if cookies:
print(f" Cookies: {cookies}")
else:
print(" Cookies: NOT FOUND (YouTube may block downloads)")
print(" β†’ Place a cookies.txt next to server.py to fix this.")
print()
missing = check_deps()
if missing:
print(f"[!] Missing: {', '.join(missing)}")
print(" Install them or clips won't generate.\n")
else:
print("[OK] ffmpeg, ffprobe, yt-dlp found\n")
app.run(host="0.0.0.0", port=PORT, debug=False, threaded=False)