| | """A local gradio app that detect matching images using FHE.""" |
| |
|
| | import os |
| | from pathlib import Path |
| | import shutil |
| | import time |
| | from typing import Tuple |
| | import requests |
| |
|
| | import numpy as np |
| |
|
| | import subprocess |
| | import gradio as gr |
| | from itertools import chain |
| | import matplotlib.pyplot as plt |
| | import matplotlib.image as img |
| | import numpy as np |
| | from PIL import Image |
| | import torch |
| | import torchvision.transforms as transforms |
| | import torchvision.models as models |
| | import cv2 |
| | from facenet_pytorch import InceptionResnetV1 |
| | from concrete.ml.deployment import FHEModelClient, FHEModelServer |
| | from client_server_interface import FHEClient |
| |
|
| | from common import ( |
| | CLIENT_TMP_PATH, |
| | ID_EXAMPLES, |
| | SELFIE_EXAMPLES, |
| | KEYS_PATH, |
| | MATCHERS_PATH, |
| | REPO_DIR, |
| | SERVER_TMP_PATH, |
| | SERVER_URL, |
| | ) |
| |
|
| | MODEL_PATH = "client_server" |
| | |
| |
|
| | |
| | subprocess.Popen(["uvicorn", "server:app"], cwd=REPO_DIR) |
| | time.sleep(3) |
| |
|
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| |
|
| |
|
| | def shorten_bytes_object(bytes_object, limit=500): |
| | """Shorten the input bytes object to a given length. |
| | |
| | Encrypted data is too large for displaying it in the browser using Gradio. This function |
| | provides a shorten representation of it. |
| | |
| | Args: |
| | bytes_object (bytes): The input to shorten |
| | limit (int): The length to consider. Default to 500. |
| | |
| | Returns: |
| | str: Hexadecimal string shorten representation of the input byte object. |
| | |
| | """ |
| | |
| | shift = 100 |
| | return bytes_object[shift : limit + shift].hex() |
| |
|
| |
|
| | def get_client(): |
| | """Get the client API. |
| | |
| | Args: |
| | user_id (int): The current user's ID. |
| | filter_name (str): The filter chosen by the user |
| | |
| | Returns: |
| | FHEClient: The client API. |
| | """ |
| | return FHEModelClient(MODEL_PATH) |
| |
|
| |
|
| | def get_client_file_path(name, user_id): |
| | """Get the correct temporary file path for the client. |
| | |
| | Args: |
| | name (str): The desired file name. |
| | user_id (int): The current user's ID. |
| | filter_name (str): The filter chosen by the user |
| | |
| | Returns: |
| | pathlib.Path: The file path. |
| | """ |
| | return CLIENT_TMP_PATH / f"{name}_embedding_{user_id}" |
| |
|
| |
|
| | def clean_temporary_files(n_keys=20): |
| | """Clean keys and encrypted images. |
| | |
| | A maximum of n_keys keys and associated temporary files are allowed to be stored. Once this |
| | limit is reached, the oldest files are deleted. |
| | |
| | Args: |
| | n_keys (int): The maximum number of keys and associated files to be stored. Default to 20. |
| | |
| | """ |
| | |
| | key_dirs = sorted(KEYS_PATH.iterdir(), key=os.path.getmtime) |
| |
|
| | |
| | user_ids = [] |
| | if len(key_dirs) > n_keys: |
| | n_keys_to_delete = len(key_dirs) - n_keys |
| | for key_dir in key_dirs[:n_keys_to_delete]: |
| | user_ids.append(key_dir.name) |
| | shutil.rmtree(key_dir) |
| |
|
| | |
| | client_files = Path(CLIENT_TMP_PATH).iterdir() |
| | server_files = Path(SERVER_TMP_PATH).iterdir() |
| |
|
| | |
| | for file in chain(client_files, server_files): |
| | for user_id in user_ids: |
| | if user_id in file.name: |
| | file.unlink() |
| |
|
| |
|
| | def keygen(matcher_name): |
| | """Generate the private key associated to a matcher. |
| | |
| | Args: |
| | matcher_name (str): The current matcher to consider. |
| | |
| | Returns: |
| | (user_id, True) (Tuple[int, bool]): The current user's ID and a boolean used for visual display. |
| | |
| | """ |
| | |
| | clean_temporary_files() |
| |
|
| | |
| | user_id = np.random.randint(0, 2**32) |
| | |
| |
|
| | |
| | client = get_client() |
| |
|
| | |
| | client.generate_private_and_evaluation_keys(force=True) |
| |
|
| | |
| | |
| | |
| | evaluation_key = client.get_serialized_evaluation_keys() |
| |
|
| | |
| | |
| | evaluation_key_path = get_client_file_path("evaluation_key", user_id) |
| |
|
| | with evaluation_key_path.open("wb") as evaluation_key_file: |
| | evaluation_key_file.write(evaluation_key) |
| |
|
| | return (user_id, True) |
| |
|
| |
|
| | def detect_and_crop_face( |
| | image: str, |
| | min_aspect_ratio: float = 0.5, |
| | max_aspect_ratio: float = 1.5, |
| | min_face_size: float = 0.01, |
| | max_face_size: float = 0.6, |
| | ) -> Tuple[np.ndarray, Tuple[int, int, int, int], np.ndarray]: |
| | |
| | |
| | image_path = "test" |
| | if image is None: |
| | print(f"Failed to load image: {image_path}") |
| | return None |
| |
|
| | |
| | print(f"Image Depth: {image.dtype}, Shape: {image.shape}") |
| |
|
| | |
| | if image.dtype == np.float64: |
| | print(f"Converting image from float64 to uint8 for {image_path}") |
| | image = cv2.convertScaleAbs(image) |
| |
|
| | elif image.dtype != np.uint8: |
| | print(f"Converting image from {image.dtype} to uint8 for {image_path}") |
| | image = cv2.convertScaleAbs(image) |
| |
|
| | |
| | try: |
| | gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) |
| | except cv2.error as e: |
| | print(f"Error converting image to grayscale: {e} for {image_path}") |
| | return None |
| |
|
| | |
| | face_classifier = cv2.CascadeClassifier( |
| | cv2.data.haarcascades + "haarcascade_frontalface_default.xml" |
| | ) |
| |
|
| | |
| | faces = face_classifier.detectMultiScale( |
| | gray_image, |
| | scaleFactor=1.1, |
| | minNeighbors=5, |
| | minSize=(int(image.shape[1] * 0.1), int(image.shape[0] * 0.1)), |
| | ) |
| |
|
| | valid_faces = [] |
| | for x, y, w, h in faces: |
| | aspect_ratio = w / h |
| | face_area = w * h |
| | image_area = image.shape[0] * image.shape[1] |
| | face_size_ratio = face_area / image_area |
| |
|
| | if ( |
| | min_aspect_ratio <= aspect_ratio <= max_aspect_ratio |
| | and min_face_size <= face_size_ratio <= max_face_size |
| | ): |
| | valid_faces.append((x, y, w, h)) |
| |
|
| | if not valid_faces: |
| | print(f"No suitable faces detected in {image_path}") |
| | return None |
| |
|
| | |
| | valid_faces.sort(key=lambda f: f[2] * f[3], reverse=True) |
| | (x, y, w, h) = valid_faces[0] |
| |
|
| | |
| | try: |
| | face_crop = image[ |
| | int(y - h * 0.1) : int(y + h * 1.1), int(x - w * 0.1) : int(x + w * 1.1) |
| | ] |
| | if face_crop.size == 0: |
| | print(f"Failed to crop face for {image_path}: resulting crop is empty") |
| | return None |
| | except Exception as e: |
| | print(f"Error cropping face from {image_path}: {e}") |
| | return None |
| |
|
| | |
| | try: |
| | face_crop_rgb = cv2.cvtColor(face_crop, cv2.COLOR_BGR2RGB) |
| | except cv2.error as e: |
| | print(f"Error converting cropped face to RGB: {e} for {image_path}") |
| | return None |
| |
|
| | return face_crop_rgb, (x, y, w, h), image |
| |
|
| |
|
| | def preprocess_image(input_image): |
| | |
| | model = InceptionResnetV1(pretrained="vggface2").eval() |
| | input_image = np.array(input_image) |
| | image_crop = detect_and_crop_face(image=input_image) |
| | preprocess = transforms.Compose( |
| | [ |
| | transforms.Resize((160, 160)), |
| | transforms.ToTensor(), |
| | transforms.Normalize( |
| | [0.5, 0.5, 0.5], [0.5, 0.5, 0.5] |
| | ), |
| | ] |
| | ) |
| | if image_crop[0] is not None: |
| | img_tensor = preprocess(Image.fromarray(image_crop[0])) |
| | img_tensor = img_tensor.unsqueeze(0) |
| | with torch.no_grad(): |
| | embedding = model(img_tensor) |
| | return embedding.numpy().flatten() |
| |
|
| |
|
| | def encrypt(user_id, selfie_image, id_image): |
| | """Encrypt the given image for a specific user and filter. |
| | |
| | Args: |
| | user_id (int): The current user's ID. |
| | selfie_image (np.ndarray): The image to encrypt. |
| | id_image (np.ndarray): The image to encrypt. |
| | |
| | Returns: |
| | (input_image, encrypted_image_short) (Tuple[bytes]): The encrypted image and one of its |
| | representation. |
| | |
| | """ |
| | if user_id == "": |
| | raise gr.Error("Please generate the private key first.") |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| |
|
| | selfie_image_orig = selfie_image.copy() |
| | id_image_orig = id_image.copy() |
| |
|
| | selfie_image = Image.fromarray(selfie_image).convert("RGB") |
| | id_image = Image.fromarray(id_image).convert("RGB") |
| | embeddings_selfie = preprocess_image(selfie_image) |
| | embeddings_id = preprocess_image(id_image) |
| | X = np.concatenate((embeddings_selfie, embeddings_id))[np.newaxis, ...] |
| | |
| | client: FHEModelClient = get_client() |
| |
|
| | |
| | encrypted_image = client.quantize_encrypt_serialize(X) |
| |
|
| | |
| | |
| | encrypted_embedding = get_client_file_path("encrypted_embedding", user_id) |
| |
|
| | with encrypted_embedding.open("wb") as encrypted_image_file: |
| | encrypted_image_file.write(encrypted_image) |
| |
|
| | |
| | encrypted_image_short = shorten_bytes_object(encrypted_image) |
| |
|
| | return ( |
| | encrypted_image_short, |
| | resize_img(selfie_image_orig), |
| | resize_img(id_image_orig), |
| | ) |
| |
|
| |
|
| | def send_input(user_id): |
| | """Send the encrypted input image as well as the evaluation key to the server. |
| | |
| | Args: |
| | user_id (int): The current user's ID. |
| | filter_name (str): The current filter to consider. |
| | """ |
| | |
| | evaluation_key_path = get_client_file_path("evaluation_key", user_id) |
| |
|
| | if user_id == "" or not evaluation_key_path.is_file(): |
| | raise gr.Error("Please generate the private key first.") |
| |
|
| | encrypted_input_path = get_client_file_path("encrypted_embedding", user_id) |
| |
|
| | if not encrypted_input_path.is_file(): |
| | raise gr.Error( |
| | "Please generate the private key and then encrypt an image first." |
| | ) |
| |
|
| | |
| | data = { |
| | "user_id": user_id, |
| | } |
| |
|
| | files = [ |
| | ("files", open(encrypted_input_path, "rb")), |
| | ("files", open(evaluation_key_path, "rb")), |
| | ] |
| |
|
| | |
| | url = SERVER_URL + "send_input" |
| | with requests.post( |
| | url=url, |
| | data=data, |
| | files=files, |
| | ) as response: |
| | return response.ok |
| |
|
| |
|
| | def run_fhe(user_id): |
| | """Apply the filter on the encrypted image previously sent using FHE. |
| | |
| | Args: |
| | user_id (int): The current user's ID. |
| | filter_name (str): The current filter to consider. |
| | """ |
| | data = { |
| | "user_id": user_id, |
| | } |
| |
|
| | |
| | url = SERVER_URL + "run_fhe" |
| | with requests.post( |
| | url=url, |
| | data=data, |
| | ) as response: |
| | if response.ok: |
| | return response.json() |
| | else: |
| | raise gr.Error("Please wait for the input image to be sent to the server.") |
| |
|
| |
|
| | def get_output(user_id): |
| | """Retrieve the encrypted output image. |
| | |
| | Args: |
| | user_id (int): The current user's ID. |
| | filter_name (str): The current filter to consider. |
| | |
| | Returns: |
| | encrypted_output_image_short (bytes): A representation of the encrypted result. |
| | |
| | """ |
| | data = { |
| | "user_id": user_id, |
| | } |
| |
|
| | |
| | url = SERVER_URL + "get_output" |
| | with requests.post( |
| | url=url, |
| | data=data, |
| | ) as response: |
| | if response.ok: |
| | encrypted_output = response.content |
| |
|
| | |
| | |
| | encrypted_output_path = get_client_file_path("encrypted_output", user_id) |
| |
|
| | with encrypted_output_path.open("wb") as encrypted_output_file: |
| | encrypted_output_file.write(encrypted_output) |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | encrypted_output_short = shorten_bytes_object(encrypted_output) |
| |
|
| | return encrypted_output_short |
| |
|
| | else: |
| | raise gr.Error("Please wait for the FHE execution to be completed.") |
| |
|
| |
|
| | def decrypt_output(user_id): |
| | """Decrypt the result. |
| | |
| | Args: |
| | user_id (int): The current user's ID. |
| | filter_name (str): The current filter to consider. |
| | |
| | Returns: |
| | (output_image, False, False) ((Tuple[np.ndarray, bool, bool]): The decrypted output, as |
| | well as two booleans used for resetting Gradio checkboxes |
| | |
| | """ |
| | if user_id == "": |
| | raise gr.Error("Please generate the private key first.") |
| |
|
| | |
| | encrypted_output_path = get_client_file_path("encrypted_output", user_id) |
| |
|
| | if not encrypted_output_path.is_file(): |
| | raise gr.Error("Please run the FHE execution first.") |
| |
|
| | |
| | with encrypted_output_path.open("rb") as encrypted_output_file: |
| | encrypted_output = encrypted_output_file.read() |
| |
|
| | |
| | client = get_client() |
| |
|
| | |
| | decrypted_ouput = client.deserialize_decrypt_dequantize(encrypted_output) |
| |
|
| | print(f"Decrypted output: {decrypted_ouput.shape=}") |
| | print(f"Decrypted output: {decrypted_ouput=}") |
| |
|
| | predicted_class_id = np.argmax(decrypted_ouput) |
| | print(f"{predicted_class_id=}") |
| | return "PASS" if predicted_class_id == 1 else "FAIL" |
| |
|
| |
|
| | def resize_img(img, width=256, height=256): |
| | """Resize the image.""" |
| | if img.dtype != np.uint8: |
| | img = img.astype(np.uint8) |
| | img_pil = Image.fromarray(img) |
| | |
| | resized_img_pil = img_pil.resize((width, height)) |
| | |
| | return np.array(resized_img_pil) |
| |
|
| |
|
| | |
| |
|
| |
|
| | print("Starting the demo...") |
| | with gr.Blocks(delete_cache=(60, 60)) as demo: |
| | gr.Markdown( |
| | """ |
| | <!--p align="center"> |
| | <img width=200 src="https://user-images.githubusercontent.com/5758427/197816413-d9cddad3-ba38-4793-847d-120975e1da11.png"> |
| | </p--> |
| | <h1 align="center">Verio “Privacy-Preserving Biometric Verification for Authentication”</h1> |
| | <p align="center"> |
| | #ppaihackteam14 |
| | <a href="https://github.com/zama-ai/concrete-ml"> <img style="vertical-align: middle; display:inline-block; margin-right: 3px;" width=15 src="https://user-images.githubusercontent.com/5758427/197972109-faaaff3e-10e2-4ab6-80f5-7531f7cfb08f.png">Concrete-ML</a> |
| | — |
| | <a href="https://docs.zama.ai/concrete-ml"> <img style="vertical-align: middle; display:inline-block; margin-right: 3px;" width=15 src="https://user-images.githubusercontent.com/5758427/197976802-fddd34c5-f59a-48d0-9bff-7ad1b00cb1fb.png">Documentation</a> |
| | — |
| | <a href="https://zama.ai/community"> <img style="vertical-align: middle; display:inline-block; margin-right: 3px;" width=15 src="https://user-images.githubusercontent.com/5758427/197977153-8c9c01a7-451a-4993-8e10-5a6ed5343d02.png">Community</a> |
| | — |
| | <a href="https://twitter.com/zama_fhe"> <img style="vertical-align: middle; display:inline-block; margin-right: 3px;" width=15 src="https://user-images.githubusercontent.com/5758427/197975044-bab9d199-e120-433b-b3be-abd73b211a54.png">@zama_fhe</a> |
| | </p> |
| | <!--p align="center"> |
| | <img src="https://user-images.githubusercontent.com/56846628/219605302-5baafac4-cf6f-4f06-9a96-91cef2b84a63.png" width="70%" height="70%"> |
| | </p--> |
| | """ |
| | ) |
| |
|
| | gr.Markdown("## Client side") |
| | gr.Markdown("### Step 1: Upload input images. ") |
| | |
| | |
| | |
| | |
| | |
| | gr.Markdown("The query image to certify.") |
| | with gr.Row(): |
| | input_query_img = gr.Image( |
| | value=None, |
| | label="Upload an image here.", |
| | height=256, |
| | width=256, |
| | |
| | interactive=True, |
| | ) |
| |
|
| | selfie_examples = gr.Examples( |
| | examples=SELFIE_EXAMPLES, |
| | inputs=[input_query_img], |
| | examples_per_page=5, |
| | label="Examples to use.", |
| | ) |
| | gr.Markdown("The reference image.") |
| | with gr.Row(): |
| | input_reference_img = gr.Image( |
| | value=None, |
| | label="Upload an image here.", |
| | height=256, |
| | width=256, |
| | |
| | interactive=True, |
| | ) |
| |
|
| | id_examples = gr.Examples( |
| | examples=ID_EXAMPLES, |
| | inputs=[input_reference_img], |
| | examples_per_page=5, |
| | label="Examples to use.", |
| | ) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | gr.Markdown("### Step 3: Generate the private key.") |
| | keygen_button = gr.Button("Generate the private key.") |
| |
|
| | with gr.Row(): |
| | keygen_checkbox = gr.Checkbox(label="Private key generated:", interactive=False) |
| |
|
| | user_id = gr.Textbox(label="", max_lines=2, interactive=False, visible=False) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | gr.Markdown("### Step 4: Encrypt the input images using FHE.") |
| | encrypt_button = gr.Button("Encrypt the images using FHE.") |
| |
|
| | with gr.Row(): |
| | encrypted_input = gr.Textbox( |
| | label="Encrypted input images representation:", |
| | max_lines=2, |
| | interactive=False, |
| | ) |
| |
|
| | gr.Markdown("## Server side") |
| | gr.Markdown( |
| | "The encrypted value is received by the server. The server can then compute the matcher " |
| | "directly over encrypted values. Once the computation is finished, the server returns " |
| | "the encrypted results to the client." |
| | ) |
| |
|
| | gr.Markdown("### Step 5: Send the encrypted images to the server.") |
| | send_input_button = gr.Button("Send the encrypted images to the server.") |
| | send_input_checkbox = gr.Checkbox(label="Encrypted images sent.", interactive=False) |
| |
|
| | gr.Markdown("### Step 6: Run FHE execution.") |
| | execute_fhe_button = gr.Button("Run FHE execution.") |
| | fhe_execution_time = gr.Textbox( |
| | label="Total FHE execution time (in seconds):", max_lines=1, interactive=False |
| | ) |
| |
|
| | gr.Markdown("### Step 7: Receive the encrypted output from the server.") |
| | gr.Markdown( |
| | "The result displayed here is the encrypted result sent by the server, which has been " |
| | "decrypted using a different private key. This is only used to visually represent an " |
| | "encrypted result." |
| | ) |
| | get_output_button = gr.Button( |
| | "Receive the encrypted output result from the server." |
| | ) |
| |
|
| | with gr.Row(): |
| | |
| | encrypted_output_representation = gr.Textbox( |
| | label="Encrypted encrypted output result:", |
| | max_lines=2, |
| | interactive=False, |
| | ) |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | gr.Markdown("## Client side") |
| | gr.Markdown( |
| | "The encrypted output is sent back to the client, who can finally decrypt it with the " |
| | "private key. Only the client is aware of the original input images and the result of the matching." |
| | ) |
| |
|
| | gr.Markdown("### Step 8: Decrypt the output.") |
| | gr.Markdown( |
| | "The images displayed on the left are the input images used during the demo. The output result " |
| | "can be seen on the right." |
| | ) |
| | decrypt_button = gr.Button("Decrypt the output") |
| |
|
| | |
| | with gr.Row(): |
| | original_query_image = gr.Image( |
| | input_query_img.value, |
| | label=f"Input query image:", |
| | interactive=False, |
| | height=256, |
| | width=256, |
| | ) |
| | original_reference_image = gr.Image( |
| | input_reference_img.value, |
| | label=f"Input reference image:", |
| | interactive=False, |
| | height=256, |
| | width=256, |
| | ) |
| | output_result = gr.Label() |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | keygen_button.click( |
| | keygen, |
| | inputs=[], |
| | outputs=[user_id, keygen_checkbox], |
| | ) |
| |
|
| | |
| | encrypt_button.click( |
| | encrypt, |
| | inputs=[user_id, input_query_img, input_reference_img], |
| | outputs=[encrypted_input, original_query_image, original_reference_image], |
| | ) |
| |
|
| | |
| | send_input_button.click(send_input, inputs=[user_id], outputs=[send_input_checkbox]) |
| |
|
| | |
| | execute_fhe_button.click(run_fhe, inputs=[user_id], outputs=[fhe_execution_time]) |
| |
|
| | |
| | get_output_button.click( |
| | get_output, |
| | inputs=[user_id], |
| | outputs=[encrypted_output_representation], |
| | ) |
| |
|
| | |
| | decrypt_button.click( |
| | decrypt_output, |
| | inputs=[user_id], |
| | |
| | outputs=[output_result], |
| | ) |
| |
|
| | gr.Markdown( |
| | "The app was built with [Concrete-ML](https://github.com/zama-ai/concrete-ml), a " |
| | "Privacy-Preserving Machine Learning (PPML) open-source set of tools by [Zama](https://zama.ai/). " |
| | "Try it yourself and don't forget to star on Github ⭐." |
| | ) |
| |
|
| | demo.launch(share=False) |
| |
|