| from diffusers import FluxPipeline, AutoencoderKL | |
| from diffusers.image_processor import VaeImageProcessor | |
| from transformers import T5EncoderModel, T5TokenizerFast, CLIPTokenizer, CLIPTextModel | |
| import torch | |
| import gc | |
| from PIL.Image import Image | |
| from pipelines.models import TextToImageRequest | |
| from torch import Generator | |
| Pipeline = None | |
| CHECKPOINT = "black-forest-labs/FLUX.1-schnell" | |
| def empty_cache(): | |
| gc.collect() | |
| torch.cuda.empty_cache() | |
| torch.cuda.reset_max_memory_allocated() | |
| torch.cuda.reset_peak_memory_stats() | |
| def load_pipeline() -> Pipeline: | |
| infer(TextToImageRequest(prompt=""), Pipeline) | |
| return Pipeline | |
| def encode_prompt(prompt: str): | |
| text_encoder = CLIPTextModel.from_pretrained( | |
| CHECKPOINT, | |
| subfolder="text_encoder", | |
| torch_dtype=torch.bfloat16, | |
| ) | |
| text_encoder_2 = T5EncoderModel.from_pretrained( | |
| CHECKPOINT, | |
| subfolder="text_encoder_2", | |
| torch_dtype=torch.bfloat16, | |
| ) | |
| tokenizer = CLIPTokenizer.from_pretrained(CHECKPOINT, subfolder="tokenizer") | |
| tokenizer_2 = T5TokenizerFast.from_pretrained(CHECKPOINT, subfolder="tokenizer_2") | |
| pipeline = FluxPipeline.from_pretrained( | |
| CHECKPOINT, | |
| text_encoder=text_encoder, | |
| text_encoder_2=text_encoder_2, | |
| tokenizer=tokenizer, | |
| tokenizer_2=tokenizer_2, | |
| transformer=None, | |
| vae=None, | |
| ).to("cuda") | |
| with torch.no_grad(): | |
| return pipeline.encode_prompt( | |
| prompt=prompt, | |
| prompt_2=None, | |
| max_sequence_length=256, | |
| ) | |
| def infer_latents(prompt_embeds, pooled_prompt_embeds, width: int | None, height: int | None, seed: int | None): | |
| pipeline = FluxPipeline.from_pretrained( | |
| CHECKPOINT, | |
| text_encoder=None, | |
| text_encoder_2=None, | |
| tokenizer=None, | |
| tokenizer_2=None, | |
| vae=None, | |
| torch_dtype=torch.bfloat16, | |
| ).to("cuda") | |
| if seed is None: | |
| generator = None | |
| else: | |
| generator = Generator(pipeline.device).manual_seed(seed) | |
| return pipeline( | |
| prompt_embeds=prompt_embeds, | |
| pooled_prompt_embeds=pooled_prompt_embeds, | |
| num_inference_steps=4, | |
| guidance_scale=0.0, | |
| width=width, | |
| height=height, | |
| generator=generator, | |
| output_type="latent", | |
| ).images | |
| def infer(request: TextToImageRequest, _pipeline: Pipeline) -> Image: | |
| empty_cache() | |
| prompt_embeds, pooled_prompt_embeds, text_ids = encode_prompt(request.prompt) | |
| empty_cache() | |
| latents = infer_latents(prompt_embeds, pooled_prompt_embeds, request.width, request.height, request.seed) | |
| empty_cache() | |
| vae = AutoencoderKL.from_pretrained( | |
| CHECKPOINT, | |
| subfolder="vae", | |
| torch_dtype=torch.bfloat16, | |
| ).to("cuda") | |
| vae_scale_factor = 2 ** (len(vae.config.block_out_channels)) | |
| image_processor = VaeImageProcessor(vae_scale_factor=vae_scale_factor) | |
| height = request.height or 64 * vae_scale_factor | |
| width = request.width or 64 * vae_scale_factor | |
| with torch.no_grad(): | |
| latents = FluxPipeline._unpack_latents(latents, height, width, vae_scale_factor) | |
| latents = (latents / vae.config.scaling_factor) + vae.config.shift_factor | |
| image = vae.decode(latents, return_dict=False)[0] | |
| return image_processor.postprocess(image, output_type="pil")[0] | |