import cv2 # video handling import pandas as pd # saving detections from tqdm import tqdm # progress bar import load_models from load_models import warm_up_models import os import pandas as pd from tqdm import tqdm import cv2 import os import pandas as pd from tqdm import tqdm from insightface.app import FaceAnalysis import json import shutil import streamlit #VIDEO INPUT def detect_faces_insightface_to_csv( input_video: str, output_csv: str = None, app: FaceAnalysis = None, start_frame: int = 0, end_frame: int = None ) -> None: """ Detects faces in a video using InsightFace and saves bounding boxes, landmarks, and embeddings to a CSV file. """ if output_csv is None: os.makedirs("meta_data", exist_ok=True) base_name = os.path.splitext(os.path.basename(input_video))[0] base_name = 'test' output_csv = os.path.join("meta_data", f"{base_name}_detections.csv") if app is None: app = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider']) app.prepare(ctx_id=0) cap = cv2.VideoCapture(input_video) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if end_frame is None: end_frame = total_frames - 1 assert 0 <= start_frame <= end_frame < total_frames, "Invalid frame range." all_detections = [] print(f"🔍 Detecting faces from frame {start_frame} to {end_frame} with InsightFace...") cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) current_frame = start_frame with tqdm(total=end_frame - start_frame + 1, desc="Frames processed") as pbar: while current_frame <= end_frame: ret, frame = cap.read() if not ret: print(f"⚠️ Frame {current_frame} could not be read.") break faces = app.get(frame) for face in faces: x1, y1, x2, y2 = map(int, face.bbox) landmarks = face.kps.tolist() if hasattr(face, 'kps') else [] normed_embedding = face.normed_embedding.tolist() if hasattr(face, 'normed_embedding') and face.normed_embedding is not None else [] all_detections.append({ "frame": current_frame, "x1": x1, "y1": y1, "x2": x2, "y2": y2, "landmarks": json.dumps(landmarks), "normed_embedding": json.dumps(normed_embedding) }) current_frame += 1 pbar.update(1) cap.release() df = pd.DataFrame(all_detections) df.to_csv(output_csv, index=False) print(f"✅ InsightFace detections saved to {output_csv}") def extract_faces_from_csv( input_video: str, csv_path: str, output_folder: str = "output_faces", padding: int = 10 # number of pixels to expand each side ) -> None: """ Extracts and saves face crops from a video using bounding boxes in a CSV. Args: input_video (str): Path to original video. csv_path (str): Path to CSV with face bounding boxes. output_folder (str): Directory to save cropped face images. padding (int): Padding in pixels to add around each face crop. """ import os import cv2 import pandas as pd from tqdm import tqdm df = pd.read_csv(csv_path) if df.empty: print("❌ CSV is empty — no faces to extract.") return os.makedirs(output_folder, exist_ok=True) cap = cv2.VideoCapture(input_video) assert cap.isOpened(), "Failed to open input video" frame_cache = {} print("📦 Extracting faces...") for frame_id in tqdm(sorted(df["frame"].unique()), desc="Reading frames"): cap.set(cv2.CAP_PROP_POS_FRAMES, frame_id) ret, frame = cap.read() if not ret: print(f"⚠️ Could not read frame {frame_id}") continue frame_cache[frame_id] = frame.copy() for i, row in tqdm(df.iterrows(), total=len(df), desc="Cropping faces"): frame_id = row["frame"] x1, y1, x2, y2 = int(row["x1"]), int(row["y1"]), int(row["x2"]), int(row["y2"]) frame = frame_cache.get(frame_id) if frame is None: continue height, width = frame.shape[:2] x1_p = max(0, x1 - padding) y1_p = max(0, y1 - padding) x2_p = min(width, x2 + padding) y2_p = min(height, y2 + padding) crop = frame[y1_p:y2_p, x1_p:x2_p] img_resized = cv2.resize(crop, (112, 112)) crop_path = os.path.join(output_folder, f"frame_{frame_id}_face_{i}.jpg") cv2.imwrite(crop_path, img_resized) # ✅ Save the resized image cap.release() print(f"✅ Saved 112x112 face crops to: {output_folder}") #FRAMES INPUT def detect_faces_insightface_from_frames( frames_folder: str, output_csv: str = None, app: FaceAnalysis = None, streamlit_progress=None, progress_range=(0, 100) ) -> None: """ Detects faces in a folder of frames using InsightFace and saves results to a CSV. """ frame_paths = sorted( [f for f in os.listdir(frames_folder) if f.lower().endswith((".jpg", ".png"))], key=lambda x: int(os.path.splitext(x)[0]) ) if output_csv is None: os.makedirs("meta_data", exist_ok=True) base_name = os.path.basename(os.path.normpath(frames_folder)) output_csv = os.path.join("meta_data", f"{base_name}_detections.csv") if app is None: app = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider']) app.prepare(ctx_id=0) all_detections = [] print("✅ Total frames found:", len(frame_paths)) print("✅ First few frames:", frame_paths[:10]) print("✅ Last few frames:", frame_paths[-10:]) print(f"🔍 Detecting faces in {len(frame_paths)} frames with InsightFace...") num_faces = 0 total = len(frame_paths) start, end = progress_range for i, filename in enumerate(frame_paths): # <-- Use enumerate to get index frame_index = int(os.path.splitext(filename)[0]) frame_path = os.path.join(frames_folder, filename) frame = cv2.imread(frame_path) if streamlit_progress: progress_value = start + (end - start) * (i + 1) / total streamlit_progress.progress(int(progress_value)) if frame is None: print(f"⚠️ Could not read frame: {frame_path}") continue faces = app.get(frame) for face in faces: x1, y1, x2, y2 = map(int, face.bbox) landmarks = face.kps.tolist() if hasattr(face, 'kps') else [] normed_embedding = face.normed_embedding.tolist() if hasattr(face, 'normed_embedding') and face.normed_embedding is not None else [] all_detections.append({ "frame": frame_index, "x1": x1, "y1": y1, "x2": x2, "y2": y2, "landmarks": json.dumps(landmarks), "normed_embedding": json.dumps(normed_embedding) }) num_faces += 1 df = pd.DataFrame(all_detections) df.to_csv(output_csv, index=False) print(f"✅ InsightFace detections saved to {output_csv}") print(f"✅ Number of faces detected: {num_faces}") def extract_faces_from_frames_folder( frames_folder: str, csv_path: str, output_folder: str = "output_faces", padding: int = 10, streamlit_progress=None, progress_range=(0, 100) ) -> None: """ Extracts and saves face crops using bounding boxes from CSV and frames in a folder. Args: frames_folder (str): Path to folder containing frame images (named numerically). csv_path (str): Path to CSV with face bounding boxes. output_folder (str): Directory to save cropped face images. padding (int): Padding in pixels to add around each face crop. """ df = pd.read_csv(csv_path) if df.empty: print("❌ CSV is empty — no faces to extract.") return # Clean the output folder before saving crops if os.path.exists(output_folder): shutil.rmtree(output_folder) os.makedirs(output_folder, exist_ok=True) # Preload all frames into a cache frame_cache = {} unique_frames = sorted(df["frame"].unique()) print("📦 Reading frames from folder...") for frame_id in tqdm(unique_frames, desc="Reading frames"): frame_path = os.path.join(frames_folder, f"{frame_id}.jpg") frame = cv2.imread(frame_path) if frame is None: print(f"⚠️ Could not read frame {frame_path}") continue frame_cache[frame_id] = frame print("✂️ Cropping faces...") total_rows = len(df) start, end = progress_range for i, row in tqdm(df.iterrows(), total=total_rows, desc="Cropping faces"): if streamlit_progress: progress_value = start + (end - start) * (i + 1) / total_rows streamlit_progress.progress(int(progress_value)) frame_id = row["frame"] x1, y1, x2, y2 = int(row["x1"]), int(row["y1"]), int(row["x2"]), int(row["y2"]) frame = frame_cache.get(frame_id) if frame is None: continue height, width = frame.shape[:2] x1_p = max(0, x1 - padding) y1_p = max(0, y1 - padding) x2_p = min(width, x2 + padding) y2_p = min(height, y2 + padding) crop = frame[y1_p:y2_p, x1_p:x2_p] img_resized = cv2.resize(crop, (112, 112)) crop_path = os.path.join(output_folder, f"frame_{frame_id}_face_{i}.jpg") cv2.imwrite(crop_path, img_resized) print(f"✅ Saved 112x112 face crops to: {output_folder}") if __name__ == "__main__": from grouping import group_faces_by_identity_facenet input_video = "/Users/sophiemaw/Downloads/CONFIDENTIAL DO NOT SHARE Edna & Paul 29.10.10 Part 2 00.12.46.531.mov" model_path = "/Users/sophiemaw/Documents/VASR_NEW/pythonProject/models/face_detection/yolov8l-face-lindevs.pt" csv_path = 'meta_data/test_detections.csv' detect_faces_insightface_to_csv(input_video=input_video,end_frame=10) extract_faces_from_csv(input_video=input_video,csv_path=csv_path,padding=30) group_faces_by_identity_facenet()