| import copy |
| import os |
| import pickle |
| import random |
| import sys |
| from pathlib import Path |
|
|
| import numpy as np |
| import open3d as o3 |
|
|
| _REPO_ROOT = Path(__file__).resolve().parents[1] |
| if str(_REPO_ROOT) not in sys.path: |
| sys.path.insert(0, str(_REPO_ROOT)) |
|
|
| from tools import augmentation, data, transformations |
|
|
| _SIM_DATA = _REPO_ROOT / "data" / "simulators" |
| ''' |
| This module provides functions to generate a dataset of point clouds with random transformations, with options for noise, outliers, and occlusions. |
| It also includes functions to check the shape of the data and to generate a data dictionary for training and testing, |
| and a function to combine multiple dataset dictionaries. |
| ''' |
|
|
| def generate_dataset(pcd, pcdPath, cadPath, num_transformation, angles, translation_range, index, noise_level = 0, outlier_level = 0, outlier_bounds = (-10, 10), occ_level = 0, save_dir=None): |
| ''' |
| A function to generate a dataset of point clouds with random transformations. |
| |
| Args: |
| pcd (open3d.geometry.PointCloud): The source point cloud |
| pcdPath (str): The path to the source point cloud |
| cadPath (str): The path to the target point cloud |
| num_transformation (int): The number of transformations to generate |
| angles (numpy.ndarray): The range of angles for the random transformations |
| translation_range (tuple): The range of translations for the random transformations |
| index (int): The index to start saving the generated dataset |
| noise_level (float): The level of noise to add to the point clouds |
| outlier_level (float): The level of outliers to add to the point clouds |
| occ_level (float): The level of occlusions to add to the point clouds |
| save (bool): A flag to save the generated dataset |
| |
| Returns: |
| None |
| ''' |
| np.random.seed(42) |
| target_list = [] |
| gt_transformation_list = [] |
|
|
| for i in range(num_transformation): |
| |
| x_angle= np.random.uniform(angles[0], angles[-1], size=1) |
| y_angle= np.random.uniform(angles[0], angles[-1], size=1) |
| z_angle= np.random.uniform(angles[0], angles[-1], size=1) |
| gt_transformation = transformations.create_transformation(x_angle, y_angle, z_angle, translation_range) |
|
|
| target = copy.deepcopy(pcd) |
| target.transform(gt_transformation) |
|
|
| if noise_level != 0: |
| target = augmentation.apply_noise(target, noise_level) |
| print('Noise applied') |
|
|
| if outlier_level != 0 or occ_level != 0: |
| _, another_cad = data.load_data(pcdPath, cadPath, every_k_points=1) |
| target = copy.deepcopy(another_cad).transform(gt_transformation) |
| if occ_level != 0: |
| target, _ = augmentation.apply_occlusion(target, occ_level) |
| print('Occlusion applied') |
| if outlier_level != 0: |
| target = augmentation.add_outliers(target, outlier_level, outlier_lowerbound=outlier_bounds[0], outlier_upperbound=outlier_bounds[1]) |
| print('Outliers applied') |
|
|
| |
| if len(target.points) >= len(pcd.points): |
| np.random.seed(42) |
| target_points = np.asarray(target.points) |
| indices = np.random.choice(len(target_points), 1441, replace=False) |
| sampled_points = target_points[indices] |
| target.points = o3.utility.Vector3dVector(sampled_points) |
| else: |
| print('Target has fewer points than source and can\'t be downsampled to the same length.') |
|
|
| print(f'size of source and target: {len(pcd.points)}, {len(target.points)}') |
| target_list.append(target) |
| gt_transformation_list.append(gt_transformation) |
|
|
| |
| if save_dir is not None: |
| if not os.path.exists(save_dir): |
| os.makedirs(save_dir) |
|
|
| for i, (target, transformation) in enumerate(zip(target_list, gt_transformation_list)): |
| target_path = os.path.join(save_dir, f"target_{i+index}.pcd") |
| transformation_path = os.path.join(save_dir, f"transformation_{i+index}.npy") |
| o3.io.write_point_cloud(target_path, target) |
| np.save(transformation_path, transformation) |
|
|
| def check_shape(data, expected_shape_3d, expected_shape_6d): |
| return data.shape == expected_shape_3d or data.shape == expected_shape_6d |
|
|
| def generate_dataset_dict(source, dataset_size, index, output_train_file_path, output_test_file_path, source_normals = None): |
| ''' |
| This function shuffles the dataset and generates a data_dict for the training and testing data following the pattern acceptable to Learning3D. |
| |
| Args: |
| source (open3d.geometry.PointCloud): The source point cloud |
| dataset_size (int): The size of the dataset |
| |
| Returns: |
| None |
| ''' |
| np.random.seed(42) |
| transformed_pcds = [] |
| gt_transformations = [] |
|
|
| |
| for i in range(index,index+dataset_size): |
| transformed_pcd = o3.io.read_point_cloud(str(_SIM_DATA / f"target_{i}.pcd")) |
| gt_transformation = np.load(str(_SIM_DATA / f"transformation_{i}.npy")) |
|
|
| if source_normals is not None: |
| M = np.linalg.inv(gt_transformation).T |
| target_normals = np.dot(source_normals, M[:3,:3]) |
| transformed_points = np.concatenate((np.asarray(transformed_pcd.points), target_normals), axis=1) |
| else: |
| transformed_points = np.asarray(transformed_pcd.points).astype(np.float32) |
|
|
| transformed_pcds.append(transformed_points) |
| gt_transformations.append(gt_transformation) |
|
|
| |
| temp = list(zip(transformed_pcds, gt_transformations)) |
| random.shuffle(temp) |
| transformed_pcds, gt_transformations = zip(*temp) |
|
|
| |
| transformed_pcds_np = np.array(transformed_pcds) |
| gt_transformations_np = np.array(gt_transformations) |
|
|
| if source_normals is not None: |
| source = np.concatenate((np.asarray(source.points), source_normals), axis=1) |
| else: |
| source = np.asarray(source.points).astype(np.float32) |
|
|
| data_dict = { |
| 'template': np.tile(source, (dataset_size, 1, 1)), |
| 'source': transformed_pcds_np, |
| 'transformation': gt_transformations_np |
| } |
|
|
| |
| train_size = int(0.8 * dataset_size) |
| test_size = dataset_size - train_size |
| num_points = len(source) |
|
|
| data_dict_train = {} |
| data_dict_test = {} |
| for key in data_dict.keys(): |
| data_dict_train[key] = data_dict[key][0:train_size] |
| data_dict_test[key] = data_dict[key][train_size:] |
| |
| assert set(data_dict_train.keys()) == {'template', 'source', 'transformation'} |
| assert set(data_dict_test.keys()) == {'template', 'source', 'transformation'} |
|
|
| expected_shape_3d_train = (train_size, num_points, 3) |
| expected_shape_6d_train = (train_size, num_points, 6) |
|
|
| assert check_shape(data_dict_train['template'], expected_shape_3d_train, expected_shape_6d_train), f"Expected shape: {expected_shape_3d_train} or {expected_shape_6d_train}, but got {data_dict_train['template'].shape}" |
| assert check_shape(data_dict_train['source'], expected_shape_3d_train, expected_shape_6d_train), f"Expected shape: {expected_shape_3d_train} or {expected_shape_6d_train}, but got {data_dict_train['source'].shape}" |
| assert data_dict_train['transformation'].shape == (train_size, 4, 4), f"Expected shape: {(train_size, 4, 4)}, but got {data_dict_train['transformation'].shape}" |
|
|
| expected_shape_3d_test = (test_size, num_points, 3) |
| expected_shape_6d_test = (test_size, num_points, 6) |
|
|
| assert check_shape(data_dict_test['template'], expected_shape_3d_test, expected_shape_6d_test), f"Expected shape: {expected_shape_3d_test} or {expected_shape_6d_test}, but got {data_dict_test['template'].shape}" |
| assert check_shape(data_dict_test['source'], expected_shape_3d_test, expected_shape_6d_test), f"Expected shape: {expected_shape_3d_test} or {expected_shape_6d_test}, but got {data_dict_test['source'].shape}" |
| assert data_dict_test['transformation'].shape == (test_size, 4, 4), f"Expected shape: {(test_size, 4, 4)}, but got {data_dict_test['transformation'].shape}" |
| |
| with open(output_train_file_path, 'wb') as f: |
| pickle.dump(data_dict_train, f) |
| print(f"train_dict saved to {output_train_file_path}") |
|
|
| with open(output_test_file_path, 'wb') as f: |
| pickle.dump(data_dict_test, f) |
| print(f"test_dict saved to {output_test_file_path}") |
|
|
|
|
| def combine_dataset_dict(train_files, test_files, output_train_file_path, output_test_file_path): |
| ''' |
| Combine and shuffle dictionaries from multiple files. |
| |
| Args: |
| train_files (list of str): List of file paths to training dictionaries. |
| test_files (list of str): List of file paths to testing dictionaries. |
| output_train_file (str): Output file path for the combined training dictionary. |
| output_test_file (str): Output file path for the combined testing dictionary. |
| ''' |
| |
| |
| train_dicts = [pickle.load(open(file, 'rb')) for file in train_files] |
| test_dicts = [pickle.load(open(file, 'rb')) for file in test_files] |
|
|
| |
| combined_train_dict = {} |
| combined_test_dict = {} |
|
|
| for key in train_dicts[0].keys(): |
| combined_train_dict[key] = np.concatenate([d[key] for d in train_dicts], axis=0) |
| combined_test_dict[key] = np.concatenate([d[key] for d in test_dicts], axis=0) |
|
|
| |
| train_combined_list = list(zip(combined_train_dict['template'], combined_train_dict['source'], combined_train_dict['transformation'])) |
| test_combined_list = list(zip(combined_test_dict['template'], combined_test_dict['source'], combined_test_dict['transformation'])) |
|
|
| random.shuffle(train_combined_list) |
| random.shuffle(test_combined_list) |
|
|
| combined_train_dict['template'], combined_train_dict['source'], combined_train_dict['transformation'] = zip(*train_combined_list) |
| combined_test_dict['template'], combined_test_dict['source'], combined_test_dict['transformation'] = zip(*test_combined_list) |
|
|
| |
| combined_train_dict['template'] = np.array(combined_train_dict['template']) |
| combined_train_dict['source'] = np.array(combined_train_dict['source']) |
| combined_train_dict['transformation'] = np.array(combined_train_dict['transformation']) |
|
|
| combined_test_dict['template'] = np.array(combined_test_dict['template']) |
| combined_test_dict['source'] = np.array(combined_test_dict['source']) |
| combined_test_dict['transformation'] = np.array(combined_test_dict['transformation']) |
|
|
| |
| train_size = len(combined_train_dict['source']) |
| test_size = len(combined_test_dict['source']) |
| num_points = combined_train_dict['source'].shape[1] |
| |
| assert set(combined_train_dict.keys()) == {'template', 'source', 'transformation'} |
| assert set(combined_test_dict.keys()) == {'template', 'source', 'transformation'} |
|
|
| expected_shape_3d_train = (train_size, num_points, 3) |
| expected_shape_6d_train = (train_size, num_points, 6) |
|
|
| assert check_shape(combined_train_dict['template'], expected_shape_3d_train, expected_shape_6d_train), f"Expected shape: {expected_shape_3d_train} or {expected_shape_6d_train}, but got {combined_train_dict['template'].shape}" |
| assert check_shape(combined_train_dict['source'], expected_shape_3d_train, expected_shape_6d_train), f"Expected shape: {expected_shape_3d_train} or {expected_shape_6d_train}, but got {combined_train_dict['source'].shape}" |
| assert combined_train_dict['transformation'].shape == (train_size, 4, 4), f"Expected shape: {(train_size, 4, 4)}, but got {combined_train_dict['transformation'].shape}" |
|
|
| expected_shape_3d_test = (test_size, num_points, 3) |
| expected_shape_6d_test = (test_size, num_points, 6) |
|
|
| assert check_shape(combined_test_dict['template'], expected_shape_3d_test, expected_shape_6d_test), f"Expected shape: {expected_shape_3d_test} or {expected_shape_6d_test}, but got {combined_test_dict['template'].shape}" |
| assert check_shape(combined_test_dict['source'], expected_shape_3d_test, expected_shape_6d_test), f"Expected shape: {expected_shape_3d_test} or {expected_shape_6d_test}, but got {combined_test_dict['source'].shape}" |
| assert combined_test_dict['transformation'].shape == (test_size, 4, 4), f"Expected shape: {(test_size, 4, 4)}, but got {combined_test_dict['transformation'].shape}" |
| |
| |
| with open(output_train_file_path, 'wb') as f: |
| pickle.dump(combined_train_dict, f) |
| print(f"combined_train_dict saved to {output_train_file_path}") |
|
|
| with open(output_test_file_path, 'wb') as f: |
| pickle.dump(combined_test_dict, f) |
| print(f"combined_test_dict saved to {output_train_file_path}") |
|
|