| | """A local gradio app that filters images using FHE.""" |
| |
|
| | import os |
| | import shutil |
| | import subprocess |
| | import time |
| |
|
| | import gradio as gr |
| | import numpy |
| | import requests |
| | from common import ( |
| | AVAILABLE_FILTERS, |
| | CLIENT_TMP_PATH, |
| | EXAMPLES, |
| | FILTERS_PATH, |
| | INPUT_SHAPE, |
| | KEYS_PATH, |
| | WRONG_KEYS_PATH, |
| | REPO_DIR, |
| | SERVER_URL, |
| | ) |
| | from custom_client_server import CustomFHEClient |
| |
|
| | |
| | subprocess.Popen(["uvicorn", "server:app"], cwd=REPO_DIR) |
| | time.sleep(3) |
| |
|
| |
|
| | def decrypt_output_with_wrong_key(encrypted_image, filter_name): |
| | """Decrypt the encrypted output using a different private key. |
| | """ |
| | |
| | filter_path = FILTERS_PATH / f"{filter_name}/deployment" |
| |
|
| | |
| | wrong_client = CustomFHEClient(filter_path, WRONG_KEYS_PATH) |
| | wrong_client.generate_private_and_evaluation_keys(force=True) |
| |
|
| | |
| | output_image = wrong_client.deserialize_decrypt_dequantize(encrypted_image) |
| |
|
| | return output_image |
| |
|
| |
|
| | def shorten_bytes_object(bytes_object, limit=500): |
| | """Shorten the input bytes object to a given length. |
| | |
| | Encrypted data is too large for displaying it in the browser using Gradio. This function |
| | provides a shorten representation of it. |
| | |
| | Args: |
| | bytes_object (bytes): The input to shorten |
| | limit (int): The length to consider. Default to 500. |
| | |
| | Returns: |
| | Any: The fitted model. |
| | |
| | """ |
| | |
| | shift = 100 |
| | return bytes_object[shift : limit + shift].hex() |
| |
|
| |
|
| | def get_client(user_id, filter_name): |
| | """Get the client API. |
| | |
| | Args: |
| | user_id (int): The current user's ID. |
| | filter_name (str): The filter chosen by the user |
| | |
| | Returns: |
| | CustomFHEClient: The client API. |
| | """ |
| | return CustomFHEClient( |
| | FILTERS_PATH / f"{filter_name}/deployment", KEYS_PATH / f"{filter_name}_{user_id}" |
| | ) |
| |
|
| |
|
| | def get_client_file_path(name, user_id, filter_name): |
| | """Get the correct temporary file path for the client. |
| | |
| | Args: |
| | name (str): The desired file name. |
| | user_id (int): The current user's ID. |
| | filter_name (str): The filter chosen by the user |
| | |
| | Returns: |
| | pathlib.Path: The file path. |
| | """ |
| | return CLIENT_TMP_PATH / f"{name}_{filter_name}_{user_id}" |
| |
|
| |
|
| | def clean_temporary_files(n_keys=20): |
| | """Clean keys and encrypted images. |
| | |
| | A maximum of n_keys keys are allowed to be stored. Once this limit is reached, the oldest are |
| | deleted. |
| | |
| | Args: |
| | n_keys (int): The maximum number of keys to be stored. Default to 20. |
| | |
| | """ |
| | |
| | list_files = sorted(KEYS_PATH.iterdir(), key=os.path.getmtime) |
| |
|
| | |
| | user_ids = [] |
| | if len(list_files) > n_keys: |
| | n_files_to_delete = len(list_files) - n_keys |
| | for p in list_files[:n_files_to_delete]: |
| | user_ids.append(p.name) |
| | shutil.rmtree(p) |
| |
|
| | |
| | list_files_tmp = CLIENT_TMP_PATH.iterdir() |
| |
|
| | |
| | for file in list_files_tmp: |
| | for user_id in user_ids: |
| | if file.name.endswith(f"{user_id}.npy"): |
| | file.unlink() |
| |
|
| |
|
| | def keygen(filter_name): |
| | """Generate the private key associated to a filter. |
| | |
| | Args: |
| | filter_name (str): The current filter to consider. |
| | |
| | Returns: |
| | (user_id, True) (Tuple[int, bool]): The current user's ID and a boolean used for visual display. |
| | |
| | """ |
| | |
| | clean_temporary_files() |
| |
|
| | |
| | user_id = numpy.random.randint(0, 2**32) |
| |
|
| | |
| | client = get_client(user_id, filter_name) |
| |
|
| | |
| | client.generate_private_and_evaluation_keys(force=True) |
| |
|
| | |
| | |
| | |
| | evaluation_key = client.get_serialized_evaluation_keys() |
| |
|
| | |
| | private_key_path = next(client.key_dir.iterdir()) / "0_0/secretKey_big" |
| | private_key_size = private_key_path.stat().st_size / 1000 |
| |
|
| | |
| | |
| | evaluation_key_path = get_client_file_path("evaluation_key", user_id, filter_name) |
| |
|
| | with evaluation_key_path.open("wb") as evaluation_key_file: |
| | evaluation_key_file.write(evaluation_key) |
| |
|
| | return (user_id, True, private_key_size) |
| |
|
| |
|
| | def encrypt(user_id, input_image, filter_name): |
| | """Encrypt the given image for a specific user and filter. |
| | |
| | Args: |
| | user_id (int): The current user's ID. |
| | input_image (numpy.ndarray): The image to encrypt. |
| | filter_name (str): The current filter to consider. |
| | |
| | Returns: |
| | (input_image, encrypted_image_short) (Tuple[bytes]): The encrypted image and one of its |
| | representation. |
| | |
| | """ |
| | if user_id == "": |
| | raise gr.Error("Please generate the private key first.") |
| |
|
| | if input_image is None: |
| | raise gr.Error("Please choose an image first.") |
| |
|
| | |
| | client = get_client(user_id, filter_name) |
| |
|
| | |
| | preprocessed_input_image = client.model.pre_processing(input_image) |
| |
|
| | |
| | encrypted_image = client.quantize_encrypt_serialize(preprocessed_input_image) |
| |
|
| | |
| | encrypted_input_size = len(encrypted_image) / 1000000 |
| |
|
| | |
| | |
| | encrypted_image_path = get_client_file_path("encrypted_image", user_id, filter_name) |
| |
|
| | with encrypted_image_path.open("wb") as encrypted_image_file: |
| | encrypted_image_file.write(encrypted_image) |
| |
|
| | |
| | encrypted_image_short = shorten_bytes_object(encrypted_image) |
| |
|
| | return (input_image, encrypted_image_short, encrypted_input_size) |
| |
|
| |
|
| | def send_input(user_id, filter_name): |
| | """Send the encrypted input image as well as the evaluation key to the server. |
| | |
| | Args: |
| | user_id (int): The current user's ID. |
| | filter_name (str): The current filter to consider. |
| | """ |
| | |
| | evaluation_key_path = get_client_file_path("evaluation_key", user_id, filter_name) |
| |
|
| | if user_id == "" or not evaluation_key_path.is_file(): |
| | raise gr.Error("Please generate the private key first.") |
| |
|
| | encrypted_input_path = get_client_file_path("encrypted_image", user_id, filter_name) |
| |
|
| | if not encrypted_input_path.is_file(): |
| | raise gr.Error("Please generate the private key and then encrypt an image first.") |
| |
|
| | |
| | data = { |
| | "user_id": user_id, |
| | "filter": filter_name, |
| | } |
| |
|
| | files = [ |
| | ("files", open(encrypted_input_path, "rb")), |
| | ("files", open(evaluation_key_path, "rb")), |
| | ] |
| |
|
| | |
| | url = SERVER_URL + "send_input" |
| | with requests.post( |
| | url=url, |
| | data=data, |
| | files=files, |
| | ) as response: |
| | return response.ok |
| |
|
| |
|
| | def run_fhe(user_id, filter_name): |
| | """Apply the filter on the encrypted image previously sent using FHE. |
| | |
| | Args: |
| | user_id (int): The current user's ID. |
| | filter_name (str): The current filter to consider. |
| | """ |
| | data = { |
| | "user_id": user_id, |
| | "filter": filter_name, |
| | } |
| |
|
| | |
| | url = SERVER_URL + "run_fhe" |
| | with requests.post( |
| | url=url, |
| | data=data, |
| | ) as response: |
| | if response.ok: |
| | return response.json() |
| | else: |
| | raise gr.Error("Please wait for the input image to be sent to the server.") |
| |
|
| |
|
| | def get_output(user_id, filter_name): |
| | """Retrieve the encrypted output image. |
| | |
| | Args: |
| | user_id (int): The current user's ID. |
| | filter_name (str): The current filter to consider. |
| | |
| | Returns: |
| | encrypted_output_image_short (bytes): A representation of the encrypted result. |
| | |
| | """ |
| | data = { |
| | "user_id": user_id, |
| | "filter": filter_name, |
| | } |
| |
|
| | |
| | url = SERVER_URL + "get_output" |
| | with requests.post( |
| | url=url, |
| | data=data, |
| | ) as response: |
| | if response.ok: |
| | encrypted_output = response.content |
| |
|
| | |
| | encrypted_output_size = len(encrypted_output) / 1000000 |
| |
|
| | |
| | |
| | encrypted_output_path = get_client_file_path("encrypted_output", user_id, filter_name) |
| |
|
| | with encrypted_output_path.open("wb") as encrypted_output_file: |
| | encrypted_output_file.write(encrypted_output) |
| |
|
| | |
| | output_image_representation = decrypt_output_with_wrong_key(encrypted_output, filter_name) |
| |
|
| | return output_image_representation, encrypted_output_size |
| | else: |
| | raise gr.Error("Please wait for the FHE execution to be completed.") |
| |
|
| |
|
| | def decrypt_output(user_id, filter_name): |
| | """Decrypt the result. |
| | |
| | Args: |
| | user_id (int): The current user's ID. |
| | filter_name (str): The current filter to consider. |
| | |
| | Returns: |
| | (output_image, False, False) ((Tuple[numpy.ndarray, bool, bool]): The decrypted output, as |
| | well as two booleans used for resetting Gradio checkboxes |
| | |
| | """ |
| | if user_id == "": |
| | raise gr.Error("Please generate the private key first.") |
| |
|
| | |
| | encrypted_output_path = get_client_file_path("encrypted_output", user_id, filter_name) |
| |
|
| | if not encrypted_output_path.is_file(): |
| | raise gr.Error("Please run the FHE execution first.") |
| |
|
| | |
| | with encrypted_output_path.open("rb") as encrypted_output_file: |
| | encrypted_output_image = encrypted_output_file.read() |
| |
|
| | |
| | client = get_client(user_id, filter_name) |
| |
|
| | |
| | output_image = client.deserialize_decrypt_dequantize(encrypted_output_image) |
| |
|
| | return output_image, False, False |
| |
|
| |
|
| | demo = gr.Blocks() |
| |
|
| |
|
| | print("Starting the demo...") |
| | with demo: |
| | gr.Markdown( |
| | """ |
| | <p align="center"> |
| | </p> |
| | <p align="center"> |
| | </p> |
| | """ |
| | ) |
| |
|
| | gr.Markdown("## Client side") |
| | gr.Markdown("### Step 1. Upload an image. ") |
| | gr.Markdown( |
| | f"The image will automatically be resized to shape ({INPUT_SHAPE[0]}x{INPUT_SHAPE[1]})." |
| | "The image displayed here is however using its original resolution. The true image used " |
| | "in this demo can be seen in Step. 8." |
| | ) |
| | with gr.Row(): |
| | input_image = gr.Image( |
| | label="Upload an image here.", shape=INPUT_SHAPE, source="upload", interactive=True |
| | ) |
| |
|
| | examples = gr.Examples( |
| | examples=EXAMPLES, inputs=[input_image], examples_per_page=5, label="Examples to use." |
| | ) |
| |
|
| | gr.Markdown("### Step 2. Choose your filter") |
| | filter_name = gr.Dropdown( |
| | choices=AVAILABLE_FILTERS, value="inverted", label="Choose your filter", interactive=True |
| | ) |
| |
|
| | gr.Markdown("#### Notes") |
| | gr.Markdown( |
| | """ |
| | - The private key is used to encrypt and decrypt the data and shall never be shared. |
| | - No public key are required for these filter operators. |
| | """ |
| | ) |
| | |
| | gr.Markdown("### Step 3. Generate the private key.") |
| | keygen_button = gr.Button("Generate the private key.") |
| |
|
| | with gr.Row(): |
| | keygen_checkbox = gr.Checkbox(label="Private key generated:", interactive=False) |
| |
|
| | private_key_size = gr.Number( |
| | label="Private key size (in kB):", value=0, precision=1, interactive=False |
| | ) |
| |
|
| | user_id = gr.Textbox(label="", max_lines=2, interactive=False, visible=False) |
| | |
| | gr.Markdown("### Step 4. Encrypt the image using FHE.") |
| | encrypt_button = gr.Button("Encrypt the image using FHE.") |
| |
|
| | with gr.Row(): |
| | encrypted_input = gr.Textbox( |
| | label="Encrypted input representation:", max_lines=2, interactive=False |
| | ) |
| |
|
| | encrypted_input_size = gr.Number( |
| | label="Encrypted input size (in MB):", value=0, precision=1, interactive=False |
| | ) |
| |
|
| | gr.Markdown("## Server side") |
| | gr.Markdown( |
| | "The encrypted value is received by the server. The server can then compute the filter " |
| | "directly over encrypted values. Once the computation is finished, the server returns " |
| | "the encrypted results to the client." |
| | ) |
| |
|
| | gr.Markdown("### Step 5. Send the encrypted image to the server.") |
| | send_input_button = gr.Button("Send the encrypted image to the server.") |
| | send_input_checkbox = gr.Checkbox(label="Encrypted image sent.", interactive=False) |
| |
|
| | gr.Markdown("### Step 6. Run FHE execution.") |
| | execute_fhe_button = gr.Button("Run FHE execution.") |
| | fhe_execution_time = gr.Textbox( |
| | label="Total FHE execution time (in seconds):", max_lines=1, interactive=False |
| | ) |
| |
|
| | gr.Markdown("### Step 7. Receive the encrypted output image from the server.") |
| | gr.Markdown( |
| | "The image displayed here is the encrypted result sent by the server which has been " |
| | "decrypted using a different private key. This is only used to visually represent an " |
| | "encrypted image." |
| | ) |
| | get_output_button = gr.Button("Receive the encrypted output image from the server.") |
| |
|
| | with gr.Row(): |
| | encrypted_output_representation = gr.Image( |
| | label=f"Encrypted output representation ({INPUT_SHAPE[0]}x{INPUT_SHAPE[1]}):", interactive=False |
| | ) |
| | encrypted_output_representation.style(height=256, width=256) |
| |
|
| | encrypted_output_size = gr.Number( |
| | label="Encrypted output size (in MB):", value=0, precision=1, interactive=False |
| | ) |
| |
|
| | gr.Markdown("## Client side") |
| | gr.Markdown( |
| | "The encrypted output is sent back to client, who can finally decrypt it with its " |
| | "private key. Only the client is aware of the original image and its transformed version." |
| | ) |
| |
|
| | gr.Markdown("### Step 8. Decrypt the output") |
| | gr.Markdown( |
| | "The image displayed on the left is the input image used during the demo. The output image " |
| | "can be seen on the right." |
| | ) |
| | decrypt_button = gr.Button("Decrypt the output") |
| |
|
| | |
| | with gr.Row(): |
| | original_image = gr.Image( |
| | input_image.value, |
| | label=f"Input image ({INPUT_SHAPE[0]}x{INPUT_SHAPE[1]}):", |
| | interactive=False, |
| | ) |
| | original_image.style(height=256, width=256) |
| |
|
| | output_image = gr.Image( |
| | label=f"Output image ({INPUT_SHAPE[0]}x{INPUT_SHAPE[1]}):", interactive=False |
| | ) |
| | output_image.style(height=256, width=256) |
| |
|
| | |
| | keygen_button.click( |
| | keygen, |
| | inputs=[filter_name], |
| | outputs=[user_id, keygen_checkbox, private_key_size], |
| | ) |
| |
|
| | |
| | encrypt_button.click( |
| | encrypt, |
| | inputs=[user_id, input_image, filter_name], |
| | outputs=[original_image, encrypted_input, encrypted_input_size], |
| | ) |
| |
|
| | |
| | send_input_button.click( |
| | send_input, inputs=[user_id, filter_name], outputs=[send_input_checkbox] |
| | ) |
| |
|
| | |
| | execute_fhe_button.click(run_fhe, inputs=[user_id, filter_name], outputs=[fhe_execution_time]) |
| |
|
| | |
| | get_output_button.click( |
| | get_output, |
| | inputs=[user_id, filter_name], |
| | outputs=[encrypted_output_representation, encrypted_output_size] |
| | ) |
| |
|
| | |
| | decrypt_button.click( |
| | decrypt_output, |
| | inputs=[user_id, filter_name], |
| | outputs=[output_image, keygen_checkbox, send_input_checkbox], |
| | ) |
| |
|
| | gr.Markdown( |
| | "The app was built with [Concrete-ML](https://github.com/zama-ai/concrete-ml), a " |
| | "Privacy-Preserving Machine Learning (PPML) open-source set of tools by [Zama](https://zama.ai/). " |
| | "Try it yourself and don't forget to star on Github ⭐." |
| | ) |
| |
|
| | demo.launch(share=False) |
| |
|