| from diffusers import AutoencoderKL, AutoencoderTiny |
| from diffusers.image_processor import VaeImageProcessor |
| import torch |
| import torch._dynamo |
| import gc |
| from PIL.Image import Image |
| from pipelines.models import TextToImageRequest |
| from torch import Generator |
| from diffusers import FluxPipeline |
| from torchao.quantization import quantize_, int8_weight_only, fpx_weight_only |
| import torch.nn as nn |
| from model import Model, Decoder, Encoder |
| import torchvision |
|
|
| Pipeline = None |
| MODEL_ID = "black-forest-labs/FLUX.1-schnell" |
| DTYPE = torch.bfloat16 |
| def clear(): |
| gc.collect() |
| torch.cuda.empty_cache() |
| torch.cuda.reset_max_memory_allocated() |
| torch.cuda.reset_peak_memory_stats() |
|
|
| def load_pipeline() -> Pipeline: |
| clear() |
| |
| |
| |
|
|
| vae = AutoencoderTiny.from_pretrained("madebyollin/taef1") |
| vae.encoder = Encoder(16) |
| vae.decoder = Decoder(16) |
|
|
| encoder_path = "encoder.pth" |
| decoder_path = "decoder.pth" |
|
|
| if encoder_path is not None: |
| encoder_state_dict = torch.load(encoder_path, map_location="cpu", weights_only=True) |
| filtered_state_dict = {k.strip('encoder.'): v for k, v in encoder_state_dict.items() if k.strip('encoder.') in vae.encoder.state_dict() and v.size() == vae.encoder.state_dict()[k.strip('encoder.')].size()} |
| print(f" num of keys in filtered: {len(filtered_state_dict)} and in decoder: {len(vae.encoder.state_dict())}") |
| vae.encoder.load_state_dict(filtered_state_dict, strict=False) |
| |
| if decoder_path is not None: |
| decoder_state_dict = torch.load(decoder_path, map_location="cpu", weights_only=True) |
| filtered_state_dict = {k.strip('decoder.'): v for k, v in decoder_state_dict.items() if k.strip('decoder.') in vae.decoder.state_dict() and v.size() == vae.decoder.state_dict()[k.strip('decoder.')].size()} |
| print(f" num of keys in filtered: {len(filtered_state_dict)} and in decoder: {len(vae.decoder.state_dict())}") |
| vae.decoder.load_state_dict(filtered_state_dict, strict=False) |
|
|
| vae.decoder.requires_grad_(False) |
| vae.encoder.requires_grad_(False) |
| vae.to(dtype=DTYPE) |
| |
| pipeline = FluxPipeline.from_pretrained(MODEL_ID,vae=vae, |
| torch_dtype=DTYPE) |
| torch.backends.cudnn.benchmark = True |
| torch.backends.cuda.matmul.allow_tf32 = True |
| torch.cuda.set_per_process_memory_fraction(0.99) |
| pipeline.text_encoder.to(memory_format=torch.channels_last) |
| pipeline.text_encoder_2.to(memory_format=torch.channels_last) |
| pipeline.transformer.to(memory_format=torch.channels_last) |
| pipeline.vae.to(memory_format=torch.channels_last) |
| pipeline.vae = torch.compile(pipeline.vae) |
| pipeline._exclude_from_cpu_offload = ["vae"] |
| pipeline.enable_sequential_cpu_offload() |
|
|
| clear() |
| for _ in range(1): |
| pipeline(prompt="unpervaded, unencumber, froggish, groundneedle, transnatural, fatherhood, outjump, cinerator", width=1024, height=1024, guidance_scale=0.1, num_inference_steps=4, max_sequence_length=256) |
| return pipeline |
|
|
| sample = True |
| @torch.inference_mode() |
| def infer(request: TextToImageRequest, pipeline: Pipeline) -> Image: |
| global sample |
| if sample: |
| clear() |
| sample = None |
| torch.cuda.reset_peak_memory_stats() |
| generator = Generator("cuda").manual_seed(request.seed) |
| image = None |
| |
| |
| image=pipeline(request.prompt,generator=generator, guidance_scale=0.0, num_inference_steps=4, max_sequence_length=256, height=request.height, width=request.width, output_type="pil").images[0] |
| |
| |
| |
| |
| |
| |
| |
| return image |