import os import re import uuid import time import asyncio from typing import Dict, List, Optional from fastapi import FastAPI, BackgroundTasks, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from pydantic import BaseModel import httpx # ------------------------------------------ # CONFIGURATION # ------------------------------------------ def _fetch_cloud_name(): import urllib.request as _ur, json as _j, ssl as _ssl ctx = _ssl.create_default_context() req = _ur.Request("https://media.toolxp.org/config", headers={"User-Agent": "Mozilla/5.0"}) for _i in range(3): try: with _ur.urlopen(req, timeout=10, context=ctx) as r: name = _j.loads(r.read().decode())["cloud_name"] if name: print(f"[config] cloud_name={name}") return name except Exception as _e: print(f"[config] attempt {_i+1} failed: {_e}") raise RuntimeError("[config] FATAL: could not fetch cloud_name after 3 attempts") CLOUD_NAME = _fetch_cloud_name() # Media proxy hides Cloudinary origin from end-users. # Route: media.toolxp.org → res.cloudinary.com/doxoms9hd (via Cloudflare Worker) CLOUDINARY_BASE = f"https://media.toolxp.org/video/upload" # ------------------------------------------ # IN-MEMORY JOB STORE # ------------------------------------------ JOBS: Dict[str, dict] = {} # ------------------------------------------ # APP SETUP # ------------------------------------------ app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class VideoRequest(BaseModel): video_url: str # ------------------------------------------ # URL PARSING HELPERS # ------------------------------------------ def parse_cloudinary_url(url: str) -> dict: """ Parse Cloudinary URL to extract video_id, start_time, and duration. Expected format: https://res.cloudinary.com/.../so_55,du_30/.../video_id.mp4 or: https://res.cloudinary.com/.../so_55,du_30/fl_getinfo/video_id.jpg """ # Extract video ID (last segment before extension) video_id_match = re.search(r'/([^/]+)\.(mp4|jpg|webm|mov)$', url) video_id = video_id_match.group(1) if video_id_match else None # Extract start offset (so_X) start_match = re.search(r'so_(\d+(?:\.\d+)?)', url) start_time = float(start_match.group(1)) if start_match else 0 # Extract duration (du_X) duration_match = re.search(r'du_(\d+(?:\.\d+)?)', url) duration = float(duration_match.group(1)) if duration_match else 30 return { "video_id": video_id, "start_time": start_time, "duration": duration, "end_time": start_time + duration } def get_face_info_url(video_id: str, time_sec: float) -> str: """ Build URL to fetch face data for a specific frame. Returns JSON with landmarks when fetched. """ return f"{CLOUDINARY_BASE}/so_{time_sec},f_jpg/c_thumb,g_face,w_450/fl_getinfo/{video_id}.jpg" async def fetch_face_data(client: httpx.AsyncClient, video_id: str, time_sec: float) -> dict: """ Fetch face detection data for a specific timestamp. Returns the number of faces, their positions, and source video dimensions. """ url = get_face_info_url(video_id, time_sec) try: response = await client.get(url, timeout=10.0) if response.status_code == 200: data = response.json() landmarks = data.get("landmarks", [[]]) input_info = data.get("input", {}) # landmarks[0] is array of face objects face_count = len(landmarks[0]) if landmarks and landmarks[0] else 0 return { "time": time_sec, "face_count": face_count, "landmarks": landmarks[0] if landmarks else [], "source_w": input_info.get("width", 1920), "source_h": input_info.get("height", 1080) } except Exception as e: print(f"Error fetching face data at {time_sec}s: {e}") return {"time": time_sec, "face_count": 0, "landmarks": [], "source_w": 1920, "source_h": 1080} def find_multi_face_segments(frame_data: List[dict]) -> List[dict]: """ Analyze frame data to find segments where 2+ REAL faces are detected. Ghost face filtering happens HERE (before segment detection), not downstream. This prevents hands/objects from ever triggering a false split-screen. Returns list of segments with start/end times and averaged face coordinates. """ # Extract source video dimensions from the first frame's API response source_w = frame_data[0].get("source_w", 1920) if frame_data else 1920 source_h = frame_data[0].get("source_h", 1080) if frame_data else 1080 segments = [] in_multi_face = False segment_start = None segment_faces = [] # collect FILTERED face landmarks for calculating averages for frame in frame_data: # STEP 1: Extract face centers from raw landmarks raw_faces = [] for face in frame.get("landmarks", []): center = _extract_face_center(face) if center: raw_faces.append(center) # STEP 2: Filter out ghost/fake faces BEFORE deciding face count real_faces = _filter_ghost_faces(raw_faces) real_face_count = len(real_faces) if real_face_count >= 2: if not in_multi_face: # Start new segment in_multi_face = True segment_start = frame["time"] segment_faces = [] segment_faces.append(frame["landmarks"]) else: if in_multi_face: # End segment and calculate averages in_multi_face = False left_avg, right_avg = compute_face_crops(segment_faces, source_w, source_h) segments.append({ "start": segment_start, "end": frame["time"], "top_face": left_avg, "bottom_face": right_avg }) # Close any open segment if in_multi_face and segment_start is not None: left_avg, right_avg = compute_face_crops(segment_faces, source_w, source_h) segments.append({ "start": segment_start, "end": frame_data[-1]["time"] if frame_data else segment_start, "top_face": left_avg, "bottom_face": right_avg }) return segments def _extract_face_center(face: dict) -> dict: """ Extract the geometric center (cx, cy_eyes) of a face from Cloudinary landmarks. Also computes 'span' — the diagonal of the landmark bounding box — used to detect and reject ghost/fake face detections. cy_eyes = eye-level Y, which is the most reliable vertical anchor. Works for both frontal and profile views. """ pts = [v for v in face.values() if isinstance(v, dict) and 'x' in v and 'y' in v] if not pts: return None xs = [p['x'] for p in pts] ys = [p['y'] for p in pts] cx = sum(xs) / len(xs) # Use the topmost Y coordinate as the eye-level reference # (eyes are always the highest landmarks returned) cy_eyes = min(ys) # Landmark bounding box diagonal — measures "face size on screen" # Real faces: 80-300px diagonal. Ghost faces (hands, objects): 10-40px. span_x = max(xs) - min(xs) span_y = max(ys) - min(ys) span = (span_x ** 2 + span_y ** 2) ** 0.5 return {'cx': cx, 'cy_eyes': cy_eyes, 'span': span} def _filter_ghost_faces(processed_faces: list) -> list: """ Filter out ghost/fake face detections from a single frame. Ghost faces are typically: - Hands, fingers, or objects misidentified as faces - Very small landmark span compared to real faces in the same frame - Landmark span < 40% of the largest face → rejected - Absolute minimum span of 30px (any face smaller than this is too tiny to be real) """ if len(processed_faces) < 2: return processed_faces # Find the largest face in this frame max_span = max(f['span'] for f in processed_faces) # Reject faces whose span is less than 40% of the largest face # Also reject faces with absolute span < 30px (too small to be a real face) MIN_RELATIVE_SPAN = 0.40 MIN_ABSOLUTE_SPAN = 30.0 filtered = [ f for f in processed_faces if f['span'] >= max_span * MIN_RELATIVE_SPAN and f['span'] >= MIN_ABSOLUTE_SPAN ] return filtered if filtered else processed_faces[:1] # Always keep at least the biggest face def compute_face_crops(segment_faces_data: List[List[dict]], source_w: int, source_h: int) -> tuple[dict, dict]: """ ╔═══════════════════════════════════════════════════════════════╗ ║ PROPORTIONAL FACE CROP ALGORITHM ║ ║ ║ ║ Core principle: Crop SIZE comes from the source video ║ ║ dimensions (always proportional). Landmarks are used ║ ║ ONLY for positioning (centering on the face). ║ ║ ║ ║ This ensures consistent framing regardless of whether ║ ║ the subject is close-up or far from the camera. ║ ╚═══════════════════════════════════════════════════════════════╝ Algorithm steps: 1. Collect face center points from all frames in the segment 2. Filter ghost/fake faces (hands, objects) using landmark span comparison 3. Sort left vs right speaker by horizontal position 4. Average each speaker's position across all frames (temporal smoothing) 5. Calculate crop width = 50% of source video width (standard interview framing) 6. Apply anti-overlap: if faces are close, reduce crop width so boxes don't overlap 7. Force 9:8 aspect ratio (matches 1080x960 layer) so c_fill = pure scale 8. Position: face centered horizontally, eye-level at 35% from top (rule of thirds) 9. Clamp to source video bounds """ TARGET_ASPECT = 1080 / 960 # 9:8 = 1.125 BASE_CROP_RATIO = 0.50 # Each speaker gets 50% of source width as base crop FACE_VERTICAL_POS = 0.35 # Eyes sit at 35% from top of frame (rule of thirds) # --- STEP 1-3: Collect, filter, and average face centers --- left_centers = [] right_centers = [] for frame_faces in segment_faces_data: processed = [] for face in frame_faces: center = _extract_face_center(face) if center: processed.append(center) # Filter out ghost/fake faces (hands, objects, etc.) processed = _filter_ghost_faces(processed) # Sort left-to-right by horizontal position sorted_faces = sorted(processed, key=lambda f: f['cx']) if len(sorted_faces) >= 2: left_centers.append(sorted_faces[0]) right_centers.append(sorted_faces[-1]) # Average positions across all frames (temporal smoothing) def avg_center(centers, fallback_x, fallback_y): if not centers: return fallback_x, fallback_y cx = sum(c['cx'] for c in centers) / len(centers) cy = sum(c['cy_eyes'] for c in centers) / len(centers) return cx, cy left_cx, left_cy = avg_center(left_centers, source_w * 0.25, source_h * 0.40) right_cx, right_cy = avg_center(right_centers, source_w * 0.75, source_h * 0.40) # --- STEP 4: Base crop size from source dimensions --- crop_w = int(source_w * BASE_CROP_RATIO) # --- STEP 5: Anti-overlap --- # If the two faces are close together, reduce crop width so boxes don't overlap face_gap = abs(right_cx - left_cx) max_allowed_w = int(face_gap * 0.92) # Leave 8% gap between the two crops if crop_w > max_allowed_w and max_allowed_w > 200: crop_w = max_allowed_w # --- STEP 6: Force 9:8 aspect ratio --- crop_h = int(crop_w / TARGET_ASPECT) # Ensure crop height fits within source video if crop_h > source_h: crop_h = source_h crop_w = int(crop_h * TARGET_ASPECT) # --- STEP 7-8: Position each crop --- def position_crop(face_cx, face_cy_eyes): # Center horizontally on the face x = int(face_cx - crop_w / 2) # Vertically: place eye-level at 35% from top of crop (rule of thirds) # This naturally gives correct headroom above and shows shoulders below y = int(face_cy_eyes - crop_h * FACE_VERTICAL_POS) # Clamp to source video bounds x = max(0, min(x, source_w - crop_w)) y = max(0, min(y, source_h - crop_h)) return {"x": x, "y": y, "w": crop_w, "h": crop_h} left_crop = position_crop(left_cx, left_cy) right_crop = position_crop(right_cx, right_cy) return left_crop, right_crop def build_final_url(video_id: str, start_time: float, end_time: float, multi_face_segments: List[dict]) -> str: """ Build the final Cloudinary URL with layers for multi-face segments. Base: Full 9:16 video with g_auto:face Layers: Split-screen overlays during multi-face segments using exact face coordinates """ duration = end_time - start_time # Base transformation: 9:16 vertical with face tracking fallback base = f"so_{start_time},eo_{end_time}/w_1080,h_1920,c_fill,g_auto:face" # Build layers for each multi-face segment layers = [] for segment in multi_face_segments: seg_start = round(segment["start"], 2) seg_end = round(segment["end"], 2) seg_duration = round(seg_end - seg_start, 2) # Skip segments shorter than 1 second if seg_duration < 1: continue # Calculate offsets in OUTPUT video timeline layer_start_offset = round(seg_start - start_time, 2) layer_end_offset = round(seg_end - start_time, 2) # Use our pre-calculated bounding boxes t_face = segment.get("top_face", {"x": 0, "y": 0, "w": 300, "h": 300}) b_face = segment.get("bottom_face", {"x": 0, "y": 0, "w": 300, "h": 300}) # Top layer - left speaker # 1. c_crop extracts just their face box # 2. c_fill scales that tight box strictly up/down to 1080x960 top_layer = ( f"l_video:{video_id}," f"so_{seg_start},eo_{seg_end},du_{seg_duration},ac_none/" f"c_crop,w_{t_face['w']},h_{t_face['h']},x_{t_face['x']},y_{t_face['y']}/" f"c_fill,w_1080,h_960/" f"fl_layer_apply,g_north,so_{layer_start_offset},eo_{layer_end_offset}" ) # Bottom layer - right speaker bottom_layer = ( f"l_video:{video_id}," f"so_{seg_start},eo_{seg_end},du_{seg_duration},ac_none/" f"c_crop,w_{b_face['w']},h_{b_face['h']},x_{b_face['x']},y_{b_face['y']}/" f"c_fill,w_1080,h_960/" f"fl_layer_apply,g_south,so_{layer_start_offset},eo_{layer_end_offset}" ) layers.append(top_layer) layers.append(bottom_layer) # Combine all parts if layers: transformations = f"{base}/{'/'.join(layers)}" else: transformations = f"{base}" return f"{CLOUDINARY_BASE}/{transformations}/{video_id}.mp4" # ------------------------------------------ # BACKGROUND WORKER # ------------------------------------------ def process_video_sync(job_id: str, video_url: str): """ Synchronous wrapper for async processing. """ asyncio.run(process_video_async(job_id, video_url)) async def process_video_async(job_id: str, video_url: str): """ Main video processing logic: 1. Parse URL to get video_id and time range 2. Fetch face data for each frame (500ms intervals) 3. Find multi-face segments 4. Build final URL with layers """ print(f"[{job_id}] Starting job: {video_url}") JOBS[job_id]["status"] = "processing" JOBS[job_id]["progress"] = "Parsing video URL..." try: # 1. Parse URL parsed = parse_cloudinary_url(video_url) video_id = parsed["video_id"] start_time = parsed["start_time"] end_time = parsed["end_time"] duration = parsed["duration"] if not video_id: raise Exception("Could not extract video ID from URL") JOBS[job_id]["progress"] = f"Analyzing {duration}s of video..." print(f"[{job_id}] Video: {video_id}, Range: {start_time}s - {end_time}s") # 2. Fetch face data for each frame (500ms intervals) frame_times = [] t = start_time while t <= end_time: frame_times.append(round(t, 1)) t += 0.5 total_frames = len(frame_times) JOBS[job_id]["progress"] = f"Fetching face data for {total_frames} frames..." frame_data = [] async with httpx.AsyncClient() as client: # Process in batches of 10 to avoid overwhelming the API batch_size = 10 for i in range(0, len(frame_times), batch_size): batch = frame_times[i:i + batch_size] tasks = [fetch_face_data(client, video_id, t) for t in batch] results = await asyncio.gather(*tasks) frame_data.extend(results) progress_pct = min(100, int((i + batch_size) / total_frames * 100)) JOBS[job_id]["progress"] = f"Analyzing frames... {progress_pct}%" # 3. Find multi-face segments JOBS[job_id]["progress"] = "Detecting multi-face segments..." multi_face_segments = find_multi_face_segments(frame_data) print(f"[{job_id}] Found {len(multi_face_segments)} multi-face segments") # 4. Build final URL JOBS[job_id]["progress"] = "Building final video URL..." final_url = build_final_url(video_id, start_time, end_time, multi_face_segments) # 5. Complete JOBS[job_id]["status"] = "completed" JOBS[job_id]["progress"] = "Done" JOBS[job_id]["result"] = { "video_url": final_url, "video_id": video_id, "start_time": start_time, "end_time": end_time, "multi_face_segments": multi_face_segments, "total_frames_analyzed": total_frames } print(f"[{job_id}] Completed: {final_url}") except Exception as e: print(f"[{job_id}] FAILED: {str(e)}") JOBS[job_id]["status"] = "failed" JOBS[job_id]["error"] = str(e) JOBS[job_id]["progress"] = "Failed" # ------------------------------------------ # API ENDPOINTS # ------------------------------------------ @app.post("/jobs") def submit_job(req: VideoRequest, background_tasks: BackgroundTasks): job_id = str(uuid.uuid4()) JOBS[job_id] = { "status": "queued", "progress": "Waiting in queue...", "result": None, "error": None, "created_at": time.time() } background_tasks.add_task(process_video_sync, job_id, req.video_url) return {"job_id": job_id, "status": "queued"} @app.get("/jobs/{job_id}") def get_job_status(job_id: str): job = JOBS.get(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") return job @app.get("/") def home(): return {"message": "Magic Cut API is Running", "version": "1.0"} @app.get("/client", response_class=HTMLResponse) def serve_client(): """Serve the embedded HTML client.""" html_content = """
so_X,du_Y (start time, duration)