| |
| """ |
| Video Clipping Script |
| |
| Reads a labels.jsonl file (produced by label_videos.py) and extracts the usable |
| segments into separate video files using ffmpeg. |
| |
| Usage: |
| python clip_videos.py --labels labels.jsonl --output-dir clips/ |
| """ |
|
|
| import os |
| import sys |
| import json |
| import argparse |
| import subprocess |
| from dataclasses import dataclass |
| from typing import List, Dict |
| from collections import defaultdict |
|
|
| @dataclass |
| class Clip: |
| video_path: str |
| start_sec: float |
| end_sec: float |
| output_filename: str |
|
|
| def parse_args(): |
| parser = argparse.ArgumentParser(description="Clip videos based on labels.jsonl") |
| parser.add_argument("--labels", required=True, help="Path to labels.jsonl file") |
| parser.add_argument("--output-dir", required=True, help="Directory to save clips") |
| parser.add_argument("--min-duration", type=float, default=4.0, help="Minimum duration in seconds (default: 4.0)") |
| parser.add_argument("--dry-run", action="store_true", help="Print commands without executing") |
| return parser.parse_args() |
|
|
| def load_clips(labels_path: str, min_duration: float = 0.0) -> List[Clip]: |
| clips = [] |
| if not os.path.exists(labels_path): |
| print(f"Error: Labels file not found: {labels_path}") |
| return [] |
| |
| with open(labels_path, 'r') as f: |
| for i, line in enumerate(f): |
| try: |
| data = json.loads(line) |
| except json.JSONDecodeError: |
| print(f"Warning: Skipping invalid JSON on line {i+1}") |
| continue |
| |
| if not data.get('usable'): |
| continue |
| |
| video_path = data['video'] |
| start = float(data['start_sec']) |
| end = float(data['end_sec']) |
| duration = end - start |
| |
| if duration < min_duration: |
| continue |
| |
| |
| video_basename = os.path.splitext(os.path.basename(video_path))[0] |
| |
| filename = f"{video_basename}_{start:06.2f}_{end:06.2f}.mp4" |
| |
| clips.append(Clip( |
| video_path=video_path, |
| start_sec=start, |
| end_sec=end, |
| output_filename=filename |
| )) |
| |
| return clips |
|
|
| def process_clips(clips: List[Clip], output_dir: str, dry_run: bool = False): |
| os.makedirs(output_dir, exist_ok=True) |
| |
| |
| |
| |
| for i, clip in enumerate(clips): |
| output_path = os.path.join(output_dir, clip.output_filename) |
| |
| if os.path.exists(output_path): |
| print(f"[{i+1}/{len(clips)}] Skipping existing: {output_path}") |
| continue |
| |
| print(f"[{i+1}/{len(clips)}] Clipping: {clip.video_path} -> {output_path}") |
| print(f" Range: {clip.start_sec}s to {clip.end_sec}s") |
| |
| duration = clip.end_sec - clip.start_sec |
| |
| |
| ffmpeg_bin = "/usr/bin/ffmpeg" |
| if not os.path.exists(ffmpeg_bin): |
| ffmpeg_bin = "ffmpeg" |
|
|
| cmd = [ |
| ffmpeg_bin, |
| '-y', |
| '-hide_banner', '-loglevel', 'error', |
| '-ss', str(clip.start_sec), |
| '-i', clip.video_path, |
| '-t', str(duration), |
| '-c:v', 'libx264', |
| '-crf', '18', |
| '-preset', 'slow', |
| '-c:a', 'copy', |
| output_path |
| ] |
| |
| if dry_run: |
| print("Running:", " ".join(cmd)) |
| else: |
| try: |
| subprocess.run(cmd, check=True) |
| except subprocess.CalledProcessError as e: |
| print(f"Error clipping {clip.video_path}: {e}") |
|
|
| def main(): |
| args = parse_args() |
| |
| print(f"Reading labels from: {args.labels}") |
| print(f"Minimum clip duration: {args.min_duration}s") |
| clips = load_clips(args.labels, min_duration=args.min_duration) |
| |
| if not clips: |
| print("No usable clips found in labels file.") |
| return |
| |
| print(f"Found {len(clips)} usable clips.") |
| process_clips(clips, args.output_dir, args.dry_run) |
| print("Done!") |
|
|
| if __name__ == "__main__": |
| main() |
|
|