| from torch import optim | |
| from tqdm.auto import tqdm | |
| from helper import * | |
| from model.generator import SkipEncoderDecoder, input_noise | |
| def remove_watermark(image_path, mask_path, max_dim, reg_noise, input_depth, lr, show_step, training_steps, tqdm_length=100): | |
| DTYPE = torch.FloatTensor | |
| has_set_device = False | |
| if torch.cuda.is_available(): | |
| device = 'cuda' | |
| has_set_device = True | |
| print("Setting Device to CUDA...") | |
| try: | |
| if torch.backends.mps.is_available(): | |
| device = 'mps' | |
| has_set_device = True | |
| print("Setting Device to MPS...") | |
| except Exception as e: | |
| print(f"Your version of pytorch might be too old, which does not support MPS. Error: \n{e}") | |
| pass | |
| if not has_set_device: | |
| device = 'cpu' | |
| print('\nSetting device to "cpu", since torch is not built with "cuda" or "mps" support...') | |
| print('It is recommended to use GPU if possible...') | |
| image_np, mask_np = preprocess_images(image_path, mask_path, max_dim) | |
| print('Building the model...') | |
| generator = SkipEncoderDecoder( | |
| input_depth, | |
| num_channels_down = [128] * 5, | |
| num_channels_up = [128] * 5, | |
| num_channels_skip = [128] * 5 | |
| ).type(DTYPE).to(device) | |
| objective = torch.nn.MSELoss().type(DTYPE).to(device) | |
| optimizer = optim.Adam(generator.parameters(), lr) | |
| image_var = np_to_torch_array(image_np).type(DTYPE).to(device) | |
| mask_var = np_to_torch_array(mask_np).type(DTYPE).to(device) | |
| generator_input = input_noise(input_depth, image_np.shape[1:]).type(DTYPE).to(device) | |
| generator_input_saved = generator_input.detach().clone() | |
| noise = generator_input.detach().clone() | |
| print('\nStarting training...\n') | |
| progress_bar = tqdm(range(training_steps), desc='Completed', ncols=tqdm_length) | |
| for step in progress_bar: | |
| optimizer.zero_grad() | |
| generator_input = generator_input_saved | |
| if reg_noise > 0: | |
| generator_input = generator_input_saved + (noise.normal_() * reg_noise) | |
| output = generator(generator_input) | |
| loss = objective(output * mask_var, image_var * mask_var) | |
| loss.backward() | |
| if step % show_step == 0: | |
| output_image = torch_to_np_array(output) | |
| visualize_sample(image_np, output_image, nrow = 2, size_factor = 10) | |
| progress_bar.set_postfix(Loss = loss.item()) | |
| optimizer.step() | |
| output_image = torch_to_np_array(output) | |
| visualize_sample(output_image, nrow = 1, size_factor = 10) | |
| pil_image = Image.fromarray((output_image.transpose(1, 2, 0) * 255.0).astype('uint8')) | |
| output_path = image_path.split('/')[-1].split('.')[-2] + '-output.jpg' | |
| print(f'\nSaving final output image to: "{output_path}"\n') | |
| pil_image.save(output_path) | |