| from diffusers import FluxPipeline, AutoencoderKL, AutoencoderTiny |
| from diffusers.image_processor import VaeImageProcessor |
| from diffusers.schedulers import FlowMatchEulerDiscreteScheduler |
| import diffusers |
| from transformers import T5EncoderModel, T5TokenizerFast, CLIPTokenizer, CLIPTextModel |
| import torch |
| import torch._dynamo |
| import gc |
| from PIL import Image as img |
| from PIL.Image import Image |
| from pipelines.models import TextToImageRequest |
| from torch import Generator |
| import time |
| from diffusers import FluxTransformer2DModel, DiffusionPipeline |
| from torchao.quantization import quantize_, int8_weight_only |
| import torch.nn.utils.prune as prune |
| import numpy as np |
| from tqdm import tqdm |
| from optimum.quanto import requantize |
| from safetensors.torch import load_file |
| from huggingface_hub import hf_hub_download |
|
|
| def load_quanto_transformer(repo_path): |
| with open(hf_hub_download(repo_path, "transformer/quantization_map.json"), "r") as f: |
| quantization_map = json.load(f) |
| with torch.device("meta"): |
| transformer = diffusers.FluxTransformer2DModel.from_config(hf_hub_download(repo_path, "transformer/config.json")).to(torch.bfloat16) |
| state_dict = load_file(hf_hub_download(repo_path, "transformer/diffusion_pytorch_model.safetensors")) |
| requantize(transformer, state_dict, quantization_map, device=torch.device("cuda")) |
| return transformer |
|
|
|
|
| def load_quanto_text_encoder_2(repo_path): |
| with open(hf_hub_download(repo_path, "text_encoder_2/quantization_map.json"), "r") as f: |
| quantization_map = json.load(f) |
| with open(hf_hub_download(repo_path, "text_encoder_2/config.json")) as f: |
| t5_config = transformers.T5Config(**json.load(f)) |
| with torch.device("meta"): |
| text_encoder_2 = transformers.T5EncoderModel(t5_config).to(torch.bfloat16) |
| state_dict = load_file(hf_hub_download(repo_path, "text_encoder_2/model.safetensors")) |
| requantize(text_encoder_2, state_dict, quantization_map, device=torch.device("cuda")) |
| return text_encoder_2 |
| |
| torch._dynamo.config.suppress_errors = True |
| Pipeline = None |
|
|
| def weight_svd_prune(module, threshold_ratio=0.5): |
| w = module.weight.data.cpu().float().numpy() |
| w = w.reshape(w.shape[0], -1) |
| U, S, V = np.linalg.svd(w, full_matrices=False) |
|
|
| k = int(len(S) * (1 - threshold_ratio)) |
| S_mask = np.zeros_like(S) |
| S_mask[:k] = 1 |
| S_masked = S * S_mask |
|
|
| w_pruned = np.dot(np.dot(U, np.diag(S_masked)), V) |
| w_pruned = w_pruned.reshape(w.shape) |
|
|
| module.weight.data = torch.tensor(w_pruned, dtype=module.weight.data.dtype).to(module.weight.data.device) |
| |
| ckpt_id = "black-forest-labs/FLUX.1-schnell" |
| def empty_cache(): |
| start = time.time() |
| gc.collect() |
| torch.cuda.empty_cache() |
| torch.cuda.reset_max_memory_allocated() |
| torch.cuda.reset_peak_memory_stats() |
|
|
| def load_pipeline() -> Pipeline: |
| empty_cache() |
| dtype, device = torch.bfloat16, "cuda" |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| pipeline = diffusers.AutoPipelineForText2Image.from_pretrained(ckpt_id, transformer=None, text_encoder_2=None, torch_dtype=torch.bfloat16) |
| pipeline.transformer = load_quanto_transformer("Disty0/FLUX.1-dev-qint8") |
| pipeline.text_encoder_2 = load_quanto_text_encoder_2("Disty0/FLUX.1-dev-qint8") |
| pipeline = pipeline.to(dtype=torch.bfloat16) |
| |
| |
|
|
| pipeline.enable_sequential_cpu_offload() |
| for _ in range(2): |
| empty_cache() |
| pipeline(prompt="onomancy, aftergo, spirantic, Platyhelmia, modificator, drupaceous, jobbernowl, hereness", width=1024, height=1024, guidance_scale=0.0, num_inference_steps=4, max_sequence_length=256) |
| |
| return pipeline |
|
|
|
|
| from datetime import datetime |
| @torch.inference_mode() |
| def infer(request: TextToImageRequest, pipeline: Pipeline) -> Image: |
| empty_cache() |
| generator = Generator("cuda").manual_seed(request.seed) |
| image=pipeline(request.prompt,generator=generator, guidance_scale=0.0, num_inference_steps=4, max_sequence_length=256, height=request.height, width=request.width, output_type="pil").images[0] |
| return image |