codewraith / data /source_files /messy /21bbf74d2d8b.py
slenk's picture
Upload folder using huggingface_hub
eeef81e verified
# sys
import os
import sys
import numpy as np
import random
import pickle
import json
# torch
import torch
import torch.nn as nn
from torchvision import datasets, transforms
# operation
from . import tools
class Feeder_UCF(torch.utils.data.Dataset):
""" Feeder for skeleton-based action recognition in kinetics-skeleton dataset
Arguments:
data_path: the path to '.npy' data, the shape of data should be (N, C, T, V, M)
label_path: the path to label
random_choose: If true, randomly choose a portion of the input sequence
random_shift: If true, randomly pad zeros at the begining or end of sequence
random_move: If true, perform randomly but continuously changed transformation to input sequence
window_size: The length of the output sequence
pose_matching: If ture, match the pose between two frames
num_person_in: The number of people the feeder can observe in the input sequence
num_person_out: The number of people the feeder in the output sequence
debug: If true, only use the first 100 samples
"""
def __init__(self,
data_path,
label_path,
ignore_empty_sample=True,
random_choose=False,
random_shift=False,
random_move=False,
window_size=-1,
pose_matching=False,
num_person_in=5,
num_person_out=2,
debug=False):
self.debug = debug
self.data_path = data_path
self.label_path = label_path
self.random_choose = random_choose
self.random_shift = random_shift
self.random_move = random_move
self.window_size = window_size
self.num_person_in = num_person_in
self.num_person_out = num_person_out
self.pose_matching = pose_matching
self.ignore_empty_sample = ignore_empty_sample
self.load_data()
def load_data(self):
# load file list
self.sample_name = os.listdir(self.data_path)
if self.debug:
self.sample_name = self.sample_name[0:2]
# load label
label_path = self.label_path
with open(label_path) as f:
label_info = json.load(f)
sample_id = [name.split('.')[0] for name in self.sample_name]
self.label = np.array(
[label_info[id]['label_index'] for id in sample_id])
has_skeleton = np.array(
[label_info[id]['has_skeleton'] for id in sample_id])
# ignore the samples which does not has skeleton sequence
if self.ignore_empty_sample:
self.sample_name = [
s for h, s in zip(has_skeleton, self.sample_name) if h
]
self.label = self.label[has_skeleton]
# output data shape (N, C, T, V, M)
self.N = len(self.sample_name) #sample
self.C = 3 #channel
self.T = 90000 #frame
self.V = 18 #joint
self.M = self.num_person_out #person
def __len__(self):
return len(self.sample_name)
def __iter__(self):
return self
def __getitem__(self, index):
# output shape (C, T, V, M)
# get data
sample_name = self.sample_name[index]
sample_path = os.path.join(self.data_path, sample_name)
with open(sample_path, 'r') as f:
video_info = json.load(f)
# fill data_numpy
data_numpy = np.zeros((self.C, self.T, self.V, self.num_person_in))
count = 0
for frame_info in video_info['data']:
frame_index = frame_info['frame_index']
for m, skeleton_info in enumerate(frame_info["skeleton"]):
if m >= self.num_person_in:
break
pose = skeleton_info['pose']
score = skeleton_info['score']
frame_index = int(frame_index)
# print(frame_index)
data_numpy[0, frame_index, :, m] = pose[0::2]
data_numpy[1, frame_index, :, m] = pose[1::2]
data_numpy[2, frame_index, :, m] = score
# count += 1
# print(" ",count, " ")
# centralization
data_numpy[0:2] = data_numpy[0:2] - 0.5
data_numpy[0][data_numpy[2] == 0] = 0
data_numpy[1][data_numpy[2] == 0] = 0
# get & check label index
label = video_info['label_index']
assert (self.label[index] == label)
# data augmentation
if self.random_shift:
data_numpy = tools.random_shift(data_numpy)
if self.random_choose:
data_numpy = tools.random_choose(data_numpy, self.window_size)
elif self.window_size > 0:
data_numpy = tools.auto_pading(data_numpy, self.window_size)
if self.random_move:
data_numpy = tools.random_move(data_numpy)
# sort by score
sort_index = (-data_numpy[2, :, :, :].sum(axis=1)).argsort(axis=1)
for t, s in enumerate(sort_index):
data_numpy[:, t, :, :] = data_numpy[:, t, :, s].transpose((1, 2,
0))
data_numpy = data_numpy[:, :, :, 0:self.num_person_out]
# match poses between 2 frames
if self.pose_matching:
data_numpy = tools.openpose_match(data_numpy)
return data_numpy, label
def top_k(self, score, top_k):
assert (all(self.label >= 0))
rank = score.argsort()
hit_top_k = [l in rank[i, -top_k:] for i, l in enumerate(self.label)]
return sum(hit_top_k) * 1.0 / len(hit_top_k)
def top_k_by_category(self, score, top_k):
assert (all(self.label >= 0))
return tools.top_k_by_category(self.label, score, top_k)
def calculate_recall_precision(self, score):
assert (all(self.label >= 0))
return tools.calculate_recall_precision(self.label, score)