vfx-2 / split_utils.py
TaqiRaza512's picture
Initial commit
307c071
import os
import cv2
import shutil
import argparse
import subprocess
import numpy as np
from pathlib import Path
from merge_videos_utils import combine_adaptive
def split_video_frames(
video_path,
output_path1,
output_path2,
split_frame=49,
cross_frames=3,
):
"""
Splits a video into two parts with overlapping frames.
Video 1: frames [0 .. split_frame-1]
Video 2: frames [split_frame-cross_frames .. end]
Example:
split_frame=49, cross_frames=3
Video1: 0–48
Video2: 46–end
"""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error opening video file {video_path}")
return False
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Safety checks
if split_frame <= 0 or split_frame >= total_frames:
print("Invalid split_frame value.")
cap.release()
return False
if cross_frames < 0:
cross_frames = 0
v1_start = 0
v1_end = split_frame
v2_start = max(0, split_frame - cross_frames)
v2_end = total_frames
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out1 = cv2.VideoWriter(output_path1, fourcc, fps, (width, height))
out2 = cv2.VideoWriter(output_path2, fourcc, fps, (width, height))
frame_idx = 0
written1 = 0
written2 = 0
while True:
ret, frame = cap.read()
if not ret:
break
if v1_start <= frame_idx < v1_end:
out1.write(frame)
written1 += 1
if v2_start <= frame_idx < v2_end:
out2.write(frame)
written2 += 1
frame_idx += 1
cap.release()
out1.release()
out2.release()
print(
f"Total frames: {total_frames}\n"
f"Video1: [{v1_start}..{v1_end-1}] → {written1} frames\n"
f"Video2: [{v2_start}..{v2_end-1}] → {written2} frames\n"
f"Overlap frames: {max(0, v1_end - v2_start)}"
)
return True
def split_mask_npz(mask_path, output_path1, output_path2, split_frame=49, cross_frames=3):
"""
Splits a mask npz file into two parts with overlapping frames.
Assumes the npz contains a 'mask' key with shape:
- [T, H, W] or
- [T, 1, H, W] or generally [T, ...]
mask1: frames [0 .. split_frame-1]
mask2: frames [split_frame-cross_frames .. end]
"""
try:
data = np.load(mask_path)
if "mask" not in data:
print(f"Key 'mask' not found in {mask_path}")
return False
mask = data["mask"]
if mask.ndim < 1:
print(f"Invalid mask shape {mask.shape}")
return False
total_frames = mask.shape[0]
# Safety checks
if split_frame <= 0 or split_frame >= total_frames:
print(f"Invalid split_frame={split_frame} for total_frames={total_frames}")
return False
if cross_frames < 0:
cross_frames = 0
v1_start, v1_end = 0, split_frame
v2_start = max(0, split_frame - cross_frames)
v2_end = total_frames
mask1 = mask[v1_start:v1_end]
mask2 = mask[v2_start:v2_end]
np.savez_compressed(output_path1, mask=mask1)
np.savez_compressed(output_path2, mask=mask2)
print(
f"Split mask {mask_path} into {output_path1} and {output_path2}\n"
f"Original shape: {mask.shape}\n"
f"mask1 frames: [{v1_start}..{v1_end-1}] -> shape {mask1.shape}\n"
f"mask2 frames: [{v2_start}..{v2_end-1}] -> shape {mask2.shape}\n"
f"Overlap frames: {max(0, v1_end - v2_start)} (requested {cross_frames})"
)
return True
except Exception as e:
print(f"Failed to split mask {mask_path}: {e}")
return False
def process_split(output_anchor_path, cross_frames, split_frame=49):
"""
Main function to process the output_anchor folder.
"""
if not os.path.exists(output_anchor_path):
print(f"Path {output_anchor_path} does not exist.")
return
parent_dir = os.path.dirname(output_anchor_path)
base_name = os.path.basename(output_anchor_path)
dir_part1 = os.path.join(parent_dir, f"{base_name}_part1")
dir_part2 = os.path.join(parent_dir, f"{base_name}_part2")
if os.path.exists(dir_part1):
shutil.rmtree(dir_part1)
if os.path.exists(dir_part2):
shutil.rmtree(dir_part2)
os.makedirs(dir_part1)
os.makedirs(dir_part2)
target_subfolders = ['videos', 'masked_videos', 'masks', 'captions']
for sub in target_subfolders:
src_sub = os.path.join(output_anchor_path, sub)
dst_sub1 = os.path.join(dir_part1, sub)
dst_sub2 = os.path.join(dir_part2, sub)
os.makedirs(dst_sub1, exist_ok=True)
os.makedirs(dst_sub2, exist_ok=True)
if not os.path.exists(src_sub):
continue
for filename in os.listdir(src_sub):
src_file = os.path.join(src_sub, filename)
dst_file1 = os.path.join(dst_sub1, filename)
dst_file2 = os.path.join(dst_sub2, filename)
if filename.endswith('.mp4'):
split_video_frames(src_file, dst_file1, dst_file2, split_frame, cross_frames)
elif filename.endswith('.npz'):
split_mask_npz(src_file, dst_file1, dst_file2, split_frame, cross_frames)
elif filename.endswith('.txt'):
# Copy captions to both
shutil.copy(src_file, dst_file1)
shutil.copy(src_file, dst_file2)
else:
pass
def merge_crop_outputs(Unique_identifier):
"""
Merges split crop outputs back into the main crops folder.
"""
logs = []
logs.append("\n[MERGE] Merging split crop outputs...")
crops_dir_path = Path(f"/app/{Unique_identifier}/crops")
crops_part1_path = Path(f"/app/{Unique_identifier}/crops_part1")
crops_part2_path = Path(f"/app/{Unique_identifier}/crops_part2")
if crops_dir_path.exists():
for crop_folder in crops_dir_path.iterdir():
if crop_folder.is_dir() and crop_folder.name.lower() != "previews":
crop_name = crop_folder.name
# Define paths
part1_out_dir = crops_part1_path / crop_name / "output"
part2_out_dir = crops_part2_path / crop_name / "output"
dest_out_dir = crop_folder / "output"
dest_out_dir.mkdir(parents=True, exist_ok=True)
# Find video files
part1_video = None
if part1_out_dir.exists():
videos = list(part1_out_dir.glob("*_vsr.mp4"))
if videos:
part1_video = videos[0] # Assume one output per crop
elif videos := list(part1_out_dir.glob("*_out.mp4")):
part1_video = videos[0]
part2_video = None
if part2_out_dir.exists():
videos = list(part2_out_dir.glob("*_vsr.mp4"))
if videos:
part2_video = videos[0] # Assume one output per crop
elif videos := list(part2_out_dir.glob("*_out.mp4")):
part2_video = videos[0]
if part1_video and part2_video:
# Construct destination filename
dest_filename = part1_video.name
dest_video_path = dest_out_dir / dest_filename
combine_adaptive(str(part1_video), str(part2_video), str(dest_video_path))
else:
logs.append(f"[MERGE] ⚠️ Missing parts for {crop_name}: Part1={part1_video is not None}, Part2={part2_video is not None}")
return logs