import os import argparse import ntpath from tqdm import tqdm import multiprocessing from pathlib import Path from glob import glob import trimesh from trimesh.sample import sample_surface from plyfile import PlyData, PlyElement import numpy as np def write_ply(points, filename, text=False): """ input: Nx3, write points to filename as PLY format. """ points = [(points[i,0], points[i,1], points[i,2]) for i in range(points.shape[0])] vertex = np.array(points, dtype=[('x', 'f4'), ('y', 'f4'),('z', 'f4')]) el = PlyElement.describe(vertex, 'vertex', comments=['vertices']) with open(filename, mode='wb') as f: PlyData([el], text=text).write(f) def find_files(folder, extension): return sorted([Path(os.path.join(folder, f)) for f in os.listdir(folder) if f.endswith(extension)]) class SamplePoints: """ Perform sampleing of points. """ def __init__(self): """ Constructor. """ parser = self.get_parser() self.options = parser.parse_args() def get_parser(self): """ Get parser of tool. :return: parser """ parser = argparse.ArgumentParser(description='Scale a set of meshes stored as OFF files.') parser.add_argument('--in_dir', type=str, help='Path to input directory.') parser.add_argument('--out_dir', type=str, help='Path to output directory; files within are overwritten!') parser.add_argument("--single-file", action='store_true', default=False) return parser def run_parallel(self, project_folder): out_folder = os.path.join(project_folder, self.options.out_dir) if not os.path.exists(out_folder): os.makedirs(out_folder) files = find_files(project_folder, 'final.stl') for filepath in files: N_POINTS = 2000 try: out_mesh = trimesh.load(str(filepath)) out_pc, _ = sample_surface(out_mesh, N_POINTS) save_path = os.path.join(out_folder, ntpath.basename(filepath)[:-4]+'_pcd.ply') write_ply(out_pc, save_path) except Exception as ex: return project_folder return def run(self): """ Run simplification. """ if self.options.single_file: self.run_parallel(self.options.in_dir) else: project_folders = sorted(glob(self.options.in_dir+'/*/')) num_cpus = multiprocessing.cpu_count() convert_iter = multiprocessing.Pool(num_cpus).imap(self.run_parallel, project_folders) for _ in tqdm(convert_iter, total=len(project_folders)): pass if __name__ == '__main__': app = SamplePoints() app.run()