| import os |
| import subprocess |
| from typing import Union |
| from huggingface_hub import whoami |
| is_spaces = True if os.environ.get("SPACE_ID") else False |
|
|
| if is_spaces: |
| subprocess.run('pip install flash-attn --no-build-isolation', env={'FLASH_ATTENTION_SKIP_CUDA_BUILD': "TRUE"}, shell=True) |
| import spaces |
| |
| os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "1" |
| import sys |
|
|
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
| |
| sys.path.insert(0, os.getcwd()) |
|
|
| import gradio as gr |
| from PIL import Image |
| import torch |
| import uuid |
| import os |
| import shutil |
| import json |
| import yaml |
| from slugify import slugify |
| from transformers import AutoProcessor, AutoModelForCausalLM |
|
|
| if not is_spaces: |
| sys.path.insert(0, "ai-toolkit") |
| from toolkit.job import get_job |
| gr.OAuthProfile = None |
| gr.OAuthToken = None |
| |
| MAX_IMAGES = 150 |
|
|
|
|
| def load_captioning(uploaded_files, concept_sentence): |
| uploaded_images = [file for file in uploaded_files if not file.endswith('.txt')] |
| txt_files = [file for file in uploaded_files if file.endswith('.txt')] |
| txt_files_dict = {os.path.splitext(os.path.basename(txt_file))[0]: txt_file for txt_file in txt_files} |
| updates = [] |
| if len(uploaded_images) <= 1: |
| raise gr.Error( |
| "Please upload at least 2 images to train your model (the ideal number with default settings is between 4-30)" |
| ) |
| elif len(uploaded_images) > MAX_IMAGES: |
| raise gr.Error(f"For now, only {MAX_IMAGES} or less images are allowed for training") |
| |
| |
| updates.append(gr.update(visible=True)) |
| |
| for i in range(1, MAX_IMAGES + 1): |
| |
| visible = i <= len(uploaded_images) |
| |
| |
| updates.append(gr.update(visible=visible)) |
|
|
| |
| image_value = uploaded_images[i - 1] if visible else None |
| updates.append(gr.update(value=image_value, visible=visible)) |
| |
| corresponding_caption = False |
| if(image_value): |
| base_name = os.path.splitext(os.path.basename(image_value))[0] |
| print(base_name) |
| print(image_value) |
| if base_name in txt_files_dict: |
| print("entrou") |
| with open(txt_files_dict[base_name], 'r') as file: |
| corresponding_caption = file.read() |
| |
| |
| text_value = corresponding_caption if visible and corresponding_caption else "[trigger]" if visible and concept_sentence else None |
| updates.append(gr.update(value=text_value, visible=visible)) |
|
|
| |
| updates.append(gr.update(visible=True)) |
| |
| updates.append(gr.update(placeholder=f'A portrait of person in a bustling cafe {concept_sentence}', value=f'A person in a bustling cafe {concept_sentence}')) |
| updates.append(gr.update(placeholder=f"A mountainous landscape in the style of {concept_sentence}")) |
| updates.append(gr.update(placeholder=f"A {concept_sentence} in a mall")) |
| return updates |
|
|
| def hide_captioning(): |
| return gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) |
|
|
| def create_dataset(*inputs): |
| print("Creating dataset") |
| images = inputs[0] |
| destination_folder = str(f"datasets/{uuid.uuid4()}") |
| if not os.path.exists(destination_folder): |
| os.makedirs(destination_folder) |
|
|
| jsonl_file_path = os.path.join(destination_folder, "metadata.jsonl") |
| with open(jsonl_file_path, "a") as jsonl_file: |
| for index, image in enumerate(images): |
| new_image_path = shutil.copy(image, destination_folder) |
|
|
| original_caption = inputs[index + 1] |
| file_name = os.path.basename(new_image_path) |
|
|
| data = {"file_name": file_name, "prompt": original_caption} |
|
|
| jsonl_file.write(json.dumps(data) + "\n") |
|
|
| return destination_folder |
|
|
|
|
| def run_captioning(images, concept_sentence, *captions): |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| torch_dtype = torch.float16 |
| model = AutoModelForCausalLM.from_pretrained( |
| "microsoft/Florence-2-large", torch_dtype=torch_dtype, trust_remote_code=True |
| ).to(device) |
| processor = AutoProcessor.from_pretrained("microsoft/Florence-2-large", trust_remote_code=True) |
|
|
| captions = list(captions) |
| for i, image_path in enumerate(images): |
| print(captions[i]) |
| if isinstance(image_path, str): |
| image = Image.open(image_path).convert("RGB") |
|
|
| prompt = "<DETAILED_CAPTION>" |
| inputs = processor(text=prompt, images=image, return_tensors="pt").to(device, torch_dtype) |
|
|
| generated_ids = model.generate( |
| input_ids=inputs["input_ids"], pixel_values=inputs["pixel_values"], max_new_tokens=1024, num_beams=3 |
| ) |
|
|
| generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0] |
| parsed_answer = processor.post_process_generation( |
| generated_text, task=prompt, image_size=(image.width, image.height) |
| ) |
| caption_text = parsed_answer["<DETAILED_CAPTION>"].replace("The image shows ", "") |
| if concept_sentence: |
| caption_text = f"{caption_text} [trigger]" |
| captions[i] = caption_text |
|
|
| yield captions |
| model.to("cpu") |
| del model |
| del processor |
|
|
| if is_spaces: |
| run_captioning = spaces.GPU()(run_captioning) |
|
|
| def recursive_update(d, u): |
| for k, v in u.items(): |
| if isinstance(v, dict) and v: |
| d[k] = recursive_update(d.get(k, {}), v) |
| else: |
| d[k] = v |
| return d |
|
|
| def start_training( |
| lora_name, |
| concept_sentence, |
| which_model, |
| steps, |
| lr, |
| rank, |
| dataset_folder, |
| sample_1, |
| sample_2, |
| sample_3, |
| use_more_advanced_options, |
| more_advanced_options, |
| profile: Union[gr.OAuthProfile, None], |
| oauth_token: Union[gr.OAuthToken, None], |
| ): |
| |
| if not lora_name: |
| raise gr.Error("You forgot to insert your LoRA name! This name has to be unique.") |
| |
| if not is_spaces: |
| try: |
| if whoami()["auth"]["accessToken"]["role"] == "write" or "repo.write" in whoami()["auth"]["accessToken"]["fineGrained"]["scoped"][0]["permissions"]: |
| gr.Info(f"Starting training locally {whoami()['name']}. Your LoRA will be available locally and in Hugging Face after it finishes.") |
| else: |
| raise gr.Error(f"You logged in to Hugging Face with not enough permissions, you need a token that allows writing to your profile.") |
| except: |
| raise gr.Error(f"You logged in to Hugging Face with not enough permissions, you need a token that allows writing to your profile.") |
| |
| print("Started training") |
| slugged_lora_name = slugify(lora_name) |
|
|
| |
| with open("train_lora_flux_24gb.yaml" if is_spaces else "ai-toolkit/config/examples/train_lora_flux_24gb.yaml", "r") as f: |
| config = yaml.safe_load(f) |
|
|
| |
| config["config"]["name"] = slugged_lora_name |
| config["config"]["process"][0]["model"]["low_vram"] = True |
| config["config"]["process"][0]["train"]["skip_first_sample"] = True |
| config["config"]["process"][0]["train"]["steps"] = int(steps) |
| config["config"]["process"][0]["train"]["lr"] = float(lr) |
| config["config"]["process"][0]["network"]["linear"] = int(rank) |
| config["config"]["process"][0]["network"]["linear_alpha"] = int(rank) |
| config["config"]["process"][0]["datasets"][0]["folder_path"] = dataset_folder |
| config["config"]["process"][0]["save"]["push_to_hub"] = True |
| try: |
| username = whoami()["name"] if not is_spaces else profile.username |
| except: |
| raise gr.Error("Error trying to retrieve your username. Are you sure you are logged in with Hugging Face?") |
| config["config"]["process"][0]["save"]["hf_repo_id"] = f"{username}/{slugged_lora_name}" |
| config["config"]["process"][0]["save"]["hf_private"] = True |
| if concept_sentence: |
| config["config"]["process"][0]["trigger_word"] = concept_sentence |
| |
| if sample_1 or sample_2 or sample_3: |
| config["config"]["process"][0]["train"]["disable_sampling"] = False |
| config["config"]["process"][0]["sample"]["sample_every"] = steps |
| config["config"]["process"][0]["sample"]["sample_steps"] = 28 |
| config["config"]["process"][0]["sample"]["prompts"] = [] |
| if sample_1: |
| config["config"]["process"][0]["sample"]["prompts"].append(sample_1) |
| if sample_2: |
| config["config"]["process"][0]["sample"]["prompts"].append(sample_2) |
| if sample_3: |
| config["config"]["process"][0]["sample"]["prompts"].append(sample_3) |
| else: |
| config["config"]["process"][0]["train"]["disable_sampling"] = True |
| |
| if(which_model == "[schnell] (4 step fast model)"): |
| config["config"]["process"][0]["model"]["name_or_path"] = "black-forest-labs/FLUX.1-schnell" |
| config["config"]["process"][0]["model"]["assistant_lora_path"] = "ostris/FLUX.1-schnell-training-adapter" |
| config["config"]["process"][0]["sample"]["sample_steps"] = 4 |
|
|
| if(use_more_advanced_options): |
| more_advanced_options_dict = yaml.safe_load(more_advanced_options) |
| config["config"]["process"][0] = recursive_update(config["config"]["process"][0], more_advanced_options_dict) |
| print(config) |
| |
| |
| |
| random_config_name = str(uuid.uuid4()) |
| os.makedirs("tmp", exist_ok=True) |
| config_path = f"tmp/{random_config_name}-{slugged_lora_name}.yaml" |
| with open(config_path, "w") as f: |
| yaml.dump(config, f) |
| if is_spaces: |
| |
| shutil.copy(config_path, dataset_folder + "/config.yaml") |
| |
| script_location = os.path.dirname(os.path.abspath(__file__)) |
| |
| shutil.copy(script_location + "/script.py", dataset_folder) |
| |
| shutil.copy(script_location + "/requirements.autotrain", dataset_folder + "/requirements.txt") |
| |
| cmd = f"autotrain spacerunner --project-name {slugged_lora_name} --script-path {dataset_folder}" |
| cmd += f" --username {profile.username} --token {oauth_token.token} --backend spaces-l4x1" |
| outcome = subprocess.run(cmd.split()) |
| if outcome.returncode == 0: |
| return f"""# Your training has started. |
| ## - Training Status: <a href='https://huggingface.co/spaces/{profile.username}/autotrain-{slugged_lora_name}?logs=container'>{profile.username}/autotrain-{slugged_lora_name}</a> <small>(in the logs tab)</small> |
| ## - Model page: <a href='https://huggingface.co/{profile.username}/{slugged_lora_name}'>{profile.username}/{slugged_lora_name}</a> <small>(will be available when training finishes)</small>""" |
| else: |
| print("Error: ", outcome.stderr) |
| raise gr.Error("Something went wrong. Make sure the name of your LoRA is unique and try again") |
| else: |
| |
| job = get_job(config_path) |
| job.run() |
| job.cleanup() |
|
|
| return f"Training completed successfully. Model saved as {slugged_lora_name}" |
|
|
| def swap_visibilty(profile: Union[gr.OAuthProfile, None]): |
| if is_spaces: |
| if profile is None: |
| return gr.update(elem_classes=["main_ui_logged_out"]) |
| else: |
| return gr.update(elem_classes=["main_ui_logged_in"]) |
| else: |
| return gr.update(elem_classes=["main_ui_logged_in"]) |
|
|
| def update_pricing(steps, oauth_token: Union[gr.OAuthToken, None]): |
| if(oauth_token and is_spaces): |
| user = whoami(oauth_token.token) |
| seconds_per_iteration = 7.54 |
| total_seconds = (steps * seconds_per_iteration) + 240 |
| cost_per_second = 0.80/60/60 |
| cost = round(cost_per_second * total_seconds, 2) |
| cost_preview = f'''To train this LoRA, a paid L4 GPU will be hooked under the hood during training and then removed once finished. |
| ### Estimated to cost <b>< US$ {str(cost)}</b> for {round(int(total_seconds)/60, 2)} minutes with your current train settings <small>({int(steps)} iterations at {seconds_per_iteration}s/it)</small>''' |
| if(user["canPay"]): |
| return gr.update(visible=True), cost_preview, gr.update(visible=False), gr.update(visible=True) |
| else: |
| pay_disclaimer = f'''## ⚠️ {user["name"]}, your account doesn't have a payment method. Set one up <a href='https://huggingface.co/settings/billing/payment' target='_blank'>here</a> and come back here to train your LoRA<br><br>''' |
| return gr.update(visible=True), pay_disclaimer+cost_preview, gr.update(visible=True), gr.update(visible=False) |
| else: |
| return gr.update(visible=False), "", gr.update(visible=False), gr.update(visible=True) |
|
|
| def swap_base_model(model): |
| return gr.update(visible=True) if model == "[dev] (high quality model, non-commercial license)" else gr.update(visible=False) |
|
|
| config_yaml = ''' |
| device: cuda:0 |
| model: |
| is_flux: true |
| quantize: true |
| network: |
| linear: 16 #it will overcome the 'rank' parameter |
| linear_alpha: 16 #you can have an alpha different than the ranking if you'd like |
| type: lora |
| sample: |
| guidance_scale: 3.5 |
| height: 1024 |
| neg: '' #doesn't work for FLUX |
| sample_every: 1000 |
| sample_steps: 28 |
| sampler: flowmatch |
| seed: 42 |
| walk_seed: true |
| width: 1024 |
| save: |
| dtype: float16 |
| hf_private: true |
| max_step_saves_to_keep: 4 |
| push_to_hub: true |
| save_every: 10000 |
| train: |
| batch_size: 1 |
| dtype: bf16 |
| ema_config: |
| ema_decay: 0.99 |
| use_ema: true |
| gradient_accumulation_steps: 1 |
| gradient_checkpointing: true |
| noise_scheduler: flowmatch |
| optimizer: adamw8bit #options: prodigy, dadaptation, adamw, adamw8bit, lion, lion8bit |
| train_text_encoder: false #probably doesn't work for flux |
| train_unet: true |
| ''' |
|
|
| theme = gr.themes.Monochrome( |
| text_size=gr.themes.Size(lg="18px", md="15px", sm="13px", xl="22px", xs="12px", xxl="24px", xxs="9px"), |
| font=[gr.themes.GoogleFont("Source Sans Pro"), "ui-sans-serif", "system-ui", "sans-serif"], |
| ) |
| css = """ |
| h1{font-size: 2em} |
| h3{margin-top: 0} |
| #component-1{text-align:center} |
| .main_ui_logged_out{opacity: 0.3; pointer-events: none} |
| .tabitem{border: 0px} |
| .group_padding{padding: .55em} |
| #space_model .wrap > label:last-child{opacity: 0.3; pointer-events:none} |
| """ |
| with gr.Blocks(theme=theme, css=css) as demo: |
| gr.Markdown( |
| """# LoRA Ease for FLUX 🧞♂️ |
| ### Train a high quality FLUX LoRA in a breeze ༄ using [Ostris' AI Toolkit](https://github.com/ostris/ai-toolkit) and [AutoTrain Advanced](https://github.com/huggingface/autotrain-advanced)""" |
| ) |
| if is_spaces: |
| gr.LoginButton("Sign in with Hugging Face to train your LoRA on Spaces", visible=is_spaces) |
| with gr.Tab("Train on Spaces" if is_spaces else "Train locally"): |
| with gr.Column() as main_ui: |
| with gr.Group(): |
| with gr.Row(): |
| lora_name = gr.Textbox( |
| label="The name of your LoRA", |
| info="This has to be a unique name", |
| placeholder="e.g.: Persian Miniature Painting style, Cat Toy", |
| ) |
| concept_sentence = gr.Textbox( |
| label="Trigger word/sentence", |
| info="Trigger word or sentence to be used", |
| placeholder="uncommon word like p3rs0n or trtcrd, or sentence like 'in the style of CNSTLL'", |
| interactive=True, |
| ) |
| which_model = gr.Radio(["[schnell] (4 step fast model)", "[dev] (high quality model, non-commercial license - available when training locally)"], label="Which base model to train?", elem_id="space_model" if is_spaces else "local_model", value="[schnell] (4 step fast model)",) |
| model_warning = gr.Markdown("""> [dev] model license is non-commercial. By choosing to fine-tune [dev], you must agree with [its license](https://huggingface.co/black-forest-labs/FLUX.1-dev/blob/main/LICENSE.md) and make sure the LoRA you will train and the training process you would start does not violate it. |
| """, visible=False) |
| with gr.Group(visible=True) as image_upload: |
| with gr.Row(): |
| images = gr.File( |
| file_types=["image", ".txt"], |
| label="Upload your images", |
| file_count="multiple", |
| interactive=True, |
| visible=True, |
| scale=1, |
| ) |
| with gr.Column(scale=3, visible=False) as captioning_area: |
| with gr.Column(): |
| gr.Markdown( |
| """# Custom captioning |
| <p style="margin-top:0">You can optionally add a custom caption for each image (or use an AI model for this). [trigger] will represent your concept sentence/trigger word.</p> |
| """, elem_classes="group_padding") |
| do_captioning = gr.Button("Add AI captions with Florence-2") |
| output_components = [captioning_area] |
| caption_list = [] |
| for i in range(1, MAX_IMAGES + 1): |
| locals()[f"captioning_row_{i}"] = gr.Row(visible=False) |
| with locals()[f"captioning_row_{i}"]: |
| locals()[f"image_{i}"] = gr.Image( |
| type="filepath", |
| width=111, |
| height=111, |
| min_width=111, |
| interactive=False, |
| scale=2, |
| show_label=False, |
| show_share_button=False, |
| show_download_button=False, |
| ) |
| locals()[f"caption_{i}"] = gr.Textbox( |
| label=f"Caption {i}", scale=15, interactive=True |
| ) |
|
|
| output_components.append(locals()[f"captioning_row_{i}"]) |
| output_components.append(locals()[f"image_{i}"]) |
| output_components.append(locals()[f"caption_{i}"]) |
| caption_list.append(locals()[f"caption_{i}"]) |
|
|
| with gr.Accordion("Advanced options", open=False): |
| steps = gr.Number(label="Steps", value=1000, minimum=1, maximum=10000, step=1) |
| lr = gr.Number(label="Learning Rate", value=4e-4, minimum=1e-6, maximum=1e-3, step=1e-6) |
| rank = gr.Number(label="LoRA Rank", value=16, minimum=4, maximum=128, step=4) |
| with gr.Accordion("Even more advanced options", open=False): |
| if(is_spaces): |
| gr.Markdown("Attention: changing this parameters may make your training fail or go out-of-memory if training on Spaces. Only change settings here it if you know what you are doing. Beware that training is done in an L4 GPU with 24GB of RAM") |
| use_more_advanced_options = gr.Checkbox(label="Use more advanced options", value=False) |
| more_advanced_options = gr.Code(config_yaml, language="yaml") |
|
|
| with gr.Accordion("Sample prompts (optional)", visible=False) as sample: |
| gr.Markdown( |
| "Include sample prompts to test out your trained model. Don't forget to include your trigger word/sentence (optional)" |
| ) |
| sample_1 = gr.Textbox(label="Test prompt 1") |
| sample_2 = gr.Textbox(label="Test prompt 2") |
| sample_3 = gr.Textbox(label="Test prompt 3") |
| with gr.Group(visible=False) as cost_preview: |
| cost_preview_info = gr.Markdown(elem_id="cost_preview_info", elem_classes="group_padding") |
| payment_update = gr.Button("I have set up a payment method", visible=False) |
| output_components.append(sample) |
| output_components.append(sample_1) |
| output_components.append(sample_2) |
| output_components.append(sample_3) |
| start = gr.Button("Start training", visible=False) |
| progress_area = gr.Markdown("") |
|
|
| with gr.Tab("Train on your device" if is_spaces else "Instructions"): |
| gr.Markdown(f"""To use FLUX LoRA Ease locally with this UI, you can clone this repository (yes, HF Spaces are git repos!). You'll need ~23GB of VRAM |
| ```bash |
| git clone https://huggingface.co/spaces/autotrain-projects/flux-lora-ease |
| cd flux-lora-ease |
| ## Optional, start a venv environment (install torch first) ## |
| python3 -m venv venv |
| source venv/bin/activate |
| # .\venv\Scripts\activate on windows |
| ## End of optional ## |
| pip install -r requirements_local.txt |
| ``` |
| |
| Then you can install ai-toolkit |
| ```bash |
| git clone https://github.com/ostris/ai-toolkit.git |
| cd ai-toolkit |
| git submodule update --init --recursive |
| pip3 install torch |
| pip3 install -r requirements.txt |
| cd .. |
| ``` |
| |
| Login with Hugging Face to access FLUX.1 [dev], choose a token with `write` permissions to push your LoRAs to the HF Hub |
| ```bash |
| huggingface-cli login |
| ``` |
| |
| Finally, you can run FLUX LoRA Ease locally with a UI by doing a simple |
| ```py |
| python app.py |
| ``` |
| |
| If you prefer command line, you can run Ostris' [AI Toolkit](https://github.com/ostris/ai-toolkit) yourself directly. |
| """ |
| ) |
|
|
| dataset_folder = gr.State() |
|
|
| images.upload( |
| load_captioning, |
| inputs=[images, concept_sentence], |
| outputs=output_components |
| ).then( |
| update_pricing, |
| inputs=[steps], |
| outputs=[cost_preview, cost_preview_info, payment_update, start] |
| ) |
| |
| images.clear( |
| hide_captioning, |
| outputs=[captioning_area, cost_preview, sample, start] |
| ) |
| |
| images.delete( |
| load_captioning, |
| inputs=[images, concept_sentence], |
| outputs=output_components |
| ).then( |
| update_pricing, |
| inputs=[steps], |
| outputs=[cost_preview, cost_preview_info, payment_update, start] |
| ) |
| |
| gr.on( |
| triggers=[steps.change, payment_update.click], |
| fn=update_pricing, |
| inputs=[steps], |
| outputs=[cost_preview, cost_preview_info, payment_update, start] |
| ) |
|
|
| which_model.change( |
| fn=swap_base_model, |
| inputs=which_model, |
| outputs=model_warning |
| ) |
| start.click(fn=create_dataset, inputs=[images] + caption_list, outputs=dataset_folder).then( |
| fn=start_training, |
| inputs=[ |
| lora_name, |
| concept_sentence, |
| which_model, |
| steps, |
| lr, |
| rank, |
| dataset_folder, |
| sample_1, |
| sample_2, |
| sample_3, |
| use_more_advanced_options, |
| more_advanced_options |
| ], |
| outputs=progress_area, |
| ) |
|
|
| do_captioning.click(fn=run_captioning, inputs=[images, concept_sentence] + caption_list, outputs=caption_list) |
| demo.load(fn=swap_visibilty, outputs=main_ui) |
|
|
| if __name__ == "__main__": |
| demo.launch(share=True, show_error=True) |
|
|