| "Filter definitions, with pre-processing, post-processing and compilation methods." |
|
|
| import json |
|
|
| import numpy as np |
| import torch |
| from common import AVAILABLE_FILTERS |
| from concrete.numpy.compilation.compiler import Compiler |
| from torch import nn |
|
|
| from concrete.ml.common.debugging.custom_assert import assert_true |
| from concrete.ml.common.utils import generate_proxy_function |
| from concrete.ml.onnx.convert import get_equivalent_numpy_forward |
| from concrete.ml.torch.numpy_module import NumpyModule |
| from concrete.ml.version import __version__ as CML_VERSION |
|
|
| |
| |
|
|
|
|
| class _TorchIdentity(nn.Module): |
| """Torch identity model.""" |
|
|
| def forward(self, x): |
| """Identity forward pass. |
| |
| Args: |
| x (torch.Tensor): The input image. |
| |
| Returns: |
| x (torch.Tensor): The input image. |
| """ |
| return x |
|
|
|
|
| class _TorchInverted(nn.Module): |
| """Torch inverted model.""" |
|
|
| def forward(self, x): |
| """Forward pass for inverting an image's colors. |
| |
| Args: |
| x (torch.Tensor): The input image. |
| |
| Returns: |
| torch.Tensor: The (color) inverted image. |
| """ |
| return 255 - x |
|
|
|
|
| class _TorchRotate(nn.Module): |
| """Torch rotated model.""" |
|
|
| def forward(self, x): |
| """Forward pass for rotating an image. |
| |
| Args: |
| x (torch.Tensor): The input image. |
| |
| Returns: |
| torch.Tensor: The rotated image. |
| """ |
| return x.transpose(2, 3) |
|
|
|
|
| class _TorchConv2D(nn.Module): |
| """Torch model for applying a single 2D convolution operator on images.""" |
|
|
| def __init__(self, kernel, n_in_channels=3, n_out_channels=3, groups=1): |
| """Initializing the filter |
| |
| Args: |
| kernel (np.ndarray): The convolution kernel to consider. |
| """ |
| super().__init__() |
| self.kernel = kernel |
| self.n_out_channels = n_out_channels |
| self.n_in_channels = n_in_channels |
| self.groups = groups |
|
|
| def forward(self, x): |
| """Forward pass for filtering the image using a 2D kernel. |
| |
| Args: |
| x (torch.Tensor): The input image. |
| |
| Returns: |
| torch.Tensor: The filtered image. |
| |
| """ |
| |
| stride = 1 |
| kernel_shape = self.kernel.shape |
|
|
| |
| |
| if len(kernel_shape) == 1: |
| kernel = self.kernel.reshape( |
| self.n_out_channels, |
| self.n_in_channels // self.groups, |
| 1, |
| 1, |
| ) |
|
|
| |
| elif len(kernel_shape) == 2: |
| kernel = self.kernel.expand( |
| self.n_out_channels, |
| self.n_in_channels // self.groups, |
| kernel_shape[0], |
| kernel_shape[1], |
| ) |
| else: |
| raise ValueError( |
| "Wrong kernel shape, only 1D or 2D kernels are accepted. Got kernel of shape " |
| f"{kernel_shape}" |
| ) |
|
|
| return nn.functional.conv2d(x, kernel, stride=stride, groups=self.groups) |
|
|
|
|
| class Filter: |
| """Filter class used in the app.""" |
|
|
| def __init__(self, image_filter="inverted"): |
| """Initializing the filter class using a given filter. |
| |
| Most filters can be found at https://en.wikipedia.org/wiki/Kernel_(image_processing). |
| |
| Args: |
| image_filter (str): The filter to consider. Default to "inverted". |
| """ |
|
|
| assert_true( |
| image_filter in AVAILABLE_FILTERS, |
| f"Unsupported image filter or transformation. Expected one of {*AVAILABLE_FILTERS,}, " |
| f"but got {image_filter}", |
| ) |
|
|
| self.filter = image_filter |
| self.divide = None |
| self.repeat_out_channels = False |
|
|
| if image_filter == "identity": |
| self.torch_model = _TorchIdentity() |
|
|
| elif image_filter == "inverted": |
| self.torch_model = _TorchInverted() |
|
|
| elif image_filter == "rotate": |
| self.torch_model = _TorchRotate() |
|
|
| elif image_filter == "black and white": |
| |
| |
| |
| |
| |
| |
| |
| |
| kernel = torch.tensor([299, 587, 114]) |
|
|
| self.torch_model = _TorchConv2D(kernel, n_out_channels=1, groups=1) |
|
|
| |
| self.divide = 1000 |
|
|
| |
| self.repeat_out_channels = True |
|
|
| elif image_filter == "blur": |
| kernel = torch.ones((3, 3), dtype=torch.int64) |
|
|
| self.torch_model = _TorchConv2D(kernel, n_out_channels=3, groups=3) |
|
|
| |
| self.divide = 9 |
|
|
| elif image_filter == "sharpen": |
| kernel = torch.tensor( |
| [ |
| [0, -1, 0], |
| [-1, 5, -1], |
| [0, -1, 0], |
| ] |
| ) |
|
|
| self.torch_model = _TorchConv2D(kernel, n_out_channels=3, groups=3) |
|
|
| elif image_filter == "ridge detection": |
| |
| |
|
|
| kernel = torch.tensor( |
| [ |
| [-1, -1, -1], |
| [-1, 9, -1], |
| [-1, -1, -1], |
| ] |
| ) |
|
|
| self.torch_model = _TorchConv2D(kernel, n_out_channels=1, groups=1) |
|
|
| |
| |
| self.repeat_out_channels = True |
|
|
| self.onnx_model = None |
| self.fhe_circuit = None |
|
|
| def compile(self, inputset, onnx_model=None): |
| """Compile the model using an inputset. |
| |
| Args: |
| inputset (List[np.ndarray]): The set of images to use for compilation |
| onnx_model (onnx.ModelProto): The loaded onnx model to consider. If None, it will be |
| generated automatically using a NumpyModule. Default to None. |
| """ |
| |
| |
| inputset = tuple( |
| np.expand_dims(input.transpose(2, 0, 1), axis=0).astype(np.int64) for input in inputset |
| ) |
|
|
| |
| if onnx_model is None: |
| numpy_module = NumpyModule( |
| self.torch_model, |
| dummy_input=torch.from_numpy(inputset[0]), |
| ) |
|
|
| onnx_model = numpy_module.onnx_model |
|
|
| |
| self.onnx_model = onnx_model |
| numpy_filter = get_equivalent_numpy_forward(onnx_model) |
|
|
| numpy_filter_proxy, parameters_mapping = generate_proxy_function(numpy_filter, ["inputs"]) |
|
|
| compiler = Compiler( |
| numpy_filter_proxy, |
| {parameters_mapping["inputs"]: "encrypted"}, |
| ) |
|
|
| |
| self.fhe_circuit = compiler.compile(inputset) |
|
|
| return self.fhe_circuit |
|
|
| def pre_processing(self, input_image): |
| """Processing that needs to be applied before encryption. |
| |
| Args: |
| input_image (np.ndarray): The image to pre-process |
| |
| Returns: |
| input_image (np.ndarray): The pre-processed image |
| """ |
| |
| |
| input_image = np.expand_dims(input_image.transpose(2, 0, 1), axis=0).astype(np.int64) |
|
|
| return input_image |
|
|
| def post_processing(self, output_image): |
| """Processing that needs to be applied after decryption. |
| |
| Args: |
| input_image (np.ndarray): The decrypted image to post-process |
| |
| Returns: |
| input_image (np.ndarray): The post-processed image |
| """ |
| |
| if self.divide is not None: |
| output_image //= self.divide |
|
|
| |
| output_image = output_image.clip(0, 255) |
|
|
| |
| |
| output_image = output_image.transpose(0, 2, 3, 1).squeeze(0) |
|
|
| |
| if self.repeat_out_channels: |
| output_image = output_image.repeat(3, axis=2) |
|
|
| return output_image |
|
|
| @classmethod |
| def from_json(cls, json_path): |
| """Instantiate a filter using a json file. |
| |
| Args: |
| json_path (Union[str, pathlib.Path]): Path to the json file. |
| |
| Returns: |
| model (Filter): The instantiated filter class. |
| """ |
| |
| with open(json_path, "r", encoding="utf-8") as f: |
| serialized_processing = json.load(f) |
|
|
| |
| assert_true( |
| serialized_processing["cml_version"] == CML_VERSION, |
| f"The version of Concrete ML library ({CML_VERSION}) is different " |
| f"from the one used to save the model ({serialized_processing['cml_version']}). " |
| "Please update to the proper Concrete ML version.", |
| ) |
|
|
| |
| model = cls(image_filter=serialized_processing["model_filter"]) |
|
|
| return model |
|
|
| def to_json(self, path_dir, file_name="serialized_processing"): |
| """Export the parameters to a json file. |
| |
| Args: |
| path_dir (Union[str, pathlib.Path]): The path to consider when saving the file. |
| file_name (str): The file name |
| """ |
| |
| serialized_processing = { |
| "model_filter": self.filter, |
| } |
| serialized_processing = self._clean_dict_types_for_json(serialized_processing) |
|
|
| |
| serialized_processing["cml_version"] = CML_VERSION |
|
|
| |
| with open(path_dir / f"{file_name}.json", "w", encoding="utf-8") as f: |
| json.dump(serialized_processing, f) |
|
|
| def _clean_dict_types_for_json(self, d: dict) -> dict: |
| """Clean all values in the dict to be json serializable. |
| |
| Args: |
| d (Dict): The dict to clean |
| |
| Returns: |
| Dict: The cleaned dict |
| """ |
| key_to_delete = [] |
| for key, value in d.items(): |
| if isinstance(value, list) and len(value) > 0 and isinstance(value[0], dict): |
| d[key] = [self._clean_dict_types_for_json(v) for v in value] |
| elif isinstance(value, dict): |
| d[key] = self._clean_dict_types_for_json(value) |
| elif isinstance(value, (np.generic, np.ndarray)): |
| d[key] = d[key].tolist() |
|
|
| for key in key_to_delete: |
| d.pop(key) |
| return d |
|
|