import gradio as gr import numpy as np import torch, random, json, spaces, time # from safetensors.torch import load_file # from diffusers import AutoencoderKL, FlowMatchEulerDiscreteScheduler # from videox_fun.pipeline import ZImageControlPipeline # from videox_fun.models import ZImageControlTransformer2DModel # from transformers import AutoTokenizer, Qwen3ForCausalLM # from diffusers import AutoencoderKL # from controlnet_aux.processor import Processor from utils import repo_utils, image_utils, prompt_utils repo_utils.clone_repo_if_not_exists("https://github.com/apple/ml-starflow.git", "app/models") repo_utils.clone_repo_if_not_exists("https://huggingface.co/apple/starflow", "app/models") # MODEL_PATH = "models/Z-Image-Turbo/" # CONTROLNET_PATH = "models/Z-Image-Turbo-Fun-Controlnet-Union/Z-Image-Turbo-Fun-Controlnet-Union.safetensors" DTYPE = torch.bfloat16 MAX_SEED = np.iinfo(np.int32).max # # load transformer # transformer = ZImageControlTransformer2DModel.from_pretrained( # MODEL_PATH, # subfolder="transformer", # transformer_additional_kwargs={ # "control_layers_places": [0, 5, 10, 15, 20, 25], # "control_in_dim": 16 # }, # torch_dtype= DTYPE # ).to("cuda") # ## Load controlnet # state_dict = load_file(CONTROLNET_PATH) # state_dict = state_dict["state_dict"] if "state_dict" in state_dict else state_dict # m, u = transformer.load_state_dict(state_dict, strict=False) # print(f"missing keys: {len(m)}, unexpected keys: {len(u)}") # # load ZImageControlPipeline # vae = AutoencoderKL.from_pretrained( # MODEL_PATH, # subfolder="vae", # device_map="cuda", # torch_dtype= DTYPE # ) # tokenizer = AutoTokenizer.from_pretrained( # MODEL_PATH, # subfolder="tokenizer" # ) # text_encoder = Qwen3ForCausalLM.from_pretrained( # MODEL_PATH, # subfolder="text_encoder", # torch_dtype=DTYPE, # ) # scheduler = FlowMatchEulerDiscreteScheduler.from_pretrained( # MODEL_PATH, # subfolder="scheduler" # ) # pipe = ZImageControlPipeline( # vae=vae, # tokenizer=tokenizer, # text_encoder=text_encoder, # transformer=transformer, # scheduler=scheduler, # ) # pipe.to("cuda", DTYPE) # def prepare(prompt, is_polish_prompt): # if not is_polish_prompt: return prompt, False # polished_prompt = prompt_utils.polish_prompt(prompt) # return polished_prompt, True @spaces.GPU def inference( prompt, negative_prompt, seed=42, randomize_seed=True, guidance_scale=1.5, num_inference_steps=8, progress=gr.Progress(track_tqdm=True), ): timestamp = time.time() print(f"timestamp: {timestamp}") # # process image # print("DEBUG: process image") # if input_image is None: # print("Error: input_image is empty.") # return None # print("DEBUG: control_image_torch") # orig_width, orig_height = input_image.size # control_image, width, height = image_utils.rescale_image(input_image, image_scale, 16, 2048) # control_image_torch = image_utils.get_image_latent(control_image, sample_size=[height, width])[:, :, 0] # # generation # if randomize_seed: seed = random.randint(0, MAX_SEED) # generator = torch.Generator().manual_seed(seed) # output_image = pipe( # prompt=prompt, # negative_prompt = negative_prompt, # width=width, # height=height, # generator=generator, # guidance_scale=guidance_scale, # control_image=control_image_torch, # num_inference_steps=num_inference_steps, # control_context_scale=control_context_scale, # ).images[0] # output_image = output_image.resize((orig_width * image_scale, orig_height * image_scale)) # return output_image, seed def read_file(path: str) -> str: with open(path, 'r', encoding='utf-8') as f: content = f.read() return content css = """ #col-container { margin: 0 auto; max-width: 960px; } """ with open('examples/0_examples.json', 'r') as file: examples = json.load(file) with gr.Blocks() as demo: with gr.Column(elem_id="col-container"): with gr.Column(): gr.HTML(read_file("static/header.html")) with gr.Row(): with gr.Column(): prompt = gr.Textbox( label="Prompt", show_label=False, lines=2, placeholder="Enter your prompt", value="a man in a fishing boat. high quality, detailed" # container=False, ) # is_polish_prompt = gr.Checkbox(label="Polish prompt", value=True) # control_mode = gr.Radio( # choices=["Canny", "Depth", "HED", "MLSD", "Pose"], # value="Canny", # label="Control Mode" # ) run_button = gr.Button("Generate", variant="primary") with gr.Accordion("Advanced Settings", open=False): negative_prompt = gr.Textbox( label="Negative prompt", lines=2, container=False, placeholder="Enter your negative prompt", value="blurry, ugly, bad" ) with gr.Row(): num_inference_steps = gr.Slider( label="Steps", minimum=1, maximum=30, step=1, value=9, ) control_context_scale = gr.Slider( label="Context scale", minimum=0.0, maximum=1.0, step=0.01, value=0.75, ) with gr.Row(): guidance_scale = gr.Slider( label="Guidance scale", minimum=0.0, maximum=10.0, step=0.1, value=1.0, ) seed = gr.Slider( label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=42, ) randomize_seed = gr.Checkbox(label="Randomize seed", value=False) with gr.Column(): output_image = gr.Image(label="Generated image", show_label=False) # polished_prompt = gr.Textbox(label="Polished prompt", interactive=False) # with gr.Accordion("Preprocessor output", open=False): # control_image = gr.Image(label="Control image", show_label=False) # gr.Examples(examples=examples, inputs=[input_image]) gr.Markdown(read_file("static/footer.md")) run_button.click( fn=inference, inputs=[ prompt, negative_prompt, seed, randomize_seed, guidance_scale, num_inference_steps, ], outputs=[output_image, seed], ) if __name__ == "__main__": demo.launch(mcp_server=True, css=css)