|
|
|
|
|
|
|
|
import argparse |
|
|
import os |
|
|
import cv2 |
|
|
import numpy as np |
|
|
from ultralytics import YOLO |
|
|
from scenedetect import open_video, SceneManager, ContentDetector |
|
|
import torch |
|
|
|
|
|
def parse_arguments(): |
|
|
"""Parse command-line arguments.""" |
|
|
parser = argparse.ArgumentParser( |
|
|
description="Detect full faces in videos and capture screenshots on scene changes.", |
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter |
|
|
) |
|
|
parser.add_argument( |
|
|
"--input-dir", "-I", |
|
|
required=True, |
|
|
help="Directory containing input video files." |
|
|
) |
|
|
parser.add_argument( |
|
|
"--output-dir", "-O", |
|
|
required=True, |
|
|
help="Directory to save screenshot outputs." |
|
|
) |
|
|
parser.add_argument( |
|
|
"--min-width", "-w", |
|
|
type=int, |
|
|
default=200, |
|
|
help="Minimum width of face bounding box to trigger screenshot." |
|
|
) |
|
|
parser.add_argument( |
|
|
"--min-height", "-m", |
|
|
type=int, |
|
|
default=200, |
|
|
help="Minimum height of face bounding box to trigger screenshot." |
|
|
) |
|
|
return parser.parse_args() |
|
|
|
|
|
def ensure_directory(directory): |
|
|
"""Create directory if it doesn't exist.""" |
|
|
if not os.path.exists(directory): |
|
|
os.makedirs(directory) |
|
|
|
|
|
def check_cuda(): |
|
|
"""Check CUDA availability and return device.""" |
|
|
if torch.cuda.is_available(): |
|
|
device = torch.device("cuda") |
|
|
print(f"CUDA is available! Using GPU: {torch.cuda.get_device_name(0)}") |
|
|
print(f"CUDA version: {torch.version.cuda}") |
|
|
print(f"Number of GPUs: {torch.cuda.device_count()}") |
|
|
else: |
|
|
device = torch.device("cpu") |
|
|
print("CUDA is not available. Falling back to CPU.") |
|
|
return device |
|
|
|
|
|
def is_full_face(box, frame_shape, min_width, min_height, min_proportion=0.1): |
|
|
"""Check if the bounding box represents a full face within the frame.""" |
|
|
x1, y1, x2, y2 = box |
|
|
frame_height, frame_width = frame_shape[:2] |
|
|
|
|
|
|
|
|
if x1 <= 0 or y1 <= 0 or x2 >= frame_width or y2 >= frame_height: |
|
|
return False |
|
|
|
|
|
|
|
|
width = x2 - x1 |
|
|
height = y2 - y1 |
|
|
if width < min_width or height < min_height: |
|
|
return False |
|
|
|
|
|
|
|
|
if width < frame_width * min_proportion or height < frame_height * min_proportion: |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def process_video(video_path, output_dir, min_width, min_height, model, device): |
|
|
"""Process a single video for face detection and scene changes.""" |
|
|
|
|
|
try: |
|
|
video = open_video(video_path) |
|
|
scene_manager = SceneManager() |
|
|
scene_manager.add_detector(ContentDetector(threshold=30.0)) |
|
|
except Exception as e: |
|
|
print(f"Error initializing video for scene detection in {video_path}: {e}") |
|
|
return |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
print(f"Error opening video file {video_path}") |
|
|
return |
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
|
|
|
try: |
|
|
scene_manager.detect_scenes(video=video) |
|
|
scene_list = scene_manager.get_scene_list() |
|
|
scene_starts = [scene[0].get_frames() for scene in scene_list] |
|
|
except Exception as e: |
|
|
print(f"Error detecting scenes in {video_path}: {e}") |
|
|
cap.release() |
|
|
return |
|
|
|
|
|
scene_index = 0 |
|
|
face_detected_in_scene = False |
|
|
frame_idx = 0 |
|
|
output_count = 0 |
|
|
video_name = os.path.splitext(os.path.basename(video_path))[0] |
|
|
|
|
|
while cap.isOpened(): |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
|
|
|
if scene_index < len(scene_starts) and frame_idx >= scene_starts[scene_index]: |
|
|
face_detected_in_scene = False |
|
|
scene_index += 1 |
|
|
print(f"New scene detected at frame {frame_idx}") |
|
|
|
|
|
|
|
|
if not face_detected_in_scene: |
|
|
try: |
|
|
results = model.predict(frame, classes=[0], conf=0.75, device=device) |
|
|
|
|
|
for result in results: |
|
|
boxes = result.boxes.xyxy.cpu().numpy() |
|
|
confidences = result.boxes.conf.cpu().numpy() |
|
|
classes = result.boxes.cls.cpu().numpy() |
|
|
|
|
|
for box, conf, cls in zip(boxes, confidences, classes): |
|
|
if cls == 0: |
|
|
if is_full_face(box, frame.shape, min_width, min_height): |
|
|
|
|
|
output_path = os.path.join(output_dir, f"{video_name}_face_{output_count:04d}.png") |
|
|
cv2.imwrite(output_path, frame) |
|
|
print(f"Saved screenshot: {output_path}") |
|
|
output_count += 1 |
|
|
face_detected_in_scene = True |
|
|
break |
|
|
if face_detected_in_scene: |
|
|
break |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error during face detection in {video_path}: {e}") |
|
|
|
|
|
frame_idx += 1 |
|
|
|
|
|
cap.release() |
|
|
print(f"Processed {video_path}: {output_count} screenshots saved.") |
|
|
|
|
|
def main(): |
|
|
"""Main function to process videos in input directory.""" |
|
|
args = parse_arguments() |
|
|
|
|
|
|
|
|
if not os.path.isdir(args.input_dir): |
|
|
print(f"Error: Input directory '{args.input_dir}' does not exist.") |
|
|
return |
|
|
|
|
|
|
|
|
ensure_directory(args.output_dir) |
|
|
|
|
|
|
|
|
device = check_cuda() |
|
|
|
|
|
|
|
|
try: |
|
|
model = YOLO("yolov11l.pt") |
|
|
model.to(device) |
|
|
print(f"YOLO model loaded on device: {device}") |
|
|
except Exception as e: |
|
|
print(f"Error loading YOLO model: {e}") |
|
|
return |
|
|
|
|
|
|
|
|
video_extensions = ('.mp4', '.avi', '.mov', '.mkv') |
|
|
|
|
|
|
|
|
for filename in os.listdir(args.input_dir): |
|
|
if filename.lower().endswith(video_extensions): |
|
|
video_path = os.path.join(args.input_dir, filename) |
|
|
print(f"Processing video: {video_path}") |
|
|
process_video(video_path, args.output_dir, args.min_width, args.min_height, model, device) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|