| import os
|
| import shutil
|
| import uuid
|
| import torch
|
| import gradio as gr
|
| import numpy as np
|
| import ray
|
| import time
|
|
|
| from pathlib import Path
|
|
|
| from diffusion.utils import export_edges
|
| from construct_brep import construct_brep_from_datanpz
|
| from app.DataProcessor import DataProcessor
|
| from app.ModelDirector import ModelDirector
|
|
|
| _EDGE_FILE = 0
|
| _SOLID_FILE = 1
|
| _STEP_FILE = 2
|
|
|
| class ConditionedGeneratingMethod():
|
| def __init__(
|
| self,
|
| model_building_director: ModelDirector,
|
| dataprocessor: DataProcessor,
|
| model_num_to_return: int,
|
| model_seed: int = 0,
|
| output_main_dir: Path | str = Path('./outputs')
|
| ):
|
| self.director = model_building_director
|
| self.dataprocessor = dataprocessor
|
| self.model_num_to_return = model_num_to_return
|
| self.model_seed = model_seed
|
| self.output_main_dir = output_main_dir
|
|
|
| def generate(self):
|
| def generating_method(browser_state: dict, *inputs):
|
| try:
|
|
|
| assert len(inputs) > 0
|
| self._user_state_check(browser_state)
|
| self._empty_input_check(inputs)
|
|
|
|
|
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
|
|
|
|
| tensor_data = self.dataprocessor.process(inputs)
|
|
|
|
|
| self.director.config_setup()
|
| model_builder = self.director.buider
|
|
|
|
|
| diffusion_output_dir = self._get_diffusion_output_dir(browser_state, self.director.get_generating_condition())
|
| postprocess_output_dir = self._get_postprocess_output_dir(browser_state, self.director.get_generating_condition())
|
|
|
| model_builder.setup_output_dir(diffusion_output_dir)
|
| model_builder.setup_seed(self.model_seed)
|
|
|
| model_builder.make_model(device)
|
| model = model_builder.model
|
|
|
|
|
|
|
|
|
| gr.Info("Start diffusing", title="Runtime Info")
|
| with torch.no_grad():
|
| pred_results = model.inference(self.dataprocessor.NUM_PROPOSALS, device, v_data=tensor_data, v_log=True)
|
|
|
|
|
| for i, result in enumerate(pred_results):
|
| diffusion_output_subdir = diffusion_output_dir / f"00_{i:02d}"
|
| diffusion_output_subdir.mkdir(parents=True, exist_ok=True)
|
|
|
| export_edges(result["pred_edge"], (diffusion_output_subdir / "edge.obj").as_posix())
|
|
|
| np.savez_compressed(
|
| file = (diffusion_output_subdir / "data.npz").as_posix(),
|
| pred_face_adj_prob = result["pred_face_adj_prob"],
|
| pred_face_adj = result["pred_face_adj"].cpu().numpy(),
|
| pred_face = result["pred_face"],
|
| pred_edge = result["pred_edge"],
|
| pred_edge_face_connectivity = result["pred_edge_face_connectivity"],
|
| )
|
| gr.Info("Finished diffusing", title="Runtime Info")
|
|
|
|
|
|
|
|
|
|
|
| gr.Info("Start post-processing!", title="Runtime Info")
|
| if not ray.is_initialized():
|
| ray.init(
|
| num_cpus=2,
|
| )
|
|
|
| construct_brep_from_datanpz_ray = ray.remote(num_cpus=1, max_retries=0)(construct_brep_from_datanpz)
|
| diffusion_results = sorted(os.listdir(diffusion_output_dir))
|
|
|
| tasks = [
|
| construct_brep_from_datanpz_ray.remote(
|
| data_root=diffusion_output_dir,
|
| out_root=postprocess_output_dir,
|
| folder_name=model_number,
|
| v_drop_num=1,
|
| use_cuda=False,
|
| from_scratch=True,
|
| is_log=False,
|
| is_ray=True,
|
| is_optimize_geom=True,
|
| isdebug=False,
|
| is_save_data=True
|
| )
|
| for model_number in diffusion_results
|
| ]
|
|
|
| results = []
|
| success_count = 0
|
| while tasks and success_count < self.model_num_to_return:
|
| done_ids, tasks = ray.wait(tasks, num_returns=1, timeout=30)
|
| for done_id in done_ids:
|
| try:
|
| result = ray.get(done_id)
|
| results.append(result)
|
|
|
|
|
| time.sleep(0.2)
|
|
|
|
|
| for done_folder in postprocess_output_dir.iterdir():
|
| output_files = os.listdir(done_folder)
|
| if 'success.txt' in output_files:
|
| success_count += 1
|
|
|
| except Exception as e:
|
| print(f"Task failed or timed out: {e}")
|
| results.append(None)
|
|
|
| if success_count >= self.model_num_to_return:
|
|
|
| time.sleep(5.0)
|
| break
|
| time.sleep(5.0)
|
| gr.Info("Finished post-processing!", title="Runtime Info")
|
|
|
| valid_models = self._get_valid_models(postprocess_output_dir)
|
|
|
|
|
|
|
|
|
| browser_state = self._update_user_state(browser_state, postprocess_output_dir, valid_models)
|
|
|
|
|
| self._postprocess_output_check(valid_models)
|
|
|
|
|
|
|
| gr.Info(f"{len(valid_models) if len(valid_models) < 4 else 4} valid models generated!", title="Finish generating")
|
| condition = self.director.get_generating_condition()
|
|
|
|
|
| edge_file = browser_state[condition][0][_EDGE_FILE]
|
| solid_file = browser_state[condition][0][_SOLID_FILE]
|
| step_file = browser_state[condition][0][_STEP_FILE]
|
|
|
| return browser_state, edge_file, solid_file, step_file, browser_state[condition][0]
|
|
|
| except EmptyInputException as input_e:
|
| gr.Warning(str(input_e), title="Empty Input")
|
|
|
| except GeneraingException as generating_e:
|
| gr.Warning(str(generating_e), title="No Valid Generation")
|
|
|
| except UnicodeEncodeError as uni_error:
|
| gr.Warning("We sincerely apologize, but we currently only support English.", title="English Support Only")
|
|
|
| except FileNotFoundError as file_e:
|
| gr.Warning("The operation is too frequent!", title="Frequent Operation")
|
|
|
| except Exception as e:
|
| print(e)
|
| gr.Warning("Something bad happened. Please try some other models", title="Unknown Error")
|
|
|
| return browser_state, gr.update(), gr.update(), gr.update(), gr.update()
|
|
|
| return generating_method
|
|
|
| def _update_user_state(self, browser_state, postprocess_output_dir, valid_model):
|
|
|
| condition = self.director.get_generating_condition()
|
| browser_state[condition] = list()
|
| for i, model_number in enumerate(valid_model):
|
| if (postprocess_output_dir / model_number / 'debug_face_loop' / 'optimized_edge.obj').exists():
|
| edge = (postprocess_output_dir / model_number / 'debug_face_loop' / 'optimized_edge.obj').as_posix()
|
| else:
|
| edge = (postprocess_output_dir / model_number / 'debug_face_loop' / 'edge.obj').as_posix()
|
| solid = (postprocess_output_dir / model_number / 'recon_brep.stl').as_posix()
|
| step = (postprocess_output_dir / model_number / 'recon_brep.step').as_posix()
|
| browser_state[condition].append([edge, solid, step])
|
| return browser_state
|
|
|
| def _postprocess_output_check(self, valid_model):
|
| if len(valid_model) <= 0:
|
| raise GeneraingException("No Valid Model Generated!")
|
|
|
| def _empty_input_check(self, inputs):
|
| for input_component in inputs:
|
| if input_component is None:
|
| raise EmptyInputException("Empty input exists!")
|
|
|
| def _user_state_check(self, state_dict):
|
| if state_dict['user_id'] is None:
|
| state_dict['user_id'] = uuid.uuid4()
|
| if state_dict['user_output_dir'] is None:
|
| state_dict['user_output_dir'] = Path(self.output_main_dir) / f"user_{state_dict['user_id']}"
|
| os.makedirs(state_dict['user_output_dir'], exist_ok=True)
|
|
|
| def _get_valid_models(self, postprocess_output: Path):
|
|
|
| output_folders = [model_folder for model_folder in os.listdir(postprocess_output) if 'success.txt' in os.listdir(postprocess_output / model_folder)]
|
| return output_folders
|
|
|
| def _get_diffusion_output_dir(self, state_dict, condition):
|
|
|
| diffusion_output_dir = Path(state_dict['user_output_dir']) / condition
|
| os.makedirs(diffusion_output_dir, exist_ok=True)
|
| if len(os.listdir(diffusion_output_dir)) > 0:
|
| shutil.rmtree(diffusion_output_dir)
|
| return diffusion_output_dir
|
|
|
| def _get_postprocess_output_dir(self, state_dict, condition):
|
|
|
| postprocess_output_dir = Path(state_dict['user_output_dir']) / f'{condition}_post'
|
| os.makedirs(postprocess_output_dir, exist_ok=True)
|
| if len(os.listdir(postprocess_output_dir)) > 0:
|
| shutil.rmtree(postprocess_output_dir)
|
| return postprocess_output_dir
|
|
|
| class GeneraingException(Exception):
|
| """Custom exception if generating failed."""
|
| pass
|
|
|
| class EmptyInputException(Exception):
|
| """Custom exception if the input is empty."""
|
| pass
|
|
|