import gradio as gr from customs.utils import rgb2rggb, rggb2rgb, CV72fillCurve, rggb2rgb_np import torch, os import numpy as np from openvino.inference_engine import IECore from cryptography.fernet import Fernet device = torch.device("cuda" if torch.cuda.is_available() else "cpu") CONFIG = { "noise_levels": [4, 6, 8, 10, 12], "raw_images": [ "data/4Card_Gain160_E30.npy", "data/4Card_Gain180_E30.npy", "data/4Card_Gain200_E30.npy", "data/4Card_Gain220_E30.npy", ], "weights": [ "customs/weights/model_ir_0.xml.encrypted", "customs/weights/model_ir_1.xml.encrypted", "customs/weights/model_ir_2.xml.encrypted", "customs/weights/model_ir_3.xml.encrypted", ], "SIDD_model_weights": "customs/weights/model_ir_SIDD.xml.encrypted", } def main(): with gr.Blocks() as demo: create_text("Raw Image Denoiser", size=10) create_text( "Data Detail : Collect images of imx678 image sensor and analyze the noise composition and distribution", size=5, ) create_text("Model Detail : ", size=5) create_text( "Synthesis Data : We have the technology to analyze and apply noise", size=3 ) create_text( "Our Denoiser : Our model architecture is trained on synthesize noise with SD images", size=3, ) create_text( "SIDD Denoiser : Our model architecture is trained on SIDD dataset", size=3 ) create_text( "Community : Any questions please contact us (tim.liu@liteon.com)", size=3 ) with gr.Tab("Synthesis"): with gr.Column(): with gr.Row(): image1 = gr.Image(label="Your Input Image") with gr.Column(): noise_level1 = create_slider("noise level") denoise_level1 = create_slider("denoise level") use_synthesis = gr.Checkbox(label="Use synthesis", value=True) image_button1 = gr.Button("Inference") # create_text("SIDD Denoiser : Our model architecture is trained to SIDD dataset") image_input1 = [ image1, noise_level1, denoise_level1, use_synthesis, ] with gr.Row(): SynthesisNoise1 = gr.Image(label="Synthesis noise") OurDenoise1 = gr.Image(label="Our denoiser result") with gr.Row(): SIDDDenoise1 = gr.Image(label="SIDD denoiser result") examples1 = gr.Examples( examples=[ ["data/4Card.png"], ["data/Color.png"], ["data/Focus.png"], ], inputs=image_input1, ) image_output1 = [SynthesisNoise1, OurDenoise1, SIDDDenoise1] with gr.Tab("Real"): with gr.Column(): with gr.Row(): with gr.Column(): noise_level2 = create_slider("noise level") denoise_level2 = create_slider("denoise level") image_button2 = gr.Button("Inference") image_input2 = [noise_level2, denoise_level2] RealRow = gr.Image(label="Real noise") with gr.Row(): OurDenoise2 = gr.Image(label="Our denoiser result") SIDDDenoise2 = gr.Image(label="SIDD denoiser result") image_output2 = [RealRow, OurDenoise2, SIDDDenoise2] # Add picture here create_text( "Our model SNR is about 5dB higher than the SIDD model, as shown in the figure below", size=3, ) with gr.Row(): with gr.Column(): # Column containing the image gr.Image(label="SNR", value="customs/SNR.png") image_button1.click( denoise_synthesis, inputs=image_input1, outputs=image_output1 ) image_button2.click(denoise_real, inputs=image_input2, outputs=image_output2) demo.launch() def decrypt_model(encrypted_file_path, decrypted_file_path): """ 解密模型文件 """ # if key file not exist, get env key if os.path.exists("IRModelKey.txt"): with open("IRModelKey.txt", "rb") as file: key = file.read() else: # get env key key = os.getenv("IRModelKey") cipher_suite = Fernet(key) with open(encrypted_file_path, "rb") as file: encrypted_data = file.read() decrypted_data = cipher_suite.decrypt(encrypted_data) with open(decrypted_file_path, "wb") as file: file.write(decrypted_data) class IEModel: """Class for inference of models in the Inference Engine format""" def __init__(self, exec_net, inputs_info, input_key, output_key, switch_rb=True): self.net = exec_net self.inputs_info = inputs_info self.input_key = input_key self.output_key = output_key self.reqs_ids = [] self.switch_rb = switch_rb def _preprocess(self, img): _, _, h, w = self.get_input_shape() img = np.expand_dims(img.transpose(2, 0, 1), axis=0) return img def forward(self, img): """Performs forward pass of the wrapped IE model""" res = self.net.infer(inputs={self.input_key: self._preprocess(img)}) return np.copy(res[self.output_key]) def forward_async(self, img): id = len(self.reqs_ids) self.net.start_async( request_id=id, inputs={self.input_key: self._preprocess(img)} ) self.reqs_ids.append(id) def grab_all_async(self): outputs = [] for id in self.reqs_ids: self.net.requests[id].wait(-1) res = self.net.requests[id].output_blobs[self.output_key].buffer outputs.append(np.copy(res)) self.reqs_ids = [] return outputs def get_input_shape(self): """Returns an input shape of the wrapped IE model""" return self.inputs_info[self.input_key].input_data.shape def load_ie_model( model_xml, device, plugin_dir, cpu_extension="", num_reqs=1, **kwargs ): """Loads a model in the Inference Engine format""" if cpu_extension and "CPU" in device: IECore().add_extension(cpu_extension, "CPU") # Read IR net = IECore().read_network(model_xml, os.path.splitext(model_xml)[0] + ".bin") assert ( len(net.input_info) == 1 or len(net.input_info) == 2 ), "Supports topologies with only 1 or 2 inputs" assert ( len(net.outputs) == 1 or len(net.outputs) == 4 or len(net.outputs) == 5 ), "Supports topologies with only 1, 4 or 5 outputs" input_blob = next(iter(net.input_info)) out_blob = next(iter(net.outputs)) net.batch_size = 1 # Loading model to the plugin exec_net = IECore().load_network( network=net, device_name=device, num_requests=num_reqs ) model = IEModel(exec_net, net.input_info, input_blob, out_blob, **kwargs) return model decrypt_model( "customs/weights/model_ir_0.xml.encrypted", "customs/weights/model_ir_0_decrypted.xml", ) decrypt_model( "customs/weights/model_ir_0.bin.encrypted", "customs/weights/model_ir_0_decrypted.bin", ) decrypt_model( "customs/weights/model_ir_1.xml.encrypted", "customs/weights/model_ir_1_decrypted.xml", ) decrypt_model( "customs/weights/model_ir_1.bin.encrypted", "customs/weights/model_ir_1_decrypted.bin", ) decrypt_model( "customs/weights/model_ir_2.xml.encrypted", "customs/weights/model_ir_2_decrypted.xml", ) decrypt_model( "customs/weights/model_ir_2.bin.encrypted", "customs/weights/model_ir_2_decrypted.bin", ) decrypt_model( "customs/weights/model_ir_3.xml.encrypted", "customs/weights/model_ir_3_decrypted.xml", ) decrypt_model( "customs/weights/model_ir_3.bin.encrypted", "customs/weights/model_ir_3_decrypted.bin", ) decrypt_model( "customs/weights/model_ir_SIDD.xml.encrypted", "customs/weights/model_ir_SIDD_decrypted.xml", ) decrypt_model( "customs/weights/model_ir_SIDD.bin.encrypted", "customs/weights/model_ir_SIDD_decrypted.bin", ) denoiseModelList = [ load_ie_model(weight.split(".")[0] + "_decrypted.xml", "CPU", None, "") for weight in CONFIG["weights"] ] SIDD_model = load_ie_model( CONFIG["SIDD_model_weights"].split(".")[0] + "_decrypted.xml", "CPU", None, "" ) def denoise_synthesis(image, noise_level=1, denoise_level=1, use_synthesis=True): # # Assuming image is a numpy array # rgb = np.transpose(image, (2, 0, 1))[np.newaxis, :] # # rgb is not 1080 x 1920, resize it , test in 360 x 640 # rgb = cv2.resize(rgb[0].transpose(1,2,0), (1920, 1080)).transpose(2,0,1)[np.newaxis, :] # rggb = rgb2rggb_np(np.transpose(rgb.squeeze(0), (1, 2, 0))) / 255 # Normalize to [0, 1] # if use_synthesis: # noiseImage = CV72fillCurve_np(rggb, CONFIG["noise_levels"][noise_level-1], CONFIG["noise_levels"][noise_level-1]+1) # rgb = rggb2rgb_np(noiseImage) # rgb = np.clip(rgb, 0, 1) # In-place clipping # torch function speed more than numpy rgb = torch.tensor(image).permute(2, 0, 1).unsqueeze(0) # rgb is not 1080 x 1920, resize it , test in 360 x 640 rgb = torch.nn.functional.interpolate( rgb, size=(1080, 1920), mode="bilinear", align_corners=False ) rggb = rgb2rggb(rgb.squeeze(0).permute(1, 2, 0)) / 255 # Normalize to [0, 1] if use_synthesis: rggb = CV72fillCurve( rggb, CONFIG["noise_levels"][noise_level - 1], CONFIG["noise_levels"][noise_level - 1] + 1, ) rgb = rggb2rgb(rggb) rgb = rgb.clamp_(0, 1).cpu().numpy() # In-place clipping noiseImage = rggb.numpy() output = denoiseModelList[denoise_level - 1].forward(noiseImage) SIDDOutput = SIDD_model.forward(noiseImage) return ( rgb, RGGB2RGBNumpy(output.squeeze().transpose(1, 2, 0)), RGGB2RGBNumpy(SIDDOutput.squeeze().transpose(1, 2, 0)), ) def denoise_real(noise_level=1, denoise_level=1): noiseImage = ( np.load(CONFIG["raw_images"][noise_level - 1]).astype(np.float32) / 65535.0 ) # noiseImage = torch.from_numpy(noiseImage).permute(2, 0, 1).to(device).unsqueeze(0) output = denoiseModelList[denoise_level - 1].forward(noiseImage) SIDDOutput = SIDD_model.forward(noiseImage) return ( RGGB2RGBNumpy(noiseImage), RGGB2RGBNumpy(output.squeeze().transpose(1, 2, 0)), RGGB2RGBNumpy(SIDDOutput.squeeze().transpose(1, 2, 0)), ) def create_slider(label): return gr.Slider( minimum=1, maximum=4, value=1, step=1, interactive=True, label=label ) def create_text(text, size=3, color="black"): gr.Markdown( "" + str(text) + "" ) def RGGB2RGBNumpy(numpyInput): # Assuming rggb2rgb is a function that can handle numpy arrays output = rggb2rgb_np(numpyInput) # In-place clipping output = np.clip(output, 0, 1) return output if __name__ == "__main__": main()