Updated Handler to have better debugging output when encountering errors trying to generate mesh
fa524b1
verified
| from typing import Dict, List, Any | |
| from PIL import Image | |
| import torch | |
| from torch import autocast | |
| from tqdm.auto import tqdm | |
| from point_e.diffusion.configs import DIFFUSION_CONFIGS, diffusion_from_config | |
| from point_e.diffusion.sampler import PointCloudSampler | |
| from point_e.models.download import load_checkpoint | |
| from point_e.models.configs import MODEL_CONFIGS, model_from_config | |
| from point_e.util.plotting import plot_point_cloud | |
| from point_e.util.pc_to_mesh import marching_cubes_mesh | |
| from point_e.util.point_cloud import PointCloud | |
| import json | |
| import base64 | |
| import numpy as np | |
| from io import BytesIO | |
| import os | |
| # set device | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| if device.type != 'cuda': | |
| raise ValueError("need to run on GPU") | |
| class EndpointHandler(): | |
| def __init__(self, path=""): | |
| # load the optimized model | |
| print('creating base model...') | |
| os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE" | |
| print('creating base model...') | |
| self.base_name = 'base40M-textvec' | |
| self.base_model = model_from_config(MODEL_CONFIGS[self.base_name], device) | |
| self.base_model.eval() | |
| self.base_diffusion = diffusion_from_config(DIFFUSION_CONFIGS[self.base_name]) | |
| print('creating image model...') | |
| # default - base40M. use base300M or base1B for better results | |
| self.base_image_name = 'base40M' | |
| self.base_image_model = model_from_config(MODEL_CONFIGS[self.base_image_name], device) | |
| self.base_image_model.eval() | |
| self.base_diffusion = diffusion_from_config(DIFFUSION_CONFIGS[self.base_image_name]) | |
| print('creating upsample model...') | |
| self.upsampler_model = model_from_config(MODEL_CONFIGS['upsample'], device) | |
| self.upsampler_model.eval() | |
| self.upsampler_diffusion = diffusion_from_config(DIFFUSION_CONFIGS['upsample']) | |
| print('downloading base checkpoint...') | |
| self.base_model.load_state_dict(load_checkpoint(self.base_name, device)) | |
| self.base_image_model.load_state_dict(load_checkpoint(self.base_image_name, device)) | |
| print('downloading upsampler checkpoint...') | |
| self.upsampler_model.load_state_dict(load_checkpoint('upsample', device)) | |
| print('creating SDF model...') | |
| self.sdf_name = 'sdf' | |
| self.sdf_model = model_from_config(MODEL_CONFIGS[self.sdf_name], device) | |
| self.sdf_model.eval() | |
| print('loading SDF model...') | |
| self.sdf_model.load_state_dict(load_checkpoint(self.sdf_name, device)) | |
| def __call__(self, input_data: Any) -> Any: | |
| # Check if input_data is a string and deserialize if necessary | |
| if isinstance(input_data, str): | |
| print("input_data is a string, attempting to deserialize...") | |
| try: | |
| input_data = json.loads(input_data) # Convert JSON string to dictionary | |
| except json.JSONDecodeError as e: | |
| print(f"Failed to parse JSON: {e}") | |
| return None # Handle the error as appropriate | |
| command = "null" | |
| if "command" in input_data: | |
| command = input_data["command"] | |
| print(f"the command is: {command}") | |
| #Assume the user app is still running the old version, and send the data back as it is being expected | |
| #Currently, the App expects a .ply Mesh to be sent back, and will not have a command input sent with it | |
| if command == "null": | |
| temp_pc = self.generate_point_cloud(input_data) | |
| return self.generate_mesh_from_pc(temp_pc) | |
| elif command == "generate_pc": | |
| return self.generate_point_cloud(input_data) | |
| elif command == "generate_mesh": | |
| print("generate_mesh command received...") | |
| raw_pc = input_data.get("raw_pc") | |
| if raw_pc is None: | |
| print("raw_pc not found in input_data!") | |
| return None | |
| # Check if raw_pc is a string and deserialize if necessary | |
| if isinstance(raw_pc, str): | |
| print("raw_pc is a string, attempting to deserialize...") | |
| raw_pc = json.loads(raw_pc) | |
| print("Calling generate_mesh_from_pc...") | |
| return self.generate_mesh_from_pc(raw_pc) | |
| elif command == "status": | |
| return self.check_status() | |
| def check_status(self) -> bool: | |
| return self.active | |
| def generate_point_cloud(self, data: Any) -> Dict[str, Dict[str, float]]: | |
| print("generate pc called...") | |
| use_image = False | |
| #Checks if an image key has been provided, and if so, uses the image data instead of text input | |
| if "image" in data: | |
| image_data_encoded = data.pop("image") | |
| use_image = True | |
| print('image data found') | |
| else: | |
| print('no image data found') | |
| inputs = data.pop("inputs", data) | |
| if use_image: | |
| sampler = PointCloudSampler( | |
| device=device, | |
| models=[self.base_image_model, self.upsampler_model], | |
| diffusions=[self.base_diffusion, self.upsampler_diffusion], | |
| num_points=[1024, 4096 - 1024], | |
| aux_channels=['R', 'G', 'B'], | |
| guidance_scale=[3.0, 3.0], | |
| ) | |
| # Load an image to condition on. | |
| image_data = base64.b64decode(image_data_encoded) | |
| # Convert bytes to PIL Image | |
| img = Image.open(BytesIO(image_data)) | |
| else: | |
| sampler = PointCloudSampler( | |
| device=device, | |
| models=[self.base_model,self.upsampler_model], | |
| diffusions=[self.base_diffusion, self.upsampler_diffusion], | |
| num_points=[1024, 4096 - 1024], | |
| aux_channels=['R', 'G', 'B'], | |
| guidance_scale=[3.0, 0.0], | |
| model_kwargs_key_filter=('texts', ''), # Do not condition the upsampler at all | |
| ) | |
| # run inference pipeline | |
| with autocast(device.type): | |
| samples = None | |
| if use_image: | |
| for x in tqdm(sampler.sample_batch_progressive(batch_size=1, model_kwargs=dict(images=[img]))): | |
| samples = x | |
| else: | |
| for x in tqdm(sampler.sample_batch_progressive(batch_size=1, model_kwargs=dict(texts=[inputs]))): | |
| samples = x | |
| #image = self.pipe(inputs, guidance_scale=7.5)["sample"][0] | |
| pc = sampler.output_to_point_clouds(samples)[0] | |
| pc_dict = {} | |
| data_list = pc.coords.tolist() | |
| json_string = json.dumps(data_list) | |
| pc_dict['data'] = json_string | |
| # Convert NumPy arrays to Python lists for serializing | |
| serializable_channels = {key: value.tolist() for key, value in pc.channels.items()} | |
| # Serialize the dictionary to a JSON-formatted string | |
| channel_data = json.dumps(serializable_channels) | |
| pc_dict['channels'] = channel_data | |
| return pc_dict | |
| def generate_mesh_from_pc(self, pc_data: Any) -> Any: | |
| # Produce a mesh (with vertex colors) | |
| print("generate mesh called...") | |
| #De-serialize both the coords and channel data | |
| coords_list = json.loads(pc_data['data']) | |
| channels_dict = json.loads(pc_data['channels']) | |
| # Reconstruct the PointCloud object | |
| # Make sure to use .items() for the dictionary to output the key-value pairs together | |
| point_cloud = PointCloud( | |
| coords=np.array(coords_list, dtype=np.float32), | |
| channels={name: np.array(array, dtype=np.float32) for name, array in channels_dict.items()} | |
| ) | |
| mesh = marching_cubes_mesh( | |
| pc=point_cloud, | |
| model=self.sdf_model, | |
| batch_size=4096, | |
| grid_size=32, # increase to 128 for resolution used in evals | |
| progress=True, | |
| ) | |
| # Write the mesh to a PLY file to import into some other program. | |
| with open('mesh.ply', 'wb') as f: | |
| mesh.write_ply(f) | |
| print(mesh) | |
| return mesh | |