DaoManhDuc2004
Deploy DATN face AI server
b5d3a91
# built-in dependencies
import os
import json
import hashlib
import struct
import math
from typing import Any, Dict, Optional, List, Union
# project dependencies
from deepface.modules.database.types import Database
from deepface.modules.modeling import build_model
from deepface.commons.logger import Logger
logger = Logger()
class PineconeClient(Database):
"""
Pinecone client for storing and retrieving face embeddings and indices.
"""
def __init__(
self,
connection_details: Optional[Union[str, Dict[str, Any]]] = None,
connection: Any = None,
):
try:
from pinecone import Pinecone, ServerlessSpec
except (ModuleNotFoundError, ImportError) as e:
raise ValueError(
"pinecone is an optional dependency. Install with 'pip install pinecone'"
) from e
self.pinecone = Pinecone
self.serverless_spec = ServerlessSpec
if connection is not None:
self.client = connection
else:
self.conn_details = connection_details or os.environ.get("DEEPFACE_PINECONE_API_KEY")
if not isinstance(self.conn_details, str):
raise ValueError(
"Pinecone api key must be provided as a string in connection_details "
"or via DEEPFACE_PINECONE_API_KEY environment variable."
)
self.client = self.pinecone(api_key=self.conn_details)
def initialize_database(self, **kwargs: Any) -> None:
"""
Ensure Pinecone index exists.
"""
model_name = kwargs.get("model_name", "VGG-Face")
detector_backend = kwargs.get("detector_backend", "opencv")
aligned = kwargs.get("aligned", True)
l2_normalized = kwargs.get("l2_normalized", False)
index_name = self.__generate_index_name(
model_name, detector_backend, aligned, l2_normalized
)
if self.client.has_index(index_name):
logger.debug(f"Pinecone index '{index_name}' already exists.")
return
model = build_model(task="facial_recognition", model_name=model_name)
dimensions = model.output_shape
similarity_function = "cosine" if l2_normalized else "euclidean"
self.client.create_index(
name=index_name,
dimension=dimensions,
metric=similarity_function,
spec=self.serverless_spec(
cloud=os.getenv("DEEPFACE_PINECONE_CLOUD", "aws"),
region=os.getenv("DEEPFACE_PINECONE_REGION", "us-east-1"),
),
)
logger.debug(f"Created Pinecone index '{index_name}' with dimension {dimensions}.")
def insert_embeddings(self, embeddings: List[Dict[str, Any]], batch_size: int = 100) -> int:
"""
Insert embeddings into Pinecone database in batches.
"""
if not embeddings:
raise ValueError("No embeddings to insert.")
self.initialize_database(
model_name=embeddings[0]["model_name"],
detector_backend=embeddings[0]["detector_backend"],
aligned=embeddings[0]["aligned"],
l2_normalized=embeddings[0]["l2_normalized"],
)
index_name = self.__generate_index_name(
embeddings[0]["model_name"],
embeddings[0]["detector_backend"],
embeddings[0]["aligned"],
embeddings[0]["l2_normalized"],
)
# connect to the index
index = self.client.Index(index_name)
total = 0
for i in range(0, len(embeddings), batch_size):
batch = embeddings[i : i + batch_size]
vectors = []
for e in batch:
face_json = json.dumps(e["face"].tolist())
face_hash = hashlib.sha256(face_json.encode()).hexdigest()
embedding_bytes = struct.pack(f'{len(e["embedding"])}d', *e["embedding"])
embedding_hash = hashlib.sha256(embedding_bytes).hexdigest()
vectors.append(
{
"id": f"{face_hash}:{embedding_hash}",
"values": e["embedding"],
"metadata": {
"img_name": e["img_name"],
# "face": e["face"].tolist(),
# "face_shape": list(e["face"].shape),
},
}
)
index.upsert(vectors=vectors)
total += len(vectors)
return total
def search_by_vector(
self,
vector: List[float],
model_name: str = "VGG-Face",
detector_backend: str = "opencv",
aligned: bool = True,
l2_normalized: bool = False,
limit: int = 10,
) -> List[Dict[str, Any]]:
"""
ANN search using the main vector (embedding).
"""
out: List[Dict[str, Any]] = []
self.initialize_database(
model_name=model_name,
detector_backend=detector_backend,
aligned=aligned,
l2_normalized=l2_normalized,
)
index_name = self.__generate_index_name(
model_name, detector_backend, aligned, l2_normalized
)
index = self.client.Index(index_name)
results = index.query(
vector=vector,
top_k=limit,
include_metadata=True,
include_values=False,
)
if not results.matches:
return out
for res in results.matches:
score = float(res.score)
if l2_normalized:
distance = 1 - score
else:
distance = math.sqrt(max(score, 0.0))
out.append(
{
"id": res.id,
"distance": distance,
"img_name": res.metadata.get("img_name"),
}
)
return out
def fetch_all_embeddings(
self,
model_name: str,
detector_backend: str,
aligned: bool,
l2_normalized: bool,
batch_size: int = 1000,
) -> List[Dict[str, Any]]:
"""
Fetch all embeddings from Pinecone database in batches.
"""
out: List[Dict[str, Any]] = []
self.initialize_database(
model_name=model_name,
detector_backend=detector_backend,
aligned=aligned,
l2_normalized=l2_normalized,
)
index_name = self.__generate_index_name(
model_name, detector_backend, aligned, l2_normalized
)
index = self.client.Index(index_name)
# Fetch all IDs
ids: List[str] = []
for _id in index.list():
ids.extend(_id)
for i in range(0, len(ids), batch_size):
batch_ids = ids[i : i + batch_size]
fetched = index.fetch(ids=batch_ids)
for _id, v in fetched.get("vectors", {}).items():
md = v.get("metadata") or {}
out.append(
{
"id": _id,
"embedding": v.get("values"),
"img_name": md.get("img_name"),
"face_hash": md.get("face_hash"),
"embedding_hash": md.get("embedding_hash"),
}
)
return out
def close(self) -> None:
"""Pinecone client does not require explicit closure"""
return
@staticmethod
def __generate_index_name(
model_name: str,
detector_backend: str,
aligned: bool,
l2_normalized: bool,
) -> str:
"""
Generate Pinecone index name based on parameters.
"""
index_name_attributes = [
"embeddings",
model_name.replace("-", ""),
detector_backend,
"Aligned" if aligned else "Unaligned",
"Norm" if l2_normalized else "Raw",
]
return "-".join(index_name_attributes).lower()