| | import os |
| | from diffusers import FluxPipeline, AutoencoderKL, FluxTransformer2DModel |
| | from diffusers.image_processor import VaeImageProcessor |
| | from transformers import T5EncoderModel, T5TokenizerFast, CLIPTokenizer, CLIPTextModel, CLIPTextConfig, T5Config |
| | import torch |
| | import gc |
| | from PIL.Image import Image |
| | from pipelines.models import TextToImageRequest |
| | from torch import Generator |
| | from torchao.quantization import quantize_, int8_weight_only, int8_dynamic_activation_int8_weight |
| | from time import perf_counter |
| |
|
| |
|
| | HOME = os.environ["HOME"] |
| | os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" |
| | FLUX_CHECKPOINT = "black-forest-labs/FLUX.1-schnell" |
| | |
| | QUANTIZED_MODEL = ["transformer", "text_encoder_2", "text_encoder", "vae"] |
| |
|
| |
|
| | QUANT_CONFIG = int8_weight_only() |
| | DTYPE = torch.bfloat16 |
| | NUM_STEPS = 4 |
| |
|
| | def get_transformer(quantize: bool = True, quant_config = int8_weight_only(), quant_ckpt: str = None): |
| | if quant_ckpt is not None: |
| | config = FluxTransformer2DModel.load_config(FLUX_CHECKPOINT, subfolder="transformer") |
| | model = FluxTransformer2DModel.from_config(config).to(DTYPE) |
| | state_dict = torch.load(quant_ckpt, map_location="cpu") |
| | model.load_state_dict(state_dict, assign=True) |
| | print(f"Loaded {quant_ckpt}") |
| | return model |
| | |
| | model = FluxTransformer2DModel.from_pretrained( |
| | FLUX_CHECKPOINT, subfolder="transformer", torch_dtype=DTYPE |
| | ) |
| | if quantize: |
| | quantize_(model, quant_config) |
| | return model |
| |
|
| |
|
| | def get_text_encoder(quantize: bool = True, quant_config = int8_weight_only(), quant_ckpt: str = None): |
| | if quant_ckpt is not None: |
| | config = CLIPTextConfig.from_pretrained(FLUX_CHECKPOINT, subfolder="text_encoder") |
| | model = CLIPTextModel(config).to(DTYPE) |
| | state_dict = torch.load(quant_ckpt, map_location="cpu") |
| | model.load_state_dict(state_dict, assign=True) |
| | print(f"Loaded {quant_ckpt}") |
| | return model |
| | |
| | model = CLIPTextModel.from_pretrained( |
| | FLUX_CHECKPOINT, subfolder="text_encoder", torch_dtype=DTYPE |
| | ) |
| | if quantize: |
| | quantize_(model, quant_config) |
| | return model |
| |
|
| |
|
| | def get_text_encoder_2(quantize: bool = True, quant_config = int8_weight_only(), quant_ckpt: str = None): |
| | if quant_ckpt is not None: |
| | config = T5Config.from_pretrained(FLUX_CHECKPOINT, subfolder="text_encoder_2") |
| | model = T5EncoderModel(config).to(DTYPE) |
| | state_dict = torch.load(quant_ckpt, map_location="cpu") |
| | print(f"Loaded {quant_ckpt}") |
| | model.load_state_dict(state_dict, assign=True) |
| | return model |
| | |
| | model = T5EncoderModel.from_pretrained( |
| | FLUX_CHECKPOINT, subfolder="text_encoder_2", torch_dtype=DTYPE |
| | ) |
| | if quantize: |
| | quantize_(model, quant_config) |
| | return model |
| |
|
| |
|
| | def get_vae(quantize: bool = True, quant_config = int8_weight_only(), quant_ckpt: str = None): |
| | if quant_ckpt is not None: |
| | config = AutoencoderKL.load_config(FLUX_CHECKPOINT, subfolder="vae") |
| | model = AutoencoderKL.from_config(config).to(DTYPE) |
| | state_dict = torch.load(quant_ckpt, map_location="cpu") |
| | model.load_state_dict(state_dict, assign=True) |
| | print(f"Loaded {quant_ckpt}") |
| | return model |
| | model = AutoencoderKL.from_pretrained( |
| | FLUX_CHECKPOINT, subfolder="vae", torch_dtype=DTYPE |
| | ) |
| | if quantize: |
| | quantize_(model, quant_config) |
| | return model |
| |
|
| |
|
| | def empty_cache(): |
| | gc.collect() |
| | torch.cuda.empty_cache() |
| | torch.cuda.reset_max_memory_allocated() |
| | torch.cuda.reset_peak_memory_stats() |
| |
|
| |
|
| | def load_pipeline() -> FluxPipeline: |
| | empty_cache() |
| | |
| | pipe = FluxPipeline.from_pretrained(FLUX_CHECKPOINT, |
| | torch_dtype=DTYPE) |
| | |
| | pipe.text_encoder_2.to(memory_format=torch.channels_last) |
| | pipe.transformer.to(memory_format=torch.channels_last) |
| | pipe.vae.to(memory_format=torch.channels_last) |
| | pipe.vae = torch.compile(pipe.vae) |
| | |
| | pipe._exclude_from_cpu_offload = ["vae"] |
| | |
| | pipe.enable_sequential_cpu_offload() |
| | |
| | empty_cache() |
| | pipe("cat", guidance_scale=0., max_sequence_length=256, num_inference_steps=4) |
| | return pipe |
| |
|
| | @torch.inference_mode() |
| | def infer(request: TextToImageRequest, _pipeline: FluxPipeline) -> Image: |
| | if request.seed is None: |
| | generator = None |
| | else: |
| | generator = Generator(device="cuda").manual_seed(request.seed) |
| |
|
| | empty_cache() |
| | image = _pipeline(prompt=request.prompt, |
| | width=request.width, |
| | height=request.height, |
| | guidance_scale=0.0, |
| | generator=generator, |
| | output_type="pil", |
| | max_sequence_length=256, |
| | num_inference_steps=NUM_STEPS).images[0] |
| | return image |
| |
|