Spaces:
Running
on
Zero
Running
on
Zero
| # Copyright (c) OpenMMLab. All rights reserved. | |
| import warnings | |
| from copy import deepcopy | |
| from typing import Dict, List, Optional, Sequence, Tuple, Union | |
| import cv2 | |
| import mmcv | |
| import mmengine | |
| import numpy as np | |
| from mmcv.image import imflip | |
| from mmcv.transforms import BaseTransform | |
| from mmcv.transforms.utils import avoid_cache_randomness, cache_randomness | |
| from mmengine import is_list_of | |
| from mmengine.dist import get_dist_info | |
| from scipy.stats import truncnorm | |
| from scipy.ndimage import distance_transform_edt | |
| from mmpose.codecs import * # noqa: F401, F403 | |
| from mmpose.registry import KEYPOINT_CODECS, TRANSFORMS | |
| from mmpose.structures.bbox import bbox_xyxy2cs, flip_bbox, bbox_cs2xyxy | |
| from mmpose.structures.keypoint import flip_keypoints | |
| from mmpose.utils.typing import MultiConfig | |
| from pycocotools import mask as Mask | |
| try: | |
| import albumentations | |
| except ImportError: | |
| albumentations = None | |
| Number = Union[int, float] | |
| class GetBBoxCenterScale(BaseTransform): | |
| """Convert bboxes from [x, y, w, h] to center and scale. | |
| The center is the coordinates of the bbox center, and the scale is the | |
| bbox width and height normalized by a scale factor. | |
| Required Keys: | |
| - bbox | |
| Added Keys: | |
| - bbox_center | |
| - bbox_scale | |
| Args: | |
| padding (float): The bbox padding scale that will be multilied to | |
| `bbox_scale`. Defaults to 1.25 | |
| """ | |
| def __init__(self, padding: float = 1.25) -> None: | |
| super().__init__() | |
| self.padding = padding | |
| def transform(self, results: Dict) -> Optional[dict]: | |
| """The transform function of :class:`GetBBoxCenterScale`. | |
| See ``transform()`` method of :class:`BaseTransform` for details. | |
| Args: | |
| results (dict): The result dict | |
| Returns: | |
| dict: The result dict. | |
| """ | |
| # Save the original bbox wrt. input | |
| results['bbox_xyxy_wrt_input'] = results['bbox'] | |
| if 'bbox_center' in results and 'bbox_scale' in results: | |
| rank, _ = get_dist_info() | |
| if rank == 0: | |
| warnings.warn('Use the existing "bbox_center" and "bbox_scale"' | |
| '. The padding will still be applied.') | |
| results['bbox_scale'] = results['bbox_scale'] * self.padding | |
| else: | |
| bbox = results['bbox'] | |
| center, scale = bbox_xyxy2cs(bbox, padding=self.padding) | |
| results['bbox_center'] = center | |
| results['bbox_scale'] = scale | |
| return results | |
| def __repr__(self) -> str: | |
| """print the basic information of the transform. | |
| Returns: | |
| str: Formatted string. | |
| """ | |
| repr_str = self.__class__.__name__ + f'(padding={self.padding})' | |
| return repr_str | |
| class MaskBackground(BaseTransform): | |
| """Convert bboxes from [x, y, w, h] to center and scale. | |
| The center is the coordinates of the bbox center, and the scale is the | |
| bbox width and height normalized by a scale factor. | |
| Required Keys: | |
| - bbox | |
| Added Keys: | |
| - bbox_center | |
| - bbox_scale | |
| Args: | |
| padding (float): The bbox padding scale that will be multilied to | |
| `bbox_scale`. Defaults to 1.25 | |
| """ | |
| def __init__(self, | |
| continue_on_failure: bool = True, | |
| prob: float = 1.0, | |
| alpha: float = 1.0, | |
| erode_prob: float = 0.0, | |
| erode_amount: float = 0.5, | |
| dilate_prob: float = 0.0, | |
| dilate_amount: float = 0.5, | |
| ) -> None: | |
| super().__init__() | |
| assert 0 <= alpha <= 1, 'alpha should be in [0, 1]' | |
| assert 0 <= prob <= 1, 'prob should be in [0, 1]' | |
| self.continue_on_failure = continue_on_failure | |
| self.alpha = alpha | |
| self.prob = prob | |
| assert 0 <= erode_prob <= 1, 'erode_prob should be in [0, 1]' | |
| assert 0 <= dilate_prob <= 1, 'dilate_prob should be in [0, 1]' | |
| assert 0 < erode_amount < 1, 'erode_amount should be in [0, 1]' | |
| assert 0 < dilate_amount < 1, 'dilate_amount should be in [0, 1]' | |
| assert erode_prob + dilate_prob <= 1, 'erode_prob + dilate_prob should be less than or equal to 1' | |
| self.noise_prob = erode_prob + dilate_prob | |
| if self.noise_prob > 0: | |
| self.erode_prob = erode_prob / (self.noise_prob) | |
| self.dilate_prob = dilate_prob / (self.noise_prob) | |
| else: | |
| self.erode_prob = 0 | |
| self.dilate_prob = 0 | |
| self.erode_amount = erode_amount | |
| self.dilate_amount = dilate_amount | |
| def _perturb_by_dilation(self, mask: np.ndarray) -> np.ndarray: | |
| """Perturb the mask to simulate real-world detector.""" | |
| mask_shape = mask.shape | |
| mask_area = (mask>0).sum() | |
| # Close the mask to erase small holes | |
| k = max(mask_area // 1000, 5) | |
| kernel = np.ones((k, k), np.uint8) | |
| mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) | |
| # Dilate the mask to increase it a bit | |
| k = max(mask_area // 3000, 5) | |
| kernel = np.ones((k, k), np.uint8) | |
| mask = cv2.dilate(mask, kernel, iterations=1) | |
| return mask.reshape(mask_shape) | |
| def _perturb_by_erosion(self, mask: np.ndarray) -> np.ndarray: | |
| """Perturb the mask to simulate real-world detector.""" | |
| mask_shape = mask.shape | |
| mask_area = (mask>0).sum() | |
| # Close the mask to erase small holes | |
| k = max(mask_area // 1000, 5) | |
| kernel = np.ones((k, k), np.uint8) | |
| mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel) | |
| # Erode the mask to decrease it a bit and cut-off limbs | |
| k = max(mask_area // 3000, 5) | |
| kernel = np.ones((k, k), np.uint8) | |
| mask = cv2.erode(mask, kernel, iterations=1) | |
| return mask.reshape(mask_shape) | |
| def _perturb_by_patches(self, mask: np.ndarray, amount: float, num_patches: int = 10) -> np.ndarray: | |
| mask_shape = mask.shape | |
| # Generate 10 random seeds uniformly distributed in the mask | |
| mask_idx = np.where(mask.flatten() > 0)[0] | |
| seeds = np.random.choice(mask_idx, num_patches, replace=False) | |
| sx, sy = np.unravel_index(seeds, mask.shape) | |
| # For each pixel, label it by it nearest seed | |
| labels = np.ones_like(mask) | |
| seed_labels = np.zeros_like(mask) | |
| seed_labels[sx, sy] = np.arange(num_patches) + 1 | |
| _, indices = distance_transform_edt(seed_labels == 0, return_indices=True) | |
| labels = seed_labels[indices[0], indices[1]] | |
| labels = labels * mask | |
| # Select labels for removal | |
| random_remove_amount = np.random.uniform(0.0, amount) | |
| random_remove_ratio = int(num_patches * random_remove_amount) | |
| remove_labels = np.random.choice(np.unique(labels), random_remove_ratio, replace=False) | |
| binary_labels = np.isin(labels, remove_labels, invert=True) | |
| mask = (binary_labels > 0).astype(np.uint8) * mask | |
| return mask.reshape(mask_shape) | |
| def _coin_flip(self) -> bool: | |
| return np.random.rand() < 0.5 | |
| def _perturb_mask(self, mask: np.ndarray) -> np.ndarray: | |
| """Perturb the mask to simulate real-world detector.""" | |
| mask_shape = mask.shape | |
| if not np.random.rand() < self.noise_prob: | |
| return mask | |
| # Erode and dilate the mask to increase smoothness | |
| kernel = np.ones((5, 5), np.uint8) | |
| mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel) | |
| mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel) | |
| increase_mask = np.random.choice([False, True], p=[self.erode_prob, self.dilate_prob]) | |
| if increase_mask: | |
| if self._coin_flip(): | |
| try: | |
| mask = self._perturb_by_patches( | |
| mask=1-mask, | |
| amount=self.dilate_amount, | |
| num_patches=50, | |
| ) | |
| mask = 1-mask | |
| except ValueError: | |
| pass | |
| else: | |
| mask = self._perturb_by_dilation(mask) | |
| else: | |
| if self._coin_flip(): | |
| try: | |
| mask = self._perturb_by_patches( | |
| mask=mask, | |
| amount=self.erode_amount, | |
| num_patches=10, | |
| ) | |
| except ValueError: | |
| pass | |
| else: | |
| mask = self._perturb_by_erosion(mask) | |
| mask = (mask>0).astype(np.uint8) | |
| return mask.reshape(mask_shape) | |
| def _do_masking(self): | |
| return np.random.rand() < self.prob | |
| def transform(self, results: Dict) -> Optional[dict]: | |
| """The transform function of :class:`GetBBoxCenterScale`. | |
| See ``transform()`` method of :class:`BaseTransform` for details. | |
| Args: | |
| results (dict): The result dict | |
| Returns: | |
| dict: The result dict. | |
| """ | |
| # Try to load the mask from the results | |
| mask = results.get('segmentation', None) | |
| # print("\nMaskBackground: ", mask is not None) | |
| if mask is None and not self.continue_on_failure: | |
| raise ValueError('No mask found in the results and self.continue_on_failure is set to False.') | |
| if mask is not None and self._do_masking(): | |
| # Convert mask from polygons to binary mask | |
| try: | |
| mask_rle = Mask.frPyObjects(mask, results['img_shape'][0], results['img_shape'][1]) | |
| except IndexError: | |
| # breakpoint() | |
| # print("Mask shape:", mask.shape) | |
| # print("Mask max:", mask.max()) | |
| # print("Mask min:", mask.min()) | |
| # print("Image shape:", results['img_shape']) | |
| return results | |
| mask_rle = Mask.merge(mask_rle) | |
| img = results['img'].copy() | |
| masked_image = results['img'].copy() | |
| mask = Mask.decode(mask_rle).reshape((img.shape[0], img.shape[1], 1)) | |
| binary_mask = (mask > 0).astype(np.uint8) | |
| # Perturb the mask to simulate real-world detector | |
| # print("Here I would perturb the mask") | |
| old_mask = mask.copy() | |
| binary_mask = self._perturb_mask(binary_mask) | |
| masked_image = masked_image * binary_mask | |
| results['img'] = cv2.addWeighted(img, 1 - self.alpha, masked_image, self.alpha, 0) | |
| # hash_id = abs(hash(555)) | |
| # cv2.imwrite("tmp_visualization/_perturbed_mask_{:d}.jpg".format(hash_id), mask * 255) | |
| # cv2.imwrite("tmp_visualization/_old_mask_{:d}.jpg".format(hash_id), old_mask * 255) | |
| # cv2.imwrite("tmp_visualization/_weighted_masked_image_{:d}.jpg".format(hash_id), results['img']) | |
| # breakpoint() | |
| # Save the mask as a binary mask | |
| # Save the image | |
| img = results['img'] | |
| # img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) | |
| # cv2.imwrite("tmp_visualization/masked_image_{:d}.jpg".format(abs(hash(555))), img) | |
| return results | |
| def __repr__(self) -> str: | |
| """print the basic information of the transform. | |
| Returns: | |
| str: Formatted string. | |
| """ | |
| repr_str = self.__class__.__name__ + f'(continue_on_failure={self.continue_on_failure})' | |
| return repr_str | |
| class RandomFlip(BaseTransform): | |
| """Randomly flip the image, bbox and keypoints. | |
| Required Keys: | |
| - img | |
| - img_shape | |
| - flip_indices | |
| - input_size (optional) | |
| - bbox (optional) | |
| - bbox_center (optional) | |
| - keypoints (optional) | |
| - keypoints_visible (optional) | |
| - keypoints_visibility (optional) | |
| - img_mask (optional) | |
| Modified Keys: | |
| - img | |
| - bbox (optional) | |
| - bbox_center (optional) | |
| - keypoints (optional) | |
| - keypoints_visible (optional) | |
| - keypoints_visibility (optional) | |
| - img_mask (optional) | |
| Added Keys: | |
| - flip | |
| - flip_direction | |
| Args: | |
| prob (float | list[float]): The flipping probability. If a list is | |
| given, the argument `direction` should be a list with the same | |
| length. And each element in `prob` indicates the flipping | |
| probability of the corresponding one in ``direction``. Defaults | |
| to 0.5 | |
| direction (str | list[str]): The flipping direction. Options are | |
| ``'horizontal'``, ``'vertical'`` and ``'diagonal'``. If a list is | |
| is given, each data sample's flipping direction will be sampled | |
| from a distribution determined by the argument ``prob``. Defaults | |
| to ``'horizontal'``. | |
| """ | |
| def __init__(self, | |
| prob: Union[float, List[float]] = 0.5, | |
| direction: Union[str, List[str]] = 'horizontal') -> None: | |
| if isinstance(prob, list): | |
| assert is_list_of(prob, float) | |
| assert 0 <= sum(prob) <= 1 | |
| elif isinstance(prob, float): | |
| assert 0 <= prob <= 1 | |
| else: | |
| raise ValueError(f'probs must be float or list of float, but \ | |
| got `{type(prob)}`.') | |
| self.prob = prob | |
| valid_directions = ['horizontal', 'vertical', 'diagonal'] | |
| if isinstance(direction, str): | |
| assert direction in valid_directions | |
| elif isinstance(direction, list): | |
| assert is_list_of(direction, str) | |
| assert set(direction).issubset(set(valid_directions)) | |
| else: | |
| raise ValueError(f'direction must be either str or list of str, \ | |
| but got `{type(direction)}`.') | |
| self.direction = direction | |
| if isinstance(prob, list): | |
| assert len(prob) == len(self.direction) | |
| def _choose_direction(self) -> str: | |
| """Choose the flip direction according to `prob` and `direction`""" | |
| if isinstance(self.direction, | |
| List) and not isinstance(self.direction, str): | |
| # None means non-flip | |
| direction_list: list = list(self.direction) + [None] | |
| elif isinstance(self.direction, str): | |
| # None means non-flip | |
| direction_list = [self.direction, None] | |
| if isinstance(self.prob, list): | |
| non_prob: float = 1 - sum(self.prob) | |
| prob_list = self.prob + [non_prob] | |
| elif isinstance(self.prob, float): | |
| non_prob = 1. - self.prob | |
| # exclude non-flip | |
| single_ratio = self.prob / (len(direction_list) - 1) | |
| prob_list = [single_ratio] * (len(direction_list) - 1) + [non_prob] | |
| cur_dir = np.random.choice(direction_list, p=prob_list) | |
| return cur_dir | |
| def transform(self, results: dict) -> dict: | |
| """The transform function of :class:`RandomFlip`. | |
| See ``transform()`` method of :class:`BaseTransform` for details. | |
| Args: | |
| results (dict): The result dict | |
| Returns: | |
| dict: The result dict. | |
| """ | |
| flip_dir = self._choose_direction() | |
| if flip_dir is None: | |
| results['flip'] = False | |
| results['flip_direction'] = None | |
| else: | |
| results['flip'] = True | |
| results['flip_direction'] = flip_dir | |
| h, w = results.get('input_size', results['img_shape']) | |
| # flip image and mask | |
| if isinstance(results['img'], list): | |
| results['img'] = [ | |
| imflip(img, direction=flip_dir) for img in results['img'] | |
| ] | |
| else: | |
| results['img'] = imflip(results['img'], direction=flip_dir) | |
| if 'img_mask' in results: | |
| results['img_mask'] = imflip( | |
| results['img_mask'], direction=flip_dir) | |
| # flip bboxes | |
| if results.get('bbox', None) is not None: | |
| results['bbox'] = flip_bbox( | |
| results['bbox'], | |
| image_size=(w, h), | |
| bbox_format='xyxy', | |
| direction=flip_dir) | |
| # flip bboxes | |
| if results.get('bbox_xyxy_wrt_input', None) is not None: | |
| results['bbox_xyxy_wrt_input'] = flip_bbox( | |
| results['bbox_xyxy_wrt_input'], | |
| image_size=(w, h), | |
| bbox_format='xyxy', | |
| direction=flip_dir) | |
| if results.get('bbox_center', None) is not None: | |
| results['bbox_center'] = flip_bbox( | |
| results['bbox_center'], | |
| image_size=(w, h), | |
| bbox_format='center', | |
| direction=flip_dir) | |
| # flip keypoints | |
| if results.get('keypoints', None) is not None: | |
| keypoints, keypoints_visible = flip_keypoints( | |
| results['keypoints'], | |
| results.get('keypoints_visible', None), | |
| image_size=(w, h), | |
| flip_indices=results['flip_indices'], | |
| direction=flip_dir) | |
| _, keypoints_visibility = flip_keypoints( | |
| results['keypoints'], | |
| results.get('keypoints_visibility', None), | |
| image_size=(w, h), | |
| flip_indices=results['flip_indices'], | |
| direction=flip_dir) | |
| results['keypoints'] = keypoints | |
| results['keypoints_visible'] = keypoints_visible | |
| results['keypoints_visibility'] = keypoints_visibility | |
| return results | |
| def __repr__(self) -> str: | |
| """print the basic information of the transform. | |
| Returns: | |
| str: Formatted string. | |
| """ | |
| repr_str = self.__class__.__name__ | |
| repr_str += f'(prob={self.prob}, ' | |
| repr_str += f'direction={self.direction})' | |
| return repr_str | |
| class RandomHalfBody(BaseTransform): | |
| """Data augmentation with half-body transform that keeps only the upper or | |
| lower body at random. | |
| Required Keys: | |
| - keypoints | |
| - keypoints_visible | |
| - upper_body_ids | |
| - lower_body_ids | |
| Modified Keys: | |
| - bbox | |
| - bbox_center | |
| - bbox_scale | |
| Args: | |
| min_total_keypoints (int): The minimum required number of total valid | |
| keypoints of a person to apply half-body transform. Defaults to 8 | |
| min_half_keypoints (int): The minimum required number of valid | |
| half-body keypoints of a person to apply half-body transform. | |
| Defaults to 2 | |
| padding (float): The bbox padding scale that will be multilied to | |
| `bbox_scale`. Defaults to 1.5 | |
| prob (float): The probability to apply half-body transform when the | |
| keypoint number meets the requirement. Defaults to 0.3 | |
| """ | |
| def __init__(self, | |
| min_total_keypoints: int = 9, | |
| min_upper_keypoints: int = 2, | |
| min_lower_keypoints: int = 3, | |
| padding: float = 1.5, | |
| prob: float = 0.3, | |
| upper_prioritized_prob: float = 0.7) -> None: | |
| super().__init__() | |
| self.min_total_keypoints = min_total_keypoints | |
| self.min_upper_keypoints = min_upper_keypoints | |
| self.min_lower_keypoints = min_lower_keypoints | |
| self.padding = padding | |
| self.prob = prob | |
| self.upper_prioritized_prob = upper_prioritized_prob | |
| def _get_half_body_bbox(self, keypoints: np.ndarray, | |
| half_body_ids: List[int] | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| """Get half-body bbox center and scale of a single instance. | |
| Args: | |
| keypoints (np.ndarray): Keypoints in shape (K, D) | |
| upper_body_ids (list): The list of half-body keypont indices | |
| Returns: | |
| tuple: A tuple containing half-body bbox center and scale | |
| - center: Center (x, y) of the bbox | |
| - scale: Scale (w, h) of the bbox | |
| """ | |
| selected_keypoints = keypoints[half_body_ids] | |
| center = selected_keypoints.mean(axis=0)[:2] | |
| x1, y1 = selected_keypoints.min(axis=0) | |
| x2, y2 = selected_keypoints.max(axis=0) | |
| w = x2 - x1 | |
| h = y2 - y1 | |
| scale = np.array([w, h], dtype=center.dtype) * self.padding | |
| return center, scale | |
| def _get_half_body_exact_bbox(self, keypoints: np.ndarray, | |
| half_body_ids: List[int], | |
| bbox: np.ndarray, | |
| ) -> np.ndarray: | |
| """Get half-body bbox center and scale of a single instance. | |
| Args: | |
| keypoints (np.ndarray): Keypoints in shape (K, D) | |
| upper_body_ids (list): The list of half-body keypont indices | |
| Returns: | |
| tuple: A tuple containing half-body bbox center and scale | |
| - center: Center (x, y) of the bbox | |
| - scale: Scale (w, h) of the bbox | |
| """ | |
| selected_keypoints = keypoints[half_body_ids] | |
| center = selected_keypoints.mean(axis=0)[:2] | |
| x1, y1 = selected_keypoints.min(axis=0) | |
| x2, y2 = selected_keypoints.max(axis=0) | |
| w = x2 - x1 | |
| h = y2 - y1 | |
| scale = np.array([w, h], dtype=center.dtype) * self.padding | |
| x1, y1 = center - scale / 2 | |
| x2, y2 = center + scale / 2 | |
| # Do not exceed the original bbox | |
| x1 = np.maximum(x1, bbox[0]) | |
| y1 = np.maximum(y1, bbox[1]) | |
| x2 = np.minimum(x2, bbox[2]) | |
| y2 = np.minimum(y2, bbox[3]) | |
| return np.array([x1, y1, x2, y2]) | |
| def _random_select_half_body(self, keypoints_visible: np.ndarray, | |
| upper_body_ids: List[int], | |
| lower_body_ids: List[int] | |
| ) -> List[Optional[List[int]]]: | |
| """Randomly determine whether applying half-body transform and get the | |
| half-body keyponit indices of each instances. | |
| Args: | |
| keypoints_visible (np.ndarray, optional): The visibility of | |
| keypoints in shape (N, K, 1) or (N, K, 2). | |
| upper_body_ids (list): The list of upper body keypoint indices | |
| lower_body_ids (list): The list of lower body keypoint indices | |
| Returns: | |
| list[list[int] | None]: The selected half-body keypoint indices | |
| of each instance. ``None`` means not applying half-body transform. | |
| """ | |
| if keypoints_visible.ndim == 3: | |
| keypoints_visible = keypoints_visible[..., 0] | |
| half_body_ids = [] | |
| for visible in keypoints_visible: | |
| if visible.sum() < self.min_total_keypoints: | |
| indices = None | |
| elif np.random.rand() > self.prob: | |
| indices = None | |
| else: | |
| upper_valid_ids = [i for i in upper_body_ids if visible[i] > 0] | |
| lower_valid_ids = [i for i in lower_body_ids if visible[i] > 0] | |
| num_upper = len(upper_valid_ids) | |
| num_lower = len(lower_valid_ids) | |
| prefer_upper = np.random.rand() < self.upper_prioritized_prob | |
| if (num_upper < self.min_upper_keypoints | |
| and num_lower < self.min_lower_keypoints): | |
| indices = None | |
| elif num_lower < self.min_lower_keypoints: | |
| indices = upper_valid_ids | |
| elif num_upper < self.min_upper_keypoints: | |
| indices = lower_valid_ids | |
| else: | |
| indices = ( | |
| upper_valid_ids if prefer_upper else lower_valid_ids) | |
| half_body_ids.append(indices) | |
| return half_body_ids | |
| def transform(self, results: Dict) -> Optional[dict]: | |
| """The transform function of :class:`HalfBodyTransform`. | |
| See ``transform()`` method of :class:`BaseTransform` for details. | |
| Args: | |
| results (dict): The result dict | |
| Returns: | |
| dict: The result dict. | |
| """ | |
| half_body_ids = self._random_select_half_body( | |
| keypoints_visible=results['keypoints_visible'], | |
| upper_body_ids=results['upper_body_ids'], | |
| lower_body_ids=results['lower_body_ids']) | |
| bbox_center = [] | |
| bbox_scale = [] | |
| bbox_xyxy_wrt_input = [] | |
| for i, indices in enumerate(half_body_ids): | |
| if indices is None: | |
| bbox_center.append(results['bbox_center'][i]) | |
| bbox_scale.append(results['bbox_scale'][i]) | |
| bbox_xyxy_wrt_input.append(results['bbox_xyxy_wrt_input'][i]) | |
| else: | |
| _center, _scale = self._get_half_body_bbox( | |
| results['keypoints'][i], indices) | |
| bbox_center.append(_center) | |
| bbox_scale.append(_scale) | |
| exact_bbox = self._get_half_body_exact_bbox( | |
| results['keypoints'][i], indices, results['bbox_xyxy_wrt_input'][i]) | |
| bbox_xyxy_wrt_input.append(exact_bbox) | |
| results['bbox_center'] = np.stack(bbox_center) | |
| results['bbox_scale'] = np.stack(bbox_scale) | |
| results['bbox_xyxy_wrt_input'] = np.stack(bbox_xyxy_wrt_input) | |
| return results | |
| def __repr__(self) -> str: | |
| """print the basic information of the transform. | |
| Returns: | |
| str: Formatted string. | |
| """ | |
| repr_str = self.__class__.__name__ | |
| repr_str += f'(min_total_keypoints={self.min_total_keypoints}, ' | |
| repr_str += f'min_upper_keypoints={self.min_upper_keypoints}, ' | |
| repr_str += f'min_lower_keypoints={self.min_lower_keypoints}, ' | |
| repr_str += f'padding={self.padding}, ' | |
| repr_str += f'prob={self.prob}, ' | |
| repr_str += f'upper_prioritized_prob={self.upper_prioritized_prob})' | |
| return repr_str | |
| class RandomPatchesBlackout(BaseTransform): | |
| """Data augmentation that divide image into patches and set color of random | |
| pathes to black. In AID paper marked as 'hide and seek'. | |
| Required Keys: | |
| - keypoints | |
| - keypoints_visible | |
| - keypoint_visibility | |
| Modified Keys: | |
| - img | |
| - keypoint_visibility | |
| Args: | |
| grid_size (tuple(int, int)): Grid size of the patches. Defaults to | |
| (8, 6) | |
| mask_ratio (float): Ratio of patches to blackout. Defaults to 0.3 | |
| prob (float): The probability to apply black patches. Defaults to 0.8 | |
| """ | |
| def __init__(self, | |
| grid_size: Tuple[int, int] = (8, 6), | |
| mask_ratio: float = 0.3, | |
| prob: float = 0.8) -> None: | |
| super().__init__() | |
| self.grid_size = grid_size | |
| self.mask_ratio = mask_ratio | |
| self.prob = prob | |
| def _get_random_patches(self, grid_h, grid_w) -> np.ndarray: | |
| black_patches = np.zeros((grid_h, grid_w), dtype=bool) | |
| if np.random.rand() < self.prob: | |
| # Split image into grid | |
| num_patches = int(self.grid_size[0] * self.grid_size[1]) | |
| # Randomly choose patches to blackout | |
| black_patches = np.random.choice( | |
| [0, 1], | |
| num_patches, | |
| p=[1 - self.mask_ratio, self.mask_ratio] | |
| ) | |
| black_patches = black_patches.reshape(grid_h, grid_w).astype(bool) | |
| return black_patches | |
| def transform(self, results: Dict) -> Optional[dict]: | |
| """The transform function of :class:`HalfBodyTransform`. | |
| See ``transform()`` method of :class:`BaseTransform` for details. | |
| Args: | |
| results (dict): The result dict | |
| Returns: | |
| dict: The result dict. | |
| """ | |
| img = results['img'] | |
| if "transformed_keypoints" in results: | |
| kpts = results['transformed_keypoints'].squeeze() | |
| else: | |
| kpts = results['keypoints'].squeeze() | |
| h, w = img.shape[:2] | |
| grid_h, grid_w = self.grid_size | |
| dh = np.ceil(h / grid_h).astype(int) | |
| dw = np.ceil(w / grid_w).astype(int) | |
| black_patches = self._get_random_patches(grid_h, grid_w) | |
| for i in range(grid_h): | |
| for j in range(grid_w): | |
| if black_patches[i, j]: | |
| # Set all pixel in the patch to black | |
| img[i*dh : (i+1)*dh, j*dw : (j+1)*dw, :] = 0 | |
| # Set keypoints in the patch to invisible | |
| in_black = ( | |
| (kpts[:, 0] >= j*dw) & | |
| (kpts[:, 0] < (j+1)*dw) & | |
| (kpts[:, 1] >= i*dh) & | |
| (kpts[:, 1] < (i+1)*dh) | |
| ) | |
| results['keypoints_visibility'][:, in_black] = 0 | |
| return results | |
| def __repr__(self) -> str: | |
| """print the basic information of the transform. | |
| Returns: | |
| str: Formatted string. | |
| """ | |
| repr_str = self.__class__.__name__ | |
| repr_str += f'(grid_size={self.grid_size}, ' | |
| repr_str += f'mask_ratio={self.mask_ratio}, ' | |
| repr_str += f'prob={self.prob})' | |
| return repr_str | |
| class RandomEdgesBlackout(BaseTransform): | |
| """Data augmentation that masks edged of the image with black color | |
| simulating image edge or random texture. | |
| Required Keys: | |
| - keypoints | |
| - keypoints_visible | |
| - keypoint_visibility | |
| Modified Keys: | |
| - img | |
| - keypoint_visibility | |
| Args: | |
| mask_ratio_range (tuple[float, float]): Range or mask-to-image ratio. Defaults to | |
| (0.1, 0.3) | |
| prob (float): The probability to apply black patches. Defaults to 0.8 | |
| texture_prob (float): The probability to apply texture to the blackout area. Defaults to 0.0 | |
| """ | |
| def __init__(self, | |
| mask_ratio_range: tuple[float, float] = (0.1, 0.3), | |
| prob: float = 0.8, | |
| texture_prob: float = 0.0, | |
| context_size:float = 1.25) -> None: | |
| super().__init__() | |
| self.mask_ratio_range = mask_ratio_range | |
| self.prob = prob | |
| self.texture_prob = texture_prob | |
| self.context_size = context_size | |
| def _get_random_mask(self, w, h, bbox_xyxy) -> float: | |
| """Get random mask ratio. | |
| Args: | |
| w (int): Width of the image | |
| h (int): Height of the image | |
| bbox_xyxy (tuple): Bounding box (x1, y1, x2, y2) | |
| Returns: | |
| np.array: mask (1 for blackout, 0 for keep) | |
| tuple: bounds of the blackout area (x1, y1, x2, y2) | |
| """ | |
| mask = np.zeros((h, w), dtype=bool) | |
| bbox_c, bbox_s = bbox_xyxy2cs(bbox_xyxy, padding=self.context_size) | |
| x0, y0, x1, y1 = bbox_cs2xyxy(bbox_c, bbox_s) | |
| # Clip the bounding box to the image | |
| x0 = np.maximum(x0, 0).astype(int) | |
| y0 = np.maximum(y0, 0).astype(int) | |
| x1 = np.minimum(x1, w).astype(int) | |
| y1 = np.minimum(y1, h).astype(int) | |
| # Set default values | |
| x = 0 | |
| y = 0 | |
| dw = w | |
| dh = h | |
| is_textured = False | |
| if np.random.rand() < self.prob: | |
| # Generate random rectangle to keep | |
| rh, rw = np.random.uniform( | |
| 1-self.mask_ratio_range[1], | |
| 1-self.mask_ratio_range[0], | |
| 2 | |
| ) | |
| dh = int((y1-y0) * rh) | |
| dw = int((x1-x0) * rw) | |
| x_end = x1-dw if x1-dw > x0 else x0+1 | |
| y_end = y1-dh if y1-dh > y0 else y0+1 | |
| try: | |
| x = np.random.randint(x0, x_end) | |
| y = np.random.randint(y0, y_end) | |
| except ValueError: | |
| print(x, x0, dw, x1, x1-dw, x_end) | |
| print(y, y0, dh, y1, y1-dh, y_end) | |
| raise ValueError | |
| # Set all pixel outside of the rectangle to black | |
| mask[y:y+dh, x:x+dw] = True | |
| # Invert the mask. True means blackout | |
| mask = ~mask | |
| # Add texture | |
| is_textured = np.random.rand() < self.texture_prob | |
| return mask, (x, y, dw+x, dh+y), is_textured | |
| def _get_random_color(self) -> np.ndarray: | |
| """Get random color. | |
| Returns: | |
| np.array: color | |
| """ | |
| h = np.random.randint(0, 360) | |
| s = np.random.uniform(0.75, 1) | |
| l = np.random.uniform(0.3, 0.7) | |
| hls_color = np.array([h, l, s]) | |
| rgb_color = cv2.cvtColor( | |
| np.array([[hls_color]], dtype=np.float32), | |
| cv2.COLOR_HLS2RGB | |
| ).squeeze() * 255 | |
| color = rgb_color.astype(np.uint8) | |
| return color.tolist() | |
| def _get_random_texture(self, w, h) -> np.ndarray: | |
| """Get random texture. | |
| Args: | |
| w (int): Width of the image | |
| h (int): Height of the image | |
| Returns: | |
| np.array: texture | |
| """ | |
| mode = np.random.choice([ | |
| 'lines', | |
| 'squares', | |
| 'circles', | |
| # 'noise', | |
| # 'uniform', | |
| ]) | |
| if mode == 'lines': | |
| texture = np.zeros((h, w, 3), dtype=np.uint8) | |
| texture[:, :, :] = self._get_random_color() | |
| num_lines = np.random.randint(1, 20) | |
| for _ in range(num_lines): | |
| x1, y1 = np.random.randint(0, w), np.random.randint(0, h) | |
| x2, y2 = np.random.randint(0, w), np.random.randint(0, h) | |
| line_width = np.random.randint(1, 10) | |
| color = self._get_random_color() | |
| cv2.line(texture, (x1, y1), (x2, y2), color, line_width) | |
| elif mode == 'squares': | |
| texture = np.zeros((h, w, 3), dtype=np.uint8) | |
| texture[:, :, :] = self._get_random_color() | |
| num_squares = np.random.randint(1, 20) | |
| for _ in range(num_squares): | |
| x1, y1 = np.random.randint(0, w), np.random.randint(0, h) | |
| x2, y2 = np.random.randint(0, w), np.random.randint(0, h) | |
| color = self._get_random_color() | |
| cv2.rectangle(texture, (x1, y1), (x2, y2), color, -1) | |
| elif mode == 'circles': | |
| texture = np.zeros((h, w, 3), dtype=np.uint8) | |
| texture[:, :, :] = self._get_random_color() | |
| num_circles = np.random.randint(1, 20) | |
| for _ in range(num_circles): | |
| x, y = np.random.randint(0, w), np.random.randint(0, h) | |
| r = np.random.randint(1, min(w, h) // 2) | |
| color = self._get_random_color() | |
| cv2.circle(texture, (x, y), r, color, -1) | |
| elif mode == 'noise': | |
| texture = np.random.randint(0, 256, (h, w, 3), dtype=np.uint8) | |
| elif mode == 'uniform': | |
| texture = np.zeros((h, w, 3), dtype=np.uint8) | |
| texture[:, :, :] = self._get_random_color() | |
| return texture | |
| def transform(self, results: Dict) -> Optional[dict]: | |
| """The transform function of :class:`HalfBodyTransform`. | |
| See ``transform()`` method of :class:`BaseTransform` for details. | |
| Args: | |
| results (dict): The result dict | |
| Returns: | |
| dict: The result dict. | |
| """ | |
| img = results['img'] | |
| if "transformed_keypoints" in results: | |
| kpts = results['transformed_keypoints'].squeeze() | |
| else: | |
| kpts = results['keypoints'].squeeze() | |
| # Generate random mask | |
| mask, (x1, y1, x2, y2), is_textured = self._get_random_mask(img.shape[1], img.shape[0], results['bbox_xyxy_wrt_input'].flatten()) | |
| # breakpoint() | |
| # print("img shape", img.shape) | |
| # print("results", results.keys()) | |
| # Apply the mask | |
| if is_textured: | |
| textured_img = self._get_random_texture(img.shape[1], img.shape[0]) | |
| textured_img[~mask, :] = img[~mask, :] | |
| img = textured_img | |
| else: | |
| # Set all pixel outside of the rectangle to black | |
| img[mask, :] = 0 | |
| results['img'] = img | |
| # Set keypoints outside of the rectangle to invisible | |
| in_rect = ( | |
| (kpts[:, 0] >= x1) & | |
| (kpts[:, 0] < x2) & | |
| (kpts[:, 1] >= y1) & | |
| (kpts[:, 1] < y2) | |
| ) | |
| results['keypoints_visibility'][:, ~in_rect] = 0 | |
| # Create new entry describing keypoints in the 'cropped' area | |
| results['keypoints_in_image'] = in_rect.squeeze().astype(int) | |
| # Crop the bbox_xyxy_wrt_input according to the blackout area | |
| if 'bbox_xyxy_wrt_input' in results: | |
| bbox_xyxy = results['bbox_xyxy_wrt_input'].flatten() | |
| bbox_xyxy[0] = np.maximum(bbox_xyxy[0], x1) | |
| bbox_xyxy[1] = np.maximum(bbox_xyxy[1], y1) | |
| bbox_xyxy[2] = np.minimum(bbox_xyxy[2], x2) | |
| bbox_xyxy[3] = np.minimum(bbox_xyxy[3], y2) | |
| results['bbox_xyxy_wrt_input'] = bbox_xyxy.reshape(-1, 4) | |
| return results | |
| def __repr__(self) -> str: | |
| """print the basic information of the transform. | |
| Returns: | |
| str: Formatted string. | |
| """ | |
| repr_str = self.__class__.__name__ | |
| repr_str += f'(mask_ratio_range={self.mask_ratio_range}, ' | |
| repr_str += f'prob={self.prob}), ' | |
| repr_str += f'texture_prob={self.texture_prob})' | |
| return repr_str | |
| class RandomBBoxTransform(BaseTransform): | |
| r"""Rnadomly shift, resize and rotate the bounding boxes. | |
| Required Keys: | |
| - bbox_center | |
| - bbox_scale | |
| Modified Keys: | |
| - bbox_center | |
| - bbox_scale | |
| Added Keys: | |
| - bbox_rotation | |
| Args: | |
| shift_factor (float): Randomly shift the bbox in range | |
| :math:`[-dx, dx]` and :math:`[-dy, dy]` in X and Y directions, | |
| where :math:`dx(y) = x(y)_scale \cdot shift_factor` in pixels. | |
| Defaults to 0.16 | |
| shift_prob (float): Probability of applying random shift. Defaults to | |
| 0.3 | |
| scale_factor (Tuple[float, float]): Randomly resize the bbox in range | |
| :math:`[scale_factor[0], scale_factor[1]]`. Defaults to (0.5, 1.5) | |
| scale_prob (float): Probability of applying random resizing. Defaults | |
| to 1.0 | |
| rotate_factor (float): Randomly rotate the bbox in | |
| :math:`[-rotate_factor, rotate_factor]` in degrees. Defaults | |
| to 80.0 | |
| rotate_prob (float): Probability of applying random rotation. Defaults | |
| to 0.6 | |
| """ | |
| def __init__(self, | |
| shift_factor: float = 0.16, | |
| shift_prob: float = 0.3, | |
| scale_factor: Tuple[float, float] = (0.5, 1.5), | |
| scale_prob: float = 1.0, | |
| rotate_factor: float = 80.0, | |
| rotate_prob: float = 0.6) -> None: | |
| super().__init__() | |
| self.shift_factor = shift_factor | |
| self.shift_prob = shift_prob | |
| self.scale_factor = scale_factor | |
| self.scale_prob = scale_prob | |
| self.rotate_factor = rotate_factor | |
| self.rotate_prob = rotate_prob | |
| def _truncnorm(low: float = -1., | |
| high: float = 1., | |
| size: tuple = ()) -> np.ndarray: | |
| """Sample from a truncated normal distribution.""" | |
| return truncnorm.rvs(low, high, size=size).astype(np.float32) | |
| def _get_transform_params(self, num_bboxes: int) -> Tuple: | |
| """Get random transform parameters. | |
| Args: | |
| num_bboxes (int): The number of bboxes | |
| Returns: | |
| tuple: | |
| - offset (np.ndarray): Offset factor of each bbox in shape (n, 2) | |
| - scale (np.ndarray): Scaling factor of each bbox in shape (n, 1) | |
| - rotate (np.ndarray): Rotation degree of each bbox in shape (n,) | |
| """ | |
| random_v = self._truncnorm(size=(num_bboxes, 4)) | |
| offset_v = random_v[:, :2] | |
| scale_v = random_v[:, 2:3] | |
| rotate_v = random_v[:, 3] | |
| # Get shift parameters | |
| offset = offset_v * self.shift_factor | |
| offset = np.where( | |
| np.random.rand(num_bboxes, 1) < self.shift_prob, offset, 0.) | |
| # Get scaling parameters | |
| scale_min, scale_max = self.scale_factor | |
| mu = (scale_max + scale_min) * 0.5 | |
| sigma = (scale_max - scale_min) * 0.5 | |
| scale = scale_v * sigma + mu | |
| scale = np.where( | |
| np.random.rand(num_bboxes, 1) < self.scale_prob, scale, 1.) | |
| # Get rotation parameters | |
| rotate = rotate_v * self.rotate_factor | |
| rotate = np.where( | |
| np.random.rand(num_bboxes) < self.rotate_prob, rotate, 0.) | |
| return offset, scale, rotate | |
| def transform(self, results: Dict) -> Optional[dict]: | |
| """The transform function of :class:`RandomBboxTransform`. | |
| See ``transform()`` method of :class:`BaseTransform` for details. | |
| Args: | |
| results (dict): The result dict | |
| Returns: | |
| dict: The result dict. | |
| """ | |
| bbox_scale = results['bbox_scale'] | |
| num_bboxes = bbox_scale.shape[0] | |
| offset, scale, rotate = self._get_transform_params(num_bboxes) | |
| results['bbox_center'] = results['bbox_center'] + offset * bbox_scale | |
| results['bbox_scale'] = results['bbox_scale'] * scale | |
| results['bbox_rotation'] = rotate | |
| bbox_xyxy_wrt_input = results.get('bbox_xyxy_wrt_input', None) | |
| if bbox_xyxy_wrt_input is not None: | |
| _c, _s = bbox_xyxy2cs(bbox_xyxy_wrt_input, padding=1.0) | |
| _c = _c + offset * _s | |
| _s = _s * scale | |
| results['bbox_xyxy_wrt_input'] = bbox_cs2xyxy(_c, _s).flatten() | |
| return results | |
| def __repr__(self) -> str: | |
| """print the basic information of the transform. | |
| Returns: | |
| str: Formatted string. | |
| """ | |
| repr_str = self.__class__.__name__ | |
| repr_str += f'(shift_prob={self.shift_prob}, ' | |
| repr_str += f'shift_factor={self.shift_factor}, ' | |
| repr_str += f'scale_prob={self.scale_prob}, ' | |
| repr_str += f'scale_factor={self.scale_factor}, ' | |
| repr_str += f'rotate_prob={self.rotate_prob}, ' | |
| repr_str += f'rotate_factor={self.rotate_factor})' | |
| return repr_str | |
| class Albumentation(BaseTransform): | |
| """Albumentation augmentation (pixel-level transforms only). | |
| Adds custom pixel-level transformations from Albumentations library. | |
| Please visit `https://albumentations.ai/docs/` | |
| to get more information. | |
| Note: we only support pixel-level transforms. | |
| Please visit `https://github.com/albumentations-team/` | |
| `albumentations#pixel-level-transforms` | |
| to get more information about pixel-level transforms. | |
| Required Keys: | |
| - img | |
| Modified Keys: | |
| - img | |
| Args: | |
| transforms (List[dict]): A list of Albumentation transforms. | |
| An example of ``transforms`` is as followed: | |
| .. code-block:: python | |
| [ | |
| dict( | |
| type='RandomBrightnessContrast', | |
| brightness_limit=[0.1, 0.3], | |
| contrast_limit=[0.1, 0.3], | |
| p=0.2), | |
| dict(type='ChannelShuffle', p=0.1), | |
| dict( | |
| type='OneOf', | |
| transforms=[ | |
| dict(type='Blur', blur_limit=3, p=1.0), | |
| dict(type='MedianBlur', blur_limit=3, p=1.0) | |
| ], | |
| p=0.1), | |
| ] | |
| keymap (dict | None): key mapping from ``input key`` to | |
| ``albumentation-style key``. | |
| Defaults to None, which will use {'img': 'image'}. | |
| """ | |
| def __init__(self, | |
| transforms: List[dict], | |
| keymap: Optional[dict] = None) -> None: | |
| if albumentations is None: | |
| raise RuntimeError('albumentations is not installed') | |
| self.transforms = transforms | |
| self.aug = albumentations.Compose( | |
| [self.albu_builder(t) for t in self.transforms]) | |
| if not keymap: | |
| self.keymap_to_albu = { | |
| 'img': 'image', | |
| } | |
| else: | |
| self.keymap_to_albu = keymap | |
| def albu_builder(self, cfg: dict) -> albumentations: | |
| """Import a module from albumentations. | |
| It resembles some of :func:`build_from_cfg` logic. | |
| Args: | |
| cfg (dict): Config dict. It should at least contain the key "type". | |
| Returns: | |
| albumentations.BasicTransform: The constructed transform object | |
| """ | |
| assert isinstance(cfg, dict) and 'type' in cfg | |
| args = cfg.copy() | |
| obj_type = args.pop('type') | |
| if mmengine.is_str(obj_type): | |
| if albumentations is None: | |
| raise RuntimeError('albumentations is not installed') | |
| rank, _ = get_dist_info() | |
| if rank == 0 and not hasattr( | |
| albumentations.augmentations.transforms, obj_type): | |
| warnings.warn( | |
| f'{obj_type} is not pixel-level transformations. ' | |
| 'Please use with caution.') | |
| obj_cls = getattr(albumentations, obj_type) | |
| elif isinstance(obj_type, type): | |
| obj_cls = obj_type | |
| else: | |
| raise TypeError(f'type must be a str, but got {type(obj_type)}') | |
| if 'transforms' in args: | |
| args['transforms'] = [ | |
| self.albu_builder(transform) | |
| for transform in args['transforms'] | |
| ] | |
| return obj_cls(**args) | |
| def transform(self, results: dict) -> dict: | |
| """The transform function of :class:`Albumentation` to apply | |
| albumentations transforms. | |
| See ``transform()`` method of :class:`BaseTransform` for details. | |
| Args: | |
| results (dict): Result dict from the data pipeline. | |
| Return: | |
| dict: updated result dict. | |
| """ | |
| # map result dict to albumentations format | |
| results_albu = {} | |
| for k, v in self.keymap_to_albu.items(): | |
| assert k in results, \ | |
| f'The `{k}` is required to perform albumentations transforms' | |
| results_albu[v] = results[k] | |
| # Apply albumentations transforms | |
| results_albu = self.aug(**results_albu) | |
| # map the albu results back to the original format | |
| for k, v in self.keymap_to_albu.items(): | |
| results[k] = results_albu[v] | |
| return results | |
| def __repr__(self) -> str: | |
| """print the basic information of the transform. | |
| Returns: | |
| str: Formatted string. | |
| """ | |
| repr_str = self.__class__.__name__ + f'(transforms={self.transforms})' | |
| return repr_str | |
| class PhotometricDistortion(BaseTransform): | |
| """Apply photometric distortion to image sequentially, every transformation | |
| is applied with a probability of 0.5. The position of random contrast is in | |
| second or second to last. | |
| 1. random brightness | |
| 2. random contrast (mode 0) | |
| 3. convert color from BGR to HSV | |
| 4. random saturation | |
| 5. random hue | |
| 6. convert color from HSV to BGR | |
| 7. random contrast (mode 1) | |
| 8. randomly swap channels | |
| Required Keys: | |
| - img | |
| Modified Keys: | |
| - img | |
| Args: | |
| brightness_delta (int): delta of brightness. | |
| contrast_range (tuple): range of contrast. | |
| saturation_range (tuple): range of saturation. | |
| hue_delta (int): delta of hue. | |
| """ | |
| def __init__(self, | |
| brightness_delta: int = 32, | |
| contrast_range: Sequence[Number] = (0.5, 1.5), | |
| saturation_range: Sequence[Number] = (0.5, 1.5), | |
| hue_delta: int = 18) -> None: | |
| self.brightness_delta = brightness_delta | |
| self.contrast_lower, self.contrast_upper = contrast_range | |
| self.saturation_lower, self.saturation_upper = saturation_range | |
| self.hue_delta = hue_delta | |
| def _random_flags(self) -> Sequence[Number]: | |
| """Generate the random flags for subsequent transforms. | |
| Returns: | |
| Sequence[Number]: a sequence of numbers that indicate whether to | |
| do the corresponding transforms. | |
| """ | |
| # contrast_mode == 0 --> do random contrast first | |
| # contrast_mode == 1 --> do random contrast last | |
| contrast_mode = np.random.randint(2) | |
| # whether to apply brightness distortion | |
| brightness_flag = np.random.randint(2) | |
| # whether to apply contrast distortion | |
| contrast_flag = np.random.randint(2) | |
| # the mode to convert color from BGR to HSV | |
| hsv_mode = np.random.randint(4) | |
| # whether to apply channel swap | |
| swap_flag = np.random.randint(2) | |
| # the beta in `self._convert` to be added to image array | |
| # in brightness distortion | |
| brightness_beta = np.random.uniform(-self.brightness_delta, | |
| self.brightness_delta) | |
| # the alpha in `self._convert` to be multiplied to image array | |
| # in contrast distortion | |
| contrast_alpha = np.random.uniform(self.contrast_lower, | |
| self.contrast_upper) | |
| # the alpha in `self._convert` to be multiplied to image array | |
| # in saturation distortion to hsv-formatted img | |
| saturation_alpha = np.random.uniform(self.saturation_lower, | |
| self.saturation_upper) | |
| # delta of hue to add to image array in hue distortion | |
| hue_delta = np.random.randint(-self.hue_delta, self.hue_delta) | |
| # the random permutation of channel order | |
| swap_channel_order = np.random.permutation(3) | |
| return (contrast_mode, brightness_flag, contrast_flag, hsv_mode, | |
| swap_flag, brightness_beta, contrast_alpha, saturation_alpha, | |
| hue_delta, swap_channel_order) | |
| def _convert(self, | |
| img: np.ndarray, | |
| alpha: float = 1, | |
| beta: float = 0) -> np.ndarray: | |
| """Multiple with alpha and add beta with clip. | |
| Args: | |
| img (np.ndarray): The image array. | |
| alpha (float): The random multiplier. | |
| beta (float): The random offset. | |
| Returns: | |
| np.ndarray: The updated image array. | |
| """ | |
| img = img.astype(np.float32) * alpha + beta | |
| img = np.clip(img, 0, 255) | |
| return img.astype(np.uint8) | |
| def transform(self, results: dict) -> dict: | |
| """The transform function of :class:`PhotometricDistortion` to perform | |
| photometric distortion on images. | |
| See ``transform()`` method of :class:`BaseTransform` for details. | |
| Args: | |
| results (dict): Result dict from the data pipeline. | |
| Returns: | |
| dict: Result dict with images distorted. | |
| """ | |
| assert 'img' in results, '`img` is not found in results' | |
| img = results['img'] | |
| (contrast_mode, brightness_flag, contrast_flag, hsv_mode, swap_flag, | |
| brightness_beta, contrast_alpha, saturation_alpha, hue_delta, | |
| swap_channel_order) = self._random_flags() | |
| # random brightness distortion | |
| if brightness_flag: | |
| img = self._convert(img, beta=brightness_beta) | |
| # contrast_mode == 0 --> do random contrast first | |
| # contrast_mode == 1 --> do random contrast last | |
| if contrast_mode == 1: | |
| if contrast_flag: | |
| img = self._convert(img, alpha=contrast_alpha) | |
| if hsv_mode: | |
| # random saturation/hue distortion | |
| img = mmcv.bgr2hsv(img) | |
| if hsv_mode == 1 or hsv_mode == 3: | |
| # apply saturation distortion to hsv-formatted img | |
| img[:, :, 1] = self._convert( | |
| img[:, :, 1], alpha=saturation_alpha) | |
| if hsv_mode == 2 or hsv_mode == 3: | |
| # apply hue distortion to hsv-formatted img | |
| img[:, :, 0] = img[:, :, 0].astype(int) + hue_delta | |
| img = mmcv.hsv2bgr(img) | |
| if contrast_mode == 1: | |
| if contrast_flag: | |
| img = self._convert(img, alpha=contrast_alpha) | |
| # randomly swap channels | |
| if swap_flag: | |
| img = img[..., swap_channel_order] | |
| results['img'] = img | |
| return results | |
| def __repr__(self) -> str: | |
| """print the basic information of the transform. | |
| Returns: | |
| str: Formatted string. | |
| """ | |
| repr_str = self.__class__.__name__ | |
| repr_str += (f'(brightness_delta={self.brightness_delta}, ' | |
| f'contrast_range=({self.contrast_lower}, ' | |
| f'{self.contrast_upper}), ' | |
| f'saturation_range=({self.saturation_lower}, ' | |
| f'{self.saturation_upper}), ' | |
| f'hue_delta={self.hue_delta})') | |
| return repr_str | |
| class GenerateTarget(BaseTransform): | |
| """Encode keypoints into Target. | |
| The generated target is usually the supervision signal of the model | |
| learning, e.g. heatmaps or regression labels. | |
| Required Keys: | |
| - keypoints | |
| - keypoints_visible | |
| - dataset_keypoint_weights | |
| Added Keys: | |
| - The keys of the encoded items from the codec will be updated into | |
| the results, e.g. ``'heatmaps'`` or ``'keypoint_weights'``. See | |
| the specific codec for more details. | |
| Args: | |
| encoder (dict | list[dict]): The codec config for keypoint encoding. | |
| Both single encoder and multiple encoders (given as a list) are | |
| supported | |
| multilevel (bool): Determine the method to handle multiple encoders. | |
| If ``multilevel==True``, generate multilevel targets from a group | |
| of encoders of the same type (e.g. multiple :class:`MSRAHeatmap` | |
| encoders with different sigma values); If ``multilevel==False``, | |
| generate combined targets from a group of different encoders. This | |
| argument will have no effect in case of single encoder. Defaults | |
| to ``False`` | |
| use_dataset_keypoint_weights (bool): Whether use the keypoint weights | |
| from the dataset meta information. Defaults to ``False`` | |
| target_type (str, deprecated): This argument is deprecated and has no | |
| effect. Defaults to ``None`` | |
| """ | |
| def __init__(self, | |
| encoder: MultiConfig, | |
| target_type: Optional[str] = None, | |
| multilevel: bool = False, | |
| use_dataset_keypoint_weights: bool = False) -> None: | |
| super().__init__() | |
| if target_type is not None: | |
| rank, _ = get_dist_info() | |
| if rank == 0: | |
| warnings.warn( | |
| 'The argument `target_type` is deprecated in' | |
| ' GenerateTarget. The target type and encoded ' | |
| 'keys will be determined by encoder(s).', | |
| DeprecationWarning) | |
| self.encoder_cfg = deepcopy(encoder) | |
| self.multilevel = multilevel | |
| self.use_dataset_keypoint_weights = use_dataset_keypoint_weights | |
| if isinstance(self.encoder_cfg, list): | |
| self.encoder = [ | |
| KEYPOINT_CODECS.build(cfg) for cfg in self.encoder_cfg | |
| ] | |
| else: | |
| assert not self.multilevel, ( | |
| 'Need multiple encoder configs if ``multilevel==True``') | |
| self.encoder = KEYPOINT_CODECS.build(self.encoder_cfg) | |
| def transform(self, results: Dict) -> Optional[dict]: | |
| """The transform function of :class:`GenerateTarget`. | |
| See ``transform()`` method of :class:`BaseTransform` for details. | |
| """ | |
| if results.get('transformed_keypoints', None) is not None: | |
| # use keypoints transformed by TopdownAffine | |
| keypoints = results['transformed_keypoints'] | |
| elif results.get('keypoints', None) is not None: | |
| # use original keypoints | |
| keypoints = results['keypoints'] | |
| else: | |
| raise ValueError( | |
| 'GenerateTarget requires \'transformed_keypoints\' or' | |
| ' \'keypoints\' in the results.') | |
| keypoints_visible = results['keypoints_visible'] | |
| if keypoints_visible.ndim == 3 and keypoints_visible.shape[2] == 2: | |
| keypoints_visible, keypoints_visible_weights = \ | |
| keypoints_visible[..., 0], keypoints_visible[..., 1] | |
| results['keypoints_visible'] = keypoints_visible | |
| results['keypoints_visible_weights'] = keypoints_visible_weights | |
| id_similarity = results.get('id_similarity', np.array([0])) | |
| keypoints_visibility = results.get("keypoints_visibility", None) | |
| # Encoded items from the encoder(s) will be updated into the results. | |
| # Please refer to the document of the specific codec for details about | |
| # encoded items. | |
| if not isinstance(self.encoder, list): | |
| # For single encoding, the encoded items will be directly added | |
| # into results. | |
| auxiliary_encode_kwargs = { | |
| key: results[key] | |
| for key in self.encoder.auxiliary_encode_keys | |
| } | |
| encoded = self.encoder.encode( | |
| keypoints=keypoints, | |
| keypoints_visible=keypoints_visible, | |
| keypoints_visibility=keypoints_visibility, | |
| id_similarity=id_similarity, | |
| **auxiliary_encode_kwargs) | |
| if self.encoder.field_mapping_table: | |
| encoded[ | |
| 'field_mapping_table'] = self.encoder.field_mapping_table | |
| if self.encoder.instance_mapping_table: | |
| encoded['instance_mapping_table'] = \ | |
| self.encoder.instance_mapping_table | |
| if self.encoder.label_mapping_table: | |
| encoded[ | |
| 'label_mapping_table'] = self.encoder.label_mapping_table | |
| else: | |
| encoded_list = [] | |
| _field_mapping_table = dict() | |
| _instance_mapping_table = dict() | |
| _label_mapping_table = dict() | |
| for _encoder in self.encoder: | |
| auxiliary_encode_kwargs = { | |
| key: results[key] | |
| for key in _encoder.auxiliary_encode_keys | |
| } | |
| encoded_list.append( | |
| _encoder.encode( | |
| keypoints=keypoints, | |
| keypoints_visible=keypoints_visible, | |
| keypoints_visibility=keypoints_visibility, | |
| id_similarity=id_similarity, | |
| **auxiliary_encode_kwargs)) | |
| _field_mapping_table.update(_encoder.field_mapping_table) | |
| _instance_mapping_table.update(_encoder.instance_mapping_table) | |
| _label_mapping_table.update(_encoder.label_mapping_table) | |
| if self.multilevel: | |
| # For multilevel encoding, the encoded items from each encoder | |
| # should have the same keys. | |
| keys = encoded_list[0].keys() | |
| if not all(_encoded.keys() == keys | |
| for _encoded in encoded_list): | |
| raise ValueError( | |
| 'Encoded items from all encoders must have the same ' | |
| 'keys if ``multilevel==True``.') | |
| encoded = { | |
| k: [_encoded[k] for _encoded in encoded_list] | |
| for k in keys | |
| } | |
| else: | |
| # For combined encoding, the encoded items from different | |
| # encoders should have no overlapping items, except for | |
| # `keypoint_weights`. If multiple `keypoint_weights` are given, | |
| # they will be multiplied as the final `keypoint_weights`. | |
| encoded = dict() | |
| keypoint_weights = [] | |
| for _encoded in encoded_list: | |
| for key, value in _encoded.items(): | |
| if key == 'keypoint_weights': | |
| keypoint_weights.append(value) | |
| elif key not in encoded: | |
| encoded[key] = value | |
| else: | |
| raise ValueError( | |
| f'Overlapping item "{key}" from multiple ' | |
| 'encoders, which is not supported when ' | |
| '``multilevel==False``') | |
| if keypoint_weights: | |
| encoded['keypoint_weights'] = keypoint_weights | |
| if _field_mapping_table: | |
| encoded['field_mapping_table'] = _field_mapping_table | |
| if _instance_mapping_table: | |
| encoded['instance_mapping_table'] = _instance_mapping_table | |
| if _label_mapping_table: | |
| encoded['label_mapping_table'] = _label_mapping_table | |
| if self.use_dataset_keypoint_weights and 'keypoint_weights' in encoded: | |
| if isinstance(encoded['keypoint_weights'], list): | |
| for w in encoded['keypoint_weights']: | |
| w = w * results['dataset_keypoint_weights'] | |
| else: | |
| encoded['keypoint_weights'] = encoded[ | |
| 'keypoint_weights'] * results['dataset_keypoint_weights'] | |
| results.update(encoded) | |
| return results | |
| def __repr__(self) -> str: | |
| """print the basic information of the transform. | |
| Returns: | |
| str: Formatted string. | |
| """ | |
| repr_str = self.__class__.__name__ | |
| repr_str += (f'(encoder={str(self.encoder_cfg)}, ') | |
| repr_str += ('use_dataset_keypoint_weights=' | |
| f'{self.use_dataset_keypoint_weights})') | |
| return repr_str | |
| class YOLOXHSVRandomAug(BaseTransform): | |
| """Apply HSV augmentation to image sequentially. It is referenced from | |
| https://github.com/Megvii- | |
| BaseDetection/YOLOX/blob/main/yolox/data/data_augment.py#L21. | |
| Required Keys: | |
| - img | |
| Modified Keys: | |
| - img | |
| Args: | |
| hue_delta (int): delta of hue. Defaults to 5. | |
| saturation_delta (int): delta of saturation. Defaults to 30. | |
| value_delta (int): delat of value. Defaults to 30. | |
| """ | |
| def __init__(self, | |
| hue_delta: int = 5, | |
| saturation_delta: int = 30, | |
| value_delta: int = 30) -> None: | |
| self.hue_delta = hue_delta | |
| self.saturation_delta = saturation_delta | |
| self.value_delta = value_delta | |
| def _get_hsv_gains(self): | |
| hsv_gains = np.random.uniform(-1, 1, 3) * [ | |
| self.hue_delta, self.saturation_delta, self.value_delta | |
| ] | |
| # random selection of h, s, v | |
| hsv_gains *= np.random.randint(0, 2, 3) | |
| # prevent overflow | |
| hsv_gains = hsv_gains.astype(np.int16) | |
| return hsv_gains | |
| def transform(self, results: dict) -> dict: | |
| img = results['img'] | |
| hsv_gains = self._get_hsv_gains() | |
| img_hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV).astype(np.int16) | |
| img_hsv[..., 0] = (img_hsv[..., 0] + hsv_gains[0]) % 180 | |
| img_hsv[..., 1] = np.clip(img_hsv[..., 1] + hsv_gains[1], 0, 255) | |
| img_hsv[..., 2] = np.clip(img_hsv[..., 2] + hsv_gains[2], 0, 255) | |
| cv2.cvtColor(img_hsv.astype(img.dtype), cv2.COLOR_HSV2BGR, dst=img) | |
| results['img'] = img | |
| return results | |
| def __repr__(self): | |
| repr_str = self.__class__.__name__ | |
| repr_str += f'(hue_delta={self.hue_delta}, ' | |
| repr_str += f'saturation_delta={self.saturation_delta}, ' | |
| repr_str += f'value_delta={self.value_delta})' | |
| return repr_str | |
| class FilterAnnotations(BaseTransform): | |
| """Eliminate undesirable annotations based on specific conditions. | |
| This class is designed to sift through annotations by examining multiple | |
| factors such as the size of the bounding box, the visibility of keypoints, | |
| and the overall area. Users can fine-tune the criteria to filter out | |
| instances that have excessively small bounding boxes, insufficient area, | |
| or an inadequate number of visible keypoints. | |
| Required Keys: | |
| - bbox (np.ndarray) (optional) | |
| - area (np.int64) (optional) | |
| - keypoints_visible (np.ndarray) (optional) | |
| Modified Keys: | |
| - bbox (optional) | |
| - bbox_score (optional) | |
| - category_id (optional) | |
| - keypoints (optional) | |
| - keypoints_visible (optional) | |
| - area (optional) | |
| Args: | |
| min_gt_bbox_wh (tuple[float]): Minimum width and height of ground | |
| truth boxes. Default: (1., 1.) | |
| min_gt_area (int): Minimum foreground area of instances. | |
| Default: 1 | |
| min_kpt_vis (int): Minimum number of visible keypoints. Default: 1 | |
| by_box (bool): Filter instances with bounding boxes not meeting the | |
| min_gt_bbox_wh threshold. Default: False | |
| by_area (bool): Filter instances with area less than min_gt_area | |
| threshold. Default: False | |
| by_kpt (bool): Filter instances with keypoints_visible not meeting the | |
| min_kpt_vis threshold. Default: True | |
| keep_empty (bool): Whether to return None when it | |
| becomes an empty bbox after filtering. Defaults to True. | |
| """ | |
| def __init__(self, | |
| min_gt_bbox_wh: Tuple[int, int] = (1, 1), | |
| min_gt_area: int = 1, | |
| min_kpt_vis: int = 1, | |
| by_box: bool = False, | |
| by_area: bool = False, | |
| by_kpt: bool = True, | |
| keep_empty: bool = True) -> None: | |
| assert by_box or by_kpt or by_area | |
| self.min_gt_bbox_wh = min_gt_bbox_wh | |
| self.min_gt_area = min_gt_area | |
| self.min_kpt_vis = min_kpt_vis | |
| self.by_box = by_box | |
| self.by_area = by_area | |
| self.by_kpt = by_kpt | |
| self.keep_empty = keep_empty | |
| def transform(self, results: dict) -> Union[dict, None]: | |
| """Transform function to filter annotations. | |
| Args: | |
| results (dict): Result dict. | |
| Returns: | |
| dict: Updated result dict. | |
| """ | |
| assert 'keypoints' in results | |
| kpts = results['keypoints'] | |
| if kpts.shape[0] == 0: | |
| return results | |
| tests = [] | |
| if self.by_box and 'bbox' in results: | |
| bbox = results['bbox'] | |
| tests.append( | |
| ((bbox[..., 2] - bbox[..., 0] > self.min_gt_bbox_wh[0]) & | |
| (bbox[..., 3] - bbox[..., 1] > self.min_gt_bbox_wh[1]))) | |
| if self.by_area and 'area' in results: | |
| area = results['area'] | |
| tests.append(area >= self.min_gt_area) | |
| if self.by_kpt: | |
| kpts_vis = results['keypoints_visible'] | |
| if kpts_vis.ndim == 3: | |
| kpts_vis = kpts_vis[..., 0] | |
| tests.append(kpts_vis.sum(axis=1) >= self.min_kpt_vis) | |
| keep = tests[0] | |
| for t in tests[1:]: | |
| keep = keep & t | |
| if not keep.any(): | |
| if self.keep_empty: | |
| return None | |
| keys = ('bbox', 'bbox_score', 'category_id', 'keypoints', | |
| 'keypoints_visible', 'area') | |
| for key in keys: | |
| if key in results: | |
| results[key] = results[key][keep] | |
| return results | |
| def __repr__(self): | |
| return (f'{self.__class__.__name__}(' | |
| f'min_gt_bbox_wh={self.min_gt_bbox_wh}, ' | |
| f'min_gt_area={self.min_gt_area}, ' | |
| f'min_kpt_vis={self.min_kpt_vis}, ' | |
| f'by_box={self.by_box}, ' | |
| f'by_area={self.by_area}, ' | |
| f'by_kpt={self.by_kpt}, ' | |
| f'keep_empty={self.keep_empty})') | |
| def compute_paddings(bbox, bbox_s, kpts): | |
| """Compute the padding of the bbox to fit the keypoints.""" | |
| bbox = np.array(bbox).flatten() | |
| bbox_s = np.array(bbox_s).flatten() | |
| if kpts.size % 2 == 0: | |
| kpts = kpts.reshape(-1, 2) | |
| else: | |
| kpts = kpts.reshape(-1, 3) | |
| x0, y0, x1, y1 = bbox | |
| x_bbox_distances = np.max(np.stack([ | |
| np.clip(x0 - kpts[:, 0], a_min=0, a_max=None), | |
| np.clip(kpts[:, 0] - x1, a_min=0, a_max=None), | |
| ]), axis=0) | |
| y_bbox_distances = np.max(np.stack([ | |
| np.clip(y0 - kpts[:, 1], a_min=0, a_max=None), | |
| np.clip(kpts[:, 1] - y1, a_min=0, a_max=None), | |
| ]), axis=0) | |
| padding_x = 2 * x_bbox_distances / bbox_s[0] | |
| padding_y = 2 * y_bbox_distances / bbox_s[1] | |
| padding = 1 + np.maximum(padding_x, padding_y) | |
| padding = np.maximum(x_bbox_distances, y_bbox_distances) | |
| return padding.flatten() |