visual-search-api2 / src /services /ai_manager.py
AdarshDRC's picture
Create ai_manager.py
607b376 verified
Raw
History Blame Contribute Delete
15.1 kB
import asyncio
import base64
import functools
import io
import threading
import traceback
import hashlib
import cv2
import numpy as np
import torch
import torch.nn.functional as F
from PIL import Image
from transformers import AutoImageProcessor, AutoModel, AutoProcessor
from ultralytics import YOLO
import insightface
from insightface.app import FaceAnalysis
from src.core.config import (
MAX_IMAGE_SIZE, MAX_CROPS, YOLO_PERSON_CLASS_ID,
YOLO_MIN_CROP_PX, YOLO_CONF_THRESHOLD,
DET_SIZE_PRIMARY, DET_SCALES, IOU_DEDUP_THRESHOLD,
MIN_FACE_SIZE, MAX_FACES_PER_IMAGE, FACE_QUALITY_GATE,
FACE_DIM, ADAFACE_DIM, FUSED_FACE_DIM,
FACE_CROP_THUMB_SIZE, FACE_CROP_QUALITY,
FACE_CROP_PADDING, ADAFACE_CROP_PADDING,
INFERENCE_CACHE_SIZE, ENABLE_ADAFACE, HF_TOKEN,
)
def _resize_pil(img: Image.Image, max_side: int = MAX_IMAGE_SIZE) -> Image.Image:
w, h = img.size
if max(w, h) <= max_side:
return img
scale = max_side / max(w, h)
return img.resize((int(w * scale), int(h * scale)), Image.LANCZOS)
def _crop_to_b64(img_bgr: np.ndarray, x1: int, y1: int, x2: int, y2: int) -> str:
H, W = img_bgr.shape[:2]
w, h = x2 - x1, y2 - y1
pad_x = int(w * FACE_CROP_PADDING)
pad_y = int(h * FACE_CROP_PADDING)
cx1, cy1 = max(0, x1 - pad_x), max(0, y1 - pad_y)
cx2, cy2 = min(W, x2 + pad_x), min(H, y2 + pad_y)
crop = img_bgr[cy1:cy2, cx1:cx2]
if crop.size == 0:
return ""
pil = Image.fromarray(crop[:, :, ::-1]).resize((FACE_CROP_THUMB_SIZE, FACE_CROP_THUMB_SIZE), Image.LANCZOS)
buf = io.BytesIO()
pil.save(buf, format="JPEG", quality=FACE_CROP_QUALITY)
return base64.b64encode(buf.getvalue()).decode()
def _face_crop_for_adaface(img_bgr: np.ndarray, x1: int, y1: int, x2: int, y2: int) -> np.ndarray | None:
H, W = img_bgr.shape[:2]
w, h = x2 - x1, y2 - y1
pad_x = int(w * ADAFACE_CROP_PADDING)
pad_y = int(h * ADAFACE_CROP_PADDING)
cx1, cy1 = max(0, x1 - pad_x), max(0, y1 - pad_y)
cx2, cy2 = min(W, x2 + pad_x), min(H, y2 + pad_y)
crop = img_bgr[cy1:cy2, cx1:cx2]
if crop.size == 0:
return None
rgb = crop[:, :, ::-1].copy()
pil = Image.fromarray(rgb).resize((112, 112), Image.LANCZOS)
arr = np.array(pil, dtype=np.float32) / 255.0
arr = (arr - 0.5) / 0.5
return arr.transpose(2, 0, 1)
def _clahe_enhance(bgr: np.ndarray) -> np.ndarray:
lab = cv2.cvtColor(bgr, cv2.COLOR_BGR2LAB)
l_ch, a_ch, b_ch = cv2.split(lab)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
l_eq = clahe.apply(l_ch)
return cv2.cvtColor(cv2.merge([l_eq, a_ch, b_ch]), cv2.COLOR_LAB2BGR)
def _iou(box_a: list, box_b: list) -> float:
xa, ya = max(box_a[0], box_b[0]), max(box_a[1], box_b[1])
xb, yb = min(box_a[2], box_b[2]), min(box_a[3], box_b[3])
inter = max(0, xb - xa) * max(0, yb - ya)
if inter == 0:
return 0.0
area_a = (box_a[2] - box_a[0]) * (box_a[3] - box_a[1])
area_b = (box_b[2] - box_b[0]) * (box_b[3] - box_b[1])
return inter / (area_a + area_b - inter)
def _dedup_faces(faces_list: list, iou_thresh: float = IOU_DEDUP_THRESHOLD) -> list:
if not faces_list:
return []
faces_list = sorted(faces_list, key=lambda f: float(f.det_score), reverse=True)
kept = []
for face in faces_list:
b = face.bbox.astype(int)
box = [b[0], b[1], b[2], b[3]]
if not any(_iou(box, [k.bbox.astype(int)[i] for i in range(4)]) > iou_thresh for k in kept):
kept.append(face)
return kept
class AIModelManager:
def __init__(self):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.siglip_processor = AutoProcessor.from_pretrained("google/siglip-base-patch16-224", use_fast=True)
self.siglip_model = AutoModel.from_pretrained("google/siglip-base-patch16-224").to(self.device).eval()
self.dinov2_processor = AutoImageProcessor.from_pretrained("facebook/dinov2-base")
self.dinov2_model = AutoModel.from_pretrained("facebook/dinov2-base").to(self.device).eval()
if self.device == "cuda":
self.siglip_model = self.siglip_model.half()
self.dinov2_model = self.dinov2_model.half()
self.yolo = YOLO("yolo11n-seg.pt")
self.face_app = FaceAnalysis(name="buffalo_l", providers=["CUDAExecutionProvider", "CPUExecutionProvider"] if self.device == "cuda" else ["CPUExecutionProvider"])
self.face_app.prepare(ctx_id=0 if self.device == "cuda" else -1, det_size=DET_SIZE_PRIMARY)
self.face_app.get(np.zeros((112, 112, 3), dtype=np.uint8))
self.adaface_model = None
self._load_adaface()
self._face_lock = threading.Lock()
self._cache_lock = threading.Lock()
self._cache: dict[str, list] = {}
def _load_adaface(self) -> None:
if not ENABLE_ADAFACE:
return
import os
import sys
REPO_ID = "minchul/cvlface_adaface_ir50_ms1mv2"
CACHE_PATH = os.path.expanduser("~/.cvlface_cache/minchul/cvlface_adaface_ir50_ms1mv2")
try:
from huggingface_hub import hf_hub_download
from transformers import AutoModel as _HFAutoModel
os.makedirs(CACHE_PATH, exist_ok=True)
hf_hub_download(repo_id=REPO_ID, filename="files.txt", token=HF_TOKEN, local_dir=CACHE_PATH, local_dir_use_symlinks=False)
with open(os.path.join(CACHE_PATH, "files.txt")) as f:
extra = [x.strip() for x in f.read().split("\n") if x.strip()]
for fname in extra + ["config.json", "wrapper.py", "model.safetensors"]:
if not os.path.exists(os.path.join(CACHE_PATH, fname)):
hf_hub_download(repo_id=REPO_ID, filename=fname, token=HF_TOKEN, local_dir=CACHE_PATH, local_dir_use_symlinks=False)
cwd = os.getcwd()
os.chdir(CACHE_PATH)
sys.path.insert(0, CACHE_PATH)
try:
model = _HFAutoModel.from_pretrained(CACHE_PATH, trust_remote_code=True, token=HF_TOKEN)
finally:
os.chdir(cwd)
if CACHE_PATH in sys.path:
sys.path.remove(CACHE_PATH)
self.adaface_model = model.to(self.device).eval()
except Exception as e:
self.adaface_model = None
def _adaface_embed(self, face_arr_chw: np.ndarray | None) -> np.ndarray | None:
if self.adaface_model is None or face_arr_chw is None:
return None
try:
t = torch.from_numpy(face_arr_chw).unsqueeze(0).to(self.device)
if self.device == "cuda":
t = t.half()
with torch.no_grad():
out = self.adaface_model(t)
emb = out if isinstance(out, torch.Tensor) else out.embedding
return F.normalize(emb.float(), p=2, dim=1)[0].cpu().numpy()
except Exception:
return None
def _embed_crops_batch(self, crops: list[Image.Image]) -> list[np.ndarray]:
if not crops:
return []
with torch.no_grad():
sig_in = self.siglip_processor(images=crops, return_tensors="pt", padding=True)
sig_in = {k: v.to(self.device) for k, v in sig_in.items()}
if self.device == "cuda":
sig_in = {k: v.half() if v.dtype == torch.float32 else v for k, v in sig_in.items()}
sig_out = self.siglip_model.get_image_features(**sig_in)
if hasattr(sig_out, "image_embeds"):
sig_out = sig_out.image_embeds
elif hasattr(sig_out, "pooler_output"):
sig_out = sig_out.pooler_output
elif hasattr(sig_out, "last_hidden_state"):
sig_out = sig_out.last_hidden_state[:, 0, :]
elif isinstance(sig_out, tuple):
sig_out = sig_out[0]
sig_vecs = F.normalize(sig_out.float(), p=2, dim=1).cpu()
dino_in = self.dinov2_processor(images=crops, return_tensors="pt")
dino_in = {k: v.to(self.device) for k, v in dino_in.items()}
if self.device == "cuda":
dino_in = {k: v.half() if v.dtype == torch.float32 else v for k, v in dino_in.items()}
dino_out = self.dinov2_model(**dino_in)
dino_vecs = F.normalize(dino_out.last_hidden_state[:, 0, :].float(), p=2, dim=1).cpu()
fused = F.normalize(torch.cat([sig_vecs, dino_vecs], dim=1), p=2, dim=1)
return [fused[i].numpy() for i in range(len(crops))]
def _detect_and_encode_faces(self, img_np: np.ndarray) -> list[dict]:
if self.face_app is None:
return []
try:
if img_np.dtype != np.uint8:
img_np = (img_np * 255).astype(np.uint8)
bgr = img_np[:, :, ::-1].copy() if img_np.shape[2] == 3 else img_np.copy()
bgr_enhanced = _clahe_enhance(bgr)
all_raw_faces = []
H, W = bgr.shape[:2]
for scale in DET_SCALES:
scale_w, scale_h = min(W, scale[0]), min(H, scale[1])
bgr_scaled = bgr_enhanced if scale_w == W and scale_h == H else cv2.resize(bgr_enhanced, (scale_w, scale_h))
try:
self.face_app.det_model.input_size = scale
with self._face_lock:
faces_at_scale = self.face_app.get(bgr_scaled)
sx, sy = W / scale_w, H / scale_h
for f in faces_at_scale:
if sx != 1.0 or sy != 1.0:
f.bbox[0] *= sx; f.bbox[1] *= sy; f.bbox[2] *= sx; f.bbox[3] *= sy
all_raw_faces.extend(faces_at_scale)
except Exception:
pass
bgr_flip = cv2.flip(bgr_enhanced, 1)
try:
self.face_app.det_model.input_size = DET_SIZE_PRIMARY
with self._face_lock:
faces_flip = self.face_app.get(bgr_flip)
for f in faces_flip:
x1, y1, x2, y2 = f.bbox
f.bbox[0], f.bbox[2] = W - x2, W - x1
all_raw_faces.extend(faces_flip)
except Exception:
pass
self.face_app.det_model.input_size = DET_SIZE_PRIMARY
faces = _dedup_faces(all_raw_faces)
results, accepted = [], 0
for face in faces:
if accepted >= MAX_FACES_PER_IMAGE:
break
bbox_raw = face.bbox.astype(int)
x1, y1, x2, y2 = bbox_raw
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(bgr.shape[1], x2), min(bgr.shape[0], y2)
w, h = x2 - x1, y2 - y1
if w < MIN_FACE_SIZE or h < MIN_FACE_SIZE:
continue
det_score = float(face.det_score) if hasattr(face, "det_score") else 1.0
if det_score < FACE_QUALITY_GATE or face.embedding is None:
continue
arcface_vec = face.embedding.astype(np.float32)
n = np.linalg.norm(arcface_vec)
if n > 0:
arcface_vec = arcface_vec / n
face_chw = _face_crop_for_adaface(bgr, x1, y1, x2, y2)
adaface_vec = self._adaface_embed(face_chw)
fused_raw = np.concatenate([arcface_vec, adaface_vec]) if adaface_vec is not None else np.concatenate([arcface_vec, np.zeros(ADAFACE_DIM, dtype=np.float32)])
n2 = np.linalg.norm(fused_raw)
final_vec = (fused_raw / n2) if n2 > 0 else fused_raw
results.append({
"type": "face", "vector": final_vec, "face_idx": accepted,
"bbox": [int(x1), int(y1), int(w), int(h)],
"face_crop": _crop_to_b64(bgr, x1, y1, x2, y2),
"det_score": det_score, "face_width_px": int(w),
})
accepted += 1
return results
except Exception:
return []
def process_image_bytes(self, image_bytes: bytes, detect_faces: bool = True) -> list[dict]:
file_hash = hashlib.md5(image_bytes[:65536]).hexdigest()
cache_key = f"{file_hash}_{detect_faces}"
with self._cache_lock:
if cache_key in self._cache:
return list(self._cache[cache_key])
extracted = []
original_pil = Image.open(io.BytesIO(image_bytes)).convert("RGB")
img_np = np.array(original_pil)
faces_found = False
if detect_faces and hasattr(self, 'face_app') and self.face_app is not None:
face_results = self._detect_and_encode_faces(img_np)
if face_results:
faces_found = True
extracted.extend(face_results)
crops: list[Image.Image] = []
yolo_results = getattr(self, 'yolo', lambda x, **kwargs: [])(original_pil, conf=YOLO_CONF_THRESHOLD, verbose=False)
for r in yolo_results:
if r.masks is not None:
for seg_idx, mask_xy in enumerate(r.masks.xy):
cls_id = int(r.boxes.cls[seg_idx].item())
if faces_found and cls_id == YOLO_PERSON_CLASS_ID:
continue
polygon = np.array(mask_xy, dtype=np.int32)
if len(polygon) < 3:
continue
x, y, w, h = cv2.boundingRect(polygon)
if w < YOLO_MIN_CROP_PX or h < YOLO_MIN_CROP_PX:
continue
crops.append(original_pil.crop((x, y, x + w, y + h)))
if len(crops) >= MAX_CROPS:
break
elif r.boxes is not None:
for box in r.boxes:
cls_id = int(box.cls.item())
if faces_found and cls_id == YOLO_PERSON_CLASS_ID:
continue
x1, y1, x2, y2 = box.xyxy[0].tolist()
if (x2 - x1) < YOLO_MIN_CROP_PX or (y2 - y1) < YOLO_MIN_CROP_PX:
continue
crops.append(original_pil.crop((x1, y1, x2, y2)))
if len(crops) >= MAX_CROPS:
break
all_crops = [_resize_pil(c, MAX_IMAGE_SIZE) for c in [original_pil] + crops]
obj_vecs = self._embed_crops_batch(all_crops)
extracted.extend({"type": "object", "vector": v} for v in obj_vecs)
with self._cache_lock:
if len(self._cache) >= INFERENCE_CACHE_SIZE:
oldest = next(iter(self._cache))
del self._cache[oldest]
self._cache[cache_key] = list(extracted)
return extracted
async def process_image_bytes_async(self, image_bytes: bytes, detect_faces: bool = True) -> list[dict]:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
functools.partial(self.process_image_bytes, image_bytes, detect_faces),
)