import cv2 import numpy as np from insightface.app import FaceAnalysis import os import threading import time from register_face import register_face # Database paths REAL_FACES_DB = "faces_db" TEMP_DB_ROOT = "temp_face_database" TEMP_EMB_ROOT = "temp_faces_db" os.makedirs(TEMP_DB_ROOT, exist_ok=True) os.makedirs(TEMP_EMB_ROOT, exist_ok=True) # ---------------- MEMORY BUFFERS ---------------- embeddings_buffer = {} image_buffer = {} # ---------------- LOAD DATABASE ---------------- def load_database(): db = {} if os.path.exists(REAL_FACES_DB): for file in os.listdir(REAL_FACES_DB): if file.endswith(".npy"): name = file.replace(".npy", "") db[name] = np.load(os.path.join(REAL_FACES_DB, file)) if os.path.exists(TEMP_EMB_ROOT): for file in os.listdir(TEMP_EMB_ROOT): if file.endswith(".npy"): name = file.replace(".npy", "") db[name] = np.load(os.path.join(TEMP_EMB_ROOT, file)) return db def cosine_similarity(a, b): return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) def get_next_unknown_id(): existing = [d for d in os.listdir(TEMP_DB_ROOT) if os.path.isdir(os.path.join(TEMP_DB_ROOT, d)) and d.startswith("unknown_")] if not existing: return 1 ids = [] for d in existing: try: ids.append(int(d.split("_")[1])) except (IndexError, ValueError): pass return max(ids) + 1 if ids else 1 # ---------------- SAVE FUNCTIONS ---------------- def save_all(): # Save embeddings for name, emb_list in embeddings_buffer.items(): if len(emb_list) == 0: continue emb_array = np.array(emb_list) np.save(os.path.join(TEMP_EMB_ROOT, f"{name}.npy"), emb_array) # Save images for name, images in image_buffer.items(): folder = os.path.join(TEMP_DB_ROOT, name) os.makedirs(folder, exist_ok=True) for i, img in enumerate(images): cv2.imwrite(os.path.join(folder, f"{i}.jpg"), img) def checkpoint_saver(): while True: time.sleep(30) save_all() def main(): # Initialize model app = FaceAnalysis(name='buffalo_l', allowed_modules=['detection', 'recognition']) app.prepare(ctx_id=-1, det_size=(640, 640)) db = load_database() # Start background checkpoint thread threading.Thread(target=checkpoint_saver, daemon=True).start() # ---------------- VIDEO ---------------- cap = cv2.VideoCapture(0) if not cap.isOpened(): raise SystemExit("Could not open webcam.") try: while True: ret, frame = cap.read() if not ret: break faces = app.get(frame) for face in faces: x1, y1, x2, y2 = face.bbox.astype(int) emb = face.embedding best_match = "Unknown" best_score = 0.0 for name, db_emb in db.items(): if db_emb.ndim == 1: score = cosine_similarity(emb, db_emb) else: scores = [cosine_similarity(emb, view) for view in db_emb] score = max(scores) if scores else 0.0 if score > best_score: best_score = score best_match = name if best_match.startswith("unknown"): threshold = 0.35 else: threshold = 0.30 if best_score > threshold: color = (0, 255, 0) label = f"{best_match} ({best_score:.2f})" # Store embedding in memory embeddings_buffer.setdefault(best_match, []).append(emb) # Limit embeddings if len(embeddings_buffer[best_match]) > 50: embeddings_buffer[best_match] = embeddings_buffer[best_match][-50:] if best_match in db: current_db_emb = db[best_match] if current_db_emb.ndim == 1: current_db_emb = np.expand_dims(current_db_emb, axis=0) updated_emb = np.vstack([current_db_emb, emb]) if len(updated_emb) > 50: updated_emb = updated_emb[-50:] db[best_match] = updated_emb else: h, w, _ = frame.shape pad_w = int((x2 - x1) * 0.25) pad_h = int((y2 - y1) * 0.25) crop_x1 = max(0, x1 - pad_w) crop_y1 = max(0, y1 - pad_h) crop_x2 = min(w, x2 + pad_w) crop_y2 = min(h, y2 + pad_h) face_crop = frame[crop_y1:crop_y2, crop_x1:crop_x2] crop_faces = app.get(face_crop) if len(crop_faces) == 0: continue crop_emb = crop_faces[0].embedding check_match = "Unknown" check_score = 0.0 for name, db_emb in db.items(): if db_emb.ndim == 1: s = cosine_similarity(crop_emb, db_emb) else: scores = [cosine_similarity(crop_emb, view) for view in db_emb] s = max(scores) if scores else 0.0 if s > check_score: check_score = s check_match = name check_threshold = 0.35 if check_match.startswith("unknown") else 0.30 if check_score > check_threshold: if check_match in db: current_db_emb = db[check_match] if current_db_emb.ndim == 1: current_db_emb = np.expand_dims(current_db_emb, axis=0) updated_emb = np.vstack([current_db_emb, crop_emb]) if len(updated_emb) > 50: updated_emb = updated_emb[-50:] db[check_match] = updated_emb color = (0, 255, 0) label = f"{check_match} ({check_score:.2f})" cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2) continue new_id = get_next_unknown_id() new_name = f"unknown_{new_id}" # Store image in memory image_buffer.setdefault(new_name, []).append(face_crop) # Store embedding embeddings_buffer.setdefault(new_name, []).append(emb) db[new_name] = np.array([emb]) best_match = new_name best_score = 1.0 color = (0, 255, 0) label = f"{best_match} (New)" cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2) cv2.imshow("Live Face Recognition", frame) if cv2.waitKey(1) & 0xFF == ord('q'): break finally: cap.release() cv2.destroyAllWindows() save_all() if __name__ == '__main__': main()