| from diffusers import AutoencoderKL |
| from diffusers.image_processor import VaeImageProcessor |
| import torch |
| import gc |
| from PIL.Image import Image |
| from pipelines.models import TextToImageRequest |
| from torch import Generator |
| from diffusers import FluxPipeline |
| import os |
| os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:False,garbage_collection_threshold:0.01" |
|
|
|
|
| class BasicQuantization: |
| def __init__(self, bits=1): |
| self.bits = bits |
| self.qmin = -(2**(bits-1)) |
| self.qmax = 2**(bits-1) - 1 |
| def quantize_tensor(self, tensor): |
| scale = (tensor.max() - tensor.min()) / (self.qmax - self.qmin) |
| zero_point = self.qmin - torch.round(tensor.min() / scale) |
| qtensor = torch.round(tensor / scale + zero_point) |
| qtensor = torch.clamp(qtensor, self.qmin, self.qmax) |
| tensor_q = (qtensor - zero_point) * scale |
| return tensor_q, scale, zero_point |
| class SDXLQuantization: |
| def __init__(self, model, bit_number=16): |
| self.model = model |
| self.quant = BasicQuantization(bit_number) |
| def quantize_model(self, save_name=None): |
| quantized_layers_state = {} |
| for name, module in self.model.named_modules(): |
| if isinstance(module, (torch.nn.Linear)): |
| if hasattr(module, 'weight'): |
| quantized_weight, _, _ = self.quant.quantize_tensor(module.weight) |
| module.weight = torch.nn.Parameter(quantized_weight) |
| if hasattr(module, 'bias') and module.bias is not None: |
| quantized_bias, _, _ = self.quant.quantize_tensor(module.bias) |
| module.bias = torch.nn.Parameter(quantized_bias) |
|
|
| Pipeline = None |
| MODEL_ID = "black-forest-labs/FLUX.1-schnell" |
| DTYPE = torch.bfloat16 |
| def clear(): |
| gc.collect() |
| torch.cuda.empty_cache() |
| torch.cuda.reset_max_memory_allocated() |
| torch.cuda.reset_peak_memory_stats() |
|
|
| def load_pipeline() -> Pipeline: |
| clear() |
| vae = AutoencoderKL.from_pretrained( |
| MODEL_ID, subfolder="vae", torch_dtype=DTYPE |
| ) |
| instance = SDXLQuantization(vae, 8) |
| instance.quantize_model() |
|
|
| pipeline = FluxPipeline.from_pretrained(MODEL_ID,vae=vae, |
| torch_dtype=DTYPE) |
| torch.backends.cudnn.benchmark = True |
| torch.backends.cuda.matmul.allow_tf32 = True |
| torch.cuda.set_per_process_memory_fraction(0.99) |
| pipeline.text_encoder.to(memory_format=torch.channels_last) |
| pipeline.text_encoder_2.to(memory_format=torch.channels_last) |
| pipeline.transformer.to(memory_format=torch.channels_last) |
| pipeline.vae.to(memory_format=torch.channels_last) |
| pipeline.vae = torch.compile(pipeline.vae) |
| pipeline._exclude_from_cpu_offload = ["vae"] |
| pipeline.enable_sequential_cpu_offload() |
|
|
| clear() |
| for _ in range(1): |
| pipeline(prompt="unpervaded, unencumber, froggish, groundneedle, transnatural, fatherhood, outjump, cinerator", width=1024, height=1024, guidance_scale=0.1, num_inference_steps=4, max_sequence_length=256) |
| return pipeline |
|
|
| sample = True |
| @torch.inference_mode() |
| def infer(request: TextToImageRequest, pipeline: Pipeline) -> Image: |
| global sample |
| if sample: |
| clear() |
| sample = None |
| torch.cuda.reset_peak_memory_stats() |
| generator = Generator("cuda").manual_seed(request.seed) |
| image=pipeline(request.prompt,generator=generator, guidance_scale=0.0, num_inference_steps=4, max_sequence_length=256, height=request.height, width=request.width, output_type="pil").images[0] |
| return(image) |