| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import itertools |
| | import time |
| | from typing import Dict, List |
| |
|
| | import torch |
| | import torch.profiler |
| | from diffusers import AutoencoderKL |
| | from torch import nn |
| |
|
| |
|
| | class VAEGenerator: |
| | """ |
| | A class for generating and searching different Variational Autoencoder (VAE) configurations. |
| | |
| | This class provides functionality to generate various VAE architecture configurations |
| | given a specific input resolution and compression ratio. It allows searching through a |
| | design space to find configurations that match given parameter and memory budgets. |
| | """ |
| |
|
| | def __init__(self, input_resolution: int = 1024, compression_ratio: int = 16) -> None: |
| | if input_resolution == 1024: |
| | assert compression_ratio in [8, 16] |
| | elif input_resolution == 2048: |
| | assert compression_ratio in [8, 16, 32] |
| | else: |
| | raise NotImplementedError("Higher resolution than 2028 is not implemented yet!") |
| |
|
| | self._input_resolution = input_resolution |
| | self._compression_ratio = compression_ratio |
| |
|
| | def _generate_input(self): |
| | """ |
| | Generate a random input tensor with the specified input resolution. |
| | |
| | The tensor is placed on the GPU in half-precision (float16). |
| | """ |
| | random_tensor = torch.rand(1, 3, self.input_resolution, self.input_resolution) |
| | random_tensor = random_tensor.to(dtype=torch.float16, device="cuda") |
| | return random_tensor |
| |
|
| | def _count_parameters(self, model: nn.Module = None): |
| | """ |
| | Count the number of trainable parameters in a given model. |
| | |
| | Args: |
| | model (nn.Module): The model for which to count parameters. |
| | |
| | Returns: |
| | int: The number of trainable parameters. |
| | """ |
| | assert model is not None, "Please provide a nn.Module to count the parameters." |
| | return sum(p.numel() for p in model.parameters() if p.requires_grad) |
| |
|
| | def _load_base_json_skeleton(self): |
| | """ |
| | Load a base configuration skeleton for the VAE. |
| | |
| | Returns: |
| | dict: A dictionary representing the base configuration JSON skeleton. |
| | """ |
| | skeleton = { |
| | "_class_name": "AutoencoderKL", |
| | "_diffusers_version": "0.20.0.dev0", |
| | "_name_or_path": "../sdxl-vae/", |
| | "act_fn": "silu", |
| | "block_out_channels": [], |
| | "down_block_types": [], |
| | "force_upcast": False, |
| | "in_channels": 3, |
| | "latent_channels": -1, |
| | "layers_per_block": -1, |
| | "norm_num_groups": 32, |
| | "out_channels": 3, |
| | "sample_size": 1024, |
| | "scaling_factor": 0.13025, |
| | "up_block_types": [], |
| | } |
| | return skeleton |
| |
|
| | def _generate_all_combinations(self, attr): |
| | """ |
| | Generates all possible combinations from a search space dictionary. |
| | |
| | Args: |
| | attr (dict): A dictionary where each key has a list of possible values. |
| | |
| | Returns: |
| | List[Dict]: A list of dictionaries, each representing a unique combination of attributes. |
| | """ |
| | keys = list(attr.keys()) |
| | choices = [attr[key] for key in keys] |
| | all_combinations = list(itertools.product(*choices)) |
| |
|
| | combination_dicts = [] |
| | for combination in all_combinations: |
| | combination_dict = {key: value for key, value in zip(keys, combination)} |
| | combination_dicts.append(combination_dict) |
| |
|
| | return combination_dicts |
| |
|
| | def _assign_attributes(self, choice): |
| | """ |
| | Assign a chosen set of attributes to the base VAE configuration skeleton. |
| | |
| | Args: |
| | choice (dict): A dictionary of attributes to assign to the skeleton. |
| | |
| | Returns: |
| | dict: A dictionary representing the updated VAE configuration. |
| | """ |
| | search_space_skleton = self._load_base_json_skeleton() |
| | search_space_skleton["down_block_types"] = choice["down_block_types"] |
| | search_space_skleton["up_block_types"] = choice["up_block_types"] |
| | search_space_skleton["block_out_channels"] = choice["block_out_channels"] |
| | search_space_skleton["layers_per_block"] = choice["layers_per_block"] |
| | search_space_skleton["latent_channels"] = choice["latent_channels"] |
| | return search_space_skleton |
| |
|
| | def _search_space_16x1024(self): |
| | """ |
| | Define the search space for a 16x compression ratio at 1024 resolution. |
| | |
| | Returns: |
| | dict: A dictionary defining lists of possible attribute values. |
| | """ |
| | attr = {} |
| | attr["down_block_types"] = [["DownEncoderBlock2D"] * 5] |
| | attr["up_block_types"] = [["UpDecoderBlock2D"] * 5] |
| | attr["block_out_channels"] = [ |
| | [128, 256, 512, 512, 512], |
| | [128, 256, 512, 512, 1024], |
| | [128, 256, 512, 1024, 2048], |
| | [64, 128, 256, 512, 512], |
| | ] |
| | attr["layers_per_block"] = [1, 2, 3] |
| | attr["latent_channels"] = [4, 16, 32, 64] |
| | return attr |
| |
|
| | def _search_space_8x1024(self): |
| | """ |
| | Define the search space for an 8x compression ratio at 1024 resolution. |
| | |
| | Returns: |
| | dict: A dictionary defining lists of possible attribute values. |
| | """ |
| | attr = {} |
| | attr["down_block_types"] = [["DownEncoderBlock2D"] * 4] |
| | attr["up_block_types"] = [["UpDecoderBlock2D"] * 4] |
| | attr["block_out_channels"] = [[128, 256, 512, 512], [128, 256, 512, 1024], [64, 128, 256, 512]] |
| | attr["layers_per_block"] = [1, 2, 3] |
| | attr["latent_channels"] = [4, 16, 32, 64] |
| | return attr |
| |
|
| | def _sort_data_in_place(self, data: List[Dict], mode: str) -> None: |
| | """ |
| | Sort the list of design configurations in place based on a chosen mode. |
| | |
| | Args: |
| | data (List[Dict]): A list of dictionaries representing design configurations. |
| | mode (str): The sorting criterion. Can be 'abs_param_diff', 'abs_cuda_mem_diff', or 'mse'. |
| | """ |
| | if mode == 'abs_param_diff': |
| | data.sort(key=lambda x: abs(x['param_diff'])) |
| | elif mode == 'abs_cuda_mem_diff': |
| | data.sort(key=lambda x: abs(x['cuda_mem_diff'])) |
| | elif mode == 'mse': |
| | data.sort(key=lambda x: (x['param_diff'] ** 2 + x['cuda_mem_diff'] ** 2) / 2) |
| | else: |
| | raise ValueError("Invalid mode. Choose from 'abs_param_diff', 'abs_cuda_mem_diff', 'mse'.") |
| |
|
| | def _print_table(self, data, headers, col_widths): |
| | """ |
| | Print a formatted table of the design choices. |
| | |
| | Args: |
| | data (List[Dict]): The data to print, each entry a design configuration. |
| | headers (List[str]): Column headers. |
| | col_widths (List[int]): Widths for each column. |
| | """ |
| | |
| | header_row = "" |
| | for header, width in zip(headers, col_widths): |
| | header_row += f"{header:<{width}}" |
| | print(header_row) |
| | print("-" * sum(col_widths)) |
| |
|
| | |
| | for item in data: |
| | row = f"{item['param_diff']:<{col_widths[0]}}" |
| | row += f"{item['cuda_mem_diff']:<{col_widths[1]}}" |
| | print(row) |
| |
|
| | def search_for_target_vae(self, parameters_budget=0, cuda_max_mem=0): |
| | """ |
| | Search through available VAE design choices to find one that best matches |
| | the given parameter and memory budgets. |
| | |
| | Args: |
| | parameters_budget (float, optional): The target number of parameters (in millions). |
| | cuda_max_mem (float, optional): The target maximum GPU memory usage (in MB). |
| | |
| | Returns: |
| | AutoencoderKL: The chosen VAE configuration that best matches the provided budgets. |
| | """ |
| | if parameters_budget <= 0 and cuda_max_mem <= 0: |
| | raise ValueError("Please specify a valid parameter budget or cuda max memory budget") |
| |
|
| | search_space_choices = [] |
| | if self.input_resolution == 1024 and self.compression_ratio == 8: |
| | search_space = self._search_space_8x1024() |
| | search_space_choices = self._generate_all_combinations(search_space) |
| | elif self.input_resolution == 1024 and self.compression_ratio == 16: |
| | search_space = self._search_space_16x1024() |
| | search_space_choices = self._generate_all_combinations(search_space) |
| |
|
| | inp_tensor = self._generate_input() |
| | inp_tensor = inp_tensor.to(dtype=torch.float16, device="cuda") |
| | design_choices = [] |
| |
|
| | for choice in search_space_choices: |
| | parameters_budget_diff = 0 |
| | cuda_max_mem_diff = 0 |
| |
|
| | curt_design_json = self._assign_attributes(choice) |
| | print("-" * 20) |
| | print(choice) |
| | vae = AutoencoderKL.from_config(curt_design_json) |
| | vae = vae.to(dtype=torch.float16, device="cuda") |
| | total_params = self._count_parameters(vae) |
| | total_params /= 10**6 |
| | |
| | torch.cuda.reset_peak_memory_stats() |
| | torch.cuda.synchronize() |
| |
|
| | with torch.profiler.profile( |
| | activities=[ |
| | torch.profiler.ProfilerActivity.CPU, |
| | torch.profiler.ProfilerActivity.CUDA, |
| | ], |
| | profile_memory=True, |
| | record_shapes=True, |
| | with_stack=True, |
| | ) as prof: |
| | |
| | start_time = time.perf_counter() |
| | with torch.no_grad(): |
| | _ = vae.encode(inp_tensor).latent_dist.sample() |
| | torch.cuda.synchronize() |
| | end_time = time.perf_counter() |
| |
|
| | total_execution_time_ms = (end_time - start_time) * 1000 |
| |
|
| | |
| | max_memory_allocated = torch.cuda.max_memory_allocated() |
| | max_memory_allocated = max_memory_allocated / (1024**2) |
| |
|
| | parameters_budget_diff = parameters_budget - total_params |
| | cuda_max_mem_diff = cuda_max_mem - max_memory_allocated |
| | design_choices.append( |
| | {"param_diff": parameters_budget_diff, "cuda_mem_diff": cuda_max_mem_diff, "design": curt_design_json} |
| | ) |
| |
|
| | print(f" Total params: {total_params}") |
| | print(f" Max GPU Memory Usage: {max_memory_allocated} MB") |
| | print(f" Total Execution Time: {total_execution_time_ms:.2f} ms") |
| |
|
| | print(prof.key_averages().table(sort_by="cuda_time_total", row_limit=10)) |
| |
|
| | print("-" * 20) |
| | sort_mode = "abs_param_diff" |
| | if parameters_budget == 0: |
| | sort_mode = "abs_cuda_mem_diff" |
| | elif cuda_max_mem == 0: |
| | sort_mode = "abs_param_diff" |
| | else: |
| | sort_mode = "mse" |
| |
|
| | print("#" * 20) |
| | self._sort_data_in_place(design_choices, sort_mode) |
| | headers = ["param_diff (M)", "cuda_mem_diff (MB)"] |
| | col_widths = [12, 15] |
| | self._print_table(design_choices, headers, col_widths) |
| |
|
| | vae = AutoencoderKL.from_config(design_choices[0]["design"]) |
| | return vae |
| |
|
| | @property |
| | def input_resolution(self) -> int: |
| | """ |
| | Get the input resolution for the VAE. |
| | |
| | Returns: |
| | int: The input resolution. |
| | """ |
| | return self._input_resolution |
| |
|
| | @property |
| | def compression_ratio(self) -> float: |
| | """ |
| | Get the compression ratio for the VAE. |
| | |
| | Returns: |
| | float: The compression ratio. |
| | """ |
| | return self._compression_ratio |
| |
|