Spaces:
Running
Running
| # built-in dependencies | |
| import os | |
| import json | |
| import struct | |
| import hashlib | |
| from typing import Any, Dict, Optional, List, Union | |
| # 3rd party dependencies | |
| import numpy as np | |
| # project dependencies | |
| from deepface.modules.modeling import build_model | |
| from deepface.modules.database.types import Database | |
| from deepface.modules.exceptions import DuplicateEntryError | |
| from deepface.commons.logger import Logger | |
| logger = Logger() | |
| _SCHEMA_CHECKED: Dict[str, bool] = {} | |
| # pylint: disable=too-many-positional-arguments | |
| class PGVectorClient(Database): | |
| def __init__( | |
| self, | |
| connection_details: Optional[Union[Dict[str, Any], str]] = None, | |
| connection: Any = None, | |
| ) -> None: | |
| # Import here to avoid mandatory dependency | |
| try: | |
| import psycopg | |
| from psycopg import errors | |
| except (ModuleNotFoundError, ImportError) as e: | |
| raise ValueError( | |
| "psycopg is an optional dependency, ensure the library is installed." | |
| "Please install using 'pip install \"psycopg[binary]\"' " | |
| ) from e | |
| try: | |
| from pgvector.psycopg import register_vector | |
| except (ModuleNotFoundError, ImportError) as e: | |
| raise ValueError( | |
| "pgvector is an optional dependency, ensure the library is installed." | |
| "Please install using 'pip install pgvector' " | |
| ) from e | |
| self.psycopg = psycopg | |
| self.errors = errors | |
| if connection is not None: | |
| self.conn = connection | |
| else: | |
| # Retrieve connection details from parameter or environment variable | |
| self.conn_details = connection_details or os.environ.get("DEEPFACE_POSTGRES_URI") | |
| if not self.conn_details: | |
| raise ValueError( | |
| "PostgreSQL connection information not found. " | |
| "Please provide connection_details or set the DEEPFACE_POSTGRES_URI" | |
| " environment variable." | |
| ) | |
| if isinstance(self.conn_details, str): | |
| self.conn = self.psycopg.connect(self.conn_details) | |
| elif isinstance(self.conn_details, dict): | |
| self.conn = self.psycopg.connect(**self.conn_details) | |
| else: | |
| raise ValueError("connection_details must be either a string or a dict.") | |
| # is pgvector extension installed? | |
| try: | |
| if not _SCHEMA_CHECKED.get("pgvector_extension"): | |
| with self.conn.cursor() as cur: | |
| cur.execute("CREATE EXTENSION IF NOT EXISTS vector;") | |
| self.conn.commit() | |
| _SCHEMA_CHECKED["pgvector_extension"] = True | |
| except Exception as e: | |
| raise ValueError( | |
| "Ensure pgvector extension is installed properly by running: " | |
| "'CREATE EXTENSION IF NOT EXISTS vector;'" | |
| ) from e | |
| register_vector(self.conn) | |
| def close(self) -> None: | |
| """Close the database connection.""" | |
| self.conn.close() | |
| def initialize_database(self, **kwargs: Any) -> None: | |
| """ | |
| Initialize PostgreSQL database schema for storing embeddings. | |
| Args: | |
| model_name (str): Name of the facial recognition model. | |
| detector_backend (str): Name of the face detector backend. | |
| aligned (bool): Whether the faces are aligned. | |
| l2_normalized (bool): Whether the embeddings are L2 normalized. | |
| """ | |
| model_name = kwargs.get("model_name", "VGG-Face") | |
| detector_backend = kwargs.get("detector_backend", "opencv") | |
| aligned = kwargs.get("aligned", True) | |
| l2_normalized = kwargs.get("l2_normalized", False) | |
| model = build_model(task="facial_recognition", model_name=model_name) | |
| dimensions = model.output_shape | |
| table_name = self.__generate_table_name( | |
| model_name, detector_backend, aligned, l2_normalized | |
| ) | |
| if _SCHEMA_CHECKED.get(table_name): | |
| logger.debug("PostgreSQL schema already checked, skipping.") | |
| return | |
| create_table_stmt = f""" | |
| CREATE TABLE IF NOT EXISTS {table_name} ( | |
| id SERIAL PRIMARY KEY, | |
| img_name TEXT NOT NULL, | |
| face BYTEA NOT NULL, | |
| face_shape INT[] NOT NULL, | |
| model_name TEXT NOT NULL, | |
| detector_backend TEXT NOT NULL, | |
| aligned BOOLEAN DEFAULT true, | |
| l2_normalized BOOLEAN DEFAULT false, | |
| embedding vector({dimensions}) NOT NULL, | |
| created_at TIMESTAMPTZ DEFAULT now(), | |
| face_hash TEXT NOT NULL, | |
| embedding_hash TEXT NOT NULL, | |
| UNIQUE (face_hash, embedding_hash) | |
| ); | |
| """ | |
| index_name = f"{table_name}_{'cosine' if l2_normalized else 'euclidean'}_idx" | |
| create_index_stmt = f""" | |
| CREATE INDEX {index_name} | |
| ON {table_name} | |
| USING hnsw (embedding {'vector_cosine_ops' if l2_normalized else 'vector_l2_ops'}); | |
| """ | |
| try: | |
| with self.conn.cursor() as cur: | |
| # create table if not exists | |
| cur.execute(create_table_stmt) | |
| # create index if not exists | |
| try: | |
| cur.execute(create_index_stmt) | |
| except Exception as e: # pylint: disable=broad-except | |
| # unfortunately if not exists is not supported for index creation | |
| if getattr(e, "sqlstate", None) == "42P07": | |
| self.conn.rollback() | |
| else: | |
| raise | |
| self.conn.commit() | |
| except Exception as e: | |
| if getattr(e, "sqlstate", None) == "42501": # permission denied | |
| raise ValueError( | |
| "The PostgreSQL user does not have permission to create " | |
| f"the required table {table_name}. " | |
| "Please ask your database administrator to grant CREATE privileges " | |
| "on the schema." | |
| ) from e | |
| raise | |
| _SCHEMA_CHECKED[table_name] = True | |
| def insert_embeddings(self, embeddings: List[Dict[str, Any]], batch_size: int = 100) -> int: | |
| """ | |
| Insert multiple embeddings into PostgreSQL. | |
| Args: | |
| embeddings (List[Dict[str, Any]]): List of embeddings to insert. | |
| batch_size (int): Number of embeddings to insert per batch. | |
| Returns: | |
| int: Number of embeddings inserted. | |
| """ | |
| if not embeddings: | |
| raise ValueError("No embeddings to insert.") | |
| self.initialize_database( | |
| model_name=embeddings[0]["model_name"], | |
| detector_backend=embeddings[0]["detector_backend"], | |
| aligned=embeddings[0]["aligned"], | |
| l2_normalized=embeddings[0]["l2_normalized"], | |
| ) | |
| table_name = self.__generate_table_name( | |
| model_name=embeddings[0]["model_name"], | |
| detector_backend=embeddings[0]["detector_backend"], | |
| aligned=embeddings[0]["aligned"], | |
| l2_normalized=embeddings[0]["l2_normalized"], | |
| ) | |
| query = f""" | |
| INSERT INTO {table_name} ( | |
| img_name, | |
| face, | |
| face_shape, | |
| model_name, | |
| detector_backend, | |
| aligned, | |
| l2_normalized, | |
| embedding, | |
| face_hash, | |
| embedding_hash | |
| ) | |
| VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s); | |
| """ | |
| values = [] | |
| for e in embeddings: | |
| face = e["face"] | |
| face_shape = list(face.shape) | |
| face_bytes = face.astype(np.float32).tobytes() | |
| face_json = json.dumps(face.tolist()) | |
| embedding_bytes = struct.pack(f'{len(e["embedding"])}d', *e["embedding"]) | |
| # uniqueness is guaranteed by face hash and embedding hash | |
| face_hash = hashlib.sha256(face_json.encode()).hexdigest() | |
| embedding_hash = hashlib.sha256(embedding_bytes).hexdigest() | |
| values.append( | |
| ( | |
| e["img_name"], | |
| face_bytes, | |
| face_shape, | |
| e["model_name"], | |
| e["detector_backend"], | |
| e["aligned"], | |
| e["l2_normalized"], | |
| e["embedding"], | |
| face_hash, | |
| embedding_hash, | |
| ) | |
| ) | |
| try: | |
| with self.conn.cursor() as cur: | |
| for i in range(0, len(values), batch_size): | |
| cur.executemany(query, values[i : i + batch_size]) | |
| # commit for every batch | |
| self.conn.commit() | |
| return len(values) | |
| except self.psycopg.errors.UniqueViolation as e: | |
| self.conn.rollback() | |
| if len(values) == 1: | |
| logger.warn("Duplicate detected for extracted face and embedding.") | |
| return 0 | |
| raise DuplicateEntryError( | |
| f"Duplicate detected for extracted face and embedding columns in {i}-th batch" | |
| ) from e | |
| def fetch_all_embeddings( | |
| self, | |
| model_name: str, | |
| detector_backend: str, | |
| aligned: bool, | |
| l2_normalized: bool, | |
| batch_size: int = 1000, | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Fetch all embeddings from PostgreSQL. | |
| Args: | |
| model_name (str): Name of the facial recognition model. | |
| detector_backend (str): Name of the face detector backend. | |
| aligned (bool): Whether the faces are aligned. | |
| l2_normalized (bool): Whether the embeddings are L2 normalized. | |
| batch_size (int): Number of embeddings to fetch per batch. | |
| Returns: | |
| List[Dict[str, Any]]: List of embeddings. | |
| """ | |
| table_name = self.__generate_table_name( | |
| model_name, detector_backend, aligned, l2_normalized | |
| ) | |
| query = f""" | |
| SELECT id, img_name, embedding | |
| FROM {table_name} | |
| ORDER BY id ASC; | |
| """ | |
| embeddings: List[Dict[str, Any]] = [] | |
| with self.conn.cursor(name="embeddings_cursor") as cur: | |
| cur.execute(query) | |
| while True: | |
| batch = cur.fetchmany(batch_size) | |
| if not batch: | |
| break | |
| for r in batch: | |
| embeddings.append( | |
| { | |
| "id": r[0], | |
| "img_name": r[1], | |
| "embedding": list(r[2]), | |
| "model_name": model_name, | |
| "detector_backend": detector_backend, | |
| "aligned": aligned, | |
| "l2_normalized": l2_normalized, | |
| } | |
| ) | |
| return embeddings | |
| def search_by_vector( | |
| self, | |
| vector: List[float], | |
| model_name: str = "VGG-Face", | |
| detector_backend: str = "opencv", | |
| aligned: bool = True, | |
| l2_normalized: bool = False, | |
| limit: int = 10, | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Search for similar embeddings in PostgreSQL using a given vector. | |
| Args: | |
| vector (List[float]): The query embedding vector. | |
| model_name (str): Name of the facial recognition model. | |
| detector_backend (str): Name of the face detector backend. | |
| aligned (bool): Whether the faces are aligned. | |
| l2_normalized (bool): Whether the embeddings are L2 normalized. | |
| limit (int): Number of similar embeddings to return. | |
| Returns: | |
| List[Dict[str, Any]]: List of similar embeddings with distances. | |
| """ | |
| table_name = self.__generate_table_name( | |
| model_name, detector_backend, aligned, l2_normalized | |
| ) | |
| self.initialize_database( | |
| model_name=model_name, | |
| detector_backend=detector_backend, | |
| aligned=aligned, | |
| l2_normalized=l2_normalized, | |
| ) | |
| op = "<=>" if l2_normalized else "<->" | |
| query = f""" | |
| SELECT id, img_name, (embedding {op} (%s::vector)) AS distance | |
| FROM {table_name} | |
| ORDER BY embedding {op} (%s::vector) | |
| LIMIT %s; | |
| """ | |
| results: List[Dict[str, Any]] = [] | |
| with self.conn.cursor() as cur: | |
| cur.execute(query, (vector, vector, limit)) | |
| rows = cur.fetchall() | |
| for r in rows: | |
| results.append( | |
| { | |
| "id": r[0], | |
| "img_name": r[1], | |
| # "embedding": list(r[2]), # embedding dropped in select query | |
| "distance": r[2], | |
| "model_name": model_name, | |
| "detector_backend": detector_backend, | |
| "aligned": aligned, | |
| "l2_normalized": l2_normalized, | |
| } | |
| ) | |
| return results | |
| def __generate_table_name( | |
| model_name: str, | |
| detector_backend: str, | |
| aligned: bool, | |
| l2_normalized: bool, | |
| ) -> str: | |
| """ | |
| Generate postgres table name based on parameters. | |
| """ | |
| class_name_attributes = [ | |
| model_name.replace("-", ""), | |
| detector_backend, | |
| "Aligned" if aligned else "Unaligned", | |
| "Norm" if l2_normalized else "Raw", | |
| ] | |
| return "Embeddings_" + "_".join(class_name_attributes).lower() | |