| import cv2 |
| import pandas as pd |
| from tqdm import tqdm |
| import load_models |
| from load_models import warm_up_models |
| import os |
| import pandas as pd |
| from tqdm import tqdm |
| from ultralytics import YOLO |
| import cv2 |
| import os |
| import pandas as pd |
| from tqdm import tqdm |
| from insightface.app import FaceAnalysis |
| import json |
| import shutil |
| import streamlit |
|
|
|
|
| def test_face_detection(): |
| input_video = "/Users/sophiemaw/Downloads/CONFIDENTIAL DO NOT SHARE Edna & Paul 29.10.10 Part 2 00.12.46.531.mov" |
| model_path = "/Users/sophiemaw/Documents/VASR_NEW/pythonProject/models/face_detection/yolov8l-face-lindevs.pt" |
|
|
| warm_up_models() |
| model = load_models.yolo_model |
|
|
| cap = cv2.VideoCapture(input_video) |
| frame_id = 0 |
|
|
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret or frame_id > 10: |
| break |
|
|
| print(f"Running detection on frame {frame_id}") |
| results = model.predict(source=frame, stream=False, verbose=False)[0] |
| print(f"Detected {len(results.boxes)} faces") |
|
|
| frame_id += 1 |
|
|
| cap.release() |
| print("β
YOLO ran without crash.") |
|
|
| def detect_faces_yolo_to_csv( |
| input_video: str, |
| output_csv: str = None, |
| model: YOLO = None, |
| start_frame: int = 0, |
| end_frame: int = None |
| ) -> None: |
| """ |
| Detects faces in a video using YOLOv8 and saves bounding boxes to a CSV file. |
| |
| Args: |
| input_video (str): Path to the input video file. |
| output_csv (str, optional): Path to save the CSV file. If None, saves to 'meta_data/'. |
| model (YOLO, optional): A pre-loaded YOLO model. If None, a new one will be loaded. |
| start_frame (int): The first frame to start detection from. |
| end_frame (int): The last frame to process (inclusive). If None, goes until the end. |
| """ |
| if output_csv is None: |
| os.makedirs("meta_data", exist_ok=True) |
| base_name = os.path.splitext(os.path.basename(input_video))[0] |
| base_name = 'test' |
| output_csv = os.path.join("meta_data", f"{base_name}_detections.csv") |
|
|
| if model is None: |
| from load_models import warm_up_models |
| warm_up_models() |
| from load_models import yolo_model |
| model = yolo_model |
|
|
| cap = cv2.VideoCapture(input_video) |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
| if end_frame is None: |
| end_frame = total_frames - 1 |
|
|
| assert 0 <= start_frame <= end_frame < total_frames, "Invalid frame range." |
|
|
| all_detections = [] |
|
|
| print(f"π Detecting faces from frame {start_frame} to {end_frame}...") |
| cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) |
| current_frame = start_frame |
|
|
| with tqdm(total=end_frame - start_frame + 1, desc="Frames processed") as pbar: |
| while current_frame <= end_frame: |
| ret, frame = cap.read() |
| if not ret: |
| print(f"β οΈ Frame {current_frame} could not be read.") |
| break |
|
|
| results = model.predict(source=frame, stream=False, verbose=False)[0] |
|
|
| for box in results.boxes.xyxy.cpu().numpy(): |
| x1, y1, x2, y2 = box[:4] |
| all_detections.append({ |
| "frame": current_frame, |
| "x1": int(x1), |
| "y1": int(y1), |
| "x2": int(x2), |
| "y2": int(y2) |
| }) |
|
|
| current_frame += 1 |
| pbar.update(1) |
|
|
| cap.release() |
|
|
| df = pd.DataFrame(all_detections) |
| df.to_csv(output_csv, index=False) |
| print(f"β
Face detections saved to {output_csv}") |
|
|
| |
| def detect_faces_insightface_to_csv( |
| input_video: str, |
| output_csv: str = None, |
| app: FaceAnalysis = None, |
| start_frame: int = 0, |
| end_frame: int = None |
| ) -> None: |
| """ |
| Detects faces in a video using InsightFace and saves bounding boxes, landmarks, and embeddings to a CSV file. |
| """ |
| if output_csv is None: |
| os.makedirs("meta_data", exist_ok=True) |
| base_name = os.path.splitext(os.path.basename(input_video))[0] |
| base_name = 'test' |
| output_csv = os.path.join("meta_data", f"{base_name}_detections.csv") |
|
|
| if app is None: |
| app = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider']) |
| app.prepare(ctx_id=0) |
|
|
| cap = cv2.VideoCapture(input_video) |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
| if end_frame is None: |
| end_frame = total_frames - 1 |
|
|
| assert 0 <= start_frame <= end_frame < total_frames, "Invalid frame range." |
|
|
| all_detections = [] |
|
|
| print(f"π Detecting faces from frame {start_frame} to {end_frame} with InsightFace...") |
| cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) |
| current_frame = start_frame |
|
|
| with tqdm(total=end_frame - start_frame + 1, desc="Frames processed") as pbar: |
| while current_frame <= end_frame: |
| ret, frame = cap.read() |
| if not ret: |
| print(f"β οΈ Frame {current_frame} could not be read.") |
| break |
|
|
| faces = app.get(frame) |
| for face in faces: |
| x1, y1, x2, y2 = map(int, face.bbox) |
| landmarks = face.kps.tolist() if hasattr(face, 'kps') else [] |
| normed_embedding = face.normed_embedding.tolist() if hasattr(face, |
| 'normed_embedding') and face.normed_embedding is not None else [] |
|
|
| all_detections.append({ |
| "frame": current_frame, |
| "x1": x1, |
| "y1": y1, |
| "x2": x2, |
| "y2": y2, |
| "landmarks": json.dumps(landmarks), |
| "normed_embedding": json.dumps(normed_embedding) |
| }) |
|
|
| current_frame += 1 |
| pbar.update(1) |
|
|
| cap.release() |
|
|
| df = pd.DataFrame(all_detections) |
| df.to_csv(output_csv, index=False) |
| print(f"β
InsightFace detections saved to {output_csv}") |
| def extract_faces_from_csv( |
| input_video: str, |
| csv_path: str, |
| output_folder: str = "output_faces", |
| padding: int = 10 |
| ) -> None: |
| """ |
| Extracts and saves face crops from a video using bounding boxes in a CSV. |
| |
| Args: |
| input_video (str): Path to original video. |
| csv_path (str): Path to CSV with face bounding boxes. |
| output_folder (str): Directory to save cropped face images. |
| padding (int): Padding in pixels to add around each face crop. |
| """ |
| import os |
| import cv2 |
| import pandas as pd |
| from tqdm import tqdm |
|
|
| df = pd.read_csv(csv_path) |
| if df.empty: |
| print("β CSV is empty β no faces to extract.") |
| return |
|
|
| os.makedirs(output_folder, exist_ok=True) |
|
|
| cap = cv2.VideoCapture(input_video) |
| assert cap.isOpened(), "Failed to open input video" |
|
|
| frame_cache = {} |
|
|
| print("π¦ Extracting faces...") |
| for frame_id in tqdm(sorted(df["frame"].unique()), desc="Reading frames"): |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_id) |
| ret, frame = cap.read() |
| if not ret: |
| print(f"β οΈ Could not read frame {frame_id}") |
| continue |
| frame_cache[frame_id] = frame.copy() |
|
|
| for i, row in tqdm(df.iterrows(), total=len(df), desc="Cropping faces"): |
| frame_id = row["frame"] |
| x1, y1, x2, y2 = int(row["x1"]), int(row["y1"]), int(row["x2"]), int(row["y2"]) |
| frame = frame_cache.get(frame_id) |
| if frame is None: |
| continue |
|
|
| height, width = frame.shape[:2] |
| x1_p = max(0, x1 - padding) |
| y1_p = max(0, y1 - padding) |
| x2_p = min(width, x2 + padding) |
| y2_p = min(height, y2 + padding) |
|
|
| crop = frame[y1_p:y2_p, x1_p:x2_p] |
| img_resized = cv2.resize(crop, (112, 112)) |
|
|
| crop_path = os.path.join(output_folder, f"frame_{frame_id}_face_{i}.jpg") |
| cv2.imwrite(crop_path, img_resized) |
|
|
| cap.release() |
| print(f"β
Saved 112x112 face crops to: {output_folder}") |
|
|
| |
| def detect_faces_insightface_from_frames( |
| frames_folder: str, |
| output_csv: str = None, |
| app: FaceAnalysis = None, |
| streamlit_progress=None, |
| progress_range=(0, 100) |
| ) -> None: |
| """ |
| Detects faces in a folder of frames using InsightFace and saves results to a CSV. |
| """ |
| frame_paths = sorted( |
| [f for f in os.listdir(frames_folder) if f.lower().endswith((".jpg", ".png"))], |
| key=lambda x: int(os.path.splitext(x)[0]) |
| ) |
|
|
| if output_csv is None: |
| os.makedirs("meta_data", exist_ok=True) |
| base_name = os.path.basename(os.path.normpath(frames_folder)) |
| output_csv = os.path.join("meta_data", f"{base_name}_detections.csv") |
|
|
| if app is None: |
| app = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider']) |
| app.prepare(ctx_id=0) |
|
|
| all_detections = [] |
| print("β
Total frames found:", len(frame_paths)) |
| print("β
First few frames:", frame_paths[:10]) |
| print("β
Last few frames:", frame_paths[-10:]) |
|
|
| print(f"π Detecting faces in {len(frame_paths)} frames with InsightFace...") |
| num_faces = 0 |
| total = len(frame_paths) |
| start, end = progress_range |
| for i, filename in enumerate(frame_paths): |
| frame_index = int(os.path.splitext(filename)[0]) |
| frame_path = os.path.join(frames_folder, filename) |
| frame = cv2.imread(frame_path) |
|
|
| if streamlit_progress: |
| progress_value = start + (end - start) * (i + 1) / total |
| streamlit_progress.progress(int(progress_value)) |
|
|
| if frame is None: |
| print(f"β οΈ Could not read frame: {frame_path}") |
| continue |
|
|
| faces = app.get(frame) |
| for face in faces: |
| x1, y1, x2, y2 = map(int, face.bbox) |
| landmarks = face.kps.tolist() if hasattr(face, 'kps') else [] |
| normed_embedding = face.normed_embedding.tolist() if hasattr(face, |
| 'normed_embedding') and face.normed_embedding is not None else [] |
|
|
| all_detections.append({ |
| "frame": frame_index, |
| "x1": x1, |
| "y1": y1, |
| "x2": x2, |
| "y2": y2, |
| "landmarks": json.dumps(landmarks), |
| "normed_embedding": json.dumps(normed_embedding) |
| }) |
| num_faces += 1 |
|
|
| df = pd.DataFrame(all_detections) |
| df.to_csv(output_csv, index=False) |
| print(f"β
InsightFace detections saved to {output_csv}") |
| print(f"β
Number of faces detected: {num_faces}") |
|
|
| def extract_faces_from_frames_folder( |
| frames_folder: str, |
| csv_path: str, |
| output_folder: str = "output_faces", |
| padding: int = 10, |
| streamlit_progress=None, |
| progress_range=(0, 100) |
| ) -> None: |
| """ |
| Extracts and saves face crops using bounding boxes from CSV and frames in a folder. |
| |
| Args: |
| frames_folder (str): Path to folder containing frame images (named numerically). |
| csv_path (str): Path to CSV with face bounding boxes. |
| output_folder (str): Directory to save cropped face images. |
| padding (int): Padding in pixels to add around each face crop. |
| """ |
| df = pd.read_csv(csv_path) |
| if df.empty: |
| print("β CSV is empty β no faces to extract.") |
| return |
|
|
| |
| if os.path.exists(output_folder): |
| shutil.rmtree(output_folder) |
| os.makedirs(output_folder, exist_ok=True) |
|
|
| |
| frame_cache = {} |
| unique_frames = sorted(df["frame"].unique()) |
|
|
| print("π¦ Reading frames from folder...") |
| for frame_id in tqdm(unique_frames, desc="Reading frames"): |
| frame_path = os.path.join(frames_folder, f"{frame_id}.jpg") |
| frame = cv2.imread(frame_path) |
| if frame is None: |
| print(f"β οΈ Could not read frame {frame_path}") |
| continue |
| frame_cache[frame_id] = frame |
|
|
| print("βοΈ Cropping faces...") |
| total_rows = len(df) |
| start, end = progress_range |
| for i, row in tqdm(df.iterrows(), total=total_rows, desc="Cropping faces"): |
| if streamlit_progress: |
| progress_value = start + (end - start) * (i + 1) / total_rows |
| streamlit_progress.progress(int(progress_value)) |
| frame_id = row["frame"] |
| x1, y1, x2, y2 = int(row["x1"]), int(row["y1"]), int(row["x2"]), int(row["y2"]) |
| frame = frame_cache.get(frame_id) |
| if frame is None: |
| continue |
|
|
| height, width = frame.shape[:2] |
| x1_p = max(0, x1 - padding) |
| y1_p = max(0, y1 - padding) |
| x2_p = min(width, x2 + padding) |
| y2_p = min(height, y2 + padding) |
|
|
| crop = frame[y1_p:y2_p, x1_p:x2_p] |
| img_resized = cv2.resize(crop, (112, 112)) |
|
|
| crop_path = os.path.join(output_folder, f"frame_{frame_id}_face_{i}.jpg") |
| cv2.imwrite(crop_path, img_resized) |
|
|
| print(f"β
Saved 112x112 face crops to: {output_folder}") |
|
|
|
|
| if __name__ == "__main__": |
| from grouping import group_faces_by_identity_facenet |
| input_video = "/Users/sophiemaw/Downloads/CONFIDENTIAL DO NOT SHARE Edna & Paul 29.10.10 Part 2 00.12.46.531.mov" |
| model_path = "/Users/sophiemaw/Documents/VASR_NEW/pythonProject/models/face_detection/yolov8l-face-lindevs.pt" |
| csv_path = 'meta_data/test_detections.csv' |
| detect_faces_insightface_to_csv(input_video=input_video,end_frame=10) |
| extract_faces_from_csv(input_video=input_video,csv_path=csv_path,padding=30) |
| group_faces_by_identity_facenet() |