Lyra / src /visu /dynamic_scene_video_static_camera.py
Muhammad Taqi Raza
adding lyra files
af758d1
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import cv2
from glob import glob
import argparse
def collect_video_indices(main_path, sub_path, max_time_idx):
"""Find all unique {i} indices across timestamps."""
video_indices = set()
for time_idx in range(0, max_time_idx + 1):
timestamp_folder = os.path.join(main_path, str(time_idx), sub_path)
if os.path.isdir(timestamp_folder):
for vid in glob(os.path.join(timestamp_folder, "rgb_*_view_idx_0_0.4.mp4")):
fname = os.path.basename(vid)
try:
idx = int(fname.split("_")[1]) # extract {i}
video_indices.add(idx)
except ValueError:
pass
return sorted(video_indices)
def collect_frames(main_path, sub_path, video_indices, num_frames, max_time_idx):
"""Collect the first `num_frames` frames from each video for each timestamp."""
frames_dict = {i: [] for i in video_indices}
for time_idx in range(0, max_time_idx + 1):
timestamp_folder = os.path.join(main_path, str(time_idx), sub_path)
if not os.path.isdir(timestamp_folder):
print(f"Skip {timestamp_folder}")
continue
for i in video_indices:
vid_path = os.path.join(timestamp_folder, f"rgb_{i}_view_idx_0_0.4.mp4")
if not os.path.isfile(vid_path):
print(f"Skip {vid_path}")
continue
cap = cv2.VideoCapture(vid_path)
frame_count = 0
while frame_count < num_frames:
ret, frame = cap.read()
if not ret:
break
frames_dict[i].append(frame)
frame_count += 1
cap.release()
return frames_dict
def save_concatenated_videos(frames_dict, output_path, fps):
"""Save concatenated frames as videos using H264 codec if possible."""
os.makedirs(output_path, exist_ok=True)
for i, frames in frames_dict.items():
if not frames:
continue
height, width, _ = frames[0].shape
out_path = os.path.join(output_path, f"{i}.mp4")
# Try H264 first
fourcc = cv2.VideoWriter_fourcc(*'avc1')
writer = cv2.VideoWriter(out_path, fourcc, fps, (width, height))
# If writer not opened, fallback to mp4v
if not writer.isOpened():
print(f"Warning: 'avc1' codec not supported, falling back to 'mp4v' for video {i}")
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(out_path, fourcc, fps, (width, height))
for frame in frames:
writer.write(frame)
writer.release()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Concatenate frames from videos across timestamps.")
parser.add_argument("--main_path", required=True, help="Path containing all time_idx folders")
parser.add_argument("--sub_path", required=True, help="Path after the time_idx folder")
parser.add_argument("--output_path", required=True, help="Folder to save output videos")
parser.add_argument("--num_frames", type=int, default=1, help="Number of frames to extract from each video")
parser.add_argument("--fps", type=int, default=10, help="Frames per second for output videos")
parser.add_argument("--max_time_idx", type=int, default=120, help="Maximum time index to loop over")
args = parser.parse_args()
print("Collecting video indices...")
video_indices = collect_video_indices(args.main_path, args.sub_path, args.max_time_idx)
print(f"Extracting first {args.num_frames} frames from each video...")
frames_dict = collect_frames(args.main_path, args.sub_path, video_indices, args.num_frames, args.max_time_idx)
print("Saving concatenated videos...")
save_concatenated_videos(frames_dict, args.output_path, fps=args.fps)
print(f"Done! Saved videos to: {args.output_path}")