|
|
import cv2
|
|
|
import multiprocessing
|
|
|
from pathlib import Path
|
|
|
import numpy as np
|
|
|
from typing import List, Tuple
|
|
|
import os
|
|
|
|
|
|
def extract_frames_from_video(args: Tuple[str, str, int]):
|
|
|
"""
|
|
|
Extract frames from a video file at specified FPS
|
|
|
args: (input_video_path, output_folder, target_fps)
|
|
|
"""
|
|
|
video_path, output_folder, target_fps = args
|
|
|
video_name = Path(video_path).stem
|
|
|
|
|
|
|
|
|
frames_dir = Path(output_folder) / video_name
|
|
|
frames_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
try:
|
|
|
|
|
|
cap = cv2.VideoCapture(str(video_path))
|
|
|
if not cap.isOpened():
|
|
|
print(f"Error: Could not open video {video_path}")
|
|
|
return
|
|
|
|
|
|
|
|
|
original_fps = cap.get(cv2.CAP_PROP_FPS)
|
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
|
|
|
|
|
|
|
interval = int(original_fps / target_fps)
|
|
|
|
|
|
frame_count = 0
|
|
|
saved_count = 0
|
|
|
|
|
|
while True:
|
|
|
ret, frame = cap.read()
|
|
|
if not ret:
|
|
|
break
|
|
|
|
|
|
|
|
|
if frame_count % interval == 0:
|
|
|
frame_path = frames_dir / f"frame_{saved_count:06d}.jpg"
|
|
|
cv2.imwrite(str(frame_path), frame)
|
|
|
saved_count += 1
|
|
|
|
|
|
frame_count += 1
|
|
|
|
|
|
cap.release()
|
|
|
return saved_count
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error processing {video_path}: {str(e)}")
|
|
|
return 0
|
|
|
|