| |
|
|
| import argparse |
| import sys |
| from pathlib import Path |
| from typing import List |
|
|
| import cv2 |
| from tqdm import tqdm |
| from joblib import Parallel, delayed |
| from contextlib import contextmanager |
| from joblib import parallel as joblib_parallel |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser( |
| description="Downscale videos and save them to a new folder with a given extension." |
| ) |
| parser.add_argument( |
| "--video_dir", |
| type=Path, |
| default=Path("/scratch/shared/beegfs/piyush/datasets/NTU/nturgb+d_rgb/"), |
| help="Root directory containing videos (searched recursively)", |
| ) |
| parser.add_argument( |
| "--ext", |
| type=str, |
| default="avi", |
| help="Video file extension to search for (without dot)", |
| ) |
| parser.add_argument( |
| "--downscale_factor", |
| type=float, |
| default=0.4, |
| help="Factor by which to downscale width and height (e.g., 0.4)", |
| ) |
| parser.add_argument( |
| "--save_dir", |
| type=Path, |
| default=None, |
| help="Directory to save downscaled videos. Defaults to video_dir + '-downscaled={factor}'", |
| ) |
| parser.add_argument( |
| "--save_ext", |
| type=str, |
| default="mp4", |
| help="Extension to save resulting videos with (without dot)", |
| ) |
| parser.add_argument( |
| "--debug", |
| action="store_true", |
| help="Process only the first 10 videos and print saved paths", |
| ) |
| parser.add_argument( |
| "--si", |
| type=int, |
| default=0, |
| help="Start index (inclusive) in the sorted video list", |
| ) |
| parser.add_argument( |
| "--ei", |
| type=int, |
| default=None, |
| help="End index (exclusive) in the sorted video list; None means till end", |
| ) |
| parser.add_argument( |
| "--n_jobs", |
| type=int, |
| default=-1, |
| help="Number of parallel jobs (-1 uses all cores)", |
| ) |
| args = parser.parse_args() |
|
|
| if args.save_dir is None: |
| args.save_dir = Path(f"{args.video_dir}-downscaled={args.downscale_factor}") |
|
|
| |
| args.ext = args.ext.lstrip('.') |
| args.save_ext = args.save_ext.lstrip('.') |
| return args |
|
|
|
|
| def ensure_even_dimension(value: int) -> int: |
| if value < 1: |
| return 1 |
| return value if value % 2 == 0 else value - 1 if value > 1 else 1 |
|
|
|
|
| def list_videos(video_dir: Path, ext: str) -> List[Path]: |
| pattern = f"**/*.{ext}" |
| return sorted(video_dir.rglob(pattern)) |
|
|
|
|
| def change_extension(path: Path, new_ext: str) -> Path: |
| return path.with_suffix('.' + new_ext) |
|
|
|
|
| def downscale_video( |
| src_path: Path, |
| dst_path: Path, |
| downscale_factor: float, |
| save_ext: str, |
| ) -> bool: |
| cap = cv2.VideoCapture(str(src_path)) |
| if not cap.isOpened(): |
| return False |
|
|
| |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| if fps <= 0 or fps != fps: |
| fps = 30.0 |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
| out_w = ensure_even_dimension(max(1, int(width * downscale_factor))) |
| out_h = ensure_even_dimension(max(1, int(height * downscale_factor))) |
|
|
| |
| save_ext_lower = save_ext.lower() |
| if save_ext_lower in {"mp4", "m4v"}: |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
| elif save_ext_lower in {"avi"}: |
| fourcc = cv2.VideoWriter_fourcc(*"XVID") |
| elif save_ext_lower in {"mov"}: |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
| else: |
| |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
|
|
| dst_path.parent.mkdir(parents=True, exist_ok=True) |
| writer = cv2.VideoWriter(str(dst_path), fourcc, fps, (out_w, out_h)) |
| if not writer.isOpened(): |
| cap.release() |
| return False |
|
|
| ok = True |
| try: |
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
| resized = cv2.resize(frame, (out_w, out_h), interpolation=cv2.INTER_AREA) |
| writer.write(resized) |
| except Exception: |
| ok = False |
| finally: |
| writer.release() |
| cap.release() |
|
|
| |
| if dst_path.exists() and dst_path.stat().st_size > 0 and ok: |
| return True |
| try: |
| if dst_path.exists(): |
| dst_path.unlink() |
| except Exception: |
| pass |
| return False |
|
|
|
|
| def process_one( |
| src: Path, |
| dst: Path, |
| downscale_factor: float, |
| save_ext: str, |
| ) -> tuple: |
| success = downscale_video(src, dst, downscale_factor, save_ext) |
| return success, dst |
|
|
|
|
| @contextmanager |
| def tqdm_joblib(tqdm_object): |
| """Context manager to patch joblib to report into tqdm progress bar.""" |
| class TqdmBatchCompletionCallback(joblib_parallel.BatchCompletionCallBack): |
| def __call__(self, *args, **kwargs): |
| tqdm_object.update(n=self.batch_size) |
| return super().__call__(*args, **kwargs) |
|
|
| old_cb = joblib_parallel.BatchCompletionCallBack |
| joblib_parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback |
| try: |
| yield tqdm_object |
| finally: |
| joblib_parallel.BatchCompletionCallBack = old_cb |
| try: |
| tqdm_object.close() |
| except Exception: |
| pass |
|
|
|
|
| def main() -> int: |
| args = parse_args() |
|
|
| video_dir: Path = args.video_dir |
| save_dir: Path = args.save_dir |
| ext: str = args.ext |
| save_ext: str = args.save_ext |
| downscale_factor: float = args.downscale_factor |
| si: int = max(0, int(args.si)) |
| ei = args.ei if args.ei is None else max(0, int(args.ei)) |
| n_jobs: int = int(args.n_jobs) |
|
|
| if not video_dir.exists() or not video_dir.is_dir(): |
| print(f"ERROR: video_dir does not exist or is not a directory: {video_dir}", file=sys.stderr) |
| return 1 |
|
|
| videos = list_videos(video_dir, ext) |
|
|
| |
| try: |
| videos = videos[si:ei] |
| except Exception: |
| |
| videos = [] |
|
|
| if len(videos) == 0: |
| print("No videos found.") |
| return 0 |
|
|
| |
| if args.debug: |
| videos = videos[:10] |
|
|
| |
| tasks = [] |
| skipped_count = 0 |
| for src in videos: |
| try: |
| rel = src.relative_to(video_dir) |
| except ValueError: |
| rel = src.name |
| rel_path = Path(rel) |
| rel_with_new_ext = change_extension(rel_path, save_ext) |
| dst = (save_dir / rel_with_new_ext).resolve() |
| if dst.exists() and dst.stat().st_size > 0: |
| skipped_count += 1 |
| continue |
| tasks.append((src, dst)) |
|
|
| total_count = len(videos) |
| saved_paths: List[Path] = [] |
| errors: int = 0 |
|
|
| with tqdm(total=total_count, desc="Downscaling videos", unit="vid") as pbar: |
| |
| if skipped_count: |
| pbar.update(skipped_count) |
|
|
| if len(tasks) > 0: |
| with tqdm_joblib(pbar): |
| results = Parallel(n_jobs=n_jobs, backend="loky")( \ |
| delayed(process_one)(src, dst, downscale_factor, save_ext) for (src, dst) in tasks |
| ) |
| for success, dst in results: |
| if success: |
| saved_paths.append(dst) |
| else: |
| errors += 1 |
| |
| if args.debug: |
| print("Saved (debug mode):") |
| for p in saved_paths: |
| print(str(p)) |
|
|
| if errors > 0: |
| print(f"Completed with {errors} failures out of {len(videos)}", file=sys.stderr) |
| return 2 |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|
|
|
|
|