| | from operator import mod |
| | import os |
| | |
| | import imageio |
| | import shutil |
| | import numpy as np |
| | import torch |
| | from tqdm import tqdm |
| |
|
| | from scipy.spatial.transform import Rotation as R |
| | from mGPT.render.renderer import get_renderer |
| | from mGPT.render.rendermotion import render_video |
| | |
| | |
| |
|
| |
|
| | def parsename(path): |
| | basebane = os.path.basename(path) |
| | base = os.path.splitext(basebane)[0] |
| | strs = base.split('_') |
| | key = strs[-2] |
| | action = strs[-1] |
| | return key, action |
| |
|
| |
|
| | def load_anim(path, timesize=None): |
| | data = np.array(imageio.mimread(path, memtest=False)) |
| | if timesize is None: |
| | return data |
| |
|
| | |
| | |
| | |
| | alldata = data |
| |
|
| | |
| | if len(data.shape) == 3 and len(alldata.shape) == 4: |
| | data = data[:, None, :, :] |
| |
|
| | |
| | lenanim = data.shape[0] |
| | alldata[:lenanim] = data[:lenanim] |
| | return alldata |
| |
|
| |
|
| | def plot_3d_motion_dico(x): |
| | motion, length, save_path, params, kargs = x |
| | plot_3d_motion(motion, length, save_path, params, **kargs) |
| |
|
| |
|
| | def plot_3d_motion(motion, |
| | length, |
| | save_path, |
| | params, |
| | title="", |
| | interval=50, |
| | pred_cam=None, |
| | imgs=None, |
| | bbox=None, |
| | side=None): |
| | |
| | |
| | if motion.shape[1] == 6890: |
| | |
| | |
| | width = 600 |
| | height = 600 |
| | if pred_cam is None: |
| | |
| | cam = (0.8, 0.8, 0, 0.1) |
| | |
| | else: |
| | assert bbox is not None |
| | assert imgs is not None |
| |
|
| | |
| | |
| | |
| | |
| | cam = np.concatenate( |
| | (pred_cam[:, [0]], pred_cam[:, [0]], pred_cam[:, 1:3]), axis=1) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | cam_pose = np.eye(4) |
| | cam_pose[0:3, 0:3] = R.from_euler('x', -90, degrees=True).as_matrix() |
| | cam_pose[0:3, 3] = [0, 0, 0] |
| | if side: |
| | rz = np.eye(4) |
| | rz[0:3, 0:3] = R.from_euler('z', -90, degrees=True).as_matrix() |
| | cam_pose = np.matmul(rz, cam_pose) |
| |
|
| | |
| | |
| | |
| | backgrounds = imgs if imgs is not None else np.ones( |
| | (height, width, 3)) * 255 |
| | renderer = get_renderer(width, height, cam_pose) |
| |
|
| | |
| | meshes = motion |
| | key, action = parsename(save_path) |
| | render_video(meshes, |
| | key, |
| | action, |
| | renderer, |
| | save_path, |
| | backgrounds, |
| | cam_pose, |
| | cams=cam) |
| | return |
| |
|
| |
|
| | def stack_images(real, real_gens, gen, real_imgs=None): |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | nleft_cols = len(real_gens) + 1 |
| | print("Stacking frames..") |
| | allframes = np.concatenate( |
| | (real[:, None, ...], *[x[:, None, ...] for x in real_gens], gen), 1) |
| | nframes, nspa, nats, h, w, pix = allframes.shape |
| |
|
| | blackborder = np.zeros((w // 30, h * nats, pix), dtype=allframes.dtype) |
| | |
| | frames = [] |
| | for frame_idx in tqdm(range(nframes)): |
| | columns = np.vstack(allframes[frame_idx].transpose(1, 2, 3, 4, |
| | 0)).transpose( |
| | 3, 1, 0, 2) |
| | frame = np.concatenate( |
| | (*columns[0:nleft_cols], blackborder, *columns[nleft_cols:]), |
| | 0).transpose(1, 0, 2) |
| |
|
| | frames.append(frame) |
| |
|
| | if real_imgs is not None: |
| | resize_imgs = convert_img(real_imgs, h)[:nframes, ...] |
| |
|
| | for i in range(len(frames)): |
| | imgs = np.vstack(resize_imgs[i, ...]) |
| | imgs4 = np.ones( |
| | (imgs.shape[0], imgs.shape[1], 4), dtype=np.uint8) * 255 |
| | imgs4[:, :, :3] = imgs |
| | |
| | frames[i] = np.concatenate((imgs4, frames[i]), 1) |
| | return np.stack(frames) |
| |
|
| |
|
| | def stack_images_gen(gen, real_imgs=None): |
| | print("Stacking frames..") |
| | allframes = gen |
| | nframes, nspa, nats, h, w, pix = allframes.shape |
| | blackborder = np.zeros((w * nspa, h // 30, pix), dtype=allframes.dtype) |
| | blackborder = blackborder[None, ...].repeat(nats, |
| | axis=0).transpose(0, 2, 1, 3) |
| |
|
| | frames = [] |
| | for frame_idx in tqdm(range(nframes)): |
| | rows = np.vstack(allframes[frame_idx].transpose(0, 3, 2, 4, |
| | 1)).transpose( |
| | 3, 1, 0, 2) |
| | rows = np.concatenate((rows, blackborder), 1) |
| | frame = np.concatenate(rows, 0) |
| | frames.append(frame) |
| |
|
| | if real_imgs is not None: |
| | |
| | resize_imgs = convert_img(real_imgs, h)[:nframes, ...] |
| | for i in range(len(frames)): |
| | imgs = np.vstack(resize_imgs[i, ...]) |
| | |
| | frames[i] = np.concatenate((imgs, frames[i]), 1) |
| | return np.stack(frames) |
| |
|
| |
|
| | def generate_by_video(visualization, reconstructions, generation, |
| | label_to_action_name, params, nats, nspa, tmp_path): |
| | |
| | |
| | fps = params["fps"] |
| |
|
| | params = params.copy() |
| |
|
| | gen_only = False |
| | if visualization is None: |
| | gen_only = True |
| | outputkey = "output_vertices" |
| | params["pose_rep"] = "vertices" |
| | elif "output_vertices" in visualization: |
| | outputkey = "output_vertices" |
| | params["pose_rep"] = "vertices" |
| | elif "output_xyz" in visualization: |
| | outputkey = "output_xyz" |
| | params["pose_rep"] = "xyz" |
| | else: |
| | outputkey = "poses" |
| |
|
| | keep = [outputkey, 'lengths', "y"] |
| | gener = {key: generation[key].data.cpu().numpy() for key in keep} |
| | if not gen_only: |
| | visu = {key: visualization[key].data.cpu().numpy() for key in keep} |
| | recons = {} |
| | |
| | if 'vertices_hat' in reconstructions['ntf']: |
| | recons['regressor'] = { |
| | 'output_vertices': |
| | reconstructions['ntf']['vertices_hat'].data.cpu().numpy(), |
| | 'lengths': |
| | reconstructions['ntf']['lengths'].data.cpu().numpy(), |
| | 'y': |
| | reconstructions['ntf']['y'].data.cpu().numpy() |
| | } |
| |
|
| | recons['regressor_side'] = { |
| | 'output_vertices': |
| | reconstructions['ntf']['vertices_hat'].data.cpu().numpy(), |
| | 'lengths': |
| | reconstructions['ntf']['lengths'].data.cpu().numpy(), |
| | 'y': |
| | reconstructions['ntf']['y'].data.cpu().numpy(), |
| | 'side': |
| | True |
| | } |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | for mode, reconstruction in reconstructions.items(): |
| | recons[mode] = { |
| | key: reconstruction[key].data.cpu().numpy() |
| | for key in keep |
| | } |
| | recons[mode + '_side'] = { |
| | key: reconstruction[key].data.cpu().numpy() |
| | for key in keep |
| | } |
| | recons[mode + '_side']['side'] = True |
| |
|
| | |
| | |
| | lenmax = gener['lengths'].max() |
| | timesize = lenmax |
| |
|
| | import multiprocessing |
| |
|
| | def pool_job_with_desc(pool, iterator, desc, max_, save_path_format, isij): |
| | with tqdm(total=max_, desc=desc.format("Render")) as pbar: |
| | for data in iterator: |
| | plot_3d_motion_dico(data) |
| | |
| | |
| | if isij: |
| | array = np.stack([[ |
| | load_anim(save_path_format.format(i, j), timesize) |
| | for j in range(nats) |
| | ] for i in tqdm(range(nspa), desc=desc.format("Load"))]) |
| | return array.transpose(2, 0, 1, 3, 4, 5) |
| | else: |
| | array = np.stack([ |
| | load_anim(save_path_format.format(i), timesize) |
| | for i in tqdm(range(nats), desc=desc.format("Load")) |
| | ]) |
| | return array.transpose(1, 0, 2, 3, 4) |
| |
|
| | pool = None |
| | |
| | with multiprocessing.Pool() as pool: |
| | |
| | save_path_format = os.path.join(tmp_path, "gen_{}_{}.gif") |
| | iterator = ((gener[outputkey][i, j], gener['lengths'][i, j], |
| | save_path_format.format(i, j), params, { |
| | "title": |
| | f"gen: {label_to_action_name(gener['y'][i, j])}", |
| | "interval": 1000 / fps |
| | }) for j in range(nats) for i in range(nspa)) |
| | gener["frames"] = pool_job_with_desc(pool, iterator, |
| | "{} the generated samples", |
| | nats * nspa, save_path_format, |
| | True) |
| | if not gen_only: |
| | |
| | save_path_format = os.path.join(tmp_path, "real_{}.gif") |
| | iterator = ((visu[outputkey][i], visu['lengths'][i], |
| | save_path_format.format(i), params, { |
| | "title": |
| | f"real: {label_to_action_name(visu['y'][i])}", |
| | "interval": 1000 / fps |
| | }) for i in range(nats)) |
| | visu["frames"] = pool_job_with_desc(pool, iterator, |
| | "{} the real samples", nats, |
| | save_path_format, False) |
| | for mode, recon in recons.items(): |
| | |
| | save_path_format = os.path.join( |
| | tmp_path, f"reconstructed_{mode}_" + "{}.gif") |
| | if mode == 'overlap': |
| | iterator = (( |
| | recon[outputkey][i], recon['lengths'][i], |
| | save_path_format.format(i), params, { |
| | "title": |
| | f"recons: {label_to_action_name(recon['y'][i])}", |
| | "interval": 1000 / fps, |
| | "pred_cam": recon['cam'][i], |
| | "imgs": recon['imgs'][i], |
| | "bbox": recon['bbox'][i] |
| | }) for i in range(nats)) |
| | else: |
| | side = True if 'side' in recon.keys() else False |
| | iterator = (( |
| | recon[outputkey][i], recon['lengths'][i], |
| | save_path_format.format(i), params, { |
| | "title": |
| | f"recons: {label_to_action_name(recon['y'][i])}", |
| | "interval": 1000 / fps, |
| | "side": side |
| | }) for i in range(nats)) |
| | recon["frames"] = pool_job_with_desc( |
| | pool, iterator, "{} the reconstructed samples", nats, |
| | save_path_format, False) |
| | |
| | if not gen_only: |
| | input_imgs = visualization["imgs"] if visualization[ |
| | "imgs"] is not None else None |
| | vis = visu["frames"] if not gen_only else None |
| | rec = [recon["frames"] |
| | for recon in recons.values()] if not gen_only else None |
| | gen = gener["frames"] |
| | frames = stack_images(vis, rec, gen, input_imgs) |
| | else: |
| | gen = gener["frames"] |
| | frames = stack_images_gen(gen) |
| | return frames |
| |
|
| |
|
| | def viz_epoch(model, |
| | dataset, |
| | epoch, |
| | params, |
| | folder, |
| | module=None, |
| | writer=None, |
| | exps=''): |
| | """ Generate & viz samples """ |
| | module = model if module is None else module |
| |
|
| | |
| | model.outputxyz = True |
| |
|
| | print(f"Visualization of the epoch {epoch}") |
| |
|
| | noise_same_action = params["noise_same_action"] |
| | noise_diff_action = params["noise_diff_action"] |
| | duration_mode = params["duration_mode"] |
| | reconstruction_mode = params["reconstruction_mode"] |
| | decoder_test = params["decoder_test"] |
| |
|
| | fact = params["fact_latent"] |
| | figname = params["figname"].format(epoch) |
| |
|
| | nspa = params["num_samples_per_action"] |
| | nats = params["num_actions_to_sample"] |
| |
|
| | num_classes = params["num_classes"] |
| | |
| |
|
| | |
| | classes = torch.randperm(num_classes)[:nats] |
| | |
| | if nats > num_classes: |
| | classes = classes.expand(nats) |
| |
|
| | meandurations = torch.from_numpy( |
| | np.array([ |
| | round(dataset.get_mean_length_label(cl.item())) for cl in classes |
| | ])) |
| |
|
| | if duration_mode == "interpolate" or decoder_test == "diffduration": |
| | points, step = np.linspace(-nspa, nspa, nspa, retstep=True) |
| | |
| | points = np.array([5, 10, 16, 30, 60, 80]).astype(int) |
| | |
| | gendurations = torch.from_numpy(points[:, None]).expand( |
| | (nspa, 1)).repeat((1, nats)) |
| | else: |
| | gendurations = meandurations.repeat((nspa, 1)) |
| | print("Duration time: ") |
| | print(gendurations[:, 0]) |
| |
|
| | |
| | |
| | batch = dataset.get_label_sample_batch(classes.numpy()) |
| |
|
| | |
| | |
| | |
| | visualization = { |
| | "x": batch['x'].to(model.device), |
| | "y": classes.to(model.device), |
| | "mask": batch['mask'].to(model.device), |
| | 'lengths': batch['lengths'].to(model.device), |
| | "output": batch['x'].to(model.device), |
| | "theta": |
| | batch['theta'].to(model.device) if 'theta' in batch.keys() else None, |
| | "imgs": |
| | batch['imgs'].to(model.device) if 'imgs' in batch.keys() else None, |
| | "paths": batch['paths'] if 'paths' in batch.keys() else None, |
| | } |
| |
|
| | |
| | if reconstruction_mode == "both": |
| | reconstructions = { |
| | "tf": { |
| | "x": |
| | batch['x'].to(model.device), |
| | "y": |
| | classes.to(model.device), |
| | 'lengths': |
| | batch['lengths'].to(model.device), |
| | "mask": |
| | batch['mask'].to(model.device), |
| | "teacher_force": |
| | True, |
| | "theta": |
| | batch['theta'].to(model.device) |
| | if 'theta' in batch.keys() else None |
| | }, |
| | "ntf": { |
| | "x": |
| | batch['x'].to(model.device), |
| | "y": |
| | classes.to(model.device), |
| | 'lengths': |
| | batch['lengths'].to(model.device), |
| | "mask": |
| | batch['mask'].to(model.device), |
| | "theta": |
| | batch['theta'].to(model.device) |
| | if 'theta' in batch.keys() else None |
| | } |
| | } |
| | else: |
| | reconstructions = { |
| | reconstruction_mode: { |
| | "x": |
| | batch['x'].to(model.device), |
| | "y": |
| | classes.to(model.device), |
| | 'lengths': |
| | batch['lengths'].to(model.device), |
| | "mask": |
| | batch['mask'].to(model.device), |
| | "teacher_force": |
| | reconstruction_mode == "tf", |
| | "imgs": |
| | batch['imgs'].to(model.device) |
| | if 'imgs' in batch.keys() else None, |
| | "theta": |
| | batch['theta'].to(model.device) |
| | if 'theta' in batch.keys() else None, |
| | "bbox": |
| | batch['bbox'] if 'bbox' in batch.keys() else None |
| | } |
| | } |
| | print("Computing the samples poses..") |
| |
|
| | |
| | model.eval() |
| | with torch.no_grad(): |
| | |
| | for mode in reconstructions: |
| | |
| | reconstructions[mode] = model(reconstructions[mode]) |
| | reconstruction = reconstructions[list(reconstructions.keys())[0]] |
| |
|
| | if decoder_test == "gt": |
| | |
| | gt_input = { |
| | "x": batch['x'].repeat(nspa, 1, 1, 1).to(model.device), |
| | "y": classes.repeat(nspa).to(model.device), |
| | "mask": batch['mask'].repeat(nspa, 1).to(model.device), |
| | 'lengths': batch['lengths'].repeat(nspa).to(model.device) |
| | } |
| | generation = model(gt_input) |
| | if decoder_test == "new": |
| | |
| | generation = module.generate(gendurations, |
| | classes=classes, |
| | nspa=nspa, |
| | noise_same_action=noise_same_action, |
| | noise_diff_action=noise_diff_action, |
| | fact=fact) |
| | elif decoder_test == "diffaction": |
| | assert nats == nspa |
| | |
| | z = reconstruction["z"].repeat((nspa, 1)) |
| | mask = reconstruction["mask"].repeat((nspa, 1)) |
| | lengths = reconstruction['lengths'].repeat(nspa) |
| | |
| | y = classes.repeat_interleave(nspa).to(model.device) |
| | generation = {"z": z, "y": y, "mask": mask, 'lengths': lengths} |
| | model.decoder(generation) |
| |
|
| | elif decoder_test == "diffduration": |
| | z = reconstruction["z"].repeat((nspa, 1)) |
| | lengths = gendurations.reshape(-1).to(model.device) |
| | mask = model.lengths_to_mask(lengths) |
| | y = classes.repeat(nspa).to(model.device) |
| | generation = {"z": z, "y": y, "mask": mask, 'lengths': lengths} |
| | model.decoder(generation) |
| |
|
| | elif decoder_test == "interpolate_action": |
| | assert nats == nspa |
| | |
| | z_diff_action = torch.randn(1, |
| | model.latent_dim, |
| | device=model.device).repeat(nats, 1) |
| | z = z_diff_action.repeat((nspa, 1)) |
| |
|
| | |
| | y = F.one_hot(classes.to(model.device), |
| | model.num_classes).to(model.device) |
| | y_below = F.one_hot(torch.cat((classes[1:], classes[0:1])), |
| | model.num_classes).to(model.device) |
| | convex_factors = torch.linspace(0, 1, nspa, device=model.device) |
| | y_mixed = torch.einsum("nk,m->mnk", y, 1-convex_factors) + \ |
| | torch.einsum("nk,m->mnk", y_below, convex_factors) |
| | y_mixed = y_mixed.reshape(nspa * nats, y_mixed.shape[-1]) |
| |
|
| | durations = gendurations[0].to(model.device) |
| | durations_below = torch.cat((durations[1:], durations[0:1])) |
| |
|
| | gendurations = torch.einsum("l,k->kl", durations, 1-convex_factors) + \ |
| | torch.einsum("l,k->kl", durations_below, convex_factors) |
| | gendurations = gendurations.to(dtype=durations.dtype) |
| |
|
| | lengths = gendurations.to(model.device).reshape(z.shape[0]) |
| | mask = model.lengths_to_mask(lengths) |
| |
|
| | generation = { |
| | "z": z, |
| | "y": y_mixed, |
| | "mask": mask, |
| | 'lengths': lengths |
| | } |
| | generation = model.decoder(generation) |
| |
|
| | visualization = module.prepare(visualization) |
| | visualization["output_xyz"] = visualization["x_xyz"] |
| | visualization["output_vertices"] = visualization["x_vertices"] |
| | |
| | |
| | |
| | |
| | |
| |
|
| | for key, val in generation.items(): |
| | if len(generation[key].shape) == 1: |
| | generation[key] = val.reshape(nspa, nats) |
| | else: |
| | generation[key] = val.reshape(nspa, nats, *val.shape[1:]) |
| |
|
| | finalpath = os.path.join(folder, figname + exps + ".gif") |
| | tmp_path = os.path.join(folder, f"subfigures_{figname}") |
| | os.makedirs(tmp_path, exist_ok=True) |
| |
|
| | print("Generate the videos..") |
| | frames = generate_by_video(visualization, reconstructions, generation, |
| | dataset.label_to_action_name, params, nats, |
| | nspa, tmp_path) |
| |
|
| | print(f"Writing video {finalpath}") |
| | imageio.mimsave(finalpath.replace('gif', 'mp4'), frames, fps=params["fps"]) |
| | shutil.rmtree(tmp_path) |
| |
|
| | |
| | output = { |
| | "data_id": batch['id'], |
| | "paths": batch['paths'], |
| | "x": batch['x'].cpu().numpy(), |
| | "x_vertices": visualization["x_vertices"].cpu().numpy(), |
| | "output_vertices": |
| | reconstructions['ntf']["output_vertices"].cpu().numpy(), |
| | "gen_vertices": generation["output_vertices"].cpu().numpy() |
| | } |
| |
|
| | outputpath = finalpath.replace('gif', 'npy') |
| | np.save(outputpath, output) |
| |
|
| | |
| | batch_recon = reconstructions["ntf"] |
| | outputpath = finalpath.replace('gif', 'pkl') |
| | |
| |
|
| | if writer is not None: |
| | writer.add_video(f"Video/Epoch {epoch}", |
| | frames.transpose(0, 3, 1, 2)[None], |
| | epoch, |
| | fps=params["fps"]) |
| | return finalpath |
| |
|
| |
|
| | def viz_dataset(dataset, params, folder): |
| | """ Generate & viz samples """ |
| | print("Visualization of the dataset") |
| |
|
| | nspa = params["num_samples_per_action"] |
| | nats = params["num_actions_to_sample"] |
| |
|
| | num_classes = params["num_classes"] |
| |
|
| | figname = "{}_{}_numframes_{}_sampling_{}_step_{}".format( |
| | params["dataset"], params["pose_rep"], params["num_frames"], |
| | params["sampling"], params["sampling_step"]) |
| |
|
| | |
| | classes = torch.randperm(num_classes)[:nats] |
| |
|
| | allclasses = classes.repeat(nspa, 1).reshape(nspa * nats) |
| | |
| | real_samples, mask_real, real_lengths = dataset.get_label_sample_batch( |
| | allclasses.numpy()) |
| | |
| |
|
| | |
| | visualization = { |
| | "x": real_samples, |
| | "y": allclasses, |
| | "mask": mask_real, |
| | 'lengths': real_lengths, |
| | "output": real_samples |
| | } |
| |
|
| | from mGPT.models.rotation2xyz import Rotation2xyz |
| |
|
| | device = params["device"] |
| | rot2xyz = Rotation2xyz(device=device) |
| |
|
| | rot2xyz_params = { |
| | "pose_rep": params["pose_rep"], |
| | "glob_rot": params["glob_rot"], |
| | "glob": params["glob"], |
| | "jointstype": params["jointstype"], |
| | "translation": params["translation"] |
| | } |
| |
|
| | output = visualization["output"] |
| | visualization["output_xyz"] = rot2xyz(output.to(device), |
| | visualization["mask"].to(device), |
| | **rot2xyz_params) |
| |
|
| | for key, val in visualization.items(): |
| | if len(visualization[key].shape) == 1: |
| | visualization[key] = val.reshape(nspa, nats) |
| | else: |
| | visualization[key] = val.reshape(nspa, nats, *val.shape[1:]) |
| |
|
| | finalpath = os.path.join(folder, figname + ".gif") |
| | tmp_path = os.path.join(folder, f"subfigures_{figname}") |
| | os.makedirs(tmp_path, exist_ok=True) |
| |
|
| | print("Generate the videos..") |
| | frames = generate_by_video_sequences(visualization, |
| | dataset.label_to_action_name, params, |
| | nats, nspa, tmp_path) |
| |
|
| | print(f"Writing video {finalpath}..") |
| | imageio.mimsave(finalpath, frames, fps=params["fps"]) |
| |
|
| |
|
| | def generate_by_video_sequences(visualization, label_to_action_name, params, |
| | nats, nspa, tmp_path): |
| | |
| | |
| | fps = params["fps"] |
| | if "output_vetices" in visualization: |
| | outputkey = "output_vetices" |
| | params["pose_rep"] = "vertices" |
| | elif "output_xyz" in visualization: |
| | outputkey = "output_xyz" |
| | params["pose_rep"] = "xyz" |
| | else: |
| | outputkey = "poses" |
| |
|
| | keep = [outputkey, 'lengths', "y"] |
| | visu = {key: visualization[key].data.cpu().numpy() for key in keep} |
| | lenmax = visu['lengths'].max() |
| |
|
| | timesize = lenmax + 5 |
| |
|
| | |
| |
|
| | def pool_job_with_desc(pool, iterator, desc, max_, save_path_format): |
| | for data in iterator: |
| | plot_3d_motion_dico(data) |
| | |
| | |
| | |
| | array = np.stack([[ |
| | load_anim(save_path_format.format(i, j), timesize) |
| | for j in range(nats) |
| | ] for i in tqdm(range(nspa), desc=desc.format("Load"))]) |
| | return array.transpose(2, 0, 1, 3, 4, 5) |
| |
|
| | pool = None |
| | |
| | |
| | save_path_format = os.path.join(tmp_path, "real_{}_{}.gif") |
| | iterator = ((visu[outputkey][i, j], visu['lengths'][i, j], |
| | save_path_format.format(i, j), params, { |
| | "title": f"real: {label_to_action_name(visu['y'][i, j])}", |
| | "interval": 1000 / fps |
| | }) for j in range(nats) for i in range(nspa)) |
| | visu["frames"] = pool_job_with_desc(pool, iterator, "{} the real samples", |
| | nats, save_path_format) |
| | frames = stack_images_sequence(visu["frames"]) |
| | return frames |
| |
|
| |
|
| | def stack_images_sequence(visu): |
| | print("Stacking frames..") |
| | allframes = visu |
| | nframes, nspa, nats, h, w, pix = allframes.shape |
| | frames = [] |
| | for frame_idx in tqdm(range(nframes)): |
| | columns = np.vstack(allframes[frame_idx].transpose(1, 2, 3, 4, |
| | 0)).transpose( |
| | 3, 1, 0, 2) |
| | frame = np.concatenate(columns).transpose(1, 0, 2) |
| | frames.append(frame) |
| | return np.stack(frames) |
| |
|