# sys import os import sys import numpy as np import random import pickle import json # torch import torch import torch.nn as nn from torchvision import datasets, transforms # operation from . import tools class Feeder_UCF(torch.utils.data.Dataset): """ Feeder for skeleton-based action recognition in kinetics-skeleton dataset Arguments: data_path: the path to '.npy' data, the shape of data should be (N, C, T, V, M) label_path: the path to label random_choose: If true, randomly choose a portion of the input sequence random_shift: If true, randomly pad zeros at the begining or end of sequence random_move: If true, perform randomly but continuously changed transformation to input sequence window_size: The length of the output sequence pose_matching: If ture, match the pose between two frames num_person_in: The number of people the feeder can observe in the input sequence num_person_out: The number of people the feeder in the output sequence debug: If true, only use the first 100 samples """ def __init__(self, data_path, label_path, ignore_empty_sample=True, random_choose=False, random_shift=False, random_move=False, window_size=-1, pose_matching=False, num_person_in=5, num_person_out=2, debug=False): self.debug = debug self.data_path = data_path self.label_path = label_path self.random_choose = random_choose self.random_shift = random_shift self.random_move = random_move self.window_size = window_size self.num_person_in = num_person_in self.num_person_out = num_person_out self.pose_matching = pose_matching self.ignore_empty_sample = ignore_empty_sample self.load_data() def load_data(self): # load file list self.sample_name = os.listdir(self.data_path) if self.debug: self.sample_name = self.sample_name[0:2] # load label label_path = self.label_path with open(label_path) as f: label_info = json.load(f) sample_id = [name.split('.')[0] for name in self.sample_name] self.label = np.array( [label_info[id]['label_index'] for id in sample_id]) has_skeleton = np.array( [label_info[id]['has_skeleton'] for id in sample_id]) # ignore the samples which does not has skeleton sequence if self.ignore_empty_sample: self.sample_name = [ s for h, s in zip(has_skeleton, self.sample_name) if h ] self.label = self.label[has_skeleton] # output data shape (N, C, T, V, M) self.N = len(self.sample_name) #sample self.C = 3 #channel self.T = 90000 #frame self.V = 18 #joint self.M = self.num_person_out #person def __len__(self): return len(self.sample_name) def __iter__(self): return self def __getitem__(self, index): # output shape (C, T, V, M) # get data sample_name = self.sample_name[index] sample_path = os.path.join(self.data_path, sample_name) with open(sample_path, 'r') as f: video_info = json.load(f) # fill data_numpy data_numpy = np.zeros((self.C, self.T, self.V, self.num_person_in)) count = 0 for frame_info in video_info['data']: frame_index = frame_info['frame_index'] for m, skeleton_info in enumerate(frame_info["skeleton"]): if m >= self.num_person_in: break pose = skeleton_info['pose'] score = skeleton_info['score'] frame_index = int(frame_index) # print(frame_index) data_numpy[0, frame_index, :, m] = pose[0::2] data_numpy[1, frame_index, :, m] = pose[1::2] data_numpy[2, frame_index, :, m] = score # count += 1 # print(" ",count, " ") # centralization data_numpy[0:2] = data_numpy[0:2] - 0.5 data_numpy[0][data_numpy[2] == 0] = 0 data_numpy[1][data_numpy[2] == 0] = 0 # get & check label index label = video_info['label_index'] assert (self.label[index] == label) # data augmentation if self.random_shift: data_numpy = tools.random_shift(data_numpy) if self.random_choose: data_numpy = tools.random_choose(data_numpy, self.window_size) elif self.window_size > 0: data_numpy = tools.auto_pading(data_numpy, self.window_size) if self.random_move: data_numpy = tools.random_move(data_numpy) # sort by score sort_index = (-data_numpy[2, :, :, :].sum(axis=1)).argsort(axis=1) for t, s in enumerate(sort_index): data_numpy[:, t, :, :] = data_numpy[:, t, :, s].transpose((1, 2, 0)) data_numpy = data_numpy[:, :, :, 0:self.num_person_out] # match poses between 2 frames if self.pose_matching: data_numpy = tools.openpose_match(data_numpy) return data_numpy, label def top_k(self, score, top_k): assert (all(self.label >= 0)) rank = score.argsort() hit_top_k = [l in rank[i, -top_k:] for i, l in enumerate(self.label)] return sum(hit_top_k) * 1.0 / len(hit_top_k) def top_k_by_category(self, score, top_k): assert (all(self.label >= 0)) return tools.top_k_by_category(self.label, score, top_k) def calculate_recall_precision(self, score): assert (all(self.label >= 0)) return tools.calculate_recall_precision(self.label, score)