| import cv2 |
| import numpy as np |
| from insightface.app import FaceAnalysis |
| import os |
| import threading |
| import time |
| from register_face import register_face |
|
|
| |
| REAL_FACES_DB = "faces_db" |
| TEMP_DB_ROOT = "temp_face_database" |
| TEMP_EMB_ROOT = "temp_faces_db" |
|
|
| os.makedirs(TEMP_DB_ROOT, exist_ok=True) |
| os.makedirs(TEMP_EMB_ROOT, exist_ok=True) |
|
|
| |
| embeddings_buffer = {} |
| image_buffer = {} |
|
|
| |
| def load_database(): |
| db = {} |
| if os.path.exists(REAL_FACES_DB): |
| for file in os.listdir(REAL_FACES_DB): |
| if file.endswith(".npy"): |
| name = file.replace(".npy", "") |
| db[name] = np.load(os.path.join(REAL_FACES_DB, file)) |
|
|
| if os.path.exists(TEMP_EMB_ROOT): |
| for file in os.listdir(TEMP_EMB_ROOT): |
| if file.endswith(".npy"): |
| name = file.replace(".npy", "") |
| db[name] = np.load(os.path.join(TEMP_EMB_ROOT, file)) |
| return db |
|
|
| def cosine_similarity(a, b): |
| return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) |
|
|
| def get_next_unknown_id(): |
| existing = [d for d in os.listdir(TEMP_DB_ROOT) |
| if os.path.isdir(os.path.join(TEMP_DB_ROOT, d)) and d.startswith("unknown_")] |
| if not existing: |
| return 1 |
| ids = [] |
| for d in existing: |
| try: |
| ids.append(int(d.split("_")[1])) |
| except (IndexError, ValueError): |
| pass |
| return max(ids) + 1 if ids else 1 |
|
|
| |
| def save_all(): |
| |
| for name, emb_list in embeddings_buffer.items(): |
| if len(emb_list) == 0: |
| continue |
| emb_array = np.array(emb_list) |
| np.save(os.path.join(TEMP_EMB_ROOT, f"{name}.npy"), emb_array) |
|
|
| |
| for name, images in image_buffer.items(): |
| folder = os.path.join(TEMP_DB_ROOT, name) |
| os.makedirs(folder, exist_ok=True) |
| for i, img in enumerate(images): |
| cv2.imwrite(os.path.join(folder, f"{i}.jpg"), img) |
|
|
| def checkpoint_saver(): |
| while True: |
| time.sleep(30) |
| save_all() |
|
|
| def main(): |
| |
| app = FaceAnalysis(name='buffalo_l', allowed_modules=['detection', 'recognition']) |
| app.prepare(ctx_id=-1, det_size=(640, 640)) |
|
|
| db = load_database() |
|
|
| |
| threading.Thread(target=checkpoint_saver, daemon=True).start() |
|
|
| |
| cap = cv2.VideoCapture(0) |
| if not cap.isOpened(): |
| raise SystemExit("Could not open webcam.") |
|
|
| try: |
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| faces = app.get(frame) |
|
|
| for face in faces: |
| x1, y1, x2, y2 = face.bbox.astype(int) |
| emb = face.embedding |
|
|
| best_match = "Unknown" |
| best_score = 0.0 |
|
|
| for name, db_emb in db.items(): |
| if db_emb.ndim == 1: |
| score = cosine_similarity(emb, db_emb) |
| else: |
| scores = [cosine_similarity(emb, view) for view in db_emb] |
| score = max(scores) if scores else 0.0 |
|
|
| if score > best_score: |
| best_score = score |
| best_match = name |
|
|
| if best_match.startswith("unknown"): |
| threshold = 0.35 |
| else: |
| threshold = 0.30 |
|
|
| if best_score > threshold: |
| color = (0, 255, 0) |
| label = f"{best_match} ({best_score:.2f})" |
|
|
| |
| embeddings_buffer.setdefault(best_match, []).append(emb) |
|
|
| |
| if len(embeddings_buffer[best_match]) > 50: |
| embeddings_buffer[best_match] = embeddings_buffer[best_match][-50:] |
|
|
| if best_match in db: |
| current_db_emb = db[best_match] |
| if current_db_emb.ndim == 1: |
| current_db_emb = np.expand_dims(current_db_emb, axis=0) |
|
|
| updated_emb = np.vstack([current_db_emb, emb]) |
|
|
| if len(updated_emb) > 50: |
| updated_emb = updated_emb[-50:] |
|
|
| db[best_match] = updated_emb |
|
|
| else: |
| h, w, _ = frame.shape |
| pad_w = int((x2 - x1) * 0.25) |
| pad_h = int((y2 - y1) * 0.25) |
|
|
| crop_x1 = max(0, x1 - pad_w) |
| crop_y1 = max(0, y1 - pad_h) |
| crop_x2 = min(w, x2 + pad_w) |
| crop_y2 = min(h, y2 + pad_h) |
|
|
| face_crop = frame[crop_y1:crop_y2, crop_x1:crop_x2] |
|
|
| crop_faces = app.get(face_crop) |
| if len(crop_faces) == 0: |
| continue |
|
|
| crop_emb = crop_faces[0].embedding |
|
|
| check_match = "Unknown" |
| check_score = 0.0 |
|
|
| for name, db_emb in db.items(): |
| if db_emb.ndim == 1: |
| s = cosine_similarity(crop_emb, db_emb) |
| else: |
| scores = [cosine_similarity(crop_emb, view) for view in db_emb] |
| s = max(scores) if scores else 0.0 |
|
|
| if s > check_score: |
| check_score = s |
| check_match = name |
|
|
| check_threshold = 0.35 if check_match.startswith("unknown") else 0.30 |
|
|
| if check_score > check_threshold: |
| if check_match in db: |
| current_db_emb = db[check_match] |
| if current_db_emb.ndim == 1: |
| current_db_emb = np.expand_dims(current_db_emb, axis=0) |
|
|
| updated_emb = np.vstack([current_db_emb, crop_emb]) |
| if len(updated_emb) > 50: |
| updated_emb = updated_emb[-50:] |
|
|
| db[check_match] = updated_emb |
|
|
| color = (0, 255, 0) |
| label = f"{check_match} ({check_score:.2f})" |
|
|
| cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) |
| cv2.putText(frame, label, (x1, y1 - 10), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2) |
| continue |
|
|
| new_id = get_next_unknown_id() |
| new_name = f"unknown_{new_id}" |
|
|
| |
| image_buffer.setdefault(new_name, []).append(face_crop) |
|
|
| |
| embeddings_buffer.setdefault(new_name, []).append(emb) |
|
|
| db[new_name] = np.array([emb]) |
|
|
| best_match = new_name |
| best_score = 1.0 |
| color = (0, 255, 0) |
| label = f"{best_match} (New)" |
|
|
| cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) |
| cv2.putText(frame, label, (x1, y1 - 10), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2) |
|
|
| cv2.imshow("Live Face Recognition", frame) |
|
|
| if cv2.waitKey(1) & 0xFF == ord('q'): |
| break |
| finally: |
| cap.release() |
| cv2.destroyAllWindows() |
| save_all() |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|