|
|
import os |
|
|
import cv2 |
|
|
import shutil |
|
|
import argparse |
|
|
import subprocess |
|
|
import numpy as np |
|
|
from pathlib import Path |
|
|
|
|
|
from merge_videos_utils import combine_adaptive |
|
|
|
|
|
def split_video_frames( |
|
|
video_path, |
|
|
output_path1, |
|
|
output_path2, |
|
|
split_frame=49, |
|
|
cross_frames=3, |
|
|
): |
|
|
""" |
|
|
Splits a video into two parts with overlapping frames. |
|
|
|
|
|
Video 1: frames [0 .. split_frame-1] |
|
|
Video 2: frames [split_frame-cross_frames .. end] |
|
|
|
|
|
Example: |
|
|
split_frame=49, cross_frames=3 |
|
|
Video1: 0–48 |
|
|
Video2: 46–end |
|
|
|
|
|
""" |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
print(f"Error opening video file {video_path}") |
|
|
return False |
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 |
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
|
|
|
if split_frame <= 0 or split_frame >= total_frames: |
|
|
print("Invalid split_frame value.") |
|
|
cap.release() |
|
|
return False |
|
|
|
|
|
if cross_frames < 0: |
|
|
cross_frames = 0 |
|
|
|
|
|
v1_start = 0 |
|
|
v1_end = split_frame |
|
|
|
|
|
v2_start = max(0, split_frame - cross_frames) |
|
|
v2_end = total_frames |
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
|
|
out1 = cv2.VideoWriter(output_path1, fourcc, fps, (width, height)) |
|
|
out2 = cv2.VideoWriter(output_path2, fourcc, fps, (width, height)) |
|
|
|
|
|
frame_idx = 0 |
|
|
written1 = 0 |
|
|
written2 = 0 |
|
|
|
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
if v1_start <= frame_idx < v1_end: |
|
|
out1.write(frame) |
|
|
written1 += 1 |
|
|
|
|
|
if v2_start <= frame_idx < v2_end: |
|
|
out2.write(frame) |
|
|
written2 += 1 |
|
|
|
|
|
frame_idx += 1 |
|
|
|
|
|
cap.release() |
|
|
out1.release() |
|
|
out2.release() |
|
|
|
|
|
print( |
|
|
f"Total frames: {total_frames}\n" |
|
|
f"Video1: [{v1_start}..{v1_end-1}] → {written1} frames\n" |
|
|
f"Video2: [{v2_start}..{v2_end-1}] → {written2} frames\n" |
|
|
f"Overlap frames: {max(0, v1_end - v2_start)}" |
|
|
) |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
def split_mask_npz(mask_path, output_path1, output_path2, split_frame=49, cross_frames=3): |
|
|
""" |
|
|
Splits a mask npz file into two parts with overlapping frames. |
|
|
|
|
|
Assumes the npz contains a 'mask' key with shape: |
|
|
- [T, H, W] or |
|
|
- [T, 1, H, W] or generally [T, ...] |
|
|
|
|
|
mask1: frames [0 .. split_frame-1] |
|
|
mask2: frames [split_frame-cross_frames .. end] |
|
|
""" |
|
|
|
|
|
try: |
|
|
data = np.load(mask_path) |
|
|
if "mask" not in data: |
|
|
print(f"Key 'mask' not found in {mask_path}") |
|
|
return False |
|
|
|
|
|
mask = data["mask"] |
|
|
if mask.ndim < 1: |
|
|
print(f"Invalid mask shape {mask.shape}") |
|
|
return False |
|
|
|
|
|
total_frames = mask.shape[0] |
|
|
|
|
|
|
|
|
if split_frame <= 0 or split_frame >= total_frames: |
|
|
print(f"Invalid split_frame={split_frame} for total_frames={total_frames}") |
|
|
return False |
|
|
|
|
|
if cross_frames < 0: |
|
|
cross_frames = 0 |
|
|
|
|
|
v1_start, v1_end = 0, split_frame |
|
|
v2_start = max(0, split_frame - cross_frames) |
|
|
v2_end = total_frames |
|
|
|
|
|
mask1 = mask[v1_start:v1_end] |
|
|
mask2 = mask[v2_start:v2_end] |
|
|
|
|
|
np.savez_compressed(output_path1, mask=mask1) |
|
|
np.savez_compressed(output_path2, mask=mask2) |
|
|
|
|
|
print( |
|
|
f"Split mask {mask_path} into {output_path1} and {output_path2}\n" |
|
|
f"Original shape: {mask.shape}\n" |
|
|
f"mask1 frames: [{v1_start}..{v1_end-1}] -> shape {mask1.shape}\n" |
|
|
f"mask2 frames: [{v2_start}..{v2_end-1}] -> shape {mask2.shape}\n" |
|
|
f"Overlap frames: {max(0, v1_end - v2_start)} (requested {cross_frames})" |
|
|
) |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Failed to split mask {mask_path}: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def process_split(output_anchor_path, cross_frames, split_frame=49): |
|
|
""" |
|
|
Main function to process the output_anchor folder. |
|
|
""" |
|
|
if not os.path.exists(output_anchor_path): |
|
|
print(f"Path {output_anchor_path} does not exist.") |
|
|
return |
|
|
|
|
|
parent_dir = os.path.dirname(output_anchor_path) |
|
|
base_name = os.path.basename(output_anchor_path) |
|
|
|
|
|
dir_part1 = os.path.join(parent_dir, f"{base_name}_part1") |
|
|
dir_part2 = os.path.join(parent_dir, f"{base_name}_part2") |
|
|
|
|
|
if os.path.exists(dir_part1): |
|
|
shutil.rmtree(dir_part1) |
|
|
if os.path.exists(dir_part2): |
|
|
shutil.rmtree(dir_part2) |
|
|
|
|
|
os.makedirs(dir_part1) |
|
|
os.makedirs(dir_part2) |
|
|
|
|
|
target_subfolders = ['videos', 'masked_videos', 'masks', 'captions'] |
|
|
|
|
|
for sub in target_subfolders: |
|
|
src_sub = os.path.join(output_anchor_path, sub) |
|
|
dst_sub1 = os.path.join(dir_part1, sub) |
|
|
dst_sub2 = os.path.join(dir_part2, sub) |
|
|
|
|
|
os.makedirs(dst_sub1, exist_ok=True) |
|
|
os.makedirs(dst_sub2, exist_ok=True) |
|
|
|
|
|
if not os.path.exists(src_sub): |
|
|
continue |
|
|
|
|
|
for filename in os.listdir(src_sub): |
|
|
src_file = os.path.join(src_sub, filename) |
|
|
dst_file1 = os.path.join(dst_sub1, filename) |
|
|
dst_file2 = os.path.join(dst_sub2, filename) |
|
|
|
|
|
if filename.endswith('.mp4'): |
|
|
split_video_frames(src_file, dst_file1, dst_file2, split_frame, cross_frames) |
|
|
elif filename.endswith('.npz'): |
|
|
split_mask_npz(src_file, dst_file1, dst_file2, split_frame, cross_frames) |
|
|
elif filename.endswith('.txt'): |
|
|
|
|
|
shutil.copy(src_file, dst_file1) |
|
|
shutil.copy(src_file, dst_file2) |
|
|
else: |
|
|
pass |
|
|
|
|
|
|
|
|
def merge_crop_outputs(Unique_identifier): |
|
|
""" |
|
|
Merges split crop outputs back into the main crops folder. |
|
|
""" |
|
|
logs = [] |
|
|
logs.append("\n[MERGE] Merging split crop outputs...") |
|
|
crops_dir_path = Path(f"/app/{Unique_identifier}/crops") |
|
|
crops_part1_path = Path(f"/app/{Unique_identifier}/crops_part1") |
|
|
crops_part2_path = Path(f"/app/{Unique_identifier}/crops_part2") |
|
|
|
|
|
if crops_dir_path.exists(): |
|
|
for crop_folder in crops_dir_path.iterdir(): |
|
|
if crop_folder.is_dir() and crop_folder.name.lower() != "previews": |
|
|
crop_name = crop_folder.name |
|
|
|
|
|
|
|
|
part1_out_dir = crops_part1_path / crop_name / "output" |
|
|
part2_out_dir = crops_part2_path / crop_name / "output" |
|
|
dest_out_dir = crop_folder / "output" |
|
|
|
|
|
dest_out_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
part1_video = None |
|
|
if part1_out_dir.exists(): |
|
|
videos = list(part1_out_dir.glob("*_vsr.mp4")) |
|
|
if videos: |
|
|
part1_video = videos[0] |
|
|
elif videos := list(part1_out_dir.glob("*_out.mp4")): |
|
|
part1_video = videos[0] |
|
|
|
|
|
part2_video = None |
|
|
if part2_out_dir.exists(): |
|
|
|
|
|
videos = list(part2_out_dir.glob("*_vsr.mp4")) |
|
|
if videos: |
|
|
part2_video = videos[0] |
|
|
elif videos := list(part2_out_dir.glob("*_out.mp4")): |
|
|
part2_video = videos[0] |
|
|
|
|
|
if part1_video and part2_video: |
|
|
|
|
|
dest_filename = part1_video.name |
|
|
dest_video_path = dest_out_dir / dest_filename |
|
|
combine_adaptive(str(part1_video), str(part2_video), str(dest_video_path)) |
|
|
else: |
|
|
logs.append(f"[MERGE] ⚠️ Missing parts for {crop_name}: Part1={part1_video is not None}, Part2={part2_video is not None}") |
|
|
return logs |
|
|
|