vr-hmr / scripts /clip_videos.py
zirobtc's picture
Upload folder using huggingface_hub
7e120dd
#!/usr/bin/env python3
"""
Video Clipping Script
Reads a labels.jsonl file (produced by label_videos.py) and extracts the usable
segments into separate video files using ffmpeg.
Usage:
python clip_videos.py --labels labels.jsonl --output-dir clips/
"""
import os
import sys
import json
import argparse
import subprocess
from dataclasses import dataclass
from typing import List, Dict
from collections import defaultdict
@dataclass
class Clip:
video_path: str
start_sec: float
end_sec: float
output_filename: str
def parse_args():
parser = argparse.ArgumentParser(description="Clip videos based on labels.jsonl")
parser.add_argument("--labels", required=True, help="Path to labels.jsonl file")
parser.add_argument("--output-dir", required=True, help="Directory to save clips")
parser.add_argument("--min-duration", type=float, default=4.0, help="Minimum duration in seconds (default: 4.0)")
parser.add_argument("--dry-run", action="store_true", help="Print commands without executing")
return parser.parse_args()
def load_clips(labels_path: str, min_duration: float = 0.0) -> List[Clip]:
clips = []
if not os.path.exists(labels_path):
print(f"Error: Labels file not found: {labels_path}")
return []
with open(labels_path, 'r') as f:
for i, line in enumerate(f):
try:
data = json.loads(line)
except json.JSONDecodeError:
print(f"Warning: Skipping invalid JSON on line {i+1}")
continue
if not data.get('usable'):
continue
video_path = data['video']
start = float(data['start_sec'])
end = float(data['end_sec'])
duration = end - start
if duration < min_duration:
continue
# Create a safe filename
video_basename = os.path.splitext(os.path.basename(video_path))[0]
# Format: VideoName_Start_End.mp4 (e.g. MyVideo_005.50_010.00.mp4)
filename = f"{video_basename}_{start:06.2f}_{end:06.2f}.mp4"
clips.append(Clip(
video_path=video_path,
start_sec=start,
end_sec=end,
output_filename=filename
))
return clips
def process_clips(clips: List[Clip], output_dir: str, dry_run: bool = False):
os.makedirs(output_dir, exist_ok=True)
# Group by video to potentially optimize (though currently we treat each clip independently)
# If we wanted to batch, we could, but ffmpeg seeking is fast enough with -ss
for i, clip in enumerate(clips):
output_path = os.path.join(output_dir, clip.output_filename)
if os.path.exists(output_path):
print(f"[{i+1}/{len(clips)}] Skipping existing: {output_path}")
continue
print(f"[{i+1}/{len(clips)}] Clipping: {clip.video_path} -> {output_path}")
print(f" Range: {clip.start_sec}s to {clip.end_sec}s")
duration = clip.end_sec - clip.start_sec
# Use system ffmpeg if available (likely has AV1 decoder), otherwise fallback to env ffmpeg
ffmpeg_bin = "/usr/bin/ffmpeg"
if not os.path.exists(ffmpeg_bin):
ffmpeg_bin = "ffmpeg"
cmd = [
ffmpeg_bin,
'-y', # Overwrite output
'-hide_banner', '-loglevel', 'error',
'-ss', str(clip.start_sec),
'-i', clip.video_path,
'-t', str(duration),
'-c:v', 'libx264',
'-crf', '18', # High quality
'-preset', 'slow', # Better compression/quality tradeoff
'-c:a', 'copy', # Copy audio stream without re-encoding
output_path
]
if dry_run:
print("Running:", " ".join(cmd))
else:
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
print(f"Error clipping {clip.video_path}: {e}")
def main():
args = parse_args()
print(f"Reading labels from: {args.labels}")
print(f"Minimum clip duration: {args.min_duration}s")
clips = load_clips(args.labels, min_duration=args.min_duration)
if not clips:
print("No usable clips found in labels file.")
return
print(f"Found {len(clips)} usable clips.")
process_clips(clips, args.output_dir, args.dry_run)
print("Done!")
if __name__ == "__main__":
main()