|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import absolute_import |
|
|
|
|
|
try: |
|
|
from collections.abc import Sequence |
|
|
except Exception: |
|
|
from collections import Sequence |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import math |
|
|
import copy |
|
|
import random |
|
|
import uuid |
|
|
from numbers import Number, Integral |
|
|
|
|
|
from ...modeling.keypoint_utils import get_affine_mat_kernel, warp_affine_joints, get_affine_transform, affine_transform, get_warp_matrix |
|
|
from ppdet.core.workspace import serializable |
|
|
from ppdet.utils.logger import setup_logger |
|
|
logger = setup_logger(__name__) |
|
|
|
|
|
registered_ops = [] |
|
|
|
|
|
__all__ = [ |
|
|
'CropAndFlipImages', 'PermuteImages', 'RandomFlipHalfBody3DTransformImages' |
|
|
] |
|
|
|
|
|
import matplotlib.pyplot as plt |
|
|
from PIL import Image, ImageDraw |
|
|
from mpl_toolkits.mplot3d import Axes3D |
|
|
|
|
|
|
|
|
def register_keypointop(cls): |
|
|
return serializable(cls) |
|
|
|
|
|
|
|
|
def register_op(cls): |
|
|
registered_ops.append(cls.__name__) |
|
|
if not hasattr(BaseOperator, cls.__name__): |
|
|
setattr(BaseOperator, cls.__name__, cls) |
|
|
else: |
|
|
raise KeyError("The {} class has been registered.".format(cls.__name__)) |
|
|
return serializable(cls) |
|
|
|
|
|
|
|
|
class BaseOperator(object): |
|
|
def __init__(self, name=None): |
|
|
if name is None: |
|
|
name = self.__class__.__name__ |
|
|
self._id = name + '_' + str(uuid.uuid4())[-6:] |
|
|
|
|
|
def apply(self, sample, context=None): |
|
|
""" Process a sample. |
|
|
Args: |
|
|
sample (dict): a dict of sample, eg: {'image':xx, 'label': xxx} |
|
|
context (dict): info about this sample processing |
|
|
Returns: |
|
|
result (dict): a processed sample |
|
|
""" |
|
|
return sample |
|
|
|
|
|
def __call__(self, sample, context=None): |
|
|
""" Process a sample. |
|
|
Args: |
|
|
sample (dict): a dict of sample, eg: {'image':xx, 'label': xxx} |
|
|
context (dict): info about this sample processing |
|
|
Returns: |
|
|
result (dict): a processed sample |
|
|
""" |
|
|
if isinstance(sample, Sequence): |
|
|
for i in range(len(sample)): |
|
|
sample[i] = self.apply(sample[i], context) |
|
|
else: |
|
|
|
|
|
sample = self.apply(sample, context) |
|
|
return sample |
|
|
|
|
|
def __str__(self): |
|
|
return str(self._id) |
|
|
|
|
|
|
|
|
@register_keypointop |
|
|
class CropAndFlipImages(object): |
|
|
"""Crop all images""" |
|
|
|
|
|
def __init__(self, crop_range, flip_pairs=None): |
|
|
super(CropAndFlipImages, self).__init__() |
|
|
self.crop_range = crop_range |
|
|
self.flip_pairs = flip_pairs |
|
|
|
|
|
def __call__(self, records): |
|
|
images = records["image"] |
|
|
images = images[:, :, ::-1, :] |
|
|
images = images[:, :, self.crop_range[0]:self.crop_range[1]] |
|
|
records["image"] = images |
|
|
|
|
|
if "kps2d" in records.keys(): |
|
|
kps2d = records["kps2d"] |
|
|
|
|
|
width, height = images.shape[2], images.shape[1] |
|
|
kps2d = np.array(kps2d) |
|
|
kps2d[:, :, 0] = kps2d[:, :, 0] - self.crop_range[0] |
|
|
|
|
|
for pair in self.flip_pairs: |
|
|
kps2d[:, pair[0], :], kps2d[:,pair[1], :] = \ |
|
|
kps2d[:,pair[1], :], kps2d[:,pair[0], :].copy() |
|
|
|
|
|
records["kps2d"] = kps2d |
|
|
|
|
|
return records |
|
|
|
|
|
|
|
|
@register_op |
|
|
class PermuteImages(BaseOperator): |
|
|
def __init__(self): |
|
|
""" |
|
|
Change the channel to be (batch_size, C, H, W) #(6, 3, 1080, 1920) |
|
|
""" |
|
|
super(PermuteImages, self).__init__() |
|
|
|
|
|
def apply(self, sample, context=None): |
|
|
images = sample["image"] |
|
|
images = images.transpose((0, 3, 1, 2)) |
|
|
|
|
|
sample["image"] = images |
|
|
|
|
|
return sample |
|
|
|
|
|
|
|
|
@register_keypointop |
|
|
class RandomFlipHalfBody3DTransformImages(object): |
|
|
"""apply data augment to images and coords |
|
|
to achieve the flip, scale, rotate and half body transform effect for training image |
|
|
Args: |
|
|
trainsize (list):[w, h], Image target size |
|
|
upper_body_ids (list): The upper body joint ids |
|
|
flip_pairs (list): The left-right joints exchange order list |
|
|
pixel_std (int): The pixel std of the scale |
|
|
scale (float): The scale factor to transform the image |
|
|
rot (int): The rotate factor to transform the image |
|
|
num_joints_half_body (int): The joints threshold of the half body transform |
|
|
prob_half_body (float): The threshold of the half body transform |
|
|
flip (bool): Whether to flip the image |
|
|
Returns: |
|
|
records(dict): contain the image and coords after tranformed |
|
|
""" |
|
|
|
|
|
def __init__(self, |
|
|
trainsize, |
|
|
upper_body_ids, |
|
|
flip_pairs, |
|
|
pixel_std, |
|
|
scale=0.35, |
|
|
rot=40, |
|
|
num_joints_half_body=8, |
|
|
prob_half_body=0.3, |
|
|
flip=True, |
|
|
rot_prob=0.6, |
|
|
do_occlusion=False): |
|
|
super(RandomFlipHalfBody3DTransformImages, self).__init__() |
|
|
self.trainsize = trainsize |
|
|
self.upper_body_ids = upper_body_ids |
|
|
self.flip_pairs = flip_pairs |
|
|
self.pixel_std = pixel_std |
|
|
self.scale = scale |
|
|
self.rot = rot |
|
|
self.num_joints_half_body = num_joints_half_body |
|
|
self.prob_half_body = prob_half_body |
|
|
self.flip = flip |
|
|
self.aspect_ratio = trainsize[0] * 1.0 / trainsize[1] |
|
|
self.rot_prob = rot_prob |
|
|
self.do_occlusion = do_occlusion |
|
|
|
|
|
def halfbody_transform(self, joints, joints_vis): |
|
|
upper_joints = [] |
|
|
lower_joints = [] |
|
|
for joint_id in range(joints.shape[0]): |
|
|
if joints_vis[joint_id][0] > 0: |
|
|
if joint_id in self.upper_body_ids: |
|
|
upper_joints.append(joints[joint_id]) |
|
|
else: |
|
|
lower_joints.append(joints[joint_id]) |
|
|
if np.random.randn() < 0.5 and len(upper_joints) > 2: |
|
|
selected_joints = upper_joints |
|
|
else: |
|
|
selected_joints = lower_joints if len( |
|
|
lower_joints) > 2 else upper_joints |
|
|
if len(selected_joints) < 2: |
|
|
return None, None |
|
|
selected_joints = np.array(selected_joints, dtype=np.float32) |
|
|
center = selected_joints.mean(axis=0)[:2] |
|
|
left_top = np.amin(selected_joints, axis=0) |
|
|
right_bottom = np.amax(selected_joints, axis=0) |
|
|
w = right_bottom[0] - left_top[0] |
|
|
h = right_bottom[1] - left_top[1] |
|
|
if w > self.aspect_ratio * h: |
|
|
h = w * 1.0 / self.aspect_ratio |
|
|
elif w < self.aspect_ratio * h: |
|
|
w = h * self.aspect_ratio |
|
|
scale = np.array( |
|
|
[w * 1.0 / self.pixel_std, h * 1.0 / self.pixel_std], |
|
|
dtype=np.float32) |
|
|
scale = scale * 1.5 |
|
|
|
|
|
return center, scale |
|
|
|
|
|
def flip_joints(self, joints, joints_vis, width, matched_parts, kps2d=None): |
|
|
|
|
|
|
|
|
joints[:, :, 0] = width - joints[:, :, 0] - 1 |
|
|
if kps2d is not None: |
|
|
kps2d[:, :, 0] = width - kps2d[:, :, 0] - 1 |
|
|
|
|
|
for pair in matched_parts: |
|
|
joints[:, pair[0], :], joints[:,pair[1], :] = \ |
|
|
joints[:,pair[1], :], joints[:,pair[0], :].copy() |
|
|
|
|
|
joints_vis[:,pair[0], :], joints_vis[:,pair[1], :] = \ |
|
|
joints_vis[:,pair[1], :], joints_vis[:,pair[0], :].copy() |
|
|
|
|
|
if kps2d is not None: |
|
|
kps2d[:, pair[0], :], kps2d[:,pair[1], :] = \ |
|
|
kps2d[:,pair[1], :], kps2d[:,pair[0], :].copy() |
|
|
|
|
|
|
|
|
joints -= joints[:, [0], :] |
|
|
|
|
|
return joints, joints_vis, kps2d |
|
|
|
|
|
def __call__(self, records): |
|
|
images = records[ |
|
|
'image'] |
|
|
|
|
|
joints = records['kps3d'] |
|
|
joints_vis = records['kps3d_vis'] |
|
|
|
|
|
kps2d = None |
|
|
if 'kps2d' in records.keys(): |
|
|
kps2d = records['kps2d'] |
|
|
|
|
|
if self.flip and np.random.random() <= 0.5: |
|
|
images = images[:, :, ::-1, :] |
|
|
joints, joints_vis, kps2d = self.flip_joints( |
|
|
joints, joints_vis, images.shape[2], self.flip_pairs, |
|
|
kps2d) |
|
|
occlusion = False |
|
|
if self.do_occlusion and random.random() <= 0.5: |
|
|
height = images[0].shape[0] |
|
|
width = images[0].shape[1] |
|
|
occlusion = True |
|
|
while True: |
|
|
area_min = 0.0 |
|
|
area_max = 0.2 |
|
|
synth_area = (random.random() * |
|
|
(area_max - area_min) + area_min) * width * height |
|
|
|
|
|
ratio_min = 0.3 |
|
|
ratio_max = 1 / 0.3 |
|
|
synth_ratio = (random.random() * |
|
|
(ratio_max - ratio_min) + ratio_min) |
|
|
|
|
|
synth_h = math.sqrt(synth_area * synth_ratio) |
|
|
synth_w = math.sqrt(synth_area / synth_ratio) |
|
|
synth_xmin = random.random() * (width - synth_w - 1) |
|
|
synth_ymin = random.random() * (height - synth_h - 1) |
|
|
|
|
|
if synth_xmin >= 0 and synth_ymin >= 0 and synth_xmin + synth_w < width and synth_ymin + synth_h < height: |
|
|
xmin = int(synth_xmin) |
|
|
ymin = int(synth_ymin) |
|
|
w = int(synth_w) |
|
|
h = int(synth_h) |
|
|
|
|
|
mask = np.random.rand(h, w, 3) * 255 |
|
|
images[:, ymin:ymin + h, xmin:xmin + w, :] = mask[ |
|
|
None, :, :, :] |
|
|
break |
|
|
|
|
|
records['image'] = images |
|
|
records['kps3d'] = joints |
|
|
records['kps3d_vis'] = joints_vis |
|
|
if kps2d is not None: |
|
|
records['kps2d'] = kps2d |
|
|
|
|
|
return records |
|
|
|