| import numpy as np |
| from tensorflow.keras import backend as K |
| import tensorflow.keras as keras |
| import math |
| from matplotlib import pyplot as plt |
| import cv2 |
| import time |
| import scipy |
| from os import listdir |
| from IPython.display import clear_output |
| import segmentation_models as sm |
| from PIL import Image |
| import images_toolkit as tlk |
|
|
|
|
| def dice_coef(y_true, y_pred, smooth=1): |
| y_true_f = K.flatten(y_true) |
| y_pred_f = K.flatten(y_pred) |
| intersection = K.sum(y_true_f * y_pred_f) |
| return (2.0 * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth) |
|
|
|
|
| def dice_loss(alpha=1): |
| def dice_coef_loss(y_true, y_pred): |
| return 1 - alpha * dice_coef(y_true, y_pred) |
|
|
| return dice_coef_loss |
|
|
|
|
| def categorical_loss(): |
| def categorical(y_true, y_pred): |
| return keras.losses.CategoricalCrossentropy()(y_true, y_pred) |
|
|
| return categorical |
|
|
|
|
| def bce_loss(): |
| def bce(y_true, y_pred): |
| return keras.losses.BinaryCrossentropy()(y_true, y_pred) |
|
|
| return bce |
|
|
|
|
| def tversky(y_true, y_pred, smooth=1, alpha=0.7): |
| y_true_pos = K.flatten(y_true) |
| y_pred_pos = K.flatten(y_pred) |
| true_pos = K.sum(y_true_pos * y_pred_pos) |
| false_neg = K.sum(y_true_pos * (1 - y_pred_pos)) |
| false_pos = K.sum((1 - y_true_pos) * y_pred_pos) |
| return (true_pos + smooth) / ( |
| true_pos + alpha * false_neg + (1 - alpha) * false_pos + smooth |
| ) |
|
|
|
|
| def tversky_loss(y_true, y_pred): |
| return 1 - tversky(y_true, y_pred) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def categorical_focal_loss(gamma=2.0, alpha=0.25): |
| def cate_focal_loss(y_true, y_pred): |
| CAT_FL = sm.losses.categorical_focal_loss |
| CAT_FL.gamma = gamma |
| CAT_FL.alpha = alpha |
| return CAT_FL(y_true, y_pred) |
|
|
| return cate_focal_loss |
|
|
|
|
| def focal_loss(gamma=2.0, alpha=0.7): |
| def focal_tversky_loss(y_true, y_pred): |
| tv = tversky(y_true, y_pred, alpha) |
| return K.pow((1 - tv), gamma) |
|
|
| return focal_tversky_loss |
|
|
|
|
| def single_iou(y_true, y_pred, label: int): |
| """ |
| Return the Intersection over Union (IoU) for a given label. |
| Args: |
| y_true: the expected y values as a one-hot |
| y_pred: the predicted y values as a one-hot or softmax output |
| label: the label to return the IoU for |
| Returns: |
| the IoU for the given label |
| """ |
| |
| |
| y_true = K.cast(K.equal(K.argmax(y_true), label), K.floatx()) |
| y_pred = K.cast(K.equal(K.argmax(y_pred), label), K.floatx()) |
|
|
| |
| |
| |
| intersection = K.sum(y_true * y_pred) |
| |
| union = K.sum(y_true) + K.sum(y_pred) - intersection |
| |
| |
| a = K.switch(K.equal(union, 0), 1.0, intersection / union) |
| return K.switch(K.equal(union, 0), 1.0, intersection / union) |
|
|
|
|
| def iou(y_true, y_pred): |
| """ |
| Return the Intersection over Union (IoU) score. |
| Args: |
| y_true: the expected y values as a one-hot |
| y_pred: the predicted y values as a one-hot or softmax output |
| Returns: |
| the scalar IoU value (mean over all labels) |
| """ |
| |
| num_labels = K.int_shape(y_pred)[-1] |
| |
| total_iou = K.variable(0) |
| |
| for label in range(num_labels): |
| total_iou = total_iou + single_iou(y_true, y_pred, label) |
| |
| a = total_iou / num_labels |
| return total_iou / num_labels |
|
|
|
|
| def simple_iou(gt, pred): |
| """Computes IoU for a binary classified image. Input shapes: (h, w)""" |
| return np.nan_to_num( |
| np.sum((gt == 1) & (pred == 1)) / np.sum((gt == 1) | (pred == 1)), 0 |
| ) |
|
|
|
|
| def simple_iou_for_multiple_classes(gt, pred, n_classes): |
| """Computes IoU for a categorically classified image. Input shapes: (h, w) |
| If n_classes > 3, then it will also compute the IoU of the union of all classes |
| that are >= 3 (i.e., the IoU of objects as one). |
| Returns: array of (h, w, n_classes) if n_classes <= 3 |
| array of (h, w, n_classes+1) if n_classes > 3 |
| """ |
| assert gt.shape == pred.shape and gt.ndim == 2 |
| assert np.max(gt) < n_classes and np.max(pred) < n_classes |
|
|
| f_gt = gt.flatten() |
| f_pred = pred.flatten() |
| gt_matrix = np.zeros((f_gt.size, n_classes), dtype=int) |
| pred_matrix = gt_matrix.copy() |
| gt_matrix[np.arange(f_gt.size), f_gt] = 1 |
| pred_matrix[np.arange(f_gt.size), f_pred] = 1 |
| intersections = np.sum((gt_matrix == 1) & (pred_matrix == 1), axis=0) |
| unions = np.sum((gt_matrix == 1) | (pred_matrix == 1), axis=0) |
| ious = intersections / unions |
| if n_classes > 3: |
| gt_as_one = f_gt >= 2 |
| pred_as_one = f_pred >= 2 |
| iou_as_one = np.sum(gt_as_one & pred_as_one) / np.sum(gt_as_one | pred_as_one) |
| return np.append(ious, iou_as_one) |
| else: |
| return ious |
|
|
|
|
| def add_mask(image, mask): |
| b_channel, g_channel, r_channel = cv2.split(image) |
| alpha_channel = mask * 255 |
|
|
| alpha_channel = alpha_channel.astype(np.float64) |
|
|
| g_channel_out = np.clip(np.add(alpha_channel, g_channel), 0, 255) |
| g_channel_out = g_channel_out.astype(np.uint8) |
|
|
| alpha_channel = alpha_channel.astype(np.uint8) |
| img_BGRA = cv2.merge((b_channel, g_channel_out, r_channel, alpha_channel)) |
| image_RGBA = cv2.cvtColor(img_BGRA, cv2.COLOR_BGRA2RGBA) |
|
|
| return image_RGBA, alpha_channel |
|
|
|
|
| def resolution2framesize3cha(resolution): |
| if resolution == "640x240": |
| framesize = (240, 640, 3) |
| if resolution == "640x480": |
| framesize = (480, 640, 3) |
| if resolution == "1280x480": |
| framesize = (480, 1280, 3) |
| if resolution == "1280x720": |
| framesize = (720, 1280, 3) |
| if resolution == "960x540": |
| framesize = (540, 960, 3) |
| if resolution == "320x240": |
| framesize = (240, 320, 3) |
| if resolution == "1024x768": |
| framesize = (768, 1024, 3) |
| if resolution == "2560x960": |
| framesize = (960, 2560, 3) |
| if resolution == "2560x720": |
| framesize = (720, 2560, 3) |
| return framesize |
|
|
|
|
| def resolution2framesize(resolution): |
| if resolution == "640x240": |
| framesize = (240, 640) |
| if resolution == "640x480": |
| framesize = (480, 640) |
| if resolution == "1280x480": |
| framesize = (480, 1280) |
| if resolution == "1280x720": |
| framesize = (720, 1280) |
| if resolution == "960x540": |
| framesize = (540, 960) |
| if resolution == "320x240": |
| framesize = (240, 320) |
| if resolution == "1024x768": |
| framesize = (768, 1024) |
| if resolution == "2560x960": |
| framesize = (960, 2560) |
| if resolution == "2560x720": |
| framesize = (720, 2560) |
| return framesize |
|
|
|
|
| def webcam_test(model): |
|
|
| cap = cv2.VideoCapture(2) |
| cont = True |
|
|
| while cont: |
|
|
| |
|
|
| ret, frame = cap.read() |
| print(frame.shape) |
|
|
| if not ret: |
| break |
|
|
| |
| frame = np.array(frame) / 255.0 |
| x = np.reshape(frame, (1, 480, 640, 3)) |
|
|
| |
| |
| start_t = time.time() |
| pred = model.predict(x) |
| duration = time.time() - start_t |
| pred = pred[0, :, :, :] |
| pred = np.argmax(pred, 2) |
|
|
| print(pred.shape) |
| overlap = add_mask(frame, pred) |
|
|
| print(duration) |
|
|
| cv2.imshow("Overlap", overlap) |
| if cv2.waitKey(1) & 0xFF == ord("q"): |
| break |
|
|
|
|
| def image_test(model, img_dir, img_num, label_dir=None): |
|
|
| list_IDs = [f[:-4] for f in listdir(img_dir) if f[-4:] == ".jpg"] |
| img_path = img_dir + list_IDs[img_num] + ".jpg" |
| test_img = cv2.imread(img_path) / 255.0 |
| test_img = np.reshape(test_img, (1, test_img.shape[0], test_img.shape[1], 3)) |
|
|
| pred = model.predict(test_img) |
| pred = pred[0, :, :, :] |
| predict = np.argmax(pred, 2) |
|
|
| overlapping = add_mask(test_img[0, :, :, :], predict) |
|
|
| cv2.imshow("Prediction", overlapping) |
| cv2.imwrite( |
| "./models_repo/frozen_resnet/Trial11/prediction_" + str(img_num) + ".png", |
| overlapping, |
| ) |
|
|
| if label_dir != None: |
| lab = label_dir + list_IDs[img_num] + ".png" |
|
|
| lab_img = cv2.imread(lab) * 255 |
| lab_img = np.array(lab_img) |
| cv2.imshow("Label", lab_img) |
| cv2.imwrite( |
| "./models_repo/frozen_resnet/Trial10/label_" + str(img_num) + ".png", |
| lab_img, |
| ) |
|
|
| cv2.waitKey(0) |
|
|
|
|
| class PlotLosses(keras.callbacks.Callback): |
| def __init__(self, out_dir): |
| self.out_dir = out_dir |
|
|
| def on_train_begin(self, logs={}): |
| self.i = 0 |
| self.x = [] |
| self.losses = [] |
| self.val_losses = [] |
| |
| self.train_iou = [] |
| self.val_iou = [] |
| |
|
|
| self.live_loss = [] |
| self.fig_livel = plt.figure() |
|
|
| self.live_iou = [] |
| self.fig_livei = plt.figure() |
|
|
| self.logs = [] |
| self.live_logs = [] |
| self.b = 0 |
| self.x_b = [] |
| self.loss = 0 |
| self.iou = 0 |
| self.num = 0 |
|
|
| def on_batch_end(self, batch, logs={}): |
| self.iou += logs.get("iou") |
| self.loss += logs.get("loss") |
| self.num += 1 |
| if self.b % 50 == 0: |
| self.x_b.append(self.num) |
| self.live_loss.append(self.loss / float(self.b + 1)) |
| self.live_iou.append(self.iou / float(self.b + 1)) |
| clear_output(wait=True) |
| plt.ioff() |
| fig1 = plt.figure(1) |
| plt.ioff() |
| plt.plot(self.x_b, self.live_loss, label="Training loss") |
| plt.title("Training loss") |
| plt.xlabel("Iteration") |
| plt.ylabel("Loss") |
| plt.savefig(self.out_dir + "training_loss.png") |
| plt.close(fig1) |
| clear_output(wait=True) |
| fig2 = plt.figure(2) |
| plt.plot(self.x_b, self.live_iou, label="Training iou") |
| plt.title("Training IoU") |
| plt.xlabel("Iteration") |
| plt.ylabel("IoU") |
| plt.savefig(self.out_dir + "training_iou.png") |
| plt.close(fig2) |
| self.b += 1 |
|
|
| def on_epoch_end(self, epoch, logs={}): |
| self.loss = 0 |
| self.iou = 0 |
| self.b = 0 |
| self.logs.append(logs) |
| self.x.append(self.i) |
| self.losses.append(logs.get("loss")) |
| self.val_losses.append(logs.get("val_loss")) |
| self.i += 1 |
| self.train_iou.append(logs.get("iou")) |
| self.val_iou.append(logs.get("val_iou")) |
| plt.ioff() |
| fig3 = plt.figure(3) |
| clear_output(wait=True) |
| plt.plot(self.x, self.losses, label="loss") |
| plt.plot(self.x, self.val_losses, label="val_loss") |
| plt.title("Loss curve") |
| plt.xlabel("Epoch") |
| plt.ylabel("Loss") |
| plt.legend() |
| plt.savefig(self.out_dir + "loss_curve.png") |
| plt.close(fig3) |
| fig4 = plt.figure(4) |
| clear_output(wait=True) |
| plt.plot(self.x, self.train_iou, label="train_iou") |
| plt.plot(self.x, self.val_iou, label="val_iou") |
| plt.title("IoU curve") |
| plt.xlabel("Epoch") |
| plt.ylabel("IoU") |
| plt.legend() |
| plt.savefig(self.out_dir + "mean_iou_curve.png") |
| plt.close(fig4) |
|
|
|
|
| def step_decay(epoch): |
| initial_lrate = 0.1 |
| drop = 0.5 |
| epochs_drop = 10.0 |
| lrate = initial_lrate * math.pow(drop, math.floor((epoch) / epochs_drop)) |
| return lrate |
|
|
|
|
| label_colours = [ |
| (0, 0, 0), |
| |
| (128, 0, 0), |
| (0, 128, 0), |
| (128, 128, 0), |
| (0, 0, 128), |
| (128, 0, 128), |
| |
| (0, 128, 128), |
| (128, 128, 128), |
| (255, 200, 180), |
| (192, 0, 0), |
| (192, 192, 192), |
| |
| (192, 128, 0), |
| (64, 0, 128), |
| (192, 0, 128), |
| (255, 128, 0), |
| (192, 128, 128), |
| |
| (0, 64, 0), |
| (128, 64, 0), |
| (0, 192, 0), |
| (153, 153, 255), |
| (0, 64, 128), |
| (255, 255, 0), |
| |
| (250, 250, 250), |
| (0, 192, 128), |
| (250, 102, 250), |
| (102, 250, 250), |
| (44, 166, 44), |
| (44, 44, 166), |
| |
| (166, 44, 44), |
| (0, 250, 0), |
| (250, 0, 0), |
| (0, 0, 250), |
| (206, 219, 156), |
| (219, 156, 206), |
| |
| (156, 206, 219), |
| (23, 190, 207), |
| (207, 23, 190), |
| (190, 207, 23), |
| (153, 0, 76), |
| ] |
| |
| |
| |
| |
|
|
|
|
| def decode_labels(mask, num_classes=38): |
| """Decode batch of segmentation masks. |
| |
| Args: |
| mask: result of inference after taking argmax. |
| num_images: number of images to decode from the batch. |
| num_classes: number of classes to predict (including background). |
| |
| Returns: |
| A batch with num_images RGB images of the same size as the input. |
| """ |
| n, h, w, c = mask.shape |
| outputs = np.zeros((h, w, 3), dtype=np.uint8) |
| binary = np.zeros((h, w), dtype=np.uint8) |
| R = np.zeros((h, w), dtype=np.uint8) |
| G = np.zeros((h, w), dtype=np.uint8) |
| B = np.zeros((h, w), dtype=np.uint8) |
|
|
| for i in range(0, num_classes): |
|
|
| |
| |
| binary[mask[0, :, :, i] >= 0.5] = 1 |
| binary[mask[0, :, :, i] < 0.5] = 0 |
| |
| color_R = label_colours[i][0] * np.ones([h, w]) |
| color_G = label_colours[i][1] * np.ones([h, w]) |
| color_B = label_colours[i][2] * np.ones([h, w]) |
| |
|
|
| R_aux = np.multiply(binary, color_R) |
| R_aux_int = R_aux.astype(np.uint8) |
| G_aux = np.multiply(binary, color_G) |
| G_aux_int = G_aux.astype(np.uint8) |
| B_aux = np.multiply(binary, color_B) |
| B_aux_int = B_aux.astype(np.uint8) |
|
|
| R += R_aux_int |
| G += G_aux_int |
| B += B_aux_int |
|
|
| R_ = R.reshape(*R.shape, 1) |
| G_ = G.reshape(*G.shape, 1) |
| B_ = B.reshape(*B.shape, 1) |
|
|
| outputs = np.concatenate((R_, G_, B_), axis=2) |
| |
|
|
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| return outputs |
|
|