ViralCutterPRO / scripts /edit_video.py
RafaG's picture
Upload 24 files
1496c35 verified
import cv2
import numpy as np
import os
import subprocess
import mediapipe as mp
from scripts.one_face import crop_and_resize_single_face, resize_with_padding, detect_face_or_body, crop_center_zoom
from scripts.two_face import crop_and_resize_two_faces, detect_face_or_body_two_faces
try:
from scripts.face_detection_insightface import init_insightface, detect_faces_insightface, crop_and_resize_insightface
INSIGHTFACE_AVAILABLE = True
except ImportError:
INSIGHTFACE_AVAILABLE = False
print("InsightFace not found or error importing. Install with: pip install insightface onnxruntime-gpu")
# Global cache for encoder
CACHED_ENCODER = None
def get_best_encoder():
global CACHED_ENCODER
if CACHED_ENCODER: return CACHED_ENCODER
try:
# Check available encoders
result = subprocess.run(['ffmpeg', '-hide_banner', '-encoders'], capture_output=True, text=True)
output = result.stdout
# Priority: NVENC (NVIDIA) > AMF (AMD) > QSV (Intel) > CPU
if "h264_nvenc" in output:
print("Encoder Detected: NVIDIA (h264_nvenc)")
CACHED_ENCODER = ("h264_nvenc", "fast") # p1-p7 presets could be used but 'fast' maps well
return CACHED_ENCODER
if "h264_amf" in output:
print("Encoder Detected: AMD (h264_amf)")
CACHED_ENCODER = ("h264_amf", "speed") # quality, speed, balanced
return CACHED_ENCODER
if "h264_qsv" in output:
print("Encoder Detected: Intel QSV (h264_qsv)")
CACHED_ENCODER = ("h264_qsv", "veryfast")
return CACHED_ENCODER
# Mac OS (VideoToolbox)
if "h264_videotoolbox" in output:
print("Encoder Detected: MacOS (h264_videotoolbox)")
CACHED_ENCODER = ("h264_videotoolbox", "default")
return CACHED_ENCODER
except Exception as e:
print(f"Error checking encoders: {e}")
print("Encoder Detected: CPU (libx264)")
CACHED_ENCODER = ("libx264", "ultrafast")
return CACHED_ENCODER
def get_center_bbox(bbox):
# bbox: [x1, y1, x2, y2]
return ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2)
def get_center_rect(rect):
# rect: (x, y, w, h)
return (rect[0] + rect[2] / 2, rect[1] + rect[3] / 2)
def sort_by_proximity(new_faces, old_faces, center_func):
"""
Sorts new_faces to match the order of old_faces based on distance.
new_faces: list of face objects (bbox or tuple)
old_faces: list of face objects (bbox or tuple)
center_func: function that takes a face object and returns (cx, cy)
"""
if not old_faces or len(old_faces) != 2 or len(new_faces) != 2:
return new_faces
old_c1 = center_func(old_faces[0])
old_c2 = center_func(old_faces[1])
new_c1 = center_func(new_faces[0])
new_c2 = center_func(new_faces[1])
# Cost if we keep order: [new1, new2]
# dist(old1, new1) + dist(old2, new2)
dist_keep = ((old_c1[0]-new_c1[0])**2 + (old_c1[1]-new_c1[1])**2) + \
((old_c2[0]-new_c2[0])**2 + (old_c2[1]-new_c2[1])**2)
# Cost if we swap: [new2, new1]
# dist(old1, new2) + dist(old2, new1)
dist_swap = ((old_c1[0]-new_c2[0])**2 + (old_c1[1]-new_c2[1])**2) + \
((old_c2[0]-new_c1[0])**2 + (old_c2[1]-new_c1[1])**2)
# If swapping reduces total movement distance, do it
if dist_swap < dist_keep:
return [new_faces[1], new_faces[0]]
return new_faces
def generate_short_fallback(input_file, output_file, index, project_folder, final_folder, no_face_mode="padding"):
"""Fallback function: Center Crop (Zoom) or Padding if detection fails."""
print(f"Processing (Fallback): {input_file} | Mode: {no_face_mode}")
cap = cv2.VideoCapture(input_file)
if not cap.isOpened():
print(f"Error opening video: {input_file}")
return
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Target dimensions (9:16)
target_width = 1080
target_height = 1920
encoder_name, encoder_preset = get_best_encoder()
# Use FFmpeg Pipe instead of cv2.VideoWriter to avoid OpenCV backend errors
ffmpeg_cmd = [
'ffmpeg', '-y', '-loglevel', 'error', '-hide_banner', '-stats',
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-s', f'{target_width}x{target_height}',
'-pix_fmt', 'bgr24',
'-r', str(fps),
'-i', '-',
'-c:v', encoder_name,
'-preset', encoder_preset,
'-pix_fmt', 'yuv420p',
output_file
]
# If using hardware encoder, we might want to set bitrate to ensure quality
if "nvenc" in encoder_name or "amf" in encoder_name:
ffmpeg_cmd.extend(["-b:v", "5M"])
process = subprocess.Popen(ffmpeg_cmd, stdin=subprocess.PIPE)
while True:
ret, frame = cap.read()
if not ret:
break
if no_face_mode == "zoom":
result = crop_center_zoom(frame)
else:
result = resize_with_padding(frame)
try:
# Write raw bytes to ffmpeg stdin
process.stdin.write(result.tobytes())
except Exception as e:
print(f"Error writing frame to ffmpeg pipe: {e}")
pass
cap.release()
process.stdin.close()
process.wait()
finalize_video(input_file, output_file, index, fps, project_folder, final_folder)
def finalize_video(input_file, output_file, index, fps, project_folder, final_folder):
"""Mux audio and video."""
audio_file = os.path.join(project_folder, "cuts", f"output-audio-{index}.aac")
subprocess.run(["ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-i", input_file, "-vn", "-acodec", "copy", audio_file],
check=False, capture_output=True)
if os.path.exists(audio_file) and os.path.getsize(audio_file) > 0:
final_output = os.path.join(final_folder, f"final-output{str(index).zfill(3)}_processed.mp4")
encoder_name, encoder_preset = get_best_encoder()
command = [
"ffmpeg", "-y", "-hide_banner", "-loglevel", "error", "-stats",
"-i", output_file,
"-i", audio_file,
"-c:v", encoder_name, "-preset", encoder_preset, "-b:v", "5M",
"-c:a", "aac", "-b:a", "192k",
"-r", str(fps),
final_output
]
try:
subprocess.run(command, check=True) #, capture_output=True)
print(f"Final file generated: {final_output}")
try:
os.remove(audio_file)
os.remove(output_file)
except:
pass
except subprocess.CalledProcessError as e:
print(f"Error muxing: {e}")
else:
print(f"Warning: No audio extracted for {input_file}")
def calculate_mouth_ratio(landmarks):
"""
Calculate Mouth Aspect Ratio (MAR) using 68-point landmarks (inner lips).
Indices:
Inner Lips: 60-67 (0-indexed 60 to 67)
Left Corner: 60
Right Corner: 64
Top Center: 62
Bottom Center: 66
"""
if landmarks is None:
return 0
# 3D points (x,y,z) or 2D (x,y). We use first 2 cols.
pts = landmarks.astype(float)
# Simple vertical vs horizontal
# Vertical
p62 = pts[62]
p66 = pts[66]
h = np.linalg.norm(p62[:2] - p66[:2])
# Horizontal
p60 = pts[60]
p64 = pts[64]
w = np.linalg.norm(p60[:2] - p64[:2])
if w < 1e-6: return 0
return h / w
def generate_short_mediapipe(input_file, output_file, index, face_mode, project_folder, final_folder, face_detection, face_mesh, pose, detection_period=None, no_face_mode="padding"):
try:
cap = cv2.VideoCapture(input_file)
if not cap.isOpened():
print(f"Error opening video: {input_file}")
return
fps = cap.get(cv2.CAP_PROP_FPS)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_file, fourcc, fps, (1080, 1920))
next_detection_frame = 0
current_interval = int(5 * fps) # Initial guess
# Initial Interval Logic if predefined
if detection_period is not None:
current_interval = max(1, int(detection_period * fps))
elif face_mode == "2":
current_interval = int(1.0 * fps)
last_detected_faces = None
last_frame_face_positions = None
last_success_frame = -1000
max_frames_without_detection = int(3.0 * fps) # 3 seconds timeout
transition_duration = int(fps)
transition_frames = []
for frame_index in range(total_frames):
ret, frame = cap.read()
if not ret or frame is None:
break
if frame_index >= next_detection_frame:
# Detect ALL faces (up to 2 in our implementation)
detections = detect_face_or_body_two_faces(frame, face_detection, face_mesh, pose)
# Dynamic Logic
target_faces = 1
if face_mode == "2":
target_faces = 2
elif face_mode == "auto":
if detections and len(detections) >= 2:
target_faces = 2
else:
target_faces = 1
# Filter detections based on target
current_detections = []
if detections:
# Sort detections by approximate Area (w*h) descending to pick main faces first
detections.sort(key=lambda s: s[2] * s[3], reverse=True)
if len(detections) >= target_faces:
current_detections = detections[:target_faces]
elif len(detections) > 0:
# Fallback
current_detections = detections[:1]
target_faces = 1
# Apply Consistency Check (Proximity)
if target_faces == 2 and len(current_detections) == 2:
if last_detected_faces is not None and len(last_detected_faces) == 2:
current_detections = sort_by_proximity(current_detections, last_detected_faces, get_center_rect)
# Check for stability/lookahead could go here but skipping for brevity unless requested.
if current_detections and len(current_detections) == target_faces:
if last_frame_face_positions is not None:
start_faces = np.array(last_frame_face_positions)
end_faces = np.array(current_detections)
try:
transition_frames = np.linspace(start_faces, end_faces, transition_duration, dtype=int)
except Exception as e:
# Fallback if shapes mismatch unexpectedly
transition_frames = []
else:
transition_frames = []
last_detected_faces = current_detections
last_success_frame = frame_index
else:
pass
# Update next detection frame
step = 5
if detection_period is not None:
if isinstance(detection_period, dict):
# If we are targeting 2 faces, we use '2' interval, else '1'
key = str(target_faces)
val = detection_period.get(key, detection_period.get('1', 0.2))
step = max(1, int(val * fps))
else:
step = max(1, int(detection_period * fps))
elif target_faces == 2:
step = int(1.0 * fps)
else:
step = int(5) # 5 frames for 1 face
next_detection_frame = frame_index + step
if len(transition_frames) > 0:
current_faces = transition_frames[0]
transition_frames = transition_frames[1:]
elif last_detected_faces is not None and (frame_index - last_success_frame) <= max_frames_without_detection:
current_faces = last_detected_faces
else:
if no_face_mode == "zoom":
result = crop_center_zoom(frame)
else:
result = resize_with_padding(frame)
coordinate_log.append({"frame": frame_index, "faces": []})
out.write(result)
continue
last_frame_face_positions = current_faces
if hasattr(current_faces, '__len__') and len(current_faces) == 2:
result = crop_and_resize_two_faces(frame, current_faces)
else:
# Ensure it's list of tuples or single tuple? current_faces is list of tuples from detection
# If 1 face: [ (x,y,w,h) ]
if hasattr(current_faces, '__len__') and len(current_faces) > 0:
f = current_faces[0]
result = crop_and_resize_single_face(frame, f)
else:
if no_face_mode == "zoom":
result = crop_center_zoom(frame)
else:
result = resize_with_padding(frame)
out.write(result)
cap.release()
out.release()
finalize_video(input_file, output_file, index, fps, project_folder, final_folder)
except Exception as e:
print(f"Error in MediaPipe processing: {e}")
raise e # Rethrow to trigger fallback
def generate_short_haar(input_file, output_file, index, project_folder, final_folder, detection_period=None, no_face_mode="padding"):
"""Face detection using OpenCV Haar Cascades."""
print(f"Processing (Haar Cascade): {input_file}")
# Load Haar Cascade
cascade_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
face_cascade = cv2.CascadeClassifier(cascade_path)
if face_cascade.empty():
print("Error: Could not load Haar Cascade XML. Falling back to center crop.")
generate_short_fallback(input_file, output_file, index, project_folder, final_folder)
return
cap = cv2.VideoCapture(input_file)
if not cap.isOpened():
print(f"Error opening video: {input_file}")
return
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_file, fourcc, fps, (1080, 1920))
# Logic copied from generate_short_mediapipe
detection_interval = int(2 * fps) # Default check every 2 seconds
if detection_period is not None:
detection_interval = max(1, int(detection_period * fps))
last_detected_faces = None
last_frame_face_positions = None
last_success_frame = -1000
max_frames_without_detection = int(3.0 * fps)
transition_duration = int(fps) # 1 second smooth transition
transition_frames = []
for frame_index in range(total_frames):
ret, frame = cap.read()
if not ret or frame is None:
break
if frame_index % detection_interval == 0:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(gray, 1.1, 4)
detections = []
if len(faces) > 0:
# Pick largest face
largest_face = max(faces, key=lambda f: f[2] * f[3])
# Ensure int type
detections = [tuple(map(int, largest_face))]
if detections:
if last_frame_face_positions is not None:
# Simple linear interpolation for smoothing
start_faces = np.array(last_frame_face_positions)
end_faces = np.array(detections)
# Generate transition frames
steps = transition_duration
transition_frames = []
for s in range(steps):
t = (s + 1) / steps
interp = (1 - t) * start_faces + t * end_faces
transition_frames.append(interp.astype(int).tolist()) # Convert back to list of lists/tuples
else:
transition_frames = []
last_detected_faces = detections
last_success_frame = frame_index
else:
pass
if len(transition_frames) > 0:
current_faces = transition_frames[0]
transition_frames = transition_frames[1:]
elif last_detected_faces is not None and (frame_index - last_success_frame) <= max_frames_without_detection:
current_faces = last_detected_faces
else:
# No face detected for a while -> Center/Padding fallback
if no_face_mode == "zoom":
result = crop_center_zoom(frame)
else:
result = resize_with_padding(frame)
out.write(result)
continue
last_frame_face_positions = current_faces
# haar detections are list containing one tuple (x,y,w,h)
# current_faces is list of one tuple
if isinstance(current_faces, list):
face_bbox = current_faces[0]
else:
face_bbox = current_faces # Should be handled
result = crop_and_resize_single_face(frame, face_bbox)
out.write(result)
cap.release()
out.release()
finalize_video(input_file, output_file, index, fps, project_folder, final_folder)
finalize_video(input_file, output_file, index, fps, project_folder, final_folder)
finalize_video(input_file, output_file, index, fps, project_folder, final_folder)
def generate_short_insightface(input_file, output_file, index, project_folder, final_folder, face_mode="auto", detection_period=None, filter_threshold=0.35, two_face_threshold=0.60, confidence_threshold=0.30, dead_zone=40, focus_active_speaker=False, active_speaker_mar=0.03, active_speaker_score_diff=1.5, include_motion=False, active_speaker_motion_deadzone=3.0, active_speaker_motion_sensitivity=0.05, active_speaker_decay=2.0, no_face_mode="padding"):
"""Face detection using InsightFace (SOTA)."""
print(f"Processing (InsightFace): {input_file} | Mode: {face_mode}")
cap = cv2.VideoCapture(input_file)
if not cap.isOpened():
print(f"Error opening video: {input_file}")
return
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Using mp4v for container, but final mux will fix encoding
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_file, fourcc, fps, (1080, 1920))
# Dynamic Interval Logic
next_detection_frame = 0
last_detected_faces = None
last_frame_face_positions = None
last_success_frame = -1000
max_frames_without_detection = int(3.0 * fps) # 3 seconds timeout
transition_duration = 4 # Smooth transition over 4 frames (almost continuous)
transition_frames = []
# Current state of face mode (1 or 2)
# If auto, we decide per detection interval
current_num_faces_state = 1
if face_mode == "2":
current_num_faces_state = 2
frame_1_face_count = 0
frame_2_face_count = 0
buffered_frame = None
# Timeline tracking: list of (frame_index, mode_str)
# We will compress this later.
timeline_frames = [] # Store mode for *every written frame* or at least detection points
timeline_frames = [] # Store mode for *every written frame* or at least detection points
coordinate_log = [] # Store raw face coordinates frame-by-frame
# For Active Speaker Logic
# Map of "Face ID" to activity score?
# Since we don't have ID tracker, we blindly assign score to faces based on proximity to previous frame
# A list of dictionaries: [{'center': (x,y), 'activity': score}, ...]
faces_activity_state = []
for frame_index in range(total_frames):
if buffered_frame is not None:
frame = buffered_frame
ret = True
buffered_frame = None
else:
ret, frame = cap.read()
if not ret or frame is None:
break
if frame_index >= next_detection_frame and len(transition_frames) == 0:
# Detect faces
faces = detect_faces_insightface(frame)
if faces:
scores = [f"{f.get('det_score',0):.2f}" for f in faces]
print(f"DEBUG: Frame {frame_index} | Raw Faces: {len(faces)} | Scores: {scores}")
else:
pass # print(f"DEBUG: Frame {frame_index} | No Raw Faces")
# --- ACTIVITY / SPEAKER DETECTION ---
# (Feature currently disabled for stability - relying on simple size checks)
last_raw_faces = faces
# ------------------------------------
# --- INTELLIGENT FILTERING ---
valid_faces = []
if faces:
# 1. Filter by confidence (Using user threshold)
faces = [f for f in faces if f.get('det_score', 0) > confidence_threshold]
if faces:
# Pre-calculate areas and SPEAKER SCORE
for f in faces:
w = f['bbox'][2] - f['bbox'][0]
h = f['bbox'][3] - f['bbox'][1]
f['area'] = w * h
f['center'] = ((f['bbox'][0] + f['bbox'][2]) / 2, (f['bbox'][1] + f['bbox'][3]) / 2)
act = f.get('activity', 0)
f['effective_area'] = f['area'] * (1.0 + (act * 0.05))
# Find largest face
max_area = max(f['area'] for f in faces)
# 2. Relative Size Filter
valid_faces = [f for f in faces if f['area'] > (filter_threshold * max_area)]
if len(valid_faces) < len(faces):
print(f"DEBUG: Filtered {len(faces)-len(valid_faces)} small faces. Max Area: {max_area}. Filter Thresh: {filter_threshold}")
faces = valid_faces
# --- ACTIVE SPEAKER UPDATE ---
if faces:
# 1. Update activity scores for current faces
# Simple matching to previous state
current_state_map = []
for f in faces:
# Calculate instantaneous openness
mar = 0
if 'landmark_3d_68' in f:
mar = calculate_mouth_ratio(f['landmark_3d_68'])
elif 'landmark_2d_106' in f:
# Fallback or Todo: map 106 to 68 approximate
# 106 indices: 52-71 are lips.
# Inner roughly 64-71?
# Let's rely on 3d_68 which is standard in buffalo_l
pass
f['mouth_ratio'] = mar
# Heuristic: Ratio > 0.05 implies openish, > 0.1 talk.
# Adjust thresholds: 0.03 is common for closed mouth, 0.05 is starting to open.
# Log raw MAR for debugging
# print(f"DEBUG: Frame {frame_index} Face {i} MAR: {mar:.4f}")
is_talking = 1.0 if mar > active_speaker_mar else 0.0
# --- CROWD MODE LOGIC ---
# If too many faces, don't even try to track. Fallback to No-Face logic (Zoom/Padding)
CROWD_THRESHOLD = 7
# FIX: Use last_raw_faces (before size filtering) so we count background people too!
is_crowd = len(last_raw_faces) >= CROWD_THRESHOLD
if is_crowd:
print(f"DEBUG: Crowd Mode Active! {len(faces)} faces >= {CROWD_THRESHOLD}. Triggering Fallback (No Face Mode).")
faces = []
valid_faces = [] # CAUTION: Must clear strict backup too!
# FORCE RESET HISTORY so it doesn't "stick" to the last face found
last_detected_faces = None
transition_frames = []
faces_activity_state = []
zoom_ema_bbox = None # Reset smoothing too
# ---------------------------
# Update Activity State - Two Pass for Global Motion Compensation
if focus_active_speaker and faces:
# Pass 1: Global Motion (Camera Shake) Calculation
# We calculate motion for ALL confident faces (before size filtering) to get best global estimate
raw_motions = []
# First, ensure we have a temporary mapping of current faces to history
# We do this non-destructively just to get motion values
for f in faces:
my_c = f['center']
best_dist = 9999
if faces_activity_state:
for old_s in faces_activity_state:
old_c = old_s['center']
dist = np.sqrt((my_c[0]-old_c[0])**2 + (my_c[1]-old_c[1])**2)
if dist < best_dist:
best_dist = dist
if best_dist < 200:
f['_raw_motion'] = best_dist
else:
f['_raw_motion'] = 0.0
if include_motion:
raw_motions.append(f['_raw_motion'])
global_motion = 0.0
if include_motion and len(raw_motions) >= 2:
global_motion = min(raw_motions)
# Pass 2: Update Scores for ALL faces
current_state_map = []
for f in faces:
# Helper: Is talking?
is_talking = f.get('mouth_ratio', 0) > active_speaker_mar
# Calculate Compensated Motion
motion_bonus = 0.0
if include_motion and faces_activity_state:
comp_motion = max(0.0, f.get('_raw_motion', 0.0) - global_motion)
f['motion_val'] = comp_motion # Store for debug
if comp_motion > active_speaker_motion_deadzone:
motion_bonus = min(2.5, (comp_motion - active_speaker_motion_deadzone) * active_speaker_motion_sensitivity)
else:
f['motion_val'] = 0.0
# Accumulate Score
matched_score = 0.0
# Re-find match to update history
my_c = f['center']
best_dist = 9999
best_idx = -1
if faces_activity_state:
for i, old_s in enumerate(faces_activity_state):
old_c = old_s['center']
dist = np.sqrt((my_c[0]-old_c[0])**2 + (my_c[1]-old_c[1])**2)
if dist < best_dist:
best_dist = dist
best_idx = i
if best_idx != -1 and best_dist < 200:
old_val = faces_activity_state[best_idx]['activity']
change = -abs(active_speaker_decay)
if is_talking:
change = 1.5
new_val = old_val + change + motion_bonus
# Increased cap to 20.0 to allow motion differences to separate two 'talking' faces
matched_score = max(0.0, min(20.0, new_val))
else:
matched_score = 1.0 if is_talking else 0.0
f['activity_score'] = matched_score
current_state_map.append({'center': f['center'], 'activity': matched_score})
faces_activity_state = current_state_map
else:
faces_activity_state = []
faces = valid_faces
# Decide 1 or 2 faces
target_faces = 1
if face_mode == "2":
target_faces = 2
elif face_mode == "auto":
if len(faces) >= 2:
# Default decision variable
decided = False
if focus_active_speaker:
# EXPERIMENTAL: Decide based on activity
f1 = faces[0]
f2 = faces[1]
score1 = f1.get('activity_score', 0)
score2 = f2.get('activity_score', 0)
y1 = f1['center'][1]
y2 = f2['center'][1]
pos1 = "Top" if y1 < y2 else "Bottom"
pos2 = "Top" if y2 < y1 else "Bottom"
# Debug Active Speaker
print(f"DEBUG: Frame {frame_index} | {pos1} (MAR: {f1.get('mouth_ratio',0):.3f}, Mov: {f1.get('motion_val',0):.1f}, Score: {score1:.1f}) | {pos2} (MAR: {f2.get('mouth_ratio',0):.3f}, Mov: {f2.get('motion_val',0):.1f}, Score: {score2:.1f})")
# If one is clearly dominant active speaker
# Lower threshold to make it more sensitive?
# Score difference > 2.0 (approx 2-3 frames of talking difference vs silence)
diff = abs(score1 - score2)
# Check strict dominance first
if diff > active_speaker_score_diff:
# Pick the winner
target_faces = 1
decided = True
# Ensure the list is sorted by activity so [0] is the winner
if score2 > score1:
# Swap ensures [0] is the active one for later 1-face crop logic which takes [0]
faces = [f2, f1]
print(f"DEBUG: Active Speaker Focus Triggered! Diff ({diff:.2f}) > Thresh ({active_speaker_score_diff}). Focusing on Face {'2' if score2 > score1 else '1'}.")
elif score1 > 4.0 and score2 > 4.0:
# Both talking -> 2 faces
# Raised threshold to 4.0 to avoid noise triggering split
target_faces = 2
decided = True
print(f"DEBUG: Dual Active Speakers! Both scores > 4.0. Forcing Split Mode.")
# If scores are low (both silent), fallback to size ratio (decided=False) or force 1 if very silent?
# Let's fallback to size.
if not decided:
# Standard Logic: Check relative sizes (effective area)
faces_sorted_temp = sorted(faces, key=lambda f: f.get('effective_area', 0), reverse=True)
largest = faces_sorted_temp[0]['effective_area']
second = faces_sorted_temp[1]['effective_area']
# Two-Face Constraint
if second > (two_face_threshold * largest):
target_faces = 2
else:
target_faces = 1
else:
target_faces = 1
# If no faces found effectively after filter
if not faces and not valid_faces:
# Logic ensures faces = valid_faces already
pass
# -----------------------------
# Fallback Lookahead: If detection fails or partial
# But DO NOT look ahead if we are in Crowd Mode (we explicitly wanted 0 faces)
if len(faces) < target_faces and not is_crowd:
# Try 1 frame ahead
ret2, frame2 = cap.read()
if ret2 and frame2 is not None:
faces2 = detect_faces_insightface(frame2)
# --- Apply same filtering to lookahead ---
valid_faces2 = []
if faces2:
faces2 = [f for f in faces2 if f.get('det_score', 0) > 0.50]
if faces2:
for f in faces2:
w = f['bbox'][2] - f['bbox'][0]
h = f['bbox'][3] - f['bbox'][1]
f['area'] = w * h
f['center'] = ((f['bbox'][0] + f['bbox'][2]) / 2, (f['bbox'][1] + f['bbox'][3]) / 2)
f['effective_area'] = f['area'] # Default for lookahead
max_area2 = max(f['area'] for f in faces2)
# STRICTER FILTER: threshold of max area
valid_faces2 = [f for f in faces2 if f['area'] > (filter_threshold * max_area2)]
faces2 = valid_faces2
# ----------------------------------------
# If lookahead found what we wanted OR found something better than nothing
if len(faces2) >= target_faces:
faces = faces2 # Use lookahead faces for current frame
elif len(faces) == 0 and len(faces2) > 0:
faces = faces2 # Better than nothing
buffered_frame = frame2 # Store for next iteration
detections = []
if len(faces) >= target_faces:
# --- FACE TRACKING / SORTING ---
# Instead of just Area, we prioritize faces closer to the LAST detected face
# This prevents switching to a background person if sizes are similar
if last_detected_faces is not None and len(last_detected_faces) == target_faces:
# Define score function: High Area is good, Low Distance to old is good.
# But simpler: calculate Intersection over Union (IOU) or Distance to old bbox center
# We want to match existing slots.
# For 1 face:
if target_faces == 1:
old_center = get_center_bbox(last_detected_faces[0])
def sort_score(f):
# Distance score (lower is better)
dist = np.sqrt((f['center'][0] - old_center[0])**2 + (f['center'][1] - old_center[1])**2)
# EFFECTIVE Area score (higher is better)
# Weight distance more heavily to keep consistency, but allow activity to swap focus if significant
# normalized score?
return dist - (f['effective_area'] * 0.0001)
faces_sorted = sorted(faces, key=sort_score)
else:
# For 2 faces, just sort by effective area for now as proximity sort happens later
faces_sorted = sorted(faces, key=lambda f: f['effective_area'], reverse=True)
else:
# No history, sort by effective area
if focus_active_speaker and target_faces == 1:
# Pick the one with highest activity score
faces_sorted = sorted(faces, key=lambda f: f.get('activity_score', 0), reverse=True)
else:
faces_sorted = sorted(faces, key=lambda f: f.get('effective_area', 0), reverse=True)
if target_faces == 2:
# Convert [x1, y1, x2, y2] to (x, y, w, h) logic is later
# Ensure we have 2 faces
f1 = faces_sorted[0]['bbox']
f2 = faces_sorted[1]['bbox']
if last_detected_faces is not None and len(last_detected_faces) == 2:
detections = sort_by_proximity([f1, f2], last_detected_faces, get_center_bbox)
else:
detections = [f1, f2]
current_num_faces_state = 2
else:
# 1 face
detections = [faces_sorted[0]['bbox']]
current_num_faces_state = 1
else:
# If we wanted 2 but found 1, or wanted 1 found 0
if len(faces) > 0:
# Fallback to 1 face if found at least 1
faces_sorted = sorted(faces, key=lambda f: f['effective_area'], reverse=True)
detections = [faces_sorted[0]['bbox']]
current_num_faces_state = 1
else:
detections = []
if detections:
# --- STABILIZATION (DEAD ZONE) ---
# Check if movement is small enough to ignore
if last_detected_faces is not None and len(last_detected_faces) == len(detections):
is_stable = True
for i in range(len(detections)):
old_c = get_center_bbox(last_detected_faces[i])
new_c = get_center_bbox(detections[i])
dist = np.sqrt((old_c[0]-new_c[0])**2 + (old_c[1]-new_c[1])**2)
# Threshold: dead_zone variable (pixels)
# Reduced jitter for talking heads
if dist > dead_zone:
is_stable = False
break
if is_stable:
# Keep old position to prevent "shaky cam"
detections = last_detected_faces
# Clear transition logic (snap) or keep it empty
transition_frames = []
# ---------------------------------
if last_frame_face_positions is not None and len(last_frame_face_positions) == len(detections):
# Only transition if we decided to MOVE (i.e., not stable)
forced_transition = True
if last_detected_faces is not None and len(detections) == len(last_detected_faces):
# Manual check to avoid numpy ambiguity
arrays_equal = True
for i in range(len(detections)):
if not np.array_equal(detections[i], last_detected_faces[i]):
arrays_equal = False
break
if arrays_equal:
forced_transition = False
if not transition_frames and forced_transition:
# Transition
start_faces = np.array(last_frame_face_positions)
end_faces = np.array(detections)
steps = transition_duration
transition_frames = []
for s in range(steps):
t = (s + 1) / steps
interp = (1 - t) * start_faces + t * end_faces
transition_frames.append(interp.astype(int).tolist())
# Optimization removed to avoid "Ambiguous truth value of array" error
# if detections == last_detected_faces: caused crash
else:
# Reset transition if face count changed or first detect
transition_frames = []
last_detected_faces = detections
last_success_frame = frame_index
else:
pass
# Update next detection frame based on NEW state
step = 5 # Default fallback (very fast)
if detection_period is not None:
if isinstance(detection_period, dict):
# Period depends on state
key = str(current_num_faces_state)
# fallback to '1' if key not found (should be there)
val = detection_period.get(key, detection_period.get('1', 0.2))
step = max(1, int(val * fps))
else:
# Legacy float support (should not happen with new main.py but good safety)
step = max(1, int(detection_period * fps))
elif current_num_faces_state == 2:
step = int(1.0 * fps) # 1s for 2 faces
else:
step = 5 # 5 frames for 1 face (~0.16s at 30fps)
next_detection_frame = frame_index + step
if len(transition_frames) > 0:
current_faces = transition_frames[0]
transition_frames = transition_frames[1:]
elif last_detected_faces is not None and (frame_index - last_success_frame) <= max_frames_without_detection:
current_faces = last_detected_faces
else:
# Fallback for this frame
if no_face_mode == "zoom":
result = crop_center_zoom(frame)
else:
result = resize_with_padding(frame)
out.write(result)
timeline_frames.append((frame_index, "1")) # Fix: Ensure fallback is treated as single face for subs
# Fix XML Log sync (Empty faces for fallback)
coords_entry = {"frame": frame_index, "src_size": [frame_width, frame_height], "faces": []}
coordinate_log.append(coords_entry)
continue
last_frame_face_positions = current_faces
target_len = len(current_faces)
if target_len == 2:
frame_2_face_count += 1
# Convert [x1, y1, x2, y2] to (x, y, w, h)
f1 = current_faces[0]
f2 = current_faces[1]
rect1 = (f1[0], f1[1], f1[2]-f1[0], f1[3]-f1[1])
rect2 = (f2[0], f2[1], f2[2]-f2[0], f2[3]-f2[1])
result = crop_and_resize_two_faces(frame, [rect1, rect2])
timeline_frames.append((frame_index, "2"))
else:
frame_1_face_count += 1
# 1 face
# current_faces[0] is [x1, y1, x2, y2]
result = crop_and_resize_insightface(frame, current_faces[0])
timeline_frames.append((frame_index, "1"))
# Capture Coordinates (Frame-by-Frame)
coords_entry = {"frame": frame_index, "src_size": [frame_width, frame_height], "faces": []}
try:
# We want to store [x1, y1, x2, y2, rh] for each face
if isinstance(current_faces, (list, tuple)):
processed_faces_log = []
for f in current_faces:
f_list = list(map(int, f[:4])) # Standard bbox
# Calculate rh (relative height)
face_h = f_list[3] - f_list[1]
rh = face_h / float(frame_height)
f_list.append(float(f"{rh:.4f}")) # Append as 5th element
processed_faces_log.append(f_list)
coords_entry["faces"] = processed_faces_log
elif isinstance(current_faces, np.ndarray):
# Similar logic for numpy
processed_faces_log = []
for f in current_faces:
f_list = f[:4].astype(int).tolist()
face_h = f_list[3] - f_list[1]
rh = face_h / float(frame_height)
f_list.append(float(f"{rh:.4f}"))
processed_faces_log.append(f_list)
coords_entry["faces"] = processed_faces_log
except: pass
coordinate_log.append(coords_entry)
out.write(result)
cap.release()
out.release()
# Compress timeline into segments
# [(start_time, end_time, mode), ...]
compressed_timeline = []
if timeline_frames:
curr_mode = timeline_frames[0][1]
start_f = timeline_frames[0][0]
for i in range(1, len(timeline_frames)):
frame_idx, mode = timeline_frames[i]
if mode != curr_mode:
# End current segment
# Convert frame to seconds
end_f = timeline_frames[i-1][0]
compressed_timeline.append({
"start": float(start_f) / fps,
"end": float(end_f) / fps, # or frame_idx / fps for continuity
"mode": curr_mode
})
# Start new
curr_mode = mode
start_f = frame_idx
# Add last
end_f = timeline_frames[-1][0]
compressed_timeline.append({
"start": float(start_f) / fps,
"end": (float(end_f) + 1) / fps,
"mode": curr_mode
})
# Save timeline JSON
timeline_file = output_file.replace(".mp4", "_timeline.json")
try:
import json
with open(timeline_file, "w") as f:
json.dump(compressed_timeline, f)
print(f"Timeline saved: {timeline_file}")
except Exception as e:
print(f"Error saving timeline: {e}")
# Save Coords JSON
coords_file = output_file.replace(".mp4", "_coords.json")
try:
with open(coords_file, "w") as f:
json.dump(coordinate_log, f)
print(f"Face Coordinates saved: {coords_file}")
except Exception as e:
print(f"Error saving coords: {e}")
finalize_video(input_file, output_file, index, fps, project_folder, final_folder)
# Return dominant mode logic (or keep 15% rule as overall fallback)
if frame_2_face_count > (total_frames * 0.15):
return "2"
return "1"
def edit(project_folder="tmp", face_model="insightface", face_mode="auto", detection_period=None, filter_threshold=0.35, two_face_threshold=0.60, confidence_threshold=0.30, dead_zone=40, focus_active_speaker=False, active_speaker_mar=0.03, active_speaker_score_diff=1.5, include_motion=False, active_speaker_motion_deadzone=3.0, active_speaker_motion_sensitivity=0.05, active_speaker_decay=2.0, segments_data=None, no_face_mode="padding"):
# Lazy init solutions only when needed to avoid AttributeError if import failed partially
mp_face_detection = None
mp_face_mesh = None
mp_pose = None
index = 0
cuts_folder = os.path.join(project_folder, "cuts")
final_folder = os.path.join(project_folder, "final")
os.makedirs(final_folder, exist_ok=True)
face_modes_log = {}
# Priority: User Choice -> Fallbacks
insightface_working = False
# Only init InsightFace if selected or default
if INSIGHTFACE_AVAILABLE and (face_model == "insightface"):
try:
print("Initializing InsightFace...")
init_insightface()
insightface_working = True
print("InsightFace Initialized Successfully.")
except Exception as e:
print(f"WARNING: InsightFace Initialization Failed ({e}). Will try MediaPipe.")
insightface_working = False
mediapipe_working = False
use_haar = False
# If insightface failed OR user chose mediapipe, init mediapipe
should_use_mediapipe = (face_model == "mediapipe") or (face_model == "insightface" and not insightface_working)
if should_use_mediapipe:
try:
# Check if solutions is available (it might not be if import failed silently or partial)
if not hasattr(mp, 'solutions'):
raise ImportError("mediapipe.solutions not found")
mp_face_detection = mp.solutions.face_detection
mp_face_mesh = mp.solutions.face_mesh
mp_pose = mp.solutions.pose
# Try to init with model_selection=0 (Short Range) as a smoketest
with mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) as fd:
pass
mediapipe_working = True
print("MediaPipe Initialized Successfully.")
except Exception as e:
print(f"WARNING: MediaPipe Initialization Failed ({e}). Switching to OpenCV Haar Cascade.")
mediapipe_working = False
use_haar = True
# Logic for MediaPipe replaced by dynamic pass
# mp_num_faces = 2 if face_mode == "2" else 1
import glob
found_files = sorted(glob.glob(os.path.join(cuts_folder, "*_original_scale.mp4")))
if not found_files:
print(f"No files found in {cuts_folder}.")
# Try finding lookahead in case listdir failed? No, glob is fine.
return
for input_file in found_files:
input_filename = os.path.basename(input_file)
# Extract Index
index = 0
try:
parts = input_filename.split('_')
if parts[0].isdigit(): index = int(parts[0])
elif input_filename.startswith("output"): # output000
idx_str = input_filename[6:9]
if idx_str.isdigit(): index = int(idx_str)
except: pass
output_file = os.path.join(final_folder, f"temp_video_no_audio_{index}.mp4")
# Determine Final Name (Title)
base_name_final = input_filename.replace("_original_scale.mp4", "")
# If legacy name, try to improve it
if input_filename.startswith("output") and segments_data and index < len(segments_data):
title = segments_data[index].get("title", f"Segment_{index}")
safe_title = "".join([c for c in title if c.isalnum() or c in " _-"]).strip().replace(" ", "_")[:60]
base_name_final = f"{index:03d}_{safe_title}"
if os.path.exists(input_file):
success = False
detected_mode = "1" # Default if detection fails or fallback
# 1. Try InsightFace
if insightface_working:
try:
# Capture returned mode
res = generate_short_insightface(input_file, output_file, index, project_folder, final_folder, face_mode=face_mode, detection_period=detection_period,
filter_threshold=filter_threshold, two_face_threshold=two_face_threshold, confidence_threshold=confidence_threshold, dead_zone=dead_zone, focus_active_speaker=focus_active_speaker,
active_speaker_mar=active_speaker_mar, active_speaker_score_diff=active_speaker_score_diff, include_motion=include_motion,
active_speaker_motion_deadzone=active_speaker_motion_deadzone,
active_speaker_motion_sensitivity=active_speaker_motion_sensitivity,
active_speaker_decay=active_speaker_decay,
no_face_mode=no_face_mode)
if res: detected_mode = res
success = True
except Exception as e:
import traceback
traceback.print_exc()
print(f"InsightFace processing failed for {input_filename}: {e}")
print("Falling back to MediaPipe/Haar...")
# 2. Try MediaPipe if InsightFace failed or not available
if not success and mediapipe_working:
try:
with mp_face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.2) as face_detection, \
mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=2, refine_landmarks=True, min_detection_confidence=0.2, min_tracking_confidence=0.2) as face_mesh, \
mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
generate_short_mediapipe(input_file, output_file, index, face_mode, project_folder, final_folder, face_detection, face_mesh, pose, detection_period=detection_period, no_face_mode=no_face_mode)
# We don't easily know detected mode here without return, assuming '1' or '2' based on last frame?
# Ideally function should return as well.
detected_mode = "1" # Placeholder, user didn't complain about stats.
# detected_mode = str(mp_num_faces) # Error fix: mp_num_faces not defined
if face_mode == "2":
detected_mode = "2"
success = True
except Exception as e:
print(f"MediaPipe processing failed (fallback): {e}")
# 3. Try Haar if others failed
if not success and (use_haar or (not mediapipe_working and not insightface_working)):
try:
print("Attempts with Haar Cascade...")
generate_short_haar(input_file, output_file, index, project_folder, final_folder, detection_period=detection_period, no_face_mode=no_face_mode)
success = True
except Exception as e2:
print(f"Haar fallback also failed: {e2}")
# 4. Last Resort: Center Crop
if not success:
generate_short_fallback(input_file, output_file, index, project_folder, final_folder, no_face_mode=no_face_mode)
detected_mode = "1"
success = True
# Save mode
face_modes_log[f"output{str(index).zfill(3)}"] = detected_mode
if success:
try:
new_mp4_name = f"{base_name_final}.mp4"
new_mp4_path = os.path.join(final_folder, new_mp4_name)
# Source is what finalize_video created
# finalize_video creates `final-output{index}_processed.mp4`
generated_mp4_name = f"final-output{str(index).zfill(3)}_processed.mp4"
generated_mp4_path = os.path.join(final_folder, generated_mp4_name)
# 1. Rename MP4
if os.path.exists(generated_mp4_path):
if os.path.exists(new_mp4_path): os.remove(new_mp4_path)
os.rename(generated_mp4_path, new_mp4_path)
print(f"Renamed Output to Title: {new_mp4_name}")
# 2. Rename JSON Subtitle (if exists and hasn't been renamed by cut_segments)
subs_folder = os.path.join(project_folder, "subs")
# Check if legacy name exists
old_json_name = f"final-output{str(index).zfill(3)}_processed.json"
old_json_path = os.path.join(subs_folder, old_json_name)
new_json_name = f"{base_name_final}_processed.json"
new_json_path = os.path.join(subs_folder, new_json_name)
if os.path.exists(old_json_path):
if os.path.exists(new_json_path): os.remove(new_json_path)
os.rename(old_json_path, new_json_path)
print(f"Renamed Subtitles to Title: {new_json_name}")
# 3. Rename Timeline JSON
# Timeline is temp_video_no_audio_{index}_timeline.json (created by generate_short...)
old_timeline_name = f"temp_video_no_audio_{index}_timeline.json"
old_timeline_path = os.path.join(final_folder, old_timeline_name)
new_timeline_name = f"{base_name_final}_timeline.json"
new_timeline_path = os.path.join(final_folder, new_timeline_name)
if os.path.exists(old_timeline_path):
if os.path.exists(new_timeline_path): os.remove(new_timeline_path)
os.rename(old_timeline_path, new_timeline_path)
print(f"Renamed Timeline to Title: {new_timeline_name}")
# 4. Rename Coords JSON
old_coords_name = f"temp_video_no_audio_{index}_coords.json"
old_coords_path = os.path.join(final_folder, old_coords_name)
new_coords_name = f"{base_name_final}_coords.json"
new_coords_path = os.path.join(final_folder, new_coords_name)
if os.path.exists(old_coords_path):
if os.path.exists(new_coords_path): os.remove(new_coords_path)
os.rename(old_coords_path, new_coords_path)
print(f"Renamed Coords to Title: {new_coords_name}")
except Exception as e:
print(f"Warning: Could not rename file with title: {e}")
# Save Face Modes to JSON for subtitle usage
modes_file = os.path.join(project_folder, "face_modes.json")
try:
import json
with open(modes_file, "w") as f:
json.dump(face_modes_log, f)
print(f"Detect Stats saved: {modes_file}")
except Exception as e:
print(f"Error saving face modes: {e}")
if __name__ == "__main__":
edit()