github-actions
Deploy to Hugging Face
c794b6b
Raw
History Blame Contribute Delete
7.76 kB
import cv2
import numpy as np
from insightface.app import FaceAnalysis
import os
import threading
import time
from register_face import register_face
# Database paths
REAL_FACES_DB = "faces_db"
TEMP_DB_ROOT = "temp_face_database"
TEMP_EMB_ROOT = "temp_faces_db"
os.makedirs(TEMP_DB_ROOT, exist_ok=True)
os.makedirs(TEMP_EMB_ROOT, exist_ok=True)
# ---------------- MEMORY BUFFERS ----------------
embeddings_buffer = {}
image_buffer = {}
# ---------------- LOAD DATABASE ----------------
def load_database():
db = {}
if os.path.exists(REAL_FACES_DB):
for file in os.listdir(REAL_FACES_DB):
if file.endswith(".npy"):
name = file.replace(".npy", "")
db[name] = np.load(os.path.join(REAL_FACES_DB, file))
if os.path.exists(TEMP_EMB_ROOT):
for file in os.listdir(TEMP_EMB_ROOT):
if file.endswith(".npy"):
name = file.replace(".npy", "")
db[name] = np.load(os.path.join(TEMP_EMB_ROOT, file))
return db
def cosine_similarity(a, b):
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def get_next_unknown_id():
existing = [d for d in os.listdir(TEMP_DB_ROOT)
if os.path.isdir(os.path.join(TEMP_DB_ROOT, d)) and d.startswith("unknown_")]
if not existing:
return 1
ids = []
for d in existing:
try:
ids.append(int(d.split("_")[1]))
except (IndexError, ValueError):
pass
return max(ids) + 1 if ids else 1
# ---------------- SAVE FUNCTIONS ----------------
def save_all():
# Save embeddings
for name, emb_list in embeddings_buffer.items():
if len(emb_list) == 0:
continue
emb_array = np.array(emb_list)
np.save(os.path.join(TEMP_EMB_ROOT, f"{name}.npy"), emb_array)
# Save images
for name, images in image_buffer.items():
folder = os.path.join(TEMP_DB_ROOT, name)
os.makedirs(folder, exist_ok=True)
for i, img in enumerate(images):
cv2.imwrite(os.path.join(folder, f"{i}.jpg"), img)
def checkpoint_saver():
while True:
time.sleep(30)
save_all()
def main():
# Initialize model
app = FaceAnalysis(name='buffalo_l', allowed_modules=['detection', 'recognition'])
app.prepare(ctx_id=-1, det_size=(640, 640))
db = load_database()
# Start background checkpoint thread
threading.Thread(target=checkpoint_saver, daemon=True).start()
# ---------------- VIDEO ----------------
cap = cv2.VideoCapture(0)
if not cap.isOpened():
raise SystemExit("Could not open webcam.")
try:
while True:
ret, frame = cap.read()
if not ret:
break
faces = app.get(frame)
for face in faces:
x1, y1, x2, y2 = face.bbox.astype(int)
emb = face.embedding
best_match = "Unknown"
best_score = 0.0
for name, db_emb in db.items():
if db_emb.ndim == 1:
score = cosine_similarity(emb, db_emb)
else:
scores = [cosine_similarity(emb, view) for view in db_emb]
score = max(scores) if scores else 0.0
if score > best_score:
best_score = score
best_match = name
if best_match.startswith("unknown"):
threshold = 0.35
else:
threshold = 0.30
if best_score > threshold:
color = (0, 255, 0)
label = f"{best_match} ({best_score:.2f})"
# Store embedding in memory
embeddings_buffer.setdefault(best_match, []).append(emb)
# Limit embeddings
if len(embeddings_buffer[best_match]) > 50:
embeddings_buffer[best_match] = embeddings_buffer[best_match][-50:]
if best_match in db:
current_db_emb = db[best_match]
if current_db_emb.ndim == 1:
current_db_emb = np.expand_dims(current_db_emb, axis=0)
updated_emb = np.vstack([current_db_emb, emb])
if len(updated_emb) > 50:
updated_emb = updated_emb[-50:]
db[best_match] = updated_emb
else:
h, w, _ = frame.shape
pad_w = int((x2 - x1) * 0.25)
pad_h = int((y2 - y1) * 0.25)
crop_x1 = max(0, x1 - pad_w)
crop_y1 = max(0, y1 - pad_h)
crop_x2 = min(w, x2 + pad_w)
crop_y2 = min(h, y2 + pad_h)
face_crop = frame[crop_y1:crop_y2, crop_x1:crop_x2]
crop_faces = app.get(face_crop)
if len(crop_faces) == 0:
continue
crop_emb = crop_faces[0].embedding
check_match = "Unknown"
check_score = 0.0
for name, db_emb in db.items():
if db_emb.ndim == 1:
s = cosine_similarity(crop_emb, db_emb)
else:
scores = [cosine_similarity(crop_emb, view) for view in db_emb]
s = max(scores) if scores else 0.0
if s > check_score:
check_score = s
check_match = name
check_threshold = 0.35 if check_match.startswith("unknown") else 0.30
if check_score > check_threshold:
if check_match in db:
current_db_emb = db[check_match]
if current_db_emb.ndim == 1:
current_db_emb = np.expand_dims(current_db_emb, axis=0)
updated_emb = np.vstack([current_db_emb, crop_emb])
if len(updated_emb) > 50:
updated_emb = updated_emb[-50:]
db[check_match] = updated_emb
color = (0, 255, 0)
label = f"{check_match} ({check_score:.2f})"
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
cv2.putText(frame, label, (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
continue
new_id = get_next_unknown_id()
new_name = f"unknown_{new_id}"
# Store image in memory
image_buffer.setdefault(new_name, []).append(face_crop)
# Store embedding
embeddings_buffer.setdefault(new_name, []).append(emb)
db[new_name] = np.array([emb])
best_match = new_name
best_score = 1.0
color = (0, 255, 0)
label = f"{best_match} (New)"
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
cv2.putText(frame, label, (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
cv2.imshow("Live Face Recognition", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
cap.release()
cv2.destroyAllWindows()
save_all()
if __name__ == '__main__':
main()