Spaces:
Sleeping
Sleeping
File size: 6,538 Bytes
b53629f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | import cv2
from ultralytics import YOLO
from deepface import DeepFace
import numpy as np
import pickle
import os
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
class FaceRecognitionService:
def __init__(self, model_dir: str):
self.model_dir = Path(model_dir)
self.model_name = "ArcFace"
self.detector_model = "yolov8n-face.pt"
self.cache_file = self.model_dir / "embeddings_cache.pkl"
self.num_best_frames = 10
self.min_blur_threshold = 10
self.blur_weight = 0.6
self.frontal_weight = 0.4
def calculate_blur_score(self, image):
"""Calculate sharpness using Laplacian variance."""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
return laplacian_var
def calculate_frontal_score(self, face_data):
"""Calculate how frontal the face is based on facial area."""
try:
facial_area = face_data.get('facial_area', {})
area = facial_area.get('w', 0) * facial_area.get('h', 0)
frontal_score = min(area / 50000.0, 1.0) * 100
return frontal_score
except:
return 50.0
def extract_embeddings_from_video(self, video_path: str):
"""Extract high-quality face embeddings from a 360-degree scan video."""
logger.info(f"Processing reference video scan: {video_path}")
logger.info("Phase 1: Analyzing frame quality...")
cap_ref = cv2.VideoCapture(video_path)
total_frames = int(cap_ref.get(cv2.CAP_PROP_FRAME_COUNT))
candidate_frames = []
frame_idx = 0
while cap_ref.isOpened():
ret, frame = cap_ref.read()
if not ret:
break
if frame_idx % 15 == 0:
blur_score = self.calculate_blur_score(frame)
if blur_score <= self.min_blur_threshold:
logger.debug(f"Frame {frame_idx}: Blur={blur_score:.1f} (too blurry, skipped)")
if blur_score > self.min_blur_threshold:
try:
face_data = DeepFace.represent(frame, model_name=self.model_name, enforce_detection=True)[0]
frontal_score = self.calculate_frontal_score(face_data)
quality_score = (blur_score * self.blur_weight) + (frontal_score * self.frontal_weight)
candidate_frames.append({
'frame_idx': frame_idx,
'frame': frame.copy(),
'blur_score': blur_score,
'frontal_score': frontal_score,
'quality_score': quality_score,
'embedding': face_data['embedding']
})
logger.debug(f"Frame {frame_idx}: Quality={quality_score:.1f}")
except Exception as e:
logger.debug(f"Frame {frame_idx}: No face detected - {e}")
frame_idx += 1
cap_ref.release()
if not candidate_frames:
raise ValueError("No valid frames found in video")
logger.info(f"Phase 2: Selecting top {self.num_best_frames} frames with temporal spacing...")
candidate_frames.sort(key=lambda x: x['quality_score'], reverse=True)
segment_size = total_frames // self.num_best_frames
selected_frames = []
for segment_idx in range(self.num_best_frames):
segment_start = segment_idx * segment_size
segment_end = (segment_idx + 1) * segment_size
best_in_segment = None
best_quality = -1
for candidate in candidate_frames:
if segment_start <= candidate['frame_idx'] < segment_end:
if candidate['quality_score'] > best_quality:
best_quality = candidate['quality_score']
best_in_segment = candidate
if best_in_segment:
selected_frames.append(best_in_segment)
logger.debug(f"Segment {segment_idx+1}: Frame {best_in_segment['frame_idx']}")
if len(selected_frames) < self.num_best_frames:
for candidate in candidate_frames:
if candidate not in selected_frames:
selected_frames.append(candidate)
if len(selected_frames) >= self.num_best_frames:
break
logger.info(f"Phase 3: Averaging {len(selected_frames)} embeddings...")
embeddings_to_average = [frame['embedding'] for frame in selected_frames]
master_embedding = np.mean(embeddings_to_average, axis=0).tolist()
return [master_embedding], len(selected_frames)
def extract_embeddings_from_image(self, image_path: str):
"""Extract face embedding from a single image."""
try:
embedding = DeepFace.represent(img_path=image_path, model_name=self.model_name)[0]["embedding"]
return [embedding]
except Exception as e:
logger.error(f"Could not extract embedding from {image_path}: {e}")
raise
def save_embeddings_cache(self, embeddings, video_path: str, num_frames_used: int):
"""Save embeddings to cache file."""
cache_data = {
'video_path': video_path,
'video_mtime': os.path.getmtime(video_path) if os.path.exists(video_path) else None,
'model_name': self.model_name,
'embeddings': embeddings,
'version': 2,
'num_frames_used': num_frames_used
}
with open(self.cache_file, 'wb') as f:
pickle.dump(cache_data, f)
logger.info(f"Saved embeddings cache to {self.cache_file}")
def load_embeddings_cache(self):
"""Load embeddings from cache file."""
if not os.path.exists(self.cache_file):
return None
try:
with open(self.cache_file, 'rb') as f:
cache_data = pickle.load(f)
return cache_data
except Exception as e:
logger.error(f"Could not load cache: {e}")
return None
|