import os import cv2 import shutil import argparse import subprocess import numpy as np from pathlib import Path from merge_videos_utils import combine_adaptive def split_video_frames( video_path, output_path1, output_path2, split_frame=49, cross_frames=3, ): """ Splits a video into two parts with overlapping frames. Video 1: frames [0 .. split_frame-1] Video 2: frames [split_frame-cross_frames .. end] Example: split_frame=49, cross_frames=3 Video1: 0–48 Video2: 46–end """ cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"Error opening video file {video_path}") return False fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Safety checks if split_frame <= 0 or split_frame >= total_frames: print("Invalid split_frame value.") cap.release() return False if cross_frames < 0: cross_frames = 0 v1_start = 0 v1_end = split_frame v2_start = max(0, split_frame - cross_frames) v2_end = total_frames fourcc = cv2.VideoWriter_fourcc(*"mp4v") out1 = cv2.VideoWriter(output_path1, fourcc, fps, (width, height)) out2 = cv2.VideoWriter(output_path2, fourcc, fps, (width, height)) frame_idx = 0 written1 = 0 written2 = 0 while True: ret, frame = cap.read() if not ret: break if v1_start <= frame_idx < v1_end: out1.write(frame) written1 += 1 if v2_start <= frame_idx < v2_end: out2.write(frame) written2 += 1 frame_idx += 1 cap.release() out1.release() out2.release() print( f"Total frames: {total_frames}\n" f"Video1: [{v1_start}..{v1_end-1}] → {written1} frames\n" f"Video2: [{v2_start}..{v2_end-1}] → {written2} frames\n" f"Overlap frames: {max(0, v1_end - v2_start)}" ) return True def split_mask_npz(mask_path, output_path1, output_path2, split_frame=49, cross_frames=3): """ Splits a mask npz file into two parts with overlapping frames. Assumes the npz contains a 'mask' key with shape: - [T, H, W] or - [T, 1, H, W] or generally [T, ...] mask1: frames [0 .. split_frame-1] mask2: frames [split_frame-cross_frames .. end] """ try: data = np.load(mask_path) if "mask" not in data: print(f"Key 'mask' not found in {mask_path}") return False mask = data["mask"] if mask.ndim < 1: print(f"Invalid mask shape {mask.shape}") return False total_frames = mask.shape[0] # Safety checks if split_frame <= 0 or split_frame >= total_frames: print(f"Invalid split_frame={split_frame} for total_frames={total_frames}") return False if cross_frames < 0: cross_frames = 0 v1_start, v1_end = 0, split_frame v2_start = max(0, split_frame - cross_frames) v2_end = total_frames mask1 = mask[v1_start:v1_end] mask2 = mask[v2_start:v2_end] np.savez_compressed(output_path1, mask=mask1) np.savez_compressed(output_path2, mask=mask2) print( f"Split mask {mask_path} into {output_path1} and {output_path2}\n" f"Original shape: {mask.shape}\n" f"mask1 frames: [{v1_start}..{v1_end-1}] -> shape {mask1.shape}\n" f"mask2 frames: [{v2_start}..{v2_end-1}] -> shape {mask2.shape}\n" f"Overlap frames: {max(0, v1_end - v2_start)} (requested {cross_frames})" ) return True except Exception as e: print(f"Failed to split mask {mask_path}: {e}") return False def process_split(output_anchor_path, cross_frames, split_frame=49): """ Main function to process the output_anchor folder. """ if not os.path.exists(output_anchor_path): print(f"Path {output_anchor_path} does not exist.") return parent_dir = os.path.dirname(output_anchor_path) base_name = os.path.basename(output_anchor_path) dir_part1 = os.path.join(parent_dir, f"{base_name}_part1") dir_part2 = os.path.join(parent_dir, f"{base_name}_part2") if os.path.exists(dir_part1): shutil.rmtree(dir_part1) if os.path.exists(dir_part2): shutil.rmtree(dir_part2) os.makedirs(dir_part1) os.makedirs(dir_part2) target_subfolders = ['videos', 'masked_videos', 'masks', 'captions'] for sub in target_subfolders: src_sub = os.path.join(output_anchor_path, sub) dst_sub1 = os.path.join(dir_part1, sub) dst_sub2 = os.path.join(dir_part2, sub) os.makedirs(dst_sub1, exist_ok=True) os.makedirs(dst_sub2, exist_ok=True) if not os.path.exists(src_sub): continue for filename in os.listdir(src_sub): src_file = os.path.join(src_sub, filename) dst_file1 = os.path.join(dst_sub1, filename) dst_file2 = os.path.join(dst_sub2, filename) if filename.endswith('.mp4'): split_video_frames(src_file, dst_file1, dst_file2, split_frame, cross_frames) elif filename.endswith('.npz'): split_mask_npz(src_file, dst_file1, dst_file2, split_frame, cross_frames) elif filename.endswith('.txt'): # Copy captions to both shutil.copy(src_file, dst_file1) shutil.copy(src_file, dst_file2) else: pass def merge_crop_outputs(Unique_identifier): """ Merges split crop outputs back into the main crops folder. """ logs = [] logs.append("\n[MERGE] Merging split crop outputs...") crops_dir_path = Path(f"/app/{Unique_identifier}/crops") crops_part1_path = Path(f"/app/{Unique_identifier}/crops_part1") crops_part2_path = Path(f"/app/{Unique_identifier}/crops_part2") if crops_dir_path.exists(): for crop_folder in crops_dir_path.iterdir(): if crop_folder.is_dir() and crop_folder.name.lower() != "previews": crop_name = crop_folder.name # Define paths part1_out_dir = crops_part1_path / crop_name / "output" part2_out_dir = crops_part2_path / crop_name / "output" dest_out_dir = crop_folder / "output" dest_out_dir.mkdir(parents=True, exist_ok=True) # Find video files part1_video = None if part1_out_dir.exists(): videos = list(part1_out_dir.glob("*_vsr.mp4")) if videos: part1_video = videos[0] # Assume one output per crop elif videos := list(part1_out_dir.glob("*_out.mp4")): part1_video = videos[0] part2_video = None if part2_out_dir.exists(): videos = list(part2_out_dir.glob("*_vsr.mp4")) if videos: part2_video = videos[0] # Assume one output per crop elif videos := list(part2_out_dir.glob("*_out.mp4")): part2_video = videos[0] if part1_video and part2_video: # Construct destination filename dest_filename = part1_video.name dest_video_path = dest_out_dir / dest_filename combine_adaptive(str(part1_video), str(part2_video), str(dest_video_path)) else: logs.append(f"[MERGE] ⚠️ Missing parts for {crop_name}: Part1={part1_video is not None}, Part2={part2_video is not None}") return logs