DaoManhDuc2004
Deploy DATN face AI server
b5d3a91
# built-in dependencies
import os
import json
import struct
import hashlib
from typing import Any, Dict, Optional, List, Union
# 3rd party dependencies
import numpy as np
# project dependencies
from deepface.modules.modeling import build_model
from deepface.modules.database.types import Database
from deepface.modules.exceptions import DuplicateEntryError
from deepface.commons.logger import Logger
logger = Logger()
_SCHEMA_CHECKED: Dict[str, bool] = {}
# pylint: disable=too-many-positional-arguments
class PGVectorClient(Database):
def __init__(
self,
connection_details: Optional[Union[Dict[str, Any], str]] = None,
connection: Any = None,
) -> None:
# Import here to avoid mandatory dependency
try:
import psycopg
from psycopg import errors
except (ModuleNotFoundError, ImportError) as e:
raise ValueError(
"psycopg is an optional dependency, ensure the library is installed."
"Please install using 'pip install \"psycopg[binary]\"' "
) from e
try:
from pgvector.psycopg import register_vector
except (ModuleNotFoundError, ImportError) as e:
raise ValueError(
"pgvector is an optional dependency, ensure the library is installed."
"Please install using 'pip install pgvector' "
) from e
self.psycopg = psycopg
self.errors = errors
if connection is not None:
self.conn = connection
else:
# Retrieve connection details from parameter or environment variable
self.conn_details = connection_details or os.environ.get("DEEPFACE_POSTGRES_URI")
if not self.conn_details:
raise ValueError(
"PostgreSQL connection information not found. "
"Please provide connection_details or set the DEEPFACE_POSTGRES_URI"
" environment variable."
)
if isinstance(self.conn_details, str):
self.conn = self.psycopg.connect(self.conn_details)
elif isinstance(self.conn_details, dict):
self.conn = self.psycopg.connect(**self.conn_details)
else:
raise ValueError("connection_details must be either a string or a dict.")
# is pgvector extension installed?
try:
if not _SCHEMA_CHECKED.get("pgvector_extension"):
with self.conn.cursor() as cur:
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
self.conn.commit()
_SCHEMA_CHECKED["pgvector_extension"] = True
except Exception as e:
raise ValueError(
"Ensure pgvector extension is installed properly by running: "
"'CREATE EXTENSION IF NOT EXISTS vector;'"
) from e
register_vector(self.conn)
def close(self) -> None:
"""Close the database connection."""
self.conn.close()
def initialize_database(self, **kwargs: Any) -> None:
"""
Initialize PostgreSQL database schema for storing embeddings.
Args:
model_name (str): Name of the facial recognition model.
detector_backend (str): Name of the face detector backend.
aligned (bool): Whether the faces are aligned.
l2_normalized (bool): Whether the embeddings are L2 normalized.
"""
model_name = kwargs.get("model_name", "VGG-Face")
detector_backend = kwargs.get("detector_backend", "opencv")
aligned = kwargs.get("aligned", True)
l2_normalized = kwargs.get("l2_normalized", False)
model = build_model(task="facial_recognition", model_name=model_name)
dimensions = model.output_shape
table_name = self.__generate_table_name(
model_name, detector_backend, aligned, l2_normalized
)
if _SCHEMA_CHECKED.get(table_name):
logger.debug("PostgreSQL schema already checked, skipping.")
return
create_table_stmt = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id SERIAL PRIMARY KEY,
img_name TEXT NOT NULL,
face BYTEA NOT NULL,
face_shape INT[] NOT NULL,
model_name TEXT NOT NULL,
detector_backend TEXT NOT NULL,
aligned BOOLEAN DEFAULT true,
l2_normalized BOOLEAN DEFAULT false,
embedding vector({dimensions}) NOT NULL,
created_at TIMESTAMPTZ DEFAULT now(),
face_hash TEXT NOT NULL,
embedding_hash TEXT NOT NULL,
UNIQUE (face_hash, embedding_hash)
);
"""
index_name = f"{table_name}_{'cosine' if l2_normalized else 'euclidean'}_idx"
create_index_stmt = f"""
CREATE INDEX {index_name}
ON {table_name}
USING hnsw (embedding {'vector_cosine_ops' if l2_normalized else 'vector_l2_ops'});
"""
try:
with self.conn.cursor() as cur:
# create table if not exists
cur.execute(create_table_stmt)
# create index if not exists
try:
cur.execute(create_index_stmt)
except Exception as e: # pylint: disable=broad-except
# unfortunately if not exists is not supported for index creation
if getattr(e, "sqlstate", None) == "42P07":
self.conn.rollback()
else:
raise
self.conn.commit()
except Exception as e:
if getattr(e, "sqlstate", None) == "42501": # permission denied
raise ValueError(
"The PostgreSQL user does not have permission to create "
f"the required table {table_name}. "
"Please ask your database administrator to grant CREATE privileges "
"on the schema."
) from e
raise
_SCHEMA_CHECKED[table_name] = True
def insert_embeddings(self, embeddings: List[Dict[str, Any]], batch_size: int = 100) -> int:
"""
Insert multiple embeddings into PostgreSQL.
Args:
embeddings (List[Dict[str, Any]]): List of embeddings to insert.
batch_size (int): Number of embeddings to insert per batch.
Returns:
int: Number of embeddings inserted.
"""
if not embeddings:
raise ValueError("No embeddings to insert.")
self.initialize_database(
model_name=embeddings[0]["model_name"],
detector_backend=embeddings[0]["detector_backend"],
aligned=embeddings[0]["aligned"],
l2_normalized=embeddings[0]["l2_normalized"],
)
table_name = self.__generate_table_name(
model_name=embeddings[0]["model_name"],
detector_backend=embeddings[0]["detector_backend"],
aligned=embeddings[0]["aligned"],
l2_normalized=embeddings[0]["l2_normalized"],
)
query = f"""
INSERT INTO {table_name} (
img_name,
face,
face_shape,
model_name,
detector_backend,
aligned,
l2_normalized,
embedding,
face_hash,
embedding_hash
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
"""
values = []
for e in embeddings:
face = e["face"]
face_shape = list(face.shape)
face_bytes = face.astype(np.float32).tobytes()
face_json = json.dumps(face.tolist())
embedding_bytes = struct.pack(f'{len(e["embedding"])}d', *e["embedding"])
# uniqueness is guaranteed by face hash and embedding hash
face_hash = hashlib.sha256(face_json.encode()).hexdigest()
embedding_hash = hashlib.sha256(embedding_bytes).hexdigest()
values.append(
(
e["img_name"],
face_bytes,
face_shape,
e["model_name"],
e["detector_backend"],
e["aligned"],
e["l2_normalized"],
e["embedding"],
face_hash,
embedding_hash,
)
)
try:
with self.conn.cursor() as cur:
for i in range(0, len(values), batch_size):
cur.executemany(query, values[i : i + batch_size])
# commit for every batch
self.conn.commit()
return len(values)
except self.psycopg.errors.UniqueViolation as e:
self.conn.rollback()
if len(values) == 1:
logger.warn("Duplicate detected for extracted face and embedding.")
return 0
raise DuplicateEntryError(
f"Duplicate detected for extracted face and embedding columns in {i}-th batch"
) from e
def fetch_all_embeddings(
self,
model_name: str,
detector_backend: str,
aligned: bool,
l2_normalized: bool,
batch_size: int = 1000,
) -> List[Dict[str, Any]]:
"""
Fetch all embeddings from PostgreSQL.
Args:
model_name (str): Name of the facial recognition model.
detector_backend (str): Name of the face detector backend.
aligned (bool): Whether the faces are aligned.
l2_normalized (bool): Whether the embeddings are L2 normalized.
batch_size (int): Number of embeddings to fetch per batch.
Returns:
List[Dict[str, Any]]: List of embeddings.
"""
table_name = self.__generate_table_name(
model_name, detector_backend, aligned, l2_normalized
)
query = f"""
SELECT id, img_name, embedding
FROM {table_name}
ORDER BY id ASC;
"""
embeddings: List[Dict[str, Any]] = []
with self.conn.cursor(name="embeddings_cursor") as cur:
cur.execute(query)
while True:
batch = cur.fetchmany(batch_size)
if not batch:
break
for r in batch:
embeddings.append(
{
"id": r[0],
"img_name": r[1],
"embedding": list(r[2]),
"model_name": model_name,
"detector_backend": detector_backend,
"aligned": aligned,
"l2_normalized": l2_normalized,
}
)
return embeddings
def search_by_vector(
self,
vector: List[float],
model_name: str = "VGG-Face",
detector_backend: str = "opencv",
aligned: bool = True,
l2_normalized: bool = False,
limit: int = 10,
) -> List[Dict[str, Any]]:
"""
Search for similar embeddings in PostgreSQL using a given vector.
Args:
vector (List[float]): The query embedding vector.
model_name (str): Name of the facial recognition model.
detector_backend (str): Name of the face detector backend.
aligned (bool): Whether the faces are aligned.
l2_normalized (bool): Whether the embeddings are L2 normalized.
limit (int): Number of similar embeddings to return.
Returns:
List[Dict[str, Any]]: List of similar embeddings with distances.
"""
table_name = self.__generate_table_name(
model_name, detector_backend, aligned, l2_normalized
)
self.initialize_database(
model_name=model_name,
detector_backend=detector_backend,
aligned=aligned,
l2_normalized=l2_normalized,
)
op = "<=>" if l2_normalized else "<->"
query = f"""
SELECT id, img_name, (embedding {op} (%s::vector)) AS distance
FROM {table_name}
ORDER BY embedding {op} (%s::vector)
LIMIT %s;
"""
results: List[Dict[str, Any]] = []
with self.conn.cursor() as cur:
cur.execute(query, (vector, vector, limit))
rows = cur.fetchall()
for r in rows:
results.append(
{
"id": r[0],
"img_name": r[1],
# "embedding": list(r[2]), # embedding dropped in select query
"distance": r[2],
"model_name": model_name,
"detector_backend": detector_backend,
"aligned": aligned,
"l2_normalized": l2_normalized,
}
)
return results
@staticmethod
def __generate_table_name(
model_name: str,
detector_backend: str,
aligned: bool,
l2_normalized: bool,
) -> str:
"""
Generate postgres table name based on parameters.
"""
class_name_attributes = [
model_name.replace("-", ""),
detector_backend,
"Aligned" if aligned else "Unaligned",
"Norm" if l2_normalized else "Raw",
]
return "Embeddings_" + "_".join(class_name_attributes).lower()