| import json |
| import multiprocessing |
| import subprocess |
| import time |
| from dataclasses import dataclass |
| import os |
| import tyro |
| import concurrent.futures |
| @dataclass |
| class Args: |
| workers_per_gpu: int |
| """number of workers per gpu""" |
| num_gpus: int = 8 |
| """number of gpus to use. -1 means all available gpus""" |
| input_dir: str |
| save_dir: str |
| engine: str = "BLENDER_EEVEE" |
|
|
|
|
| def check_already_rendered(save_path): |
| if not os.path.exists(os.path.join(save_path, '02419_semantic.png')): |
| return False |
| return True |
|
|
| def process_file(file): |
| if not check_already_rendered(file[1]): |
| return file |
| return None |
|
|
| def worker(queue, count, gpu): |
| while True: |
| try: |
| item = queue.get() |
| if item is None: |
| queue.task_done() |
| break |
| data_path, save_path, engine, log_name = item |
| print(f"Processing: {data_path} on GPU {gpu}") |
| start = time.time() |
| if check_already_rendered(save_path): |
| queue.task_done() |
| print('========', item, 'rendered', '========') |
| continue |
| else: |
| os.makedirs(save_path, exist_ok=True) |
| command = (f"export DISPLAY=:0.{gpu} &&" |
| f" CUDA_VISIBLE_DEVICES={gpu} " |
| f" blender -b -P blender_lrm_script.py --" |
| f" --object_path {data_path} --output_dir {save_path} --engine {engine}") |
|
|
| try: |
| subprocess.run(command, shell=True, timeout=3600, check=True) |
| count.value += 1 |
| end = time.time() |
| with open(log_name, 'a') as f: |
| f.write(f'{end - start}\n') |
| except subprocess.CalledProcessError as e: |
| print(f"Subprocess error processing {item}: {e}") |
| except subprocess.TimeoutExpired as e: |
| print(f"Timeout expired processing {item}: {e}") |
| except Exception as e: |
| print(f"Error processing {item}: {e}") |
| finally: |
| queue.task_done() |
|
|
| except Exception as e: |
| print(f"Error processing {item}: {e}") |
| queue.task_done() |
|
|
|
|
| if __name__ == "__main__": |
| args = tyro.cli(Args) |
| queue = multiprocessing.JoinableQueue() |
| count = multiprocessing.Value("i", 0) |
| log_name = f'time_log_{args.workers_per_gpu}_{args.num_gpus}_{args.engine}.txt' |
|
|
| if args.num_gpus == -1: |
| result = subprocess.run(['nvidia-smi', '--list-gpus'], stdout=subprocess.PIPE) |
| output = result.stdout.decode('utf-8') |
| args.num_gpus = output.count('GPU') |
|
|
| files = [] |
|
|
| for group in [ str(i) for i in range(10) ]: |
| for folder in os.listdir(f'{args.input_dir}/{group}'): |
| filename = f'{args.input_dir}/{group}/{folder}/{folder}.vrm' |
| outputdir = f'{args.save_dir}/{group}/{folder}' |
| files.append([filename, outputdir]) |
|
|
| |
| files = sorted(files, key=lambda x: x[0]) |
|
|
| |
| with concurrent.futures.ThreadPoolExecutor() as executor: |
| |
| results = list(executor.map(process_file, files)) |
|
|
| |
| unprocess_files = [file for file in results if file is not None] |
|
|
| |
| print(f'Unprocessed files: {len(unprocess_files)}') |
|
|
| |
| for gpu_i in range(args.num_gpus): |
| for worker_i in range(args.workers_per_gpu): |
| worker_i = gpu_i * args.workers_per_gpu + worker_i |
| process = multiprocessing.Process( |
| target=worker, args=(queue, count, gpu_i) |
| ) |
| process.daemon = True |
| process.start() |
|
|
| for file in unprocess_files: |
| queue.put((file[0], file[1], args.engine, log_name)) |
|
|
| |
| for i in range(args.num_gpus * args.workers_per_gpu * 10): |
| queue.put(None) |
| |
| queue.join() |
| end = time.time() |
|
|