import os import cv2 import json import logging import random from typing import Dict import torch from torch.utils.data import Dataset from torchvision import transforms import numpy as np import transformers from pycocotools.coco import COCO from .constants import COCO_KEYPOINT_NAME, KeypointLocationDescription, KeypointLocationQuestion from .constants import COCO_KEYPOINT_NAME_TOKEN DEFAULT_IMAGE_PATCH_TOKEN = "" PREFIX_IMAGE = "Image: " PREFIX_NO_IMAGE = "Image: N/A" BEGIN_DESCRIPTION = "" END_DESCRIPTION = "" IGNORE_INDEX = -100 DEFAULT_EOS_TOKEN = "" BEGIN_OPTIONS = "" END_OPTIONS = "" BEGIN_LOC = "" END_LOC = "" BEGIN_QUESTION = "" END_QUESTION = "" class PoseHICODetDataset(Dataset): """Dataset for supervised fine-tuning.""" def __init__(self, data_path: str, multimodal_cfg: dict, ): super(PoseHICODetDataset, self).__init__() logging.warning("Loading data...") self.multimodal_cfg = multimodal_cfg self.mllm_image_size = multimodal_cfg['image_size'] self.aspect_ratio = 1.0 self.pixel_std = 200 self.num_joints = 17 self.num_joints_full_body = 136 self.list_data_dict = self._load_data(data_path) def _iou(self, a, b): x1, y1, x2, y2 = a; X1, Y1, X2, Y2 = b iw = max(0, min(x2, X2) - max(x1, X1)) ih = max(0, min(y2, Y2) - max(y1, Y1)) inter = iw * ih return inter / ((x2 - x1) * (y2 - y1) + (X2 - X1) * (Y2 - Y1) - inter + 1e-9) def _match_pose_hoi_objs(self, pose_objs, hoi_objs): matched_pose_objs = [] matched_hoi_objs = [] for pose_obj in pose_objs: for hoi_obj in hoi_objs: X1, Y1, W, H = pose_obj['bbox'] iou = self._iou(hoi_obj['human_bbox'], [X1, Y1, X1+W, Y1+H]) if iou < 0.9: continue if 'action_labels' not in list(hoi_obj.keys()): continue matched_pose_objs.append(pose_obj) matched_hoi_objs.append(hoi_obj) return matched_pose_objs, matched_hoi_objs def _load_data(self, data_path): # load pose annotation via coco api coco_path = os.path.join(data_path, 'Annotation/hico-fullbody-pose/halpe_train_v1.json') coco = COCO(coco_path) # load instance-level hoi+part state annotation via json json_path = os.path.join(data_path, "Annotation/hico-det-instance-level/hico-det-training-set-instance-level.json") with open(json_path, "r", encoding="utf-8") as f: hoi_data = json.load(f) # dict (or list) depending on the JSON root instance_id = 0 list_data_dict=[] for index in coco.getImgIds(): #load pose data per image id im_ann = coco.loadImgs(index)[0] width = im_ann['width'] height = im_ann['height'] annIds = coco.getAnnIds(imgIds=index, iscrowd=False) pose_objs = coco.loadAnns(annIds) #load hoi data per image id file_name = im_ann['file_name'] hoi_objs = hoi_data[file_name]['labels'] pose_objs, hoi_objs = self._match_pose_hoi_objs(pose_objs, hoi_objs) for (pose_obj, hoi_obj) in zip(pose_objs, hoi_objs): cls = pose_obj['category_id'] if cls != 1: continue # ignore objs without keypoints annotation if max(pose_obj['keypoints']) == 0: continue assert 'action_labels' in list(hoi_obj.keys()) joints_3d = np.zeros((self.num_joints_full_body, 3), dtype=np.float32) joints_3d_vis = np.zeros((self.num_joints_full_body, 3), dtype=np.float32) visible = np.zeros((self.num_joints_full_body), dtype=np.float32) for ipt in range(self.num_joints_full_body): joints_3d[ipt, 0] = pose_obj['keypoints'][ipt * 3 + 0] joints_3d[ipt, 1] = pose_obj['keypoints'][ipt * 3 + 1] joints_3d[ipt, 2] = 0 t_vis = pose_obj['keypoints'][ipt * 3 + 2] visible[ipt] = t_vis if t_vis > 1: t_vis = 1 joints_3d_vis[ipt, 0] = t_vis joints_3d_vis[ipt, 1] = t_vis joints_3d_vis[ipt, 2] = 0 center, scale = self._box2cs(pose_obj['bbox'][:4]) list_data_dict.append({ 'file_name': file_name, 'image_id': index, 'center': center, 'scale': scale, 'joints_3d': joints_3d[:self.num_joints], # the first 17 keypoints are aligned with COCO's 17 keypoints definition. 'joints_3d_vis': joints_3d_vis[:self.num_joints], 'instance_id': instance_id, 'hoi_obj': hoi_obj, }) instance_id += 1 logging.warning("The number of training samples is {}".format(len(list_data_dict))) logging.warning("Formatting inputs...Skip in lazy mode") return list_data_dict def __len__(self): return len(self.list_data_dict) def __getitem__(self, i): sources = self.list_data_dict[i] image, joints, joints_vis, c, s = self._get_image_item(sources) data_dict = {} data_dict["image"] = image data_dict["has_image"] = True data_dict["meta"] = sources return data_dict def _get_image_item(self, sources): file_name = sources['file_name'] image_folder = self.multimodal_cfg['image_folder'] image_file = os.path.join(image_folder, file_name) image = cv2.imread( image_file, cv2.IMREAD_COLOR | cv2.IMREAD_IGNORE_ORIENTATION ) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # process image joints = sources['joints_3d'] joints_vis = sources['joints_3d_vis'] c = sources['center'] s = sources['scale'] r = 0 trans = get_affine_transform(c, s, r, (int(self.mllm_image_size), int(self.mllm_image_size))) image = cv2.warpAffine( image, trans, (int(self.mllm_image_size), int(self.mllm_image_size)), flags=cv2.INTER_LINEAR) # for i in range(self.num_joints): # if joints_vis[i, 0] > 0.0: # joints[i, 0:2] = affine_transform(joints[i, 0:2], trans) return image, joints, joints_vis, c, s def _box2cs(self, box): x, y, w, h = box[:4] return self._xywh2cs(x, y, w, h) def _xywh2cs(self, x, y, w, h): center = np.zeros((2), dtype=np.float32) center[0] = x + w * 0.5 center[1] = y + h * 0.5 if w > self.aspect_ratio * h: h = w * 1.0 / self.aspect_ratio elif w < self.aspect_ratio * h: w = h * self.aspect_ratio scale = np.array( [w * 1.0 / self.pixel_std, h * 1.0 / self.pixel_std], dtype=np.float32) if center[0] != -1: # scale = scale * 1.25 scale = scale * 1.0 return center, scale def _generate_target(self, joints, joints_vis): ''' :param joints: [num_joints, 3] :param joints_vis: [num_joints, 3] :return: target, target_weight(1: visible, 0: invisible) ''' target_weight = np.ones((self.num_joints, 1), dtype=np.float32) target_weight[:, 0] = joints_vis[:, 0] target = np.zeros((self.num_joints, self.heatmap_size[1], self.heatmap_size[0]), dtype=np.float32) tmp_size = self.sigma * 3 for joint_id in range(self.num_joints): feat_stride = self.vitpose_image_size / self.heatmap_size mu_x = int(joints[joint_id][0] / feat_stride[0] + 0.5) mu_y = int(joints[joint_id][1] / feat_stride[1] + 0.5) # Check that any part of the gaussian is in-bounds ul = [int(mu_x - tmp_size), int(mu_y - tmp_size)] br = [int(mu_x + tmp_size + 1), int(mu_y + tmp_size + 1)] if ul[0] >= self.heatmap_size[0] or ul[1] >= self.heatmap_size[1] \ or br[0] < 0 or br[1] < 0: # If not, just return the image as is target_weight[joint_id] = 0 continue # # Generate gaussian size = 2 * tmp_size + 1 x = np.arange(0, size, 1, np.float32) y = x[:, np.newaxis] x0 = y0 = size // 2 # The gaussian is not normalized, we want the center value to equal 1 g = np.exp(- ((x - x0) ** 2 + (y - y0) ** 2) / (2 * self.sigma ** 2)) # Usable gaussian range g_x = max(0, -ul[0]), min(br[0], self.heatmap_size[0]) - ul[0] g_y = max(0, -ul[1]), min(br[1], self.heatmap_size[1]) - ul[1] # Image range img_x = max(0, ul[0]), min(br[0], self.heatmap_size[0]) img_y = max(0, ul[1]), min(br[1], self.heatmap_size[1]) v = target_weight[joint_id] if v > 0.5: target[joint_id][img_y[0]:img_y[1], img_x[0]:img_x[1]] = \ g[g_y[0]:g_y[1], g_x[0]:g_x[1]] # if self.use_different_joints_weight: # target_weight = np.multiply(target_weight, self.joints_weight) return target, target_weight def fliplr_joints(joints, joints_vis, width, matched_parts): """ flip coords """ # Flip horizontal joints[:, 0] = width - joints[:, 0] - 1 # Change left-right parts for pair in matched_parts: joints[pair[0], :], joints[pair[1], :] = \ joints[pair[1], :], joints[pair[0], :].copy() joints_vis[pair[0], :], joints_vis[pair[1], :] = \ joints_vis[pair[1], :], joints_vis[pair[0], :].copy() return joints*joints_vis, joints_vis def transform_preds(coords, center, scale, output_size): target_coords = np.zeros(coords.shape) trans = get_affine_transform(center, scale, 0, output_size, inv=1) for p in range(coords.shape[0]): target_coords[p, 0:2] = affine_transform(coords[p, 0:2], trans) return target_coords def get_affine_transform( center, scale, rot, output_size, shift=np.array([0, 0], dtype=np.float32), inv=0 ): if not isinstance(scale, np.ndarray) and not isinstance(scale, list): print(scale) scale = np.array([scale, scale]) scale_tmp = scale * 200.0 src_w = scale_tmp[0] dst_w = output_size[0] dst_h = output_size[1] rot_rad = np.pi * rot / 180 src_dir = get_dir([0, src_w * -0.5], rot_rad) dst_dir = np.array([0, dst_w * -0.5], np.float32) src = np.zeros((3, 2), dtype=np.float32) dst = np.zeros((3, 2), dtype=np.float32) src[0, :] = center + scale_tmp * shift src[1, :] = center + src_dir + scale_tmp * shift dst[0, :] = [dst_w * 0.5, dst_h * 0.5] dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir src[2:, :] = get_3rd_point(src[0, :], src[1, :]) dst[2:, :] = get_3rd_point(dst[0, :], dst[1, :]) if inv: trans = cv2.getAffineTransform(np.float32(dst), np.float32(src)) else: trans = cv2.getAffineTransform(np.float32(src), np.float32(dst)) return trans def affine_transform(pt, t): new_pt = np.array([pt[0], pt[1], 1.]).T new_pt = np.dot(t, new_pt) return new_pt[:2] def get_3rd_point(a, b): direct = a - b return b + np.array([-direct[1], direct[0]], dtype=np.float32) def get_dir(src_point, rot_rad): sn, cs = np.sin(rot_rad), np.cos(rot_rad) src_result = [0, 0] src_result[0] = src_point[0] * cs - src_point[1] * sn src_result[1] = src_point[0] * sn + src_point[1] * cs return src_result