Spaces:
Running
Running
| from fastapi import APIRouter, File, UploadFile | |
| import numpy as np | |
| import cv2 | |
| import faiss | |
| import pickle | |
| import os | |
| import torch | |
| import insightface | |
| from insightface.utils import face_align | |
| import sys | |
| # โ Dockerfile์์ ํด๋ก ํ AdaFace ๊ฒฝ๋ก ์ถ๊ฐ | |
| sys.path.append('/app/AdaFace') | |
| import net | |
| router = APIRouter() | |
| # --- ์ค์ ๋ฐ ๊ฒฝ๋ก --- | |
| faiss_index_name = "face_faiss_index_v2.index" | |
| faiss_label_name = "face_faiss_labels_v2.pkl" | |
| load_path = os.path.abspath("embedding/person") # ์ค์ FAISS ํ์ผ ์์น๋ก ๋ณ๊ฒฝ ํ์ | |
| threshold = 45.0 # Unknown ํ๋ณ ์๊ณ๊ฐ | |
| # โ Hugging Face ๋ฌด๋ฃ CPU ๊ฐ์ ์ค์ | |
| device = torch.device('cpu') | |
| # --- 1. InsightFace (ํ์ง๊ธฐ) ๋ก๋ --- | |
| detector = insightface.app.FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider'], allowed_modules=['detection']) | |
| detector.prepare(ctx_id=0, det_size=(640, 640)) | |
| # --- 2. AdaFace (์ธ์๊ธฐ) ๋ก๋ --- | |
| model_path = "/app/adaface_ir101_webface12m.ckpt" | |
| adaface_model = net.build_model('ir_101') | |
| statedict = torch.load(model_path, map_location=device)["state_dict"] | |
| model_statedict = {key[6:]: val for key, val in statedict.items() if key.startswith("model.")} | |
| adaface_model.load_state_dict(model_statedict) | |
| adaface_model.to(device) | |
| adaface_model.eval() | |
| # --- 3. FAISS ๋ก๋ --- | |
| index = faiss.read_index(os.path.join(load_path, faiss_index_name)) | |
| with open(os.path.join(load_path, faiss_label_name), "rb") as f: | |
| labels = pickle.load(f) | |
| # โ AdaFace ์๋ฒ ๋ฉ ์ถ์ถ ํจ์ | |
| def extract_adaface_embedding(img_bgr, face_kps): | |
| aligned_face = face_align.norm_crop(img_bgr, landmark=face_kps, image_size=112) | |
| img_norm = (aligned_face / 255.0 - 0.5) / 0.5 | |
| img_tensor = torch.tensor(img_norm.transpose(2, 0, 1)).float().unsqueeze(0).to(device) | |
| with torch.no_grad(): | |
| embedding, _ = adaface_model(img_tensor) | |
| return embedding.cpu().numpy()[0] | |
| async def predict(file: UploadFile = File(...)): | |
| contents = await file.read() | |
| nparr = np.frombuffer(contents, np.uint8) | |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| if img is None: | |
| return {"success": False, "message": "โ ์ด๋ฏธ์ง๋ฅผ ์ฝ์ ์ ์์ต๋๋ค."} | |
| faces = detector.get(img) | |
| if not faces: | |
| return {"success": False, "message": "โ ์ผ๊ตด์ ์ฐพ์ ์ ์์ต๋๋ค."} | |
| results = [] | |
| for face in faces: | |
| # AdaFace ์๋ฒ ๋ฉ ์ถ์ถ ๋ฐ ์ ๊ทํ | |
| emb = extract_adaface_embedding(img, face.kps) | |
| emb = np.array([emb], dtype='float32') | |
| emb /= np.linalg.norm(emb, axis=1, keepdims=True) | |
| # FAISS ๊ฒ์ | |
| distances, indices = index.search(emb, k=1) | |
| best_match_idx = indices[0][0] | |
| similarity_score = distances[0][0] | |
| # ์ผ์น์จ ๊ณ์ฐ ๋ฐ Threshold ์ ์ฉ | |
| score_percent = max(0, similarity_score) * 100 | |
| if score_percent >= threshold: | |
| predicted_name = labels[best_match_idx] | |
| else: | |
| predicted_name = "Unknown" | |
| box = face.bbox.astype(int).tolist() | |
| results.append({ | |
| "label": predicted_name, | |
| "score": float(score_percent), | |
| "bbox": box # ํ๋ก ํธ์๋์์ ๋ฐ์ค๋ฅผ ๊ทธ๋ฆด ์ ์๋๋ก ์ขํ ๋ฐํ | |
| }) | |
| return {"success": True, "results": results, "message": f"โ ์ด {len(faces)}๋ช ์ ์ผ๊ตด์ ์ฒ๋ฆฌํ์ต๋๋ค."} |