Spaces:
Runtime error
Runtime error
| import os | |
| import gradio as gr | |
| import numpy as np | |
| import random | |
| import spaces | |
| from diffusers import DiffusionPipeline | |
| import torch | |
| import json | |
| import logging | |
| from diffusers import DiffusionPipeline, AutoencoderTiny, AutoencoderKL | |
| from live_preview_helpers import calculate_shift, retrieve_timesteps, flux_pipe_call_that_returns_an_iterable_of_images | |
| from huggingface_hub import login | |
| from huggingface_hub import hf_hub_download, HfFileSystem, ModelCard, snapshot_download | |
| import copy | |
| import random | |
| import time | |
| import boto3 | |
| from io import BytesIO | |
| from datetime import datetime | |
| from transformers import AutoTokenizer | |
| from diffusers import UNet2DConditionModel | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| login(token=HF_TOKEN) | |
| # init | |
| dtype = torch.bfloat16 | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| base_model = "black-forest-labs/FLUX.1-dev" | |
| # unet = UNet2DConditionModel.from_pretrained( | |
| # base_model, | |
| # torch_dtype=torch.float16, | |
| # use_safetensors=True, | |
| # variant="fp16", | |
| # subfolder="unet", | |
| # # ).to("cuda") | |
| # tokenizer = AutoTokenizer.from_pretrained("openai/clip-vit-base-patch32") | |
| pipe = DiffusionPipeline.from_pretrained(base_model, torch_dtype=dtype).to(device) | |
| MAX_SEED = 2**32-1 | |
| class calculateDuration: | |
| def __init__(self, activity_name=""): | |
| self.activity_name = activity_name | |
| def __enter__(self): | |
| self.start_time = time.time() | |
| return self | |
| def __exit__(self, exc_type, exc_value, traceback): | |
| self.end_time = time.time() | |
| self.elapsed_time = self.end_time - self.start_time | |
| if self.activity_name: | |
| print(f"Elapsed time for {self.activity_name}: {self.elapsed_time:.6f} seconds") | |
| else: | |
| print(f"Elapsed time: {self.elapsed_time:.6f} seconds") | |
| def upload_image_to_r2(image, account_id, access_key, secret_key, bucket_name): | |
| print("upload_image_to_r2", account_id, access_key, secret_key, bucket_name) | |
| connectionUrl = f"https://{account_id}.r2.cloudflarestorage.com" | |
| s3 = boto3.client( | |
| 's3', | |
| endpoint_url=connectionUrl, | |
| region_name='auto', | |
| aws_access_key_id=access_key, | |
| aws_secret_access_key=secret_key | |
| ) | |
| current_time = datetime.now().strftime("%Y/%m/%d/%H%M%S") | |
| image_file = f"generated_images/{current_time}_{random.randint(0, MAX_SEED)}.png" | |
| buffer = BytesIO() | |
| image.save(buffer, "PNG") | |
| buffer.seek(0) | |
| s3.upload_fileobj(buffer, bucket_name, image_file) | |
| print("upload finish", image_file) | |
| return image_file | |
| def generate_image(prompt, steps, seed, cfg_scale, width, height, progress): | |
| pipe.to("cuda") | |
| text_inputs = pipe.tokenizer(prompt, return_tensors="pt").to("cuda") | |
| input_ids = text_inputs.input_ids[0] | |
| # 获取每个主体对应的令牌 ID | |
| boy_token_id = pipe.tokenizer.convert_tokens_to_ids("boy_asia_05") | |
| print(boy_token_id) | |
| girl_token_id = pipe.tokenizer.convert_tokens_to_ids("girl_asia_04") | |
| print(girl_token_id) | |
| # 找到每个主体在输入中的索引位置 | |
| boy_indices = (input_ids == boy_token_id).nonzero(as_tuple=True)[0] | |
| girl_indices = (input_ids == girl_token_id).nonzero(as_tuple=True)[0] | |
| # 准备 cross_attention_kwargs | |
| def attention_control(attention_probs, adapter_name): | |
| # 根据 adapter_name 和令牌索引控制注意力 | |
| print("attention_control", adapter_name) | |
| if adapter_name == "boy_asia_05": | |
| # 对女孩的令牌注意力设为零 | |
| attention_probs[:, :, :, girl_indices] = 0 | |
| elif adapter_name == "girl_asia_04": | |
| # 对男孩的令牌注意力设为零 | |
| attention_probs[:, :, :, boy_indices] = 0 | |
| return attention_probs | |
| joint_attention_kwargs = {"attention_control": attention_control} | |
| generator = torch.Generator(device="cuda").manual_seed(seed) | |
| with calculateDuration("Generating image"): | |
| # Generate image | |
| generate_image = pipe( | |
| prompt=prompt, | |
| num_inference_steps=steps, | |
| guidance_scale=cfg_scale, | |
| width=width, | |
| height=height, | |
| generator=generator, | |
| joint_attention_kwargs=joint_attention_kwargs | |
| ).images[0] | |
| progress(99, "Generate success!") | |
| return generate_image | |
| # 在 Transformer 中,自定义注意力处理器 | |
| class CustomAttentionProcessor(torch.nn.Module): | |
| def __init__(self, attention_control, adapter_name): | |
| super().__init__() | |
| self.attention_control = attention_control | |
| self.adapter_name = adapter_name | |
| def forward(self, attention_probs): | |
| # 调用自定义的注意力控制函数 | |
| attention_probs = self.attention_control(attention_probs, self.adapter_name) | |
| return attention_probs | |
| def run_lora(prompt, cfg_scale, steps, lora_strings, randomize_seed, seed, width, height, lora_scale, upload_to_r2, account_id, access_key, secret_key, bucket, progress=gr.Progress(track_tqdm=True)): | |
| # Load LoRA weights | |
| if lora_strings: | |
| with calculateDuration(f"Loading LoRA weights for {lora_strings}"): | |
| pipe.unload_lora_weights() | |
| lora_array = lora_strings.split(',') | |
| adapter_names = [] | |
| for lora_string in lora_array: | |
| parts = lora_string.split(':') | |
| if len(parts) == 3: | |
| lora_repo, weights, adapter_name = parts | |
| # 调用 pipe.load_lora_weights() 方法加载权重 | |
| pipe.load_lora_weights(lora_repo, weight_name=weights, adapter_name=adapter_name) | |
| adapter_names.append(adapter_name) | |
| else: | |
| print(f"Invalid format for lora_string: {lora_string}") | |
| adapter_weights = [lora_scale] * len(adapter_names) | |
| # 调用 pipeline.set_adapters 方法设置 adapter 和对应权重 | |
| pipe.set_adapters(adapter_names, adapter_weights=adapter_weights) | |
| # Set random seed for reproducibility | |
| if randomize_seed: | |
| seed = random.randint(0, MAX_SEED) | |
| final_image = generate_image(prompt, steps, seed, cfg_scale, width, height, progress) | |
| if upload_to_r2: | |
| with calculateDuration("upload r2"): | |
| url = upload_image_to_r2(final_image, account_id, access_key, secret_key, bucket) | |
| result = {"status": "success", "url": url} | |
| else: | |
| result = {"status": "success", "message": "Image generated but not uploaded"} | |
| progress(100, "Completed!") | |
| yield final_image, seed, json.dumps(result) | |
| css=""" | |
| #col-container { | |
| margin: 0 auto; | |
| max-width: 640px; | |
| } | |
| """ | |
| with gr.Blocks(css=css) as demo: | |
| gr.Markdown("Flux with lora") | |
| with gr.Row(): | |
| with gr.Column(): | |
| prompt = gr.Text(label="Prompt", show_label=False, max_lines=1, placeholder="Enter your prompt", container=False) | |
| lora_strings = gr.Text( label="lora_strings", max_lines=1, placeholder="Enter a lora strings", visible=True) | |
| run_button = gr.Button("Run", scale=0) | |
| with gr.Accordion("Advanced Settings", open=False): | |
| with gr.Row(): | |
| seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=0, randomize=True) | |
| randomize_seed = gr.Checkbox(label="Randomize seed", value=True) | |
| lora_scale = gr.Slider(label="LoRA Scale", minimum=0, maximum=1, step=0.01, value=0.5) | |
| with gr.Row(): | |
| width = gr.Slider(label="Width", minimum=256, maximum=1536, step=64, value=1024) | |
| height = gr.Slider(label="Height", minimum=256, maximum=1536, step=64, value=1024) | |
| with gr.Row(): | |
| cfg_scale = gr.Slider(label="CFG Scale", minimum=1, maximum=20, step=0.5, value=3.5) | |
| steps = gr.Slider(label="Steps", minimum=1, maximum=50, step=1, value=28) | |
| upload_to_r2 = gr.Checkbox(label="Upload to R2", value=False) | |
| account_id = gr.Textbox(label="Account Id", placeholder="Enter R2 account id") | |
| access_key = gr.Textbox(label="Access Key", placeholder="Enter R2 access key here") | |
| secret_key = gr.Textbox(label="Secret Key", placeholder="Enter R2 secret key here") | |
| bucket = gr.Textbox(label="Bucket Name", placeholder="Enter R2 bucket name here") | |
| with gr.Column(): | |
| result = gr.Image(label="Result", show_label=False) | |
| json_text = gr.Text() | |
| gr.on( | |
| triggers=[run_button.click, prompt.submit], | |
| fn = run_lora, | |
| inputs = [prompt, cfg_scale, steps, lora_strings, randomize_seed, seed, width, height, lora_scale, upload_to_r2, account_id, access_key, secret_key, bucket], | |
| outputs=[result, seed, json_text] | |
| ) | |
| demo.queue().launch() |