| import random |
| import cv2 |
| import numpy as np |
| from PIL import Image |
|
|
| import torch |
| import torchvision.transforms as TF |
| import dataloaders.image_transforms as IT |
|
|
| cv2.setNumThreads(0) |
|
|
|
|
| class Resize(object): |
| """Rescale the image in a sample to a given size. |
| |
| Args: |
| output_size (tuple or int): Desired output size. If tuple, output is |
| matched to output_size. If int, smaller of image edges is matched |
| to output_size keeping aspect ratio the same. |
| """ |
| def __init__(self, output_size, use_padding=False): |
| assert isinstance(output_size, (int, tuple)) |
| if isinstance(output_size, int): |
| self.output_size = (output_size, output_size) |
| else: |
| self.output_size = output_size |
| self.use_padding = use_padding |
|
|
| def __call__(self, sample): |
| return self.padding(sample) if self.use_padding else self.rescale( |
| sample) |
|
|
| def rescale(self, sample): |
| prev_img = sample['prev_img'] |
| h, w = prev_img.shape[:2] |
| if self.output_size == (h, w): |
| return sample |
| else: |
| new_h, new_w = self.output_size |
|
|
| for elem in sample.keys(): |
| if 'meta' in elem: |
| continue |
| tmp = sample[elem] |
|
|
| if elem == 'prev_img' or elem == 'curr_img' or elem == 'ref_img': |
| flagval = cv2.INTER_CUBIC |
| else: |
| flagval = cv2.INTER_NEAREST |
|
|
| if elem == 'curr_img' or elem == 'curr_label': |
| new_tmp = [] |
| all_tmp = tmp |
| for tmp in all_tmp: |
| tmp = cv2.resize(tmp, |
| dsize=(new_w, new_h), |
| interpolation=flagval) |
| new_tmp.append(tmp) |
| tmp = new_tmp |
| else: |
| tmp = cv2.resize(tmp, |
| dsize=(new_w, new_h), |
| interpolation=flagval) |
|
|
| sample[elem] = tmp |
|
|
| return sample |
|
|
| def padding(self, sample): |
| prev_img = sample['prev_img'] |
| h, w = prev_img.shape[:2] |
| if self.output_size == (h, w): |
| return sample |
| else: |
| new_h, new_w = self.output_size |
|
|
| def sep_pad(x): |
| x0 = np.random.randint(0, x + 1) |
| x1 = x - x0 |
| return x0, x1 |
|
|
| top_pad, bottom_pad = sep_pad(new_h - h) |
| left_pad, right_pad = sep_pad(new_w - w) |
|
|
| for elem in sample.keys(): |
| if 'meta' in elem: |
| continue |
| tmp = sample[elem] |
|
|
| if elem == 'prev_img' or elem == 'curr_img' or elem == 'ref_img': |
| pad_value = (124, 116, 104) |
| else: |
| pad_value = (0) |
|
|
| if elem == 'curr_img' or elem == 'curr_label': |
| new_tmp = [] |
| all_tmp = tmp |
| for tmp in all_tmp: |
| tmp = cv2.copyMakeBorder(tmp, |
| top_pad, |
| bottom_pad, |
| left_pad, |
| right_pad, |
| cv2.BORDER_CONSTANT, |
| value=pad_value) |
| new_tmp.append(tmp) |
| tmp = new_tmp |
| else: |
| tmp = cv2.copyMakeBorder(tmp, |
| top_pad, |
| bottom_pad, |
| left_pad, |
| right_pad, |
| cv2.BORDER_CONSTANT, |
| value=pad_value) |
|
|
| sample[elem] = tmp |
|
|
| return sample |
|
|
|
|
| class BalancedRandomCrop(object): |
| """Crop randomly the image in a sample. |
| |
| Args: |
| output_size (tuple or int): Desired output size. If int, square crop |
| is made. |
| """ |
| def __init__(self, |
| output_size, |
| max_step=5, |
| max_obj_num=5, |
| min_obj_pixel_num=100): |
| assert isinstance(output_size, (int, tuple)) |
| if isinstance(output_size, int): |
| self.output_size = (output_size, output_size) |
| else: |
| assert len(output_size) == 2 |
| self.output_size = output_size |
| self.max_step = max_step |
| self.max_obj_num = max_obj_num |
| self.min_obj_pixel_num = min_obj_pixel_num |
|
|
| def __call__(self, sample): |
|
|
| image = sample['prev_img'] |
| h, w = image.shape[:2] |
| new_h, new_w = self.output_size |
| new_h = h if new_h >= h else new_h |
| new_w = w if new_w >= w else new_w |
| ref_label = sample["ref_label"] |
| prev_label = sample["prev_label"] |
| curr_label = sample["curr_label"] |
|
|
| is_contain_obj = False |
| step = 0 |
| while (not is_contain_obj) and (step < self.max_step): |
| step += 1 |
| top = np.random.randint(0, h - new_h + 1) |
| left = np.random.randint(0, w - new_w + 1) |
| after_crop = [] |
| contains = [] |
| for elem in ([ref_label, prev_label] + curr_label): |
| tmp = elem[top:top + new_h, left:left + new_w] |
| contains.append(np.unique(tmp)) |
| after_crop.append(tmp) |
|
|
| all_obj = list(np.sort(contains[0])) |
|
|
| if all_obj[-1] == 0: |
| continue |
|
|
| |
| if all_obj[0] == 0: |
| all_obj = all_obj[1:] |
|
|
| |
| new_all_obj = [] |
| for obj_id in all_obj: |
| after_crop_pixels = np.sum(after_crop[0] == obj_id) |
| if after_crop_pixels > self.min_obj_pixel_num: |
| new_all_obj.append(obj_id) |
|
|
| if len(new_all_obj) == 0: |
| is_contain_obj = False |
| else: |
| is_contain_obj = True |
|
|
| if len(new_all_obj) > self.max_obj_num: |
| random.shuffle(new_all_obj) |
| new_all_obj = new_all_obj[:self.max_obj_num] |
|
|
| all_obj = [0] + new_all_obj |
|
|
| post_process = [] |
| for elem in after_crop: |
| new_elem = elem * 0 |
| for idx in range(len(all_obj)): |
| obj_id = all_obj[idx] |
| if obj_id == 0: |
| continue |
| mask = elem == obj_id |
|
|
| new_elem += (mask * idx).astype(np.uint8) |
| post_process.append(new_elem.astype(np.uint8)) |
|
|
| sample["ref_label"] = post_process[0] |
| sample["prev_label"] = post_process[1] |
| curr_len = len(sample["curr_img"]) |
| sample["curr_label"] = [] |
| for idx in range(curr_len): |
| sample["curr_label"].append(post_process[idx + 2]) |
|
|
| for elem in sample.keys(): |
| if 'meta' in elem or 'label' in elem: |
| continue |
| if elem == 'curr_img': |
| new_tmp = [] |
| for tmp_ in sample[elem]: |
| tmp_ = tmp_[top:top + new_h, left:left + new_w] |
| new_tmp.append(tmp_) |
| sample[elem] = new_tmp |
| else: |
| tmp = sample[elem] |
| tmp = tmp[top:top + new_h, left:left + new_w] |
| sample[elem] = tmp |
|
|
| obj_num = len(all_obj) - 1 |
|
|
| sample['meta']['obj_num'] = obj_num |
|
|
| return sample |
|
|
|
|
| class RandomScale(object): |
| """Randomly resize the image and the ground truth to specified scales. |
| Args: |
| scales (list): the list of scales |
| """ |
| def __init__(self, min_scale=1., max_scale=1.3, short_edge=None): |
| self.min_scale = min_scale |
| self.max_scale = max_scale |
| self.short_edge = short_edge |
|
|
| def __call__(self, sample): |
| |
| sc = np.random.uniform(self.min_scale, self.max_scale) |
| |
| if self.short_edge is not None: |
| image = sample['prev_img'] |
| h, w = image.shape[:2] |
| if h > w: |
| sc *= float(self.short_edge) / w |
| else: |
| sc *= float(self.short_edge) / h |
|
|
| for elem in sample.keys(): |
| if 'meta' in elem: |
| continue |
| tmp = sample[elem] |
|
|
| if elem == 'prev_img' or elem == 'curr_img' or elem == 'ref_img': |
| flagval = cv2.INTER_CUBIC |
| else: |
| flagval = cv2.INTER_NEAREST |
|
|
| if elem == 'curr_img' or elem == 'curr_label': |
| new_tmp = [] |
| for tmp_ in tmp: |
| tmp_ = cv2.resize(tmp_, |
| None, |
| fx=sc, |
| fy=sc, |
| interpolation=flagval) |
| new_tmp.append(tmp_) |
| tmp = new_tmp |
| else: |
| tmp = cv2.resize(tmp, |
| None, |
| fx=sc, |
| fy=sc, |
| interpolation=flagval) |
|
|
| sample[elem] = tmp |
|
|
| return sample |
|
|
|
|
| class RandomScaleV2(object): |
| """Randomly resize the image and the ground truth to specified scales. |
| Args: |
| scales (list): the list of scales |
| """ |
| def __init__(self, |
| min_scale=0.36, |
| max_scale=1.0, |
| short_edge=None, |
| ratio=[3. / 4., 4. / 3.]): |
| self.min_scale = min_scale |
| self.max_scale = max_scale |
| self.short_edge = short_edge |
| self.ratio = ratio |
|
|
| def __call__(self, sample): |
| image = sample['prev_img'] |
| h, w = image.shape[:2] |
|
|
| new_h, new_w = self.get_params(h, w) |
|
|
| sc_x = float(new_w) / w |
| sc_y = float(new_h) / h |
|
|
| |
| if not (self.short_edge is None): |
| if h > w: |
| sc_x *= float(self.short_edge) / w |
| sc_y *= float(self.short_edge) / w |
| else: |
| sc_x *= float(self.short_edge) / h |
| sc_y *= float(self.short_edge) / h |
|
|
| for elem in sample.keys(): |
| if 'meta' in elem: |
| continue |
| tmp = sample[elem] |
|
|
| if elem == 'prev_img' or elem == 'curr_img' or elem == 'ref_img': |
| flagval = cv2.INTER_CUBIC |
| else: |
| flagval = cv2.INTER_NEAREST |
|
|
| if elem == 'curr_img' or elem == 'curr_label': |
| new_tmp = [] |
| for tmp_ in tmp: |
| tmp_ = cv2.resize(tmp_, |
| None, |
| fx=sc_x, |
| fy=sc_y, |
| interpolation=flagval) |
| new_tmp.append(tmp_) |
| tmp = new_tmp |
| else: |
| tmp = cv2.resize(tmp, |
| None, |
| fx=sc_x, |
| fy=sc_y, |
| interpolation=flagval) |
|
|
| sample[elem] = tmp |
|
|
| return sample |
|
|
| def get_params(self, height, width): |
| area = height * width |
|
|
| log_ratio = [np.log(item) for item in self.ratio] |
| for _ in range(10): |
| target_area = area * np.random.uniform(self.min_scale**2, |
| self.max_scale**2) |
| aspect_ratio = np.exp(np.random.uniform(log_ratio[0], |
| log_ratio[1])) |
|
|
| w = int(round(np.sqrt(target_area * aspect_ratio))) |
| h = int(round(np.sqrt(target_area / aspect_ratio))) |
|
|
| if 0 < w <= width and 0 < h <= height: |
| return h, w |
|
|
| |
| in_ratio = float(width) / float(height) |
| if in_ratio < min(self.ratio): |
| w = width |
| h = int(round(w / min(self.ratio))) |
| elif in_ratio > max(self.ratio): |
| h = height |
| w = int(round(h * max(self.ratio))) |
| else: |
| w = width |
| h = height |
|
|
| return h, w |
|
|
| class RestrictSize(object): |
| """Randomly resize the image and the ground truth to specified scales. |
| Args: |
| scales (list): the list of scales |
| """ |
| def __init__(self, max_short_edge=None, max_long_edge=800 * 1.3): |
| self.max_short_edge = max_short_edge |
| self.max_long_edge = max_long_edge |
| assert ((max_short_edge is None)) or ((max_long_edge is None)) |
|
|
| def __call__(self, sample): |
|
|
| |
| sc = None |
| image = sample['ref_img'] |
| h, w = image.shape[:2] |
| |
| if not (self.max_short_edge is None): |
| if h > w: |
| short_edge = w |
| else: |
| short_edge = h |
| if short_edge < self.max_short_edge: |
| sc = float(self.max_short_edge) / short_edge |
| else: |
| if h > w: |
| long_edge = h |
| else: |
| long_edge = w |
| if long_edge > self.max_long_edge: |
| sc = float(self.max_long_edge) / long_edge |
|
|
| if sc is None: |
| new_h = h |
| new_w = w |
| else: |
| new_h = int(sc * h) |
| new_w = int(sc * w) |
| new_h = new_h - (new_h - 1) % 4 |
| new_w = new_w - (new_w - 1) % 4 |
| if new_h == h and new_w == w: |
| return sample |
|
|
| for elem in sample.keys(): |
| if 'meta' in elem: |
| continue |
| tmp = sample[elem] |
|
|
| if 'label' in elem: |
| flagval = cv2.INTER_NEAREST |
| else: |
| flagval = cv2.INTER_CUBIC |
|
|
| tmp = cv2.resize(tmp, dsize=(new_w, new_h), interpolation=flagval) |
|
|
| sample[elem] = tmp |
|
|
| return sample |
|
|
|
|
| class RandomHorizontalFlip(object): |
| """Horizontally flip the given image and ground truth randomly with a probability of 0.5.""" |
| def __init__(self, prob): |
| self.p = prob |
|
|
| def __call__(self, sample): |
|
|
| if random.random() < self.p: |
| for elem in sample.keys(): |
| if 'meta' in elem: |
| continue |
| if elem == 'curr_img' or elem == 'curr_label': |
| new_tmp = [] |
| for tmp_ in sample[elem]: |
| tmp_ = cv2.flip(tmp_, flipCode=1) |
| new_tmp.append(tmp_) |
| sample[elem] = new_tmp |
| else: |
| tmp = sample[elem] |
| tmp = cv2.flip(tmp, flipCode=1) |
| sample[elem] = tmp |
|
|
| return sample |
|
|
|
|
| class RandomVerticalFlip(object): |
| """Vertically flip the given image and ground truth randomly with a probability of 0.5.""" |
| def __init__(self, prob=0.3): |
| self.p = prob |
|
|
| def __call__(self, sample): |
|
|
| if random.random() < self.p: |
| for elem in sample.keys(): |
| if 'meta' in elem: |
| continue |
| if elem == 'curr_img' or elem == 'curr_label': |
| new_tmp = [] |
| for tmp_ in sample[elem]: |
| tmp_ = cv2.flip(tmp_, flipCode=0) |
| new_tmp.append(tmp_) |
| sample[elem] = new_tmp |
| else: |
| tmp = sample[elem] |
| tmp = cv2.flip(tmp, flipCode=0) |
| sample[elem] = tmp |
|
|
| return sample |
|
|
|
|
| class RandomGaussianBlur(object): |
| def __init__(self, prob=0.3, sigma=[0.1, 2.]): |
| self.aug = TF.RandomApply([IT.GaussianBlur(sigma)], p=prob) |
|
|
| def __call__(self, sample): |
| for elem in sample.keys(): |
| if 'meta' in elem or 'label' in elem: |
| continue |
|
|
| if elem == 'curr_img': |
| new_tmp = [] |
| for tmp_ in sample[elem]: |
| tmp_ = self.apply_augmentation(tmp_) |
| new_tmp.append(tmp_) |
| sample[elem] = new_tmp |
| else: |
| tmp = sample[elem] |
| tmp = self.apply_augmentation(tmp) |
| sample[elem] = tmp |
| return sample |
|
|
| def apply_augmentation(self, x): |
| x = Image.fromarray(np.uint8(x)) |
| x = self.aug(x) |
| x = np.array(x, dtype=np.float32) |
| return x |
|
|
|
|
| class RandomGrayScale(RandomGaussianBlur): |
| def __init__(self, prob=0.2): |
| self.aug = TF.RandomGrayscale(p=prob) |
|
|
|
|
| class RandomColorJitter(RandomGaussianBlur): |
| def __init__(self, |
| prob=0.8, |
| brightness=0.4, |
| contrast=0.4, |
| saturation=0.2, |
| hue=0.1): |
| self.aug = TF.RandomApply( |
| [TF.ColorJitter(brightness, contrast, saturation, hue)], p=prob) |
|
|
|
|
| class SubtractMeanImage(object): |
| def __init__(self, mean, change_channels=False): |
| self.mean = mean |
| self.change_channels = change_channels |
|
|
| def __call__(self, sample): |
| for elem in sample.keys(): |
| if 'image' in elem: |
| if self.change_channels: |
| sample[elem] = sample[elem][:, :, [2, 1, 0]] |
| sample[elem] = np.subtract( |
| sample[elem], np.array(self.mean, dtype=np.float32)) |
| return sample |
|
|
| def __str__(self): |
| return 'SubtractMeanImage' + str(self.mean) |
|
|
|
|
| class ToTensor(object): |
| """Convert ndarrays in sample to Tensors.""" |
| def __call__(self, sample): |
|
|
| for elem in sample.keys(): |
| if 'meta' in elem: |
| continue |
| tmp = sample[elem] |
|
|
| if elem == 'curr_img' or elem == 'curr_label': |
| new_tmp = [] |
| for tmp_ in tmp: |
| if tmp_.ndim == 2: |
| tmp_ = tmp_[:, :, np.newaxis] |
| tmp_ = tmp_.transpose((2, 0, 1)) |
| new_tmp.append(torch.from_numpy(tmp_).int()) |
| else: |
| tmp_ = tmp_ / 255. |
| tmp_ -= (0.485, 0.456, 0.406) |
| tmp_ /= (0.229, 0.224, 0.225) |
| tmp_ = tmp_.transpose((2, 0, 1)) |
| new_tmp.append(torch.from_numpy(tmp_)) |
| tmp = new_tmp |
| else: |
| if tmp.ndim == 2: |
| tmp = tmp[:, :, np.newaxis] |
| tmp = tmp.transpose((2, 0, 1)) |
| tmp = torch.from_numpy(tmp).int() |
| else: |
| tmp = tmp / 255. |
| tmp -= (0.485, 0.456, 0.406) |
| tmp /= (0.229, 0.224, 0.225) |
| tmp = tmp.transpose((2, 0, 1)) |
| tmp = torch.from_numpy(tmp) |
| sample[elem] = tmp |
|
|
| return sample |
|
|
|
|
| class MultiRestrictSize(object): |
| def __init__(self, |
| max_short_edge=None, |
| max_long_edge=800, |
| flip=False, |
| multi_scale=[1.3], |
| align_corners=True, |
| max_stride=16): |
| self.max_short_edge = max_short_edge |
| self.max_long_edge = max_long_edge |
| self.multi_scale = multi_scale |
| self.flip = flip |
| self.align_corners = align_corners |
| self.max_stride = max_stride |
|
|
| def __call__(self, sample): |
| samples = [] |
| image = sample['current_img'] |
| h, w = image.shape[:2] |
| for scale in self.multi_scale: |
| |
| sc = 1. |
| if self.max_short_edge is not None: |
| if h > w: |
| short_edge = w |
| else: |
| short_edge = h |
| if short_edge > self.max_short_edge: |
| sc *= float(self.max_short_edge) / short_edge |
| new_h, new_w = sc * h, sc * w |
|
|
| |
| sc = 1. |
| if self.max_long_edge is not None: |
| if new_h > new_w: |
| long_edge = new_h |
| else: |
| long_edge = new_w |
| if long_edge > self.max_long_edge: |
| sc *= float(self.max_long_edge) / long_edge |
|
|
| new_h, new_w = sc * new_h, sc * new_w |
|
|
| new_h = int(new_h * scale) |
| new_w = int(new_w * scale) |
|
|
| if self.align_corners: |
| if (new_h - 1) % self.max_stride != 0: |
| new_h = int( |
| np.around((new_h - 1) / self.max_stride) * |
| self.max_stride + 1) |
| if (new_w - 1) % self.max_stride != 0: |
| new_w = int( |
| np.around((new_w - 1) / self.max_stride) * |
| self.max_stride + 1) |
| else: |
| if new_h % self.max_stride != 0: |
| new_h = int( |
| np.around(new_h / self.max_stride) * self.max_stride) |
| if new_w % self.max_stride != 0: |
| new_w = int( |
| np.around(new_w / self.max_stride) * self.max_stride) |
|
|
| if new_h == h and new_w == w: |
| samples.append(sample) |
| else: |
| new_sample = {} |
| for elem in sample.keys(): |
| if 'meta' in elem: |
| new_sample[elem] = sample[elem] |
| continue |
| tmp = sample[elem] |
| if 'label' in elem: |
| new_sample[elem] = sample[elem] |
| continue |
| else: |
| flagval = cv2.INTER_CUBIC |
| tmp = cv2.resize(tmp, |
| dsize=(new_w, new_h), |
| interpolation=flagval) |
| new_sample[elem] = tmp |
| samples.append(new_sample) |
|
|
| if self.flip: |
| now_sample = samples[-1] |
| new_sample = {} |
| for elem in now_sample.keys(): |
| if 'meta' in elem: |
| new_sample[elem] = now_sample[elem].copy() |
| new_sample[elem]['flip'] = True |
| continue |
| tmp = now_sample[elem] |
| tmp = tmp[:, ::-1].copy() |
| new_sample[elem] = tmp |
| samples.append(new_sample) |
|
|
| return samples |
|
|
|
|
| class MultiToTensor(object): |
| def __call__(self, samples): |
| for idx in range(len(samples)): |
| sample = samples[idx] |
| for elem in sample.keys(): |
| if 'meta' in elem: |
| continue |
| tmp = sample[elem] |
| if tmp is None: |
| continue |
|
|
| if tmp.ndim == 2: |
| tmp = tmp[:, :, np.newaxis] |
| tmp = tmp.transpose((2, 0, 1)) |
| samples[idx][elem] = torch.from_numpy(tmp).int() |
| else: |
| tmp = tmp / 255. |
| tmp -= (0.485, 0.456, 0.406) |
| tmp /= (0.229, 0.224, 0.225) |
| tmp = tmp.transpose((2, 0, 1)) |
| samples[idx][elem] = torch.from_numpy(tmp) |
|
|
| return samples |
|
|