import cv2 import numpy as np import os import subprocess import mediapipe as mp from scripts.one_face import crop_and_resize_single_face, resize_with_padding, detect_face_or_body, crop_center_zoom from scripts.two_face import crop_and_resize_two_faces, detect_face_or_body_two_faces try: from scripts.face_detection_insightface import init_insightface, detect_faces_insightface, crop_and_resize_insightface INSIGHTFACE_AVAILABLE = True except ImportError: INSIGHTFACE_AVAILABLE = False print("InsightFace not found or error importing. Install with: pip install insightface onnxruntime-gpu") # Global cache for encoder CACHED_ENCODER = None def get_best_encoder(): global CACHED_ENCODER if CACHED_ENCODER: return CACHED_ENCODER try: # Check available encoders result = subprocess.run(['ffmpeg', '-hide_banner', '-encoders'], capture_output=True, text=True) output = result.stdout # Priority: NVENC (NVIDIA) > AMF (AMD) > QSV (Intel) > CPU if "h264_nvenc" in output: print("Encoder Detected: NVIDIA (h264_nvenc)") CACHED_ENCODER = ("h264_nvenc", "fast") # p1-p7 presets could be used but 'fast' maps well return CACHED_ENCODER if "h264_amf" in output: print("Encoder Detected: AMD (h264_amf)") CACHED_ENCODER = ("h264_amf", "speed") # quality, speed, balanced return CACHED_ENCODER if "h264_qsv" in output: print("Encoder Detected: Intel QSV (h264_qsv)") CACHED_ENCODER = ("h264_qsv", "veryfast") return CACHED_ENCODER # Mac OS (VideoToolbox) if "h264_videotoolbox" in output: print("Encoder Detected: MacOS (h264_videotoolbox)") CACHED_ENCODER = ("h264_videotoolbox", "default") return CACHED_ENCODER except Exception as e: print(f"Error checking encoders: {e}") print("Encoder Detected: CPU (libx264)") CACHED_ENCODER = ("libx264", "ultrafast") return CACHED_ENCODER def get_center_bbox(bbox): # bbox: [x1, y1, x2, y2] return ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2) def get_center_rect(rect): # rect: (x, y, w, h) return (rect[0] + rect[2] / 2, rect[1] + rect[3] / 2) def sort_by_proximity(new_faces, old_faces, center_func): """ Sorts new_faces to match the order of old_faces based on distance. new_faces: list of face objects (bbox or tuple) old_faces: list of face objects (bbox or tuple) center_func: function that takes a face object and returns (cx, cy) """ if not old_faces or len(old_faces) != 2 or len(new_faces) != 2: return new_faces old_c1 = center_func(old_faces[0]) old_c2 = center_func(old_faces[1]) new_c1 = center_func(new_faces[0]) new_c2 = center_func(new_faces[1]) # Cost if we keep order: [new1, new2] # dist(old1, new1) + dist(old2, new2) dist_keep = ((old_c1[0]-new_c1[0])**2 + (old_c1[1]-new_c1[1])**2) + \ ((old_c2[0]-new_c2[0])**2 + (old_c2[1]-new_c2[1])**2) # Cost if we swap: [new2, new1] # dist(old1, new2) + dist(old2, new1) dist_swap = ((old_c1[0]-new_c2[0])**2 + (old_c1[1]-new_c2[1])**2) + \ ((old_c2[0]-new_c1[0])**2 + (old_c2[1]-new_c1[1])**2) # If swapping reduces total movement distance, do it if dist_swap < dist_keep: return [new_faces[1], new_faces[0]] return new_faces def generate_short_fallback(input_file, output_file, index, project_folder, final_folder, no_face_mode="padding"): """Fallback function: Center Crop (Zoom) or Padding if detection fails.""" print(f"Processing (Fallback): {input_file} | Mode: {no_face_mode}") cap = cv2.VideoCapture(input_file) if not cap.isOpened(): print(f"Error opening video: {input_file}") return fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Target dimensions (9:16) target_width = 1080 target_height = 1920 encoder_name, encoder_preset = get_best_encoder() # Use FFmpeg Pipe instead of cv2.VideoWriter to avoid OpenCV backend errors ffmpeg_cmd = [ 'ffmpeg', '-y', '-loglevel', 'error', '-hide_banner', '-stats', '-f', 'rawvideo', '-vcodec', 'rawvideo', '-s', f'{target_width}x{target_height}', '-pix_fmt', 'bgr24', '-r', str(fps), '-i', '-', '-c:v', encoder_name, '-preset', encoder_preset, '-pix_fmt', 'yuv420p', output_file ] # If using hardware encoder, we might want to set bitrate to ensure quality if "nvenc" in encoder_name or "amf" in encoder_name: ffmpeg_cmd.extend(["-b:v", "5M"]) process = subprocess.Popen(ffmpeg_cmd, stdin=subprocess.PIPE) while True: ret, frame = cap.read() if not ret: break if no_face_mode == "zoom": result = crop_center_zoom(frame) else: result = resize_with_padding(frame) try: # Write raw bytes to ffmpeg stdin process.stdin.write(result.tobytes()) except Exception as e: print(f"Error writing frame to ffmpeg pipe: {e}") pass cap.release() process.stdin.close() process.wait() finalize_video(input_file, output_file, index, fps, project_folder, final_folder) def finalize_video(input_file, output_file, index, fps, project_folder, final_folder): """Mux audio and video.""" audio_file = os.path.join(project_folder, "cuts", f"output-audio-{index}.aac") subprocess.run(["ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-i", input_file, "-vn", "-acodec", "copy", audio_file], check=False, capture_output=True) if os.path.exists(audio_file) and os.path.getsize(audio_file) > 0: final_output = os.path.join(final_folder, f"final-output{str(index).zfill(3)}_processed.mp4") encoder_name, encoder_preset = get_best_encoder() command = [ "ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-stats", "-i", output_file, "-i", audio_file, "-c:v", encoder_name, "-preset", encoder_preset, "-b:v", "5M", "-c:a", "aac", "-b:a", "192k", "-r", str(fps), final_output ] try: subprocess.run(command, check=True) #, capture_output=True) print(f"Final file generated: {final_output}") try: os.remove(audio_file) os.remove(output_file) except: pass except subprocess.CalledProcessError as e: print(f"Error muxing: {e}") else: print(f"Warning: No audio extracted for {input_file}") def calculate_mouth_ratio(landmarks): """ Calculate Mouth Aspect Ratio (MAR) using 68-point landmarks (inner lips). Indices: Inner Lips: 60-67 (0-indexed 60 to 67) Left Corner: 60 Right Corner: 64 Top Center: 62 Bottom Center: 66 """ if landmarks is None: return 0 # 3D points (x,y,z) or 2D (x,y). We use first 2 cols. pts = landmarks.astype(float) # Simple vertical vs horizontal # Vertical p62 = pts[62] p66 = pts[66] h = np.linalg.norm(p62[:2] - p66[:2]) # Horizontal p60 = pts[60] p64 = pts[64] w = np.linalg.norm(p60[:2] - p64[:2]) if w < 1e-6: return 0 return h / w def generate_short_mediapipe(input_file, output_file, index, face_mode, project_folder, final_folder, face_detection, face_mesh, pose, detection_period=None, no_face_mode="padding"): try: cap = cv2.VideoCapture(input_file) if not cap.isOpened(): print(f"Error opening video: {input_file}") return fps = cap.get(cv2.CAP_PROP_FPS) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_file, fourcc, fps, (1080, 1920)) next_detection_frame = 0 current_interval = int(5 * fps) # Initial guess # Initial Interval Logic if predefined if detection_period is not None: current_interval = max(1, int(detection_period * fps)) elif face_mode == "2": current_interval = int(1.0 * fps) last_detected_faces = None last_frame_face_positions = None last_success_frame = -1000 max_frames_without_detection = int(3.0 * fps) # 3 seconds timeout transition_duration = int(fps) transition_frames = [] for frame_index in range(total_frames): ret, frame = cap.read() if not ret or frame is None: break if frame_index >= next_detection_frame: # Detect ALL faces (up to 2 in our implementation) detections = detect_face_or_body_two_faces(frame, face_detection, face_mesh, pose) # Dynamic Logic target_faces = 1 if face_mode == "2": target_faces = 2 elif face_mode == "auto": if detections and len(detections) >= 2: target_faces = 2 else: target_faces = 1 # Filter detections based on target current_detections = [] if detections: # Sort detections by approximate Area (w*h) descending to pick main faces first detections.sort(key=lambda s: s[2] * s[3], reverse=True) if len(detections) >= target_faces: current_detections = detections[:target_faces] elif len(detections) > 0: # Fallback current_detections = detections[:1] target_faces = 1 # Apply Consistency Check (Proximity) if target_faces == 2 and len(current_detections) == 2: if last_detected_faces is not None and len(last_detected_faces) == 2: current_detections = sort_by_proximity(current_detections, last_detected_faces, get_center_rect) # Check for stability/lookahead could go here but skipping for brevity unless requested. if current_detections and len(current_detections) == target_faces: if last_frame_face_positions is not None: start_faces = np.array(last_frame_face_positions) end_faces = np.array(current_detections) try: transition_frames = np.linspace(start_faces, end_faces, transition_duration, dtype=int) except Exception as e: # Fallback if shapes mismatch unexpectedly transition_frames = [] else: transition_frames = [] last_detected_faces = current_detections last_success_frame = frame_index else: pass # Update next detection frame step = 5 if detection_period is not None: if isinstance(detection_period, dict): # If we are targeting 2 faces, we use '2' interval, else '1' key = str(target_faces) val = detection_period.get(key, detection_period.get('1', 0.2)) step = max(1, int(val * fps)) else: step = max(1, int(detection_period * fps)) elif target_faces == 2: step = int(1.0 * fps) else: step = int(5) # 5 frames for 1 face next_detection_frame = frame_index + step if len(transition_frames) > 0: current_faces = transition_frames[0] transition_frames = transition_frames[1:] elif last_detected_faces is not None and (frame_index - last_success_frame) <= max_frames_without_detection: current_faces = last_detected_faces else: if no_face_mode == "zoom": result = crop_center_zoom(frame) else: result = resize_with_padding(frame) coordinate_log.append({"frame": frame_index, "faces": []}) out.write(result) continue last_frame_face_positions = current_faces if hasattr(current_faces, '__len__') and len(current_faces) == 2: result = crop_and_resize_two_faces(frame, current_faces) else: # Ensure it's list of tuples or single tuple? current_faces is list of tuples from detection # If 1 face: [ (x,y,w,h) ] if hasattr(current_faces, '__len__') and len(current_faces) > 0: f = current_faces[0] result = crop_and_resize_single_face(frame, f) else: if no_face_mode == "zoom": result = crop_center_zoom(frame) else: result = resize_with_padding(frame) out.write(result) cap.release() out.release() finalize_video(input_file, output_file, index, fps, project_folder, final_folder) except Exception as e: print(f"Error in MediaPipe processing: {e}") raise e # Rethrow to trigger fallback def generate_short_haar(input_file, output_file, index, project_folder, final_folder, detection_period=None, no_face_mode="padding"): """Face detection using OpenCV Haar Cascades.""" print(f"Processing (Haar Cascade): {input_file}") # Load Haar Cascade cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml' face_cascade = cv2.CascadeClassifier(cascade_path) if face_cascade.empty(): print("Error: Could not load Haar Cascade XML. Falling back to center crop.") generate_short_fallback(input_file, output_file, index, project_folder, final_folder) return cap = cv2.VideoCapture(input_file) if not cap.isOpened(): print(f"Error opening video: {input_file}") return fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_file, fourcc, fps, (1080, 1920)) # Logic copied from generate_short_mediapipe detection_interval = int(2 * fps) # Default check every 2 seconds if detection_period is not None: detection_interval = max(1, int(detection_period * fps)) last_detected_faces = None last_frame_face_positions = None last_success_frame = -1000 max_frames_without_detection = int(3.0 * fps) transition_duration = int(fps) # 1 second smooth transition transition_frames = [] for frame_index in range(total_frames): ret, frame = cap.read() if not ret or frame is None: break if frame_index % detection_interval == 0: gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = face_cascade.detectMultiScale(gray, 1.1, 4) detections = [] if len(faces) > 0: # Pick largest face largest_face = max(faces, key=lambda f: f[2] * f[3]) # Ensure int type detections = [tuple(map(int, largest_face))] if detections: if last_frame_face_positions is not None: # Simple linear interpolation for smoothing start_faces = np.array(last_frame_face_positions) end_faces = np.array(detections) # Generate transition frames steps = transition_duration transition_frames = [] for s in range(steps): t = (s + 1) / steps interp = (1 - t) * start_faces + t * end_faces transition_frames.append(interp.astype(int).tolist()) # Convert back to list of lists/tuples else: transition_frames = [] last_detected_faces = detections last_success_frame = frame_index else: pass if len(transition_frames) > 0: current_faces = transition_frames[0] transition_frames = transition_frames[1:] elif last_detected_faces is not None and (frame_index - last_success_frame) <= max_frames_without_detection: current_faces = last_detected_faces else: # No face detected for a while -> Center/Padding fallback if no_face_mode == "zoom": result = crop_center_zoom(frame) else: result = resize_with_padding(frame) out.write(result) continue last_frame_face_positions = current_faces # haar detections are list containing one tuple (x,y,w,h) # current_faces is list of one tuple if isinstance(current_faces, list): face_bbox = current_faces[0] else: face_bbox = current_faces # Should be handled result = crop_and_resize_single_face(frame, face_bbox) out.write(result) cap.release() out.release() finalize_video(input_file, output_file, index, fps, project_folder, final_folder) finalize_video(input_file, output_file, index, fps, project_folder, final_folder) finalize_video(input_file, output_file, index, fps, project_folder, final_folder) def generate_short_insightface(input_file, output_file, index, project_folder, final_folder, face_mode="auto", detection_period=None, filter_threshold=0.35, two_face_threshold=0.60, confidence_threshold=0.30, dead_zone=40, focus_active_speaker=False, active_speaker_mar=0.03, active_speaker_score_diff=1.5, include_motion=False, active_speaker_motion_deadzone=3.0, active_speaker_motion_sensitivity=0.05, active_speaker_decay=2.0, no_face_mode="padding"): """Face detection using InsightFace (SOTA).""" print(f"Processing (InsightFace): {input_file} | Mode: {face_mode}") cap = cv2.VideoCapture(input_file) if not cap.isOpened(): print(f"Error opening video: {input_file}") return fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Using mp4v for container, but final mux will fix encoding fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_file, fourcc, fps, (1080, 1920)) # Dynamic Interval Logic next_detection_frame = 0 last_detected_faces = None last_frame_face_positions = None last_success_frame = -1000 max_frames_without_detection = int(3.0 * fps) # 3 seconds timeout transition_duration = 4 # Smooth transition over 4 frames (almost continuous) transition_frames = [] # Current state of face mode (1 or 2) # If auto, we decide per detection interval current_num_faces_state = 1 if face_mode == "2": current_num_faces_state = 2 frame_1_face_count = 0 frame_2_face_count = 0 buffered_frame = None # Timeline tracking: list of (frame_index, mode_str) # We will compress this later. timeline_frames = [] # Store mode for *every written frame* or at least detection points timeline_frames = [] # Store mode for *every written frame* or at least detection points coordinate_log = [] # Store raw face coordinates frame-by-frame # For Active Speaker Logic # Map of "Face ID" to activity score? # Since we don't have ID tracker, we blindly assign score to faces based on proximity to previous frame # A list of dictionaries: [{'center': (x,y), 'activity': score}, ...] faces_activity_state = [] for frame_index in range(total_frames): if buffered_frame is not None: frame = buffered_frame ret = True buffered_frame = None else: ret, frame = cap.read() if not ret or frame is None: break if frame_index >= next_detection_frame and len(transition_frames) == 0: # Detect faces faces = detect_faces_insightface(frame) if faces: scores = [f"{f.get('det_score',0):.2f}" for f in faces] print(f"DEBUG: Frame {frame_index} | Raw Faces: {len(faces)} | Scores: {scores}") else: pass # print(f"DEBUG: Frame {frame_index} | No Raw Faces") # --- ACTIVITY / SPEAKER DETECTION --- # (Feature currently disabled for stability - relying on simple size checks) last_raw_faces = faces # ------------------------------------ # --- INTELLIGENT FILTERING --- valid_faces = [] if faces: # 1. Filter by confidence (Using user threshold) faces = [f for f in faces if f.get('det_score', 0) > confidence_threshold] if faces: # Pre-calculate areas and SPEAKER SCORE for f in faces: w = f['bbox'][2] - f['bbox'][0] h = f['bbox'][3] - f['bbox'][1] f['area'] = w * h f['center'] = ((f['bbox'][0] + f['bbox'][2]) / 2, (f['bbox'][1] + f['bbox'][3]) / 2) act = f.get('activity', 0) f['effective_area'] = f['area'] * (1.0 + (act * 0.05)) # Find largest face max_area = max(f['area'] for f in faces) # 2. Relative Size Filter valid_faces = [f for f in faces if f['area'] > (filter_threshold * max_area)] if len(valid_faces) < len(faces): print(f"DEBUG: Filtered {len(faces)-len(valid_faces)} small faces. Max Area: {max_area}. Filter Thresh: {filter_threshold}") faces = valid_faces # --- ACTIVE SPEAKER UPDATE --- if faces: # 1. Update activity scores for current faces # Simple matching to previous state current_state_map = [] for f in faces: # Calculate instantaneous openness mar = 0 if 'landmark_3d_68' in f: mar = calculate_mouth_ratio(f['landmark_3d_68']) elif 'landmark_2d_106' in f: # Fallback or Todo: map 106 to 68 approximate # 106 indices: 52-71 are lips. # Inner roughly 64-71? # Let's rely on 3d_68 which is standard in buffalo_l pass f['mouth_ratio'] = mar # Heuristic: Ratio > 0.05 implies openish, > 0.1 talk. # Adjust thresholds: 0.03 is common for closed mouth, 0.05 is starting to open. # Log raw MAR for debugging # print(f"DEBUG: Frame {frame_index} Face {i} MAR: {mar:.4f}") is_talking = 1.0 if mar > active_speaker_mar else 0.0 # --- CROWD MODE LOGIC --- # If too many faces, don't even try to track. Fallback to No-Face logic (Zoom/Padding) CROWD_THRESHOLD = 7 # FIX: Use last_raw_faces (before size filtering) so we count background people too! is_crowd = len(last_raw_faces) >= CROWD_THRESHOLD if is_crowd: print(f"DEBUG: Crowd Mode Active! {len(faces)} faces >= {CROWD_THRESHOLD}. Triggering Fallback (No Face Mode).") faces = [] valid_faces = [] # CAUTION: Must clear strict backup too! # FORCE RESET HISTORY so it doesn't "stick" to the last face found last_detected_faces = None transition_frames = [] faces_activity_state = [] zoom_ema_bbox = None # Reset smoothing too # --------------------------- # Update Activity State - Two Pass for Global Motion Compensation if focus_active_speaker and faces: # Pass 1: Global Motion (Camera Shake) Calculation # We calculate motion for ALL confident faces (before size filtering) to get best global estimate raw_motions = [] # First, ensure we have a temporary mapping of current faces to history # We do this non-destructively just to get motion values for f in faces: my_c = f['center'] best_dist = 9999 if faces_activity_state: for old_s in faces_activity_state: old_c = old_s['center'] dist = np.sqrt((my_c[0]-old_c[0])**2 + (my_c[1]-old_c[1])**2) if dist < best_dist: best_dist = dist if best_dist < 200: f['_raw_motion'] = best_dist else: f['_raw_motion'] = 0.0 if include_motion: raw_motions.append(f['_raw_motion']) global_motion = 0.0 if include_motion and len(raw_motions) >= 2: global_motion = min(raw_motions) # Pass 2: Update Scores for ALL faces current_state_map = [] for f in faces: # Helper: Is talking? is_talking = f.get('mouth_ratio', 0) > active_speaker_mar # Calculate Compensated Motion motion_bonus = 0.0 if include_motion and faces_activity_state: comp_motion = max(0.0, f.get('_raw_motion', 0.0) - global_motion) f['motion_val'] = comp_motion # Store for debug if comp_motion > active_speaker_motion_deadzone: motion_bonus = min(2.5, (comp_motion - active_speaker_motion_deadzone) * active_speaker_motion_sensitivity) else: f['motion_val'] = 0.0 # Accumulate Score matched_score = 0.0 # Re-find match to update history my_c = f['center'] best_dist = 9999 best_idx = -1 if faces_activity_state: for i, old_s in enumerate(faces_activity_state): old_c = old_s['center'] dist = np.sqrt((my_c[0]-old_c[0])**2 + (my_c[1]-old_c[1])**2) if dist < best_dist: best_dist = dist best_idx = i if best_idx != -1 and best_dist < 200: old_val = faces_activity_state[best_idx]['activity'] change = -abs(active_speaker_decay) if is_talking: change = 1.5 new_val = old_val + change + motion_bonus # Increased cap to 20.0 to allow motion differences to separate two 'talking' faces matched_score = max(0.0, min(20.0, new_val)) else: matched_score = 1.0 if is_talking else 0.0 f['activity_score'] = matched_score current_state_map.append({'center': f['center'], 'activity': matched_score}) faces_activity_state = current_state_map else: faces_activity_state = [] faces = valid_faces # Decide 1 or 2 faces target_faces = 1 if face_mode == "2": target_faces = 2 elif face_mode == "auto": if len(faces) >= 2: # Default decision variable decided = False if focus_active_speaker: # EXPERIMENTAL: Decide based on activity f1 = faces[0] f2 = faces[1] score1 = f1.get('activity_score', 0) score2 = f2.get('activity_score', 0) y1 = f1['center'][1] y2 = f2['center'][1] pos1 = "Top" if y1 < y2 else "Bottom" pos2 = "Top" if y2 < y1 else "Bottom" # Debug Active Speaker print(f"DEBUG: Frame {frame_index} | {pos1} (MAR: {f1.get('mouth_ratio',0):.3f}, Mov: {f1.get('motion_val',0):.1f}, Score: {score1:.1f}) | {pos2} (MAR: {f2.get('mouth_ratio',0):.3f}, Mov: {f2.get('motion_val',0):.1f}, Score: {score2:.1f})") # If one is clearly dominant active speaker # Lower threshold to make it more sensitive? # Score difference > 2.0 (approx 2-3 frames of talking difference vs silence) diff = abs(score1 - score2) # Check strict dominance first if diff > active_speaker_score_diff: # Pick the winner target_faces = 1 decided = True # Ensure the list is sorted by activity so [0] is the winner if score2 > score1: # Swap ensures [0] is the active one for later 1-face crop logic which takes [0] faces = [f2, f1] print(f"DEBUG: Active Speaker Focus Triggered! Diff ({diff:.2f}) > Thresh ({active_speaker_score_diff}). Focusing on Face {'2' if score2 > score1 else '1'}.") elif score1 > 4.0 and score2 > 4.0: # Both talking -> 2 faces # Raised threshold to 4.0 to avoid noise triggering split target_faces = 2 decided = True print(f"DEBUG: Dual Active Speakers! Both scores > 4.0. Forcing Split Mode.") # If scores are low (both silent), fallback to size ratio (decided=False) or force 1 if very silent? # Let's fallback to size. if not decided: # Standard Logic: Check relative sizes (effective area) faces_sorted_temp = sorted(faces, key=lambda f: f.get('effective_area', 0), reverse=True) largest = faces_sorted_temp[0]['effective_area'] second = faces_sorted_temp[1]['effective_area'] # Two-Face Constraint if second > (two_face_threshold * largest): target_faces = 2 else: target_faces = 1 else: target_faces = 1 # If no faces found effectively after filter if not faces and not valid_faces: # Logic ensures faces = valid_faces already pass # ----------------------------- # Fallback Lookahead: If detection fails or partial # But DO NOT look ahead if we are in Crowd Mode (we explicitly wanted 0 faces) if len(faces) < target_faces and not is_crowd: # Try 1 frame ahead ret2, frame2 = cap.read() if ret2 and frame2 is not None: faces2 = detect_faces_insightface(frame2) # --- Apply same filtering to lookahead --- valid_faces2 = [] if faces2: faces2 = [f for f in faces2 if f.get('det_score', 0) > 0.50] if faces2: for f in faces2: w = f['bbox'][2] - f['bbox'][0] h = f['bbox'][3] - f['bbox'][1] f['area'] = w * h f['center'] = ((f['bbox'][0] + f['bbox'][2]) / 2, (f['bbox'][1] + f['bbox'][3]) / 2) f['effective_area'] = f['area'] # Default for lookahead max_area2 = max(f['area'] for f in faces2) # STRICTER FILTER: threshold of max area valid_faces2 = [f for f in faces2 if f['area'] > (filter_threshold * max_area2)] faces2 = valid_faces2 # ---------------------------------------- # If lookahead found what we wanted OR found something better than nothing if len(faces2) >= target_faces: faces = faces2 # Use lookahead faces for current frame elif len(faces) == 0 and len(faces2) > 0: faces = faces2 # Better than nothing buffered_frame = frame2 # Store for next iteration detections = [] if len(faces) >= target_faces: # --- FACE TRACKING / SORTING --- # Instead of just Area, we prioritize faces closer to the LAST detected face # This prevents switching to a background person if sizes are similar if last_detected_faces is not None and len(last_detected_faces) == target_faces: # Define score function: High Area is good, Low Distance to old is good. # But simpler: calculate Intersection over Union (IOU) or Distance to old bbox center # We want to match existing slots. # For 1 face: if target_faces == 1: old_center = get_center_bbox(last_detected_faces[0]) def sort_score(f): # Distance score (lower is better) dist = np.sqrt((f['center'][0] - old_center[0])**2 + (f['center'][1] - old_center[1])**2) # EFFECTIVE Area score (higher is better) # Weight distance more heavily to keep consistency, but allow activity to swap focus if significant # normalized score? return dist - (f['effective_area'] * 0.0001) faces_sorted = sorted(faces, key=sort_score) else: # For 2 faces, just sort by effective area for now as proximity sort happens later faces_sorted = sorted(faces, key=lambda f: f['effective_area'], reverse=True) else: # No history, sort by effective area if focus_active_speaker and target_faces == 1: # Pick the one with highest activity score faces_sorted = sorted(faces, key=lambda f: f.get('activity_score', 0), reverse=True) else: faces_sorted = sorted(faces, key=lambda f: f.get('effective_area', 0), reverse=True) if target_faces == 2: # Convert [x1, y1, x2, y2] to (x, y, w, h) logic is later # Ensure we have 2 faces f1 = faces_sorted[0]['bbox'] f2 = faces_sorted[1]['bbox'] if last_detected_faces is not None and len(last_detected_faces) == 2: detections = sort_by_proximity([f1, f2], last_detected_faces, get_center_bbox) else: detections = [f1, f2] current_num_faces_state = 2 else: # 1 face detections = [faces_sorted[0]['bbox']] current_num_faces_state = 1 else: # If we wanted 2 but found 1, or wanted 1 found 0 if len(faces) > 0: # Fallback to 1 face if found at least 1 faces_sorted = sorted(faces, key=lambda f: f['effective_area'], reverse=True) detections = [faces_sorted[0]['bbox']] current_num_faces_state = 1 else: detections = [] if detections: # --- STABILIZATION (DEAD ZONE) --- # Check if movement is small enough to ignore if last_detected_faces is not None and len(last_detected_faces) == len(detections): is_stable = True for i in range(len(detections)): old_c = get_center_bbox(last_detected_faces[i]) new_c = get_center_bbox(detections[i]) dist = np.sqrt((old_c[0]-new_c[0])**2 + (old_c[1]-new_c[1])**2) # Threshold: dead_zone variable (pixels) # Reduced jitter for talking heads if dist > dead_zone: is_stable = False break if is_stable: # Keep old position to prevent "shaky cam" detections = last_detected_faces # Clear transition logic (snap) or keep it empty transition_frames = [] # --------------------------------- if last_frame_face_positions is not None and len(last_frame_face_positions) == len(detections): # Only transition if we decided to MOVE (i.e., not stable) forced_transition = True if last_detected_faces is not None and len(detections) == len(last_detected_faces): # Manual check to avoid numpy ambiguity arrays_equal = True for i in range(len(detections)): if not np.array_equal(detections[i], last_detected_faces[i]): arrays_equal = False break if arrays_equal: forced_transition = False if not transition_frames and forced_transition: # Transition start_faces = np.array(last_frame_face_positions) end_faces = np.array(detections) steps = transition_duration transition_frames = [] for s in range(steps): t = (s + 1) / steps interp = (1 - t) * start_faces + t * end_faces transition_frames.append(interp.astype(int).tolist()) # Optimization removed to avoid "Ambiguous truth value of array" error # if detections == last_detected_faces: caused crash else: # Reset transition if face count changed or first detect transition_frames = [] last_detected_faces = detections last_success_frame = frame_index else: pass # Update next detection frame based on NEW state step = 5 # Default fallback (very fast) if detection_period is not None: if isinstance(detection_period, dict): # Period depends on state key = str(current_num_faces_state) # fallback to '1' if key not found (should be there) val = detection_period.get(key, detection_period.get('1', 0.2)) step = max(1, int(val * fps)) else: # Legacy float support (should not happen with new main.py but good safety) step = max(1, int(detection_period * fps)) elif current_num_faces_state == 2: step = int(1.0 * fps) # 1s for 2 faces else: step = 5 # 5 frames for 1 face (~0.16s at 30fps) next_detection_frame = frame_index + step if len(transition_frames) > 0: current_faces = transition_frames[0] transition_frames = transition_frames[1:] elif last_detected_faces is not None and (frame_index - last_success_frame) <= max_frames_without_detection: current_faces = last_detected_faces else: # Fallback for this frame if no_face_mode == "zoom": result = crop_center_zoom(frame) else: result = resize_with_padding(frame) out.write(result) timeline_frames.append((frame_index, "1")) # Fix: Ensure fallback is treated as single face for subs # Fix XML Log sync (Empty faces for fallback) coords_entry = {"frame": frame_index, "src_size": [frame_width, frame_height], "faces": []} coordinate_log.append(coords_entry) continue last_frame_face_positions = current_faces target_len = len(current_faces) if target_len == 2: frame_2_face_count += 1 # Convert [x1, y1, x2, y2] to (x, y, w, h) f1 = current_faces[0] f2 = current_faces[1] rect1 = (f1[0], f1[1], f1[2]-f1[0], f1[3]-f1[1]) rect2 = (f2[0], f2[1], f2[2]-f2[0], f2[3]-f2[1]) result = crop_and_resize_two_faces(frame, [rect1, rect2]) timeline_frames.append((frame_index, "2")) else: frame_1_face_count += 1 # 1 face # current_faces[0] is [x1, y1, x2, y2] result = crop_and_resize_insightface(frame, current_faces[0]) timeline_frames.append((frame_index, "1")) # Capture Coordinates (Frame-by-Frame) coords_entry = {"frame": frame_index, "src_size": [frame_width, frame_height], "faces": []} try: # We want to store [x1, y1, x2, y2, rh] for each face if isinstance(current_faces, (list, tuple)): processed_faces_log = [] for f in current_faces: f_list = list(map(int, f[:4])) # Standard bbox # Calculate rh (relative height) face_h = f_list[3] - f_list[1] rh = face_h / float(frame_height) f_list.append(float(f"{rh:.4f}")) # Append as 5th element processed_faces_log.append(f_list) coords_entry["faces"] = processed_faces_log elif isinstance(current_faces, np.ndarray): # Similar logic for numpy processed_faces_log = [] for f in current_faces: f_list = f[:4].astype(int).tolist() face_h = f_list[3] - f_list[1] rh = face_h / float(frame_height) f_list.append(float(f"{rh:.4f}")) processed_faces_log.append(f_list) coords_entry["faces"] = processed_faces_log except: pass coordinate_log.append(coords_entry) out.write(result) cap.release() out.release() # Compress timeline into segments # [(start_time, end_time, mode), ...] compressed_timeline = [] if timeline_frames: curr_mode = timeline_frames[0][1] start_f = timeline_frames[0][0] for i in range(1, len(timeline_frames)): frame_idx, mode = timeline_frames[i] if mode != curr_mode: # End current segment # Convert frame to seconds end_f = timeline_frames[i-1][0] compressed_timeline.append({ "start": float(start_f) / fps, "end": float(end_f) / fps, # or frame_idx / fps for continuity "mode": curr_mode }) # Start new curr_mode = mode start_f = frame_idx # Add last end_f = timeline_frames[-1][0] compressed_timeline.append({ "start": float(start_f) / fps, "end": (float(end_f) + 1) / fps, "mode": curr_mode }) # Save timeline JSON timeline_file = output_file.replace(".mp4", "_timeline.json") try: import json with open(timeline_file, "w") as f: json.dump(compressed_timeline, f) print(f"Timeline saved: {timeline_file}") except Exception as e: print(f"Error saving timeline: {e}") # Save Coords JSON coords_file = output_file.replace(".mp4", "_coords.json") try: with open(coords_file, "w") as f: json.dump(coordinate_log, f) print(f"Face Coordinates saved: {coords_file}") except Exception as e: print(f"Error saving coords: {e}") finalize_video(input_file, output_file, index, fps, project_folder, final_folder) # Return dominant mode logic (or keep 15% rule as overall fallback) if frame_2_face_count > (total_frames * 0.15): return "2" return "1" def edit(project_folder="tmp", face_model="insightface", face_mode="auto", detection_period=None, filter_threshold=0.35, two_face_threshold=0.60, confidence_threshold=0.30, dead_zone=40, focus_active_speaker=False, active_speaker_mar=0.03, active_speaker_score_diff=1.5, include_motion=False, active_speaker_motion_deadzone=3.0, active_speaker_motion_sensitivity=0.05, active_speaker_decay=2.0, segments_data=None, no_face_mode="padding"): # Lazy init solutions only when needed to avoid AttributeError if import failed partially mp_face_detection = None mp_face_mesh = None mp_pose = None index = 0 cuts_folder = os.path.join(project_folder, "cuts") final_folder = os.path.join(project_folder, "final") os.makedirs(final_folder, exist_ok=True) face_modes_log = {} # Priority: User Choice -> Fallbacks insightface_working = False # Only init InsightFace if selected or default if INSIGHTFACE_AVAILABLE and (face_model == "insightface"): try: print("Initializing InsightFace...") init_insightface() insightface_working = True print("InsightFace Initialized Successfully.") except Exception as e: print(f"WARNING: InsightFace Initialization Failed ({e}). Will try MediaPipe.") insightface_working = False mediapipe_working = False use_haar = False # If insightface failed OR user chose mediapipe, init mediapipe should_use_mediapipe = (face_model == "mediapipe") or (face_model == "insightface" and not insightface_working) if should_use_mediapipe: try: # Check if solutions is available (it might not be if import failed silently or partial) if not hasattr(mp, 'solutions'): raise ImportError("mediapipe.solutions not found") mp_face_detection = mp.solutions.face_detection mp_face_mesh = mp.solutions.face_mesh mp_pose = mp.solutions.pose # Try to init with model_selection=0 (Short Range) as a smoketest with mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) as fd: pass mediapipe_working = True print("MediaPipe Initialized Successfully.") except Exception as e: print(f"WARNING: MediaPipe Initialization Failed ({e}). Switching to OpenCV Haar Cascade.") mediapipe_working = False use_haar = True # Logic for MediaPipe replaced by dynamic pass # mp_num_faces = 2 if face_mode == "2" else 1 import glob found_files = sorted(glob.glob(os.path.join(cuts_folder, "*_original_scale.mp4"))) if not found_files: print(f"No files found in {cuts_folder}.") # Try finding lookahead in case listdir failed? No, glob is fine. return for input_file in found_files: input_filename = os.path.basename(input_file) # Extract Index index = 0 try: parts = input_filename.split('_') if parts[0].isdigit(): index = int(parts[0]) elif input_filename.startswith("output"): # output000 idx_str = input_filename[6:9] if idx_str.isdigit(): index = int(idx_str) except: pass output_file = os.path.join(final_folder, f"temp_video_no_audio_{index}.mp4") # Determine Final Name (Title) base_name_final = input_filename.replace("_original_scale.mp4", "") # If legacy name, try to improve it if input_filename.startswith("output") and segments_data and index < len(segments_data): title = segments_data[index].get("title", f"Segment_{index}") safe_title = "".join([c for c in title if c.isalnum() or c in " _-"]).strip().replace(" ", "_")[:60] base_name_final = f"{index:03d}_{safe_title}" if os.path.exists(input_file): success = False detected_mode = "1" # Default if detection fails or fallback # 1. Try InsightFace if insightface_working: try: # Capture returned mode res = generate_short_insightface(input_file, output_file, index, project_folder, final_folder, face_mode=face_mode, detection_period=detection_period, filter_threshold=filter_threshold, two_face_threshold=two_face_threshold, confidence_threshold=confidence_threshold, dead_zone=dead_zone, focus_active_speaker=focus_active_speaker, active_speaker_mar=active_speaker_mar, active_speaker_score_diff=active_speaker_score_diff, include_motion=include_motion, active_speaker_motion_deadzone=active_speaker_motion_deadzone, active_speaker_motion_sensitivity=active_speaker_motion_sensitivity, active_speaker_decay=active_speaker_decay, no_face_mode=no_face_mode) if res: detected_mode = res success = True except Exception as e: import traceback traceback.print_exc() print(f"InsightFace processing failed for {input_filename}: {e}") print("Falling back to MediaPipe/Haar...") # 2. Try MediaPipe if InsightFace failed or not available if not success and mediapipe_working: try: with mp_face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.2) as face_detection, \ mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=2, refine_landmarks=True, min_detection_confidence=0.2, min_tracking_confidence=0.2) as face_mesh, \ mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose: generate_short_mediapipe(input_file, output_file, index, face_mode, project_folder, final_folder, face_detection, face_mesh, pose, detection_period=detection_period, no_face_mode=no_face_mode) # We don't easily know detected mode here without return, assuming '1' or '2' based on last frame? # Ideally function should return as well. detected_mode = "1" # Placeholder, user didn't complain about stats. # detected_mode = str(mp_num_faces) # Error fix: mp_num_faces not defined if face_mode == "2": detected_mode = "2" success = True except Exception as e: print(f"MediaPipe processing failed (fallback): {e}") # 3. Try Haar if others failed if not success and (use_haar or (not mediapipe_working and not insightface_working)): try: print("Attempts with Haar Cascade...") generate_short_haar(input_file, output_file, index, project_folder, final_folder, detection_period=detection_period, no_face_mode=no_face_mode) success = True except Exception as e2: print(f"Haar fallback also failed: {e2}") # 4. Last Resort: Center Crop if not success: generate_short_fallback(input_file, output_file, index, project_folder, final_folder, no_face_mode=no_face_mode) detected_mode = "1" success = True # Save mode face_modes_log[f"output{str(index).zfill(3)}"] = detected_mode if success: try: new_mp4_name = f"{base_name_final}.mp4" new_mp4_path = os.path.join(final_folder, new_mp4_name) # Source is what finalize_video created # finalize_video creates `final-output{index}_processed.mp4` generated_mp4_name = f"final-output{str(index).zfill(3)}_processed.mp4" generated_mp4_path = os.path.join(final_folder, generated_mp4_name) # 1. Rename MP4 if os.path.exists(generated_mp4_path): if os.path.exists(new_mp4_path): os.remove(new_mp4_path) os.rename(generated_mp4_path, new_mp4_path) print(f"Renamed Output to Title: {new_mp4_name}") # 2. Rename JSON Subtitle (if exists and hasn't been renamed by cut_segments) subs_folder = os.path.join(project_folder, "subs") # Check if legacy name exists old_json_name = f"final-output{str(index).zfill(3)}_processed.json" old_json_path = os.path.join(subs_folder, old_json_name) new_json_name = f"{base_name_final}_processed.json" new_json_path = os.path.join(subs_folder, new_json_name) if os.path.exists(old_json_path): if os.path.exists(new_json_path): os.remove(new_json_path) os.rename(old_json_path, new_json_path) print(f"Renamed Subtitles to Title: {new_json_name}") # 3. Rename Timeline JSON # Timeline is temp_video_no_audio_{index}_timeline.json (created by generate_short...) old_timeline_name = f"temp_video_no_audio_{index}_timeline.json" old_timeline_path = os.path.join(final_folder, old_timeline_name) new_timeline_name = f"{base_name_final}_timeline.json" new_timeline_path = os.path.join(final_folder, new_timeline_name) if os.path.exists(old_timeline_path): if os.path.exists(new_timeline_path): os.remove(new_timeline_path) os.rename(old_timeline_path, new_timeline_path) print(f"Renamed Timeline to Title: {new_timeline_name}") # 4. Rename Coords JSON old_coords_name = f"temp_video_no_audio_{index}_coords.json" old_coords_path = os.path.join(final_folder, old_coords_name) new_coords_name = f"{base_name_final}_coords.json" new_coords_path = os.path.join(final_folder, new_coords_name) if os.path.exists(old_coords_path): if os.path.exists(new_coords_path): os.remove(new_coords_path) os.rename(old_coords_path, new_coords_path) print(f"Renamed Coords to Title: {new_coords_name}") except Exception as e: print(f"Warning: Could not rename file with title: {e}") # Save Face Modes to JSON for subtitle usage modes_file = os.path.join(project_folder, "face_modes.json") try: import json with open(modes_file, "w") as f: json.dump(face_modes_log, f) print(f"Detect Stats saved: {modes_file}") except Exception as e: print(f"Error saving face modes: {e}") if __name__ == "__main__": edit()