Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| from s3d_floorplan_eval.S3DLoader.s3d_utils import generate_floorplan, parse_floor_plan_polys | |
| from torch.utils.data import DataLoader, Dataset | |
| class S3DLoader(object): | |
| def __init__(self, args, mode, generate_input_candidates=False): | |
| self.mode = mode | |
| self.seed = 8978 | |
| np.random.seed(seed=self.seed) | |
| if hasattr(args, "network_mode"): | |
| self.function_mode = args.network_mode | |
| else: | |
| self.function_mode = "S" | |
| if hasattr(args, "batch_size"): | |
| self.batch_size = args.batch_size | |
| else: | |
| self.batch_size = 1 | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print("Selected device is:", device) | |
| self.device = device | |
| if mode == "train": | |
| self.dataset = self.create_dataset(args, mode, generate_input_candidates) | |
| self.augment = True | |
| self.data = DataLoader( | |
| self.dataset, self.batch_size, drop_last=True, collate_fn=self.collate_fn, shuffle=True | |
| ) | |
| self.sample_n = len(self.dataset) | |
| elif mode == "online_eval" or mode == "test": | |
| self.dataset = self.create_dataset(args, mode, generate_input_candidates) | |
| self.augment = False | |
| self.sample_n = len(self.dataset) | |
| self.data = DataLoader(self.dataset, self.batch_size, drop_last=True, collate_fn=self.collate_fn) | |
| elif mode == "test": | |
| self.dataset = self.create_dataset(args, mode) | |
| self.augment = False | |
| self.batch_size = 1 | |
| self.sample_n = 20 | |
| self.data = DataLoader( | |
| self.dataset, self.batch_size, num_workers=1, drop_last=True, collate_fn=self.collate_fn | |
| ) | |
| # elif mode == 'test': | |
| # self.dataset = self.create_dataset(args, mode) | |
| # self.augment = False | |
| # | |
| # self.data = DataLoader(self.dataset, | |
| # 1, | |
| # shuffle=False, | |
| # num_workers=1) | |
| # self.sample_n = 20 | |
| else: | |
| print("mode should be one of 'train, test, online_eval'. Got {}".format(mode)) | |
| def collate_fn(self, samples): | |
| # wall_maps = [torch.tensor(s["wall_map"][None,:,:,None], device=self.device) for s in samples] | |
| room_maps = [torch.tensor(s["room_map"][None, :, :, None], device=self.device) for s in samples] | |
| input_maps = [torch.tensor(s["input_map"][None], device=self.device) for s in samples] | |
| scores = [torch.tensor(s["score"][None], device=self.device) for s in samples] | |
| torch_sample = {} | |
| torch_sample["room_map"] = torch.cat(room_maps, dim=0) | |
| # torch_sample["wall_map"] = torch.cat(wall_maps, dim=0) | |
| torch_sample["input_map"] = torch.cat(input_maps, dim=0) | |
| torch_sample["score"] = torch.cat(scores, dim=0) | |
| for key, value in torch_sample.items(): | |
| assert torch.all(torch_sample[key] == torch_sample[key]) | |
| assert torch.all(torch.logical_not(torch.isinf(torch_sample[key]))) | |
| return torch_sample | |
| def create_dataset(self, args, mode, generate_input_candidates): | |
| self.args = args | |
| dataset_path = args.dataset_path | |
| if mode == "train": | |
| scenes_path = os.path.join(dataset_path, "train") | |
| dataset = S3DDataset( | |
| args, | |
| scenes_path, | |
| None, | |
| num_scenes=3000, | |
| generate_input_candidates=generate_input_candidates, | |
| mode=mode, | |
| ) | |
| elif mode == "online_eval": | |
| scenes_path = os.path.join(dataset_path, "val") | |
| dataset = S3DDataset( | |
| args, scenes_path, None, num_scenes=250, generate_input_candidates=generate_input_candidates, mode=mode | |
| ) | |
| elif mode == "test": | |
| scenes_path = os.path.join(dataset_path, "test") | |
| dataset = S3DDataset( | |
| args, scenes_path, None, num_scenes=250, generate_input_candidates=generate_input_candidates, mode=mode | |
| ) | |
| return dataset | |
| def load_sample(self, sample_batch): | |
| """ | |
| Identity function. Everything is already loaded in Dataset class for Structured 3D | |
| :param sample_batch: | |
| :return: | |
| """ | |
| return sample_batch | |
| class S3DDataset(Dataset): | |
| def __init__(self, options, scenes_path, score_gen, num_scenes, generate_input_candidates, mode): | |
| print("Creating Structured3D Dataset with %d scenes..." % num_scenes) | |
| self.options = options | |
| self.score_gen = None | |
| self.mode = mode | |
| self.scenes_path = scenes_path | |
| self.floor_data_folder_name = "" | |
| self.scenes_list = os.listdir(scenes_path) | |
| self.scenes_list.sort() | |
| inv_scenes = [".DS_Store", "scene_01155", "scene_01852", "scene_01192", "scene_01816"] | |
| self.scenes_list = [s for s in self.scenes_list if s not in inv_scenes] | |
| self.scenes_list = self.scenes_list[:num_scenes] | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.device = device | |
| self.gen_input_candidates = generate_input_candidates | |
| def __getitem__(self, item): | |
| scene_name = self.scenes_list[item] | |
| sample = self.load_scene(scene_name) | |
| return sample | |
| def __len__(self): | |
| return len(self.scenes_list) | |
| def load_density_map(self, sp): | |
| """ | |
| Load density map | |
| :param sp: | |
| :return: | |
| """ | |
| density_path = os.path.join(sp, self.floor_data_folder_name, "density.png") | |
| density_map = cv2.imread(density_path, cv2.IMREAD_ANYCOLOR | cv2.IMREAD_ANYDEPTH) / 255.0 | |
| if self.gen_input_candidates: | |
| thresh = np.maximum(np.random.random(), 0.8) | |
| density_map = np.minimum(density_map, thresh) / thresh | |
| if self.mode != "test": | |
| pow = np.random.random() | |
| pow = (1.5 - 1.0) * (pow - 1) + 1.5 | |
| density_map = density_map**pow | |
| return density_map.astype(np.float32) | |
| def load_annotation(self, sp): | |
| """ | |
| Load annotation dict | |
| :param sp: | |
| :return: | |
| :rtype: dict | |
| """ | |
| anno_path = os.path.join(sp, self.floor_data_folder_name, "annotation_3d.json") | |
| with open(anno_path, "r") as f: | |
| anno_dict = json.load(f) | |
| return anno_dict | |
| def load_scene(self, scene_name): | |
| """ | |
| Load scene | |
| :param scene_name: | |
| :return: | |
| """ | |
| def cvt_tmp_sample_to_torch(): | |
| torch_sample = {} | |
| room_map = torch.tensor(np.array(sample["room_map"]), device=self.device)[None] | |
| torch_sample["room_map"] = room_map | |
| if "input_map" in sample.keys(): | |
| torch_sample["input_map"] = torch.tensor(np.array(sample["input_map"]), device=self.device)[None] | |
| torch_sample["cand_inst"] = torch.tensor(np.array(sample["cand_inst"]), device=self.device)[None] | |
| torch_sample["cand_confidence"] = torch.tensor( | |
| np.array(sample["cand_confidence"]), device=self.device | |
| )[None] | |
| else: | |
| torch_sample["density_map"] = torch.tensor(np.array(sample["density_map"]), device=self.device)[None] | |
| torch_sample["wall_map"] = torch.tensor(np.array(sample["wall_map"]), device=self.device)[None] | |
| torch_sample["polygons_list"] = [ | |
| torch.tensor(poly, device=self.device)[None] for poly in sample["polygons_list"] | |
| ] | |
| return torch_sample | |
| sp = os.path.join(self.scenes_path, scene_name) | |
| sample = {} | |
| sample["scene_name"] = scene_name | |
| scene_anno = self.load_annotation(sp) | |
| # density_map = torch.tensor(np.array(density_map))[None] | |
| density_map = self.load_density_map(sp) | |
| self.generate_room_map(sample, scene_anno, density_map) | |
| sample["density_map"] = density_map | |
| # import pdb; pdb.set_trace() | |
| for key, value in sample.items(): | |
| assert np.all(value == value), "%s contains NaN" % key | |
| # import matplotlib.pyplot as plt | |
| # plt.figure() | |
| # plt.subplot(131) | |
| # plt.title(scene_name) | |
| # plt.imshow(density_map) | |
| # plt.subplot(132) | |
| # plt.imshow(sample["room_map"]) | |
| # plt.subplot(133) | |
| # # plt.imshow(sample["input_map"][:,:,1]) | |
| # # plt.imshow(sample["cand_inst"][:,:,0]) | |
| # plt.show() | |
| return sample | |
| def generate_room_map(self, sample, annos, density_map): | |
| """ | |
| :param density_map: | |
| :param sample: | |
| :param annos: | |
| :return: | |
| """ | |
| h, w = density_map.shape | |
| polys = parse_floor_plan_polys(annos) | |
| room_map, polygons_list, polygons_type_list = generate_floorplan( | |
| annos, | |
| polys, | |
| h, | |
| w, | |
| ignore_types=["outwall", "door", "window"], | |
| constant_color=False, | |
| shuffle=self.gen_input_candidates, | |
| ) | |
| room_map = cv2.dilate(room_map, np.ones((5, 5))) | |
| wall_map, _, _ = generate_floorplan( | |
| annos, polys, h, w, ignore_types=[], include_types=["outwall"], constant_color=True | |
| ) | |
| wall_map *= room_map == 0 | |
| window_doors_map, window_doors_list, window_doors_type_list = generate_floorplan( | |
| annos, | |
| polys, | |
| h, | |
| w, | |
| ignore_types=[], | |
| include_types=["door", "window"], | |
| fillpoly=False, | |
| constant_color=True, | |
| shuffle=self.gen_input_candidates, | |
| ) | |
| sample["room_map"] = room_map.astype(np.float32) | |
| sample["wall_map"] = wall_map.astype(np.float32) | |
| sample["polygons_list"] = polygons_list | |
| sample["polygons_type_list"] = polygons_type_list | |
| sample["window_doors_list"] = window_doors_list | |
| sample["window_doors_type_list"] = window_doors_type_list | |
| def generate_density(self, points, width=256, height=256): | |
| image_res_tensor = torch.tensor([width, height], device=self.device).reshape(1, 1, 2) | |
| coordinates = torch.round(points[:, :, :2] * image_res_tensor) | |
| coordinates = torch.minimum( | |
| torch.maximum(coordinates, torch.zeros_like(image_res_tensor)), image_res_tensor - 1 | |
| ).type(torch.cuda.LongTensor) | |
| density = torch.zeros((self.batch_size, height, width), dtype=torch.float, device=self.device) | |
| for i in range(self.batch_size): | |
| unique_coordinates, counts = torch.unique(coordinates[i], return_counts=True, dim=0) | |
| density[i, unique_coordinates[:, 1], unique_coordinates[:, 0]] = counts.type(torch.cuda.FloatTensor) | |
| density[i] = density[i] / torch.max(density[i]) | |
| return density | |