File size: 36,634 Bytes
93772d0
46e910a
93772d0
 
 
 
94ea93d
93772d0
 
5866927
93772d0
 
 
 
 
 
 
 
 
46983e8
93772d0
46983e8
f278e43
 
93772d0
 
286e141
94ea93d
bbf4ff5
93772d0
46e910a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f278e43
 
 
 
93772d0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e963edc
93772d0
 
 
46e910a
e963edc
46e910a
46983e8
 
bd7033b
46983e8
93772d0
94ea93d
 
 
 
 
 
 
 
46e910a
 
 
 
 
 
 
e963edc
 
 
 
 
 
 
93772d0
 
 
 
 
 
 
94ea93d
e963edc
94ea93d
 
 
 
 
 
 
 
 
e963edc
 
94ea93d
 
 
 
 
 
e963edc
 
 
 
 
 
 
bdcd109
 
 
 
 
94ea93d
 
 
e963edc
94ea93d
 
 
 
 
93772d0
e963edc
93772d0
 
46e910a
 
e963edc
 
 
 
 
 
 
 
93772d0
 
 
e963edc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93772d0
 
e963edc
93772d0
 
 
 
70e4b9d
93772d0
e963edc
edfb0d8
93772d0
edfb0d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93772d0
 
 
46983e8
94ea93d
93772d0
 
46983e8
 
 
a5332c9
 
93772d0
46e910a
93772d0
f278e43
 
 
93772d0
 
 
 
46e910a
a5332c9
46e910a
93772d0
 
 
46e910a
a5332c9
46e910a
a5332c9
93772d0
46e910a
 
f278e43
93772d0
 
46e910a
 
93772d0
 
 
 
94ea93d
93772d0
 
 
 
 
46e910a
a5332c9
93772d0
 
 
 
46e910a
93772d0
 
46e910a
 
93772d0
46e910a
 
93772d0
94ea93d
 
93772d0
 
 
 
 
94ea93d
f278e43
93772d0
 
 
 
46983e8
94ea93d
93772d0
 
94ea93d
93772d0
94ea93d
 
46983e8
 
93772d0
 
 
46983e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bd7033b
46983e8
 
 
 
 
 
bd7033b
46983e8
bd7033b
 
 
 
46983e8
 
 
 
 
 
 
 
 
 
 
 
 
bd7033b
46983e8
 
 
93772d0
f278e43
93772d0
 
94ea93d
 
93772d0
 
46983e8
 
 
 
 
93772d0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94ea93d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93772d0
 
 
 
 
 
 
 
 
 
 
 
 
e963edc
94ea93d
 
 
 
 
93772d0
 
94ea93d
 
e963edc
 
 
 
 
 
94ea93d
93772d0
 
edfb0d8
 
 
 
 
 
 
93772d0
bd7033b
 
 
93772d0
 
 
e963edc
 
93772d0
e963edc
93772d0
bd7033b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93772d0
 
 
 
 
 
 
 
 
 
 
 
 
d96ae75
93772d0
 
 
 
e963edc
d96ae75
 
 
 
 
93772d0
 
 
 
 
 
 
bdcd109
a5332c9
 
46983e8
 
f278e43
286e141
e963edc
286e141
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a5332c9
 
 
 
 
 
 
 
 
 
 
 
93772d0
 
 
 
 
 
 
d96ae75
93772d0
94ea93d
 
 
 
 
 
93772d0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e963edc
 
93772d0
d96ae75
93772d0
94ea93d
93772d0
 
e963edc
94ea93d
 
 
 
 
93772d0
 
 
 
 
 
 
 
 
 
 
 
 
 
46e910a
93772d0
46e910a
e963edc
 
46e910a
 
 
 
 
 
e963edc
 
 
 
 
 
286e141
 
 
 
 
 
 
e963edc
 
 
46983e8
 
 
 
 
 
 
 
93772d0
 
8f553ed
93772d0
bd7033b
614a465
93772d0
 
 
8f553ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bd7033b
 
8f553ed
bbf4ff5
614a465
 
 
 
 
93772d0
 
 
d96ae75
93772d0
 
 
 
 
 
 
 
 
 
 
 
 
 
e963edc
 
 
 
94ea93d
e963edc
94ea93d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e963edc
94ea93d
 
 
 
93772d0
 
46e910a
 
93772d0
 
 
 
 
 
46e910a
93772d0
 
 
 
d96ae75
93772d0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94ea93d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93772d0
 
 
5866927
93772d0
 
8f553ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93772d0
5866927
93772d0
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
import os
import importlib.util
import random
import re
import threading
import warnings
import inspect
from typing import List, Tuple

import gradio as gr
import spaces
import torch
from diffusers import AutoencoderKL, FlowMatchEulerDiscreteScheduler, ZImagePipeline
from diffusers.models.transformers.transformer_z_image import ZImageTransformer2DModel
from transformers import AutoModelForCausalLM, AutoTokenizer

MODEL_PATH = os.environ.get("MODEL_PATH", "Tongyi-MAI/Z-Image-Turbo")
LORA_PATH = os.environ.get("LORA_PATH", os.path.join("lora", "zit-mystic-xxx.safetensors"))
HF_TOKEN = os.environ.get("HF_TOKEN")
ENABLE_COMPILE = os.environ.get("ENABLE_COMPILE", "true").lower() == "true"
ENABLE_WARMUP = os.environ.get("ENABLE_WARMUP", "false").lower() == "true"
ATTENTION_BACKEND = os.environ.get("ATTENTION_BACKEND", "flash_3")
OFFLOAD_TO_CPU_AFTER_RUN = os.environ.get("OFFLOAD_TO_CPU_AFTER_RUN", "false").lower() == "true"
ENABLE_AOTI = os.environ.get("ENABLE_AOTI", "true").lower() == "true"
AOTI_REPO = os.environ.get("AOTI_REPO", "zerogpu-aoti/Z-Image")
AOTI_VARIANT = os.environ.get("AOTI_VARIANT", "fa3")
AOTI_ALLOW_LORA = os.environ.get("AOTI_ALLOW_LORA", "false").lower() == "true"
DEFAULT_CFG = float(os.environ.get("DEFAULT_CFG", "0.0"))
SHOW_STATUS_PANEL = os.environ.get("DEBUG", "false").lower() == "true"


def resolve_model_dtype() -> torch.dtype:
    override = os.environ.get("MODEL_DTYPE")
    if override:
        key = override.strip().lower()
        if key in {"bf16", "bfloat16"}:
            return torch.bfloat16
        if key in {"fp16", "float16", "half"}:
            return torch.float16
        if key in {"fp32", "float32"}:
            return torch.float32
        print(f"Unknown MODEL_DTYPE={override!r}; falling back to auto.")

    if torch.cuda.is_available():
        is_bf16_supported = getattr(torch.cuda, "is_bf16_supported", None)
        if callable(is_bf16_supported) and is_bf16_supported():
            return torch.bfloat16
        return torch.float16
    return torch.float32


def dtype_label(dtype: torch.dtype) -> str:
    if dtype == torch.bfloat16:
        return "bf16"
    if dtype == torch.float16:
        return "fp16"
    if dtype == torch.float32:
        return "fp32"
    return str(dtype).replace("torch.", "")


def get_gpu_summary() -> str:
    if not torch.cuda.is_available():
        return "CPU"
    try:
        name = torch.cuda.get_device_name(0)
        major, minor = torch.cuda.get_device_capability(0)
        return f"{name} (cc {major}.{minor})"
    except Exception:
        return "CUDA"


MODEL_DTYPE = resolve_model_dtype()
MODEL_DTYPE_LABEL = dtype_label(MODEL_DTYPE)
GPU_SUMMARY = get_gpu_summary()

if torch.cuda.is_available():
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.set_float32_matmul_precision("high")

warnings.filterwarnings("ignore")
os.environ["TOKENIZERS_PARALLELISM"] = "false"

RES_CHOICES = {
    "1024": [
        "1024x1024 ( 1:1 )",
        "1152x896 ( 9:7 )",
        "896x1152 ( 7:9 )",
        "1152x864 ( 4:3 )",
        "864x1152 ( 3:4 )",
        "1248x832 ( 3:2 )",
        "832x1248 ( 2:3 )",
        "1280x720 ( 16:9 )",
        "720x1280 ( 9:16 )",
        "1344x576 ( 21:9 )",
        "576x1344 ( 9:21 )",
    ],
    "1280": [
        "1280x1280 ( 1:1 )",
        "1440x1120 ( 9:7 )",
        "1120x1440 ( 7:9 )",
        "1472x1104 ( 4:3 )",
        "1104x1472 ( 3:4 )",
        "1536x1024 ( 3:2 )",
        "1024x1536 ( 2:3 )",
        "1536x864 ( 16:9 )",
        "864x1536 ( 9:16 )",
        "1680x720 ( 21:9 )",
        "720x1680 ( 9:21 )",
    ],
    "1536": [
        "1536x1536 ( 1:1 )",
        "1728x1344 ( 9:7 )",
        "1344x1728 ( 7:9 )",
        "1728x1296 ( 4:3 )",
        "1296x1728 ( 3:4 )",
        "1872x1248 ( 3:2 )",
        "1248x1872 ( 2:3 )",
        "2048x1152 ( 16:9 )",
        "1152x2048 ( 9:16 )",
        "2016x864 ( 21:9 )",
        "864x2016 ( 9:21 )",
    ],
}

RESOLUTION_SET: List[str] = []
for resolutions in RES_CHOICES.values():
    RESOLUTION_SET.extend(resolutions)

EXAMPLE_PROMPTS = [
    ["一位男士和他的贵宾犬穿着配套的服装参加狗狗秀,室内灯光,背景中有观众。"],
    [
        "极具氛围感的暗调人像,一位优雅的中国美女在黑暗的房间里。一束强光通过遮光板,在她的脸上投射出一个清晰的闪电形状的光影,正好照亮一只眼睛。高对比度,明暗交界清晰,神秘感,莱卡相机色调。"
    ],
    [
        "一张中景手机自拍照片拍摄了一位留着长黑发的年轻东亚女子在灯光明亮的电梯内对着镜子自拍。她穿着一件带有白色花朵图案的黑色露肩短上衣和深色牛仔裤。她的头微微倾斜,嘴唇嘟起做亲吻状,非常可爱俏皮。她右手拿着一部深灰色智能手机,遮住了部分脸,后置摄像头镜头对着镜子"
    ],
    [
        "Young Chinese woman in red Hanfu, intricate embroidery. Impeccable makeup, red floral forehead pattern. Elaborate high bun, golden phoenix headdress, red flowers, beads. Holds round folding fan with lady, trees, bird. Neon lightning-bolt lamp (⚡️), bright yellow glow, above extended left palm. Soft-lit outdoor night background, silhouetted tiered pagoda (西安大雁塔), blurred colorful distant lights."
    ],
    [
        '''A vertical digital illustration depicting a serene and majestic Chinese landscape, rendered in a style reminiscent of traditional Shanshui painting but with a modern, clean aesthetic. The scene is dominated by towering, steep cliffs in various shades of blue and teal, which frame a central valley. In the distance, layers of mountains fade into a light blue and white mist, creating a strong sense of atmospheric perspective and depth. A calm, turquoise river flows through the center of the composition, with a small, traditional Chinese boat, possibly a sampan, navigating its waters. The boat has a bright yellow canopy and a red hull, and it leaves a gentle wake behind it. It carries several indistinct figures of people. Sparse vegetation, including green trees and some bare-branched trees, clings to the rocky ledges and peaks. The overall lighting is soft and diffused, casting a tranquil glow over the entire scene. Centered in the image is overlaid text. At the top of the text block is a small, red, circular seal-like logo containing stylized characters. Below it, in a smaller, black, sans-serif font, are the words "Zao-Xiang * East Beauty & West Fashion * Z-Image". Directly beneath this, in a larger, elegant black serif font, is the word "SHOW & SHARE CREATIVITY WITH THE WORLD". Among them, there are "SHOW & SHARE", "CREATIVITY", and "WITH THE WORLD"'''
    ],
    [
        """一张虚构的英语电影《回忆之味》(The Taste of Memory)的电影海报。场景设置在一个质朴的19世纪风格厨房里。画面中央,一位红棕色头发、留着小胡子的中年男子(演员阿瑟·彭哈利根饰)站在一张木桌后,他身穿白色衬衫、黑色马甲和米色围裙,正看着一位女士,手中拿着一大块生红肉,下方是一个木制切菜板。在他的右边,一位梳着高髻的黑发女子(演员埃莉诺·万斯饰)倚靠在桌子上,温柔地对他微笑。她穿着浅色衬衫和一条上白下蓝的长裙。桌上除了放有切碎的葱和卷心菜丝的切菜板外,还有一个白色陶瓷盘、新鲜香草,左侧一个木箱上放着一串深色葡萄。背景是一面粗糙的灰白色抹灰墙,墙上挂着一幅风景画。最右边的一个台面上放着一盏复古油灯。海报上有大量的文字信息。左上角是白色的无衬线字体"ARTISAN FILMS PRESENTS",其下方是"ELEANOR VANCE"和"ACADEMY AWARD® WINNER"。右上角写着"ARTHUR PENHALIGON"和"GOLDEN GLOBE® AWARD WINNER"。顶部中央是圣丹斯电影节的桂冠标志,下方写着"SUNDANCE FILM FESTIVAL GRAND JURY PRIZE 2024"。主标题"THE TASTE OF MEMORY"以白色的大号衬线字体醒目地显示在下半部分。标题下方注明了"A FILM BY Tongyi Interaction Lab"。底部区域用白色小字列出了完整的演职员名单,包括"SCREENPLAY BY ANNA REID"、"CULINARY DIRECTION BY JAMES CARTER"以及Artisan Films、Riverstone Pictures和Heritage Media等众多出品公司标志。整体风格是写实主义,采用温暖柔和的灯光方案,营造出一种亲密的氛围。色调以棕色、米色和柔和的绿色等大地色系为主。两位演员的身体都在腰部被截断。"""
    ],
    [
        """一张方形构图的特写照片,主体是一片巨大的、鲜绿色的植物叶片,并叠加了文字,使其具有海报或杂志封面的外观。主要拍摄对象是一片厚实、有蜡质感的叶子,从左下角到右上角呈对角线弯曲穿过画面。其表面反光性很强,捕捉到一个明亮的直射光源,形成了一道突出的高光,亮面下显露出平行的精细叶脉。背景由其他深绿色的叶子组成,这些叶子轻微失焦,营造出浅景深效果,突出了前景的主叶片。整体风格是写实摄影,明亮的叶片与黑暗的阴影背景之间形成高对比度。图像上有多处渲染文字。左上角是白色的衬线字体文字"PIXEL-PEEPERS GUILD Presents"。右上角同样是白色衬线字体的文字"[Instant Noodle] 泡面调料包"。左侧垂直排列着标题"Render Distance: Max",为白色衬线字体。左下角是五个硕大的白色宋体汉字"显卡在...燃烧"。右下角是较小的白色衬线字体文字"Leica Glow™ Unobtanium X-1",其正上方是用白色宋体字书写的名字"蔡几"。识别出的核心实体包括品牌像素偷窥者协会、其产品线泡面调料包、相机型号买不到™ X-1以及摄影师名字造相。"""
    ],
]

pipe: ZImagePipeline | None = None
lora_loaded: bool = False
lora_error: str | None = None
lora_adapter_name: str | None = None
pipe_lock = threading.Lock()
pipe_on_gpu: bool = False
aoti_loaded: bool = False
applied_attention_backend: str | None = None
attention_backend_error: str | None = None
aoti_error: str | None = None
transformer_compiled: bool = False
transformer_compile_attempted: bool = False
compile_error: str | None = None
inductor_configured: bool = False

SCHEDULERS = {"FlowMatch Euler": FlowMatchEulerDiscreteScheduler}
try:
    from diffusers import FlowMatchHeunDiscreteScheduler  # type: ignore

    SCHEDULERS["FlowMatch Heun"] = FlowMatchHeunDiscreteScheduler
except Exception:
    pass

def module_available(module_name: str) -> bool:
    try:
        return importlib.util.find_spec(module_name) is not None
    except (ImportError, ValueError):
        return False


def summarize_error(message: str, *, max_len: int = 120) -> str:
    one_line = " ".join(str(message).split())
    if len(one_line) <= max_len:
        return one_line
    return one_line[: max_len - 1] + "…"


def parse_resolution(resolution: str) -> Tuple[int, int]:
    match = re.search(r"(\d+)\s*[×x]\s*(\d+)", resolution)
    if match:
        return int(match.group(1)), int(match.group(2))
    return 1024, 1024


def set_attention_backend_safe(transformer, backend: str) -> str:
    global attention_backend_error
    candidates: List[str] = []
    if backend:
        candidates.append(backend)
        if backend.startswith("_"):
            candidates.append(backend.lstrip("_"))
        else:
            candidates.append(f"_{backend}")
    candidates.extend(["flash", "xformers", "native"])

    attention_backend_error = None
    errors: dict[str, Exception] = {}
    last_exc: Exception | None = None
    for name in candidates:
        if not name:
            continue
        try:
            transformer.set_attention_backend(name)
            if backend and name != backend:
                for key in (backend, backend.lstrip("_"), f"_{backend}"):
                    if key in errors:
                        attention_backend_error = str(errors[key])
                        break
                if attention_backend_error is None and last_exc is not None:
                    attention_backend_error = str(last_exc)
                if attention_backend_error:
                    print(
                        f"Requested attention backend {backend!r} failed; using {name!r} instead. "
                        f"Reason: {attention_backend_error}"
                    )
            return name
        except Exception as exc:  # noqa: BLE001
            last_exc = exc
            errors[name] = exc
            continue

    raise RuntimeError(f"Failed to set attention backend (tried {candidates}): {last_exc}")


def attach_lora(pipeline: ZImagePipeline) -> Tuple[bool, str | None]:
    global lora_adapter_name
    if not LORA_PATH or not os.path.isfile(LORA_PATH):
        return False, "LoRA file not found"
    if not module_available("peft"):
        return False, "PEFT backend is required for LoRA. Install `peft` and restart."

    def extract_present_adapter_names(exc: Exception) -> List[str]:
        msg = str(exc)
        match = re.search(r"present adapters:\s*(\{[^}]*\})", msg)
        if not match:
            return []
        return re.findall(r"'([^']+)'", match.group(1))

    try:
        folder, weight_name = os.path.split(LORA_PATH)
        folder = folder or "."
        preferred_adapter = os.environ.get("LORA_ADAPTER_NAME", "default")
        lora_adapter_name = preferred_adapter
        try:
            pipeline.load_lora_weights(folder, weight_name=weight_name, adapter_name=preferred_adapter)
        except TypeError:
            pipeline.load_lora_weights(folder, weight_name=weight_name)

        try:
            set_lora_scale(pipeline, 1.0)
        except Exception as exc:  # noqa: BLE001
            adapter_names = extract_present_adapter_names(exc)
            if adapter_names:
                lora_adapter_name = adapter_names[0]
                set_lora_scale(pipeline, 1.0)
            else:
                raise
        return True, None
    except Exception as exc:  # noqa: BLE001
        lora_adapter_name = None
        return False, f"Failed to load LoRA: {exc}"


def set_lora_scale(pipeline: ZImagePipeline, scale: float) -> None:
    global lora_adapter_name
    weight = max(float(scale), 0.0)
    adapter = lora_adapter_name or "default"
    compiled_transformer = None
    try:
        transformer = getattr(pipeline, "transformer", None)
        if transformer is not None and hasattr(transformer, "_orig_mod"):
            compiled_transformer = transformer
            pipeline.transformer = transformer._orig_mod

        try:
            pipeline.set_adapters([adapter], adapter_weights=[weight])
        except TypeError:
            pipeline.set_adapters([adapter], weights=[weight])
        except ValueError as exc:
            msg = str(exc)
            present_match = re.search(r"present adapters:\s*(\{[^}]*\}|set\([^)]*\))", msg)
            if present_match:
                present_names = re.findall(r"'([^']+)'", present_match.group(1))
            else:
                present_names = []
            if present_names:
                lora_adapter_name = present_names[0]
                adapter = lora_adapter_name
                try:
                    pipeline.set_adapters([adapter], adapter_weights=[weight])
                except TypeError:
                    pipeline.set_adapters([adapter], weights=[weight])
            else:
                raise
    finally:
        if compiled_transformer is not None:
            pipeline.transformer = compiled_transformer


def load_models() -> Tuple[ZImagePipeline, bool, str | None]:
    global pipe, lora_loaded, lora_error, pipe_on_gpu, applied_attention_backend, transformer_compiled, transformer_compile_attempted
    if pipe is not None and getattr(pipe, "transformer", None) is not None:
        return pipe, lora_loaded, lora_error

    transformer_compiled = False
    transformer_compile_attempted = False

    use_auth_token = HF_TOKEN if HF_TOKEN else None
    hf_kwargs = {"use_auth_token": use_auth_token} if use_auth_token else {}
    print(f"Loading Z-Image from {MODEL_PATH}...")
    print(f"GPU: {GPU_SUMMARY} | dtype: {MODEL_DTYPE_LABEL}")

    if not torch.cuda.is_available():
        raise RuntimeError("CUDA is not available. This app requires a GPU.")

    if not os.path.exists(MODEL_PATH):
        vae = AutoencoderKL.from_pretrained(
            MODEL_PATH,
            subfolder="vae",
            torch_dtype=MODEL_DTYPE,
            **hf_kwargs,
        ).to("cuda", MODEL_DTYPE)
        text_encoder = AutoModelForCausalLM.from_pretrained(
            MODEL_PATH,
            subfolder="text_encoder",
            torch_dtype=MODEL_DTYPE,
            **hf_kwargs,
        ).to("cuda", MODEL_DTYPE).eval()
        tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, subfolder="tokenizer", **hf_kwargs)
    else:
        vae = AutoencoderKL.from_pretrained(os.path.join(MODEL_PATH, "vae"), torch_dtype=MODEL_DTYPE).to(
            "cuda", MODEL_DTYPE
        )
        text_encoder = AutoModelForCausalLM.from_pretrained(
            os.path.join(MODEL_PATH, "text_encoder"),
            torch_dtype=MODEL_DTYPE,
        ).to("cuda", MODEL_DTYPE).eval()
        tokenizer = AutoTokenizer.from_pretrained(os.path.join(MODEL_PATH, "tokenizer"))

    tokenizer.padding_side = "left"

    pipeline = ZImagePipeline(scheduler=None, vae=vae, text_encoder=text_encoder, tokenizer=tokenizer, transformer=None)

    if not os.path.exists(MODEL_PATH):
        transformer = ZImageTransformer2DModel.from_pretrained(
            MODEL_PATH,
            subfolder="transformer",
            torch_dtype=MODEL_DTYPE,
            **hf_kwargs,
        )
    else:
        transformer = ZImageTransformer2DModel.from_pretrained(
            os.path.join(MODEL_PATH, "transformer"),
            torch_dtype=MODEL_DTYPE,
        )

    applied_attention_backend = set_attention_backend_safe(transformer, ATTENTION_BACKEND)
    print(f"Attention backend: {applied_attention_backend}")

    pipeline.transformer = transformer.to("cuda", MODEL_DTYPE)
    pipeline.to("cuda", MODEL_DTYPE)

    loaded, error = attach_lora(pipeline)
    lora_loaded, lora_error = loaded, error
    if lora_error:
        print(lora_error)
    else:
        print(f"LoRA loaded: {lora_loaded} ({LORA_PATH})")

    pipe = pipeline
    pipe_on_gpu = True
    return pipe, lora_loaded, lora_error


def ensure_models_loaded() -> Tuple[ZImagePipeline, bool, str | None]:
    global pipe, pipe_on_gpu, transformer_compiled, transformer_compile_attempted
    if pipe is not None and getattr(pipe, "transformer", None) is not None:
        return pipe, lora_loaded, lora_error
    with pipe_lock:
        if pipe is not None and getattr(pipe, "transformer", None) is not None:
            return pipe, lora_loaded, lora_error
        pipe = None
        pipe_on_gpu = False
        transformer_compiled = False
        transformer_compile_attempted = False
        return load_models()


def configure_inductor_for_compile() -> None:
    global inductor_configured
    if inductor_configured:
        return
    try:
        torch._inductor.config.conv_1x1_as_mm = True
        torch._inductor.config.coordinate_descent_tuning = True
        torch._inductor.config.epilogue_fusion = False
        torch._inductor.config.coordinate_descent_check_all_directions = True
        torch._inductor.config.max_autotune_gemm = True
        torch._inductor.config.max_autotune_gemm_backends = "TRITON,ATEN"
        torch._inductor.config.triton.cudagraphs = False
    except Exception as exc:  # noqa: BLE001
        print(f"torch.compile inductor config failed (continuing): {exc}")
    inductor_configured = True


def maybe_compile_transformer() -> None:
    global transformer_compiled, transformer_compile_attempted, compile_error
    if not ENABLE_COMPILE or transformer_compile_attempted:
        return
    if pipe is None or getattr(pipe, "transformer", None) is None:
        return

    transformer_compile_attempted = True
    compile_error = None
    configure_inductor_for_compile()
    try:
        torch._dynamo.config.suppress_errors = True
    except Exception:  # noqa: BLE001
        pass

    try:
        if getattr(pipe, "vae", None) is not None and hasattr(pipe.vae, "disable_tiling"):
            pipe.vae.disable_tiling()
    except Exception:  # noqa: BLE001
        pass

    try:
        print("Compiling transformer (torch.compile)...")
        pipe.transformer = torch.compile(pipe.transformer, mode="max-autotune-no-cudagraphs", fullgraph=False)
        transformer_compiled = True
    except Exception as exc:  # noqa: BLE001
        transformer_compiled = False
        compile_error = str(exc)
        print(f"torch.compile failed (continuing without compile): {exc}")


def ensure_on_gpu() -> None:
    global pipe_on_gpu
    if pipe is None:
        raise gr.Error("Model not loaded.")
    if getattr(pipe, "transformer", None) is None:
        raise gr.Error("Model init failed (transformer missing). Check startup logs.")
    if not torch.cuda.is_available():
        raise gr.Error("CUDA is not available. This Space requires a GPU.")
    if not pipe_on_gpu:
        print("Moving model to GPU...")
        pipe.to("cuda", MODEL_DTYPE)
        pipe_on_gpu = True
    maybe_compile_transformer()


def offload_to_cpu() -> None:
    global pipe_on_gpu
    if pipe is None:
        return
    if not pipe_on_gpu:
        return
    print("Offloading model to CPU...")
    pipe.to("cpu")
    pipe_on_gpu = False
    if torch.cuda.is_available():
        torch.cuda.empty_cache()


def make_scheduler(scheduler_cls, **kwargs):
    sig = inspect.signature(scheduler_cls.__init__)
    accepted = set(sig.parameters.keys())
    accepted.discard("self")
    filtered = {k: v for k, v in kwargs.items() if k in accepted and v is not None}
    return scheduler_cls(**filtered)


def set_scheduler(
    pipeline: ZImagePipeline,
    scheduler_name: str,
    *,
    num_train_timesteps: int,
    shift: float,
    use_dynamic_shifting: bool,
    base_shift: float,
    max_shift: float,
) -> None:
    scheduler_cls = SCHEDULERS.get(scheduler_name, FlowMatchEulerDiscreteScheduler)
    scheduler = make_scheduler(
        scheduler_cls,
        num_train_timesteps=int(num_train_timesteps),
        shift=float(shift),
        use_dynamic_shifting=bool(use_dynamic_shifting),
        base_shift=float(base_shift),
        max_shift=float(max_shift),
    )
    pipeline.scheduler = scheduler


def generate_image(
    pipeline: ZImagePipeline,
    prompt: str,
    resolution: str,
    seed: int,
    steps: int,
    shift: float,
    guidance_scale: float,
    use_lora: bool,
    lora_scale: float,
    max_sequence_length: int,
    scheduler_name: str,
    num_train_timesteps: int,
    use_dynamic_shifting: bool,
    base_shift: float,
    max_shift: float,
) -> Tuple[torch.Tensor, int]:
    width, height = parse_resolution(resolution)
    set_scheduler(
        pipeline,
        str(scheduler_name),
        num_train_timesteps=int(num_train_timesteps),
        shift=float(shift),
        use_dynamic_shifting=bool(use_dynamic_shifting),
        base_shift=float(base_shift),
        max_shift=float(max_shift),
    )

    if lora_loaded:
        try:
            if use_lora:
                set_lora_scale(pipeline, float(lora_scale))
            else:
                set_lora_scale(pipeline, 0.0)
        except Exception as exc:  # noqa: BLE001
            print(f"LoRA scale update failed (continuing without changing LoRA state): {exc}")

    def run_pipeline() -> torch.Tensor:
        generator = torch.Generator("cuda").manual_seed(seed)
        return pipeline(
            prompt=prompt,
            height=height,
            width=width,
            guidance_scale=float(guidance_scale),
            num_inference_steps=int(steps),
            generator=generator,
            max_sequence_length=int(max_sequence_length),
        ).images[0]

    try:
        with torch.inference_mode():
            image = run_pipeline()
    except Exception as exc:  # noqa: BLE001
        transformer = getattr(pipeline, "transformer", None)
        message = str(exc)
        is_dynamo_error = isinstance(exc, AssertionError) or "torch._dynamo" in message or "ConstantVariable" in message
        if transformer is not None and hasattr(transformer, "_orig_mod") and is_dynamo_error:
            global transformer_compiled, transformer_compile_attempted, compile_error
            compile_error = message
            transformer_compiled = False
            transformer_compile_attempted = True
            pipeline.transformer = transformer._orig_mod
            print(f"torch.compile runtime failed; falling back to eager: {exc}")
            with torch.inference_mode():
                image = run_pipeline()
        else:
            raise
    return image, seed


def warmup_model(pipeline: ZImagePipeline, resolutions: List[str]) -> None:
    print("Warmup started...")
    dummy_prompt = "warmup"
    for res_str in resolutions:
        try:
            generate_image(
                pipeline,
                prompt=dummy_prompt,
                resolution=res_str,
                seed=42,
                steps=9,
                shift=3.0,
                guidance_scale=0.0,
                use_lora=False,
                lora_scale=0.0,
                max_sequence_length=512,
                scheduler_name="FlowMatch Euler",
                num_train_timesteps=1000,
                use_dynamic_shifting=False,
                base_shift=0.5,
                max_shift=3.0,
            )
        except Exception as exc:  # noqa: BLE001
            print(f"Warmup failed for {res_str}: {exc}")
    print("Warmup done.")


def init_app() -> None:
    global aoti_loaded, aoti_error, applied_attention_backend
    try:
        ensure_models_loaded()
        if ENABLE_COMPILE and pipe is not None:
            ensure_on_gpu()
        if ENABLE_AOTI and not aoti_loaded and pipe is not None and getattr(pipe, "transformer", None) is not None:
            if lora_loaded and not AOTI_ALLOW_LORA:
                aoti_loaded = False
                aoti_error = "disabled: AoTI blocks are incompatible with LoRA adapters"
                print("AoTI disabled: LoRA adapters are loaded (AoTI blocks are incompatible with LoRA).")
            else:
                try:
                    pipe.transformer.layers._repeated_blocks = ["ZImageTransformerBlock"]
                    spaces.aoti_blocks_load(pipe.transformer.layers, AOTI_REPO, variant=AOTI_VARIANT)
                    aoti_loaded = True
                    aoti_error = None
                    print(f"AoTI loaded: {AOTI_REPO} (variant={AOTI_VARIANT})")
                except Exception as exc:  # noqa: BLE001
                    aoti_loaded = False
                    aoti_error = str(exc)
                    print(f"AoTI load failed (continuing without AoTI): {exc}")
                try:
                    applied_attention_backend = set_attention_backend_safe(pipe.transformer, ATTENTION_BACKEND)
                    print(f"Attention backend (post-AoTI): {applied_attention_backend}")
                except Exception as exc:  # noqa: BLE001
                    print(f"Attention backend update failed (continuing): {exc}")
        if ENABLE_WARMUP and pipe is not None:
            ensure_on_gpu()
            try:
                all_resolutions: List[str] = []
                for cat in RES_CHOICES.values():
                    all_resolutions.extend(cat)
                warmup_model(pipe, all_resolutions)
            finally:
                if OFFLOAD_TO_CPU_AFTER_RUN:
                    offload_to_cpu()
    except Exception as exc:  # noqa: BLE001
        print(f"Model init failed: {exc}")


@spaces.GPU
def generate(
    prompt: str,
    resolution: str = "1024x1024 ( 1:1 )",
    seed: int = 42,
    steps: int = 8,
    shift: float = 3.0,
    cfg: float = DEFAULT_CFG,
    scheduler_name: str = "FlowMatch Euler",
    num_train_timesteps: int = 1000,
    use_dynamic_shifting: bool = False,
    base_shift: float = 0.5,
    max_shift: float = 3.0,
    random_seed: bool = True,
    use_lora: bool = True,
    lora_scale: float = 1.0,
    max_sequence_length: int = 512,
    gallery_images=None,
    progress=gr.Progress(track_tqdm=True),
):
    ensure_models_loaded()
    ensure_on_gpu()

    new_seed = random.randint(1, 1_000_000) if random_seed or seed == -1 else int(seed)

    try:
        image = generate_image(
            pipeline=pipe,
            prompt=str(prompt),
            resolution=str(resolution),
            seed=new_seed,
            steps=int(steps) + 1,
            shift=float(shift),
            guidance_scale=float(cfg),
            use_lora=use_lora,
            lora_scale=float(lora_scale),
            max_sequence_length=int(max_sequence_length),
            scheduler_name=str(scheduler_name),
            num_train_timesteps=int(num_train_timesteps),
            use_dynamic_shifting=bool(use_dynamic_shifting),
            base_shift=float(base_shift),
            max_shift=float(max_shift),
        )[0]
    finally:
        if OFFLOAD_TO_CPU_AFTER_RUN:
            offload_to_cpu()

    if gallery_images is None:
        gallery_images = []
    gallery_images = [image] + gallery_images
    return gallery_images, str(new_seed), int(new_seed)


init_app()

with gr.Blocks(title="Z-Image + LoRA") as demo:
    pipe_status = "loaded (GPU)" if pipe and pipe_on_gpu else "loaded (CPU)" if pipe else "not loaded"
    lora_file_status = "found" if os.path.isfile(LORA_PATH) else "missing"
    if lora_loaded:
        adapter = lora_adapter_name or "default"
        lora_status = f"LoRA: loaded ({LORA_PATH}, adapter={adapter})"
    elif lora_error:
        lora_status = f"LoRA: not loaded ({lora_error})"
    else:
        lora_status = f"LoRA file: {LORA_PATH} ({lora_file_status})"

    attention_status = applied_attention_backend or "unknown"
    if attention_backend_error and ATTENTION_BACKEND and attention_status != ATTENTION_BACKEND:
        attention_status = f"{attention_status} ({ATTENTION_BACKEND} unavailable: {summarize_error(attention_backend_error)})"

    if aoti_loaded:
        aoti_status = "loaded"
    elif aoti_error:
        lower = aoti_error.lower()
        if "disabled" in lower:
            label = "disabled"
        elif "kernels" in lower:
            label = "unavailable"
        else:
            label = "failed"
        aoti_status = f"{label} ({summarize_error(aoti_error)})"
    else:
        aoti_status = "not loaded"
    if not ENABLE_COMPILE:
        compile_status = "off"
    elif transformer_compiled:
        compile_status = "on"
    elif transformer_compile_attempted:
        compile_status = "failed"
    else:
        compile_status = "pending"

    gr.Markdown(
        """<div align="center">

# Z-Image with LoRA
LoRA: zit-mystic-xxx

</div>"""
    )

    status_md = (
        f"Model: `{MODEL_PATH}` | {pipe_status}  \n"
        f"GPU: `{GPU_SUMMARY}` | dtype: `{MODEL_DTYPE_LABEL}`  \n"
        f"Attention: `{attention_status}`  \n"
        f"AoTI: `{aoti_status}`  \n"
        f"torch.compile: `{compile_status}`  \n"
        f"{lora_status}"
    )

    details_md_blocks: List[str] = []
    if attention_backend_error:
        details_md_blocks.append(f"**Attention backend error**\n```\n{attention_backend_error}\n```")
    if aoti_error:
        details_md_blocks.append(f"**AoTI error**\n```\n{aoti_error}\n```")
    if compile_error:
        details_md_blocks.append(f"**torch.compile error**\n```\n{compile_error}\n```")

    if SHOW_STATUS_PANEL:
        with gr.Column(elem_id="floating_status_panel"):
            with gr.Accordion("Status / Debug", open=False):
                gr.Markdown(status_md)
                if details_md_blocks:
                    gr.Markdown("\n\n".join(details_md_blocks))

    with gr.Row():
        with gr.Column(scale=1):
            prompt_input = gr.Textbox(label="Prompt", lines=3, placeholder="Enter your prompt here...")

            with gr.Row():
                choices = [int(k) for k in RES_CHOICES.keys()]
                res_cat = gr.Dropdown(value=1024, choices=choices, label="Resolution Category")
                resolution = gr.Dropdown(
                    value=RES_CHOICES["1024"][0],
                    choices=RESOLUTION_SET,
                    label="Width x Height (Ratio)",
                )

            with gr.Row():
                seed = gr.Number(label="Seed", value=42, precision=0)
                random_seed = gr.Checkbox(label="Random Seed", value=True)

            with gr.Row():
                steps = gr.Slider(label="Steps", minimum=1, maximum=100, value=8, step=1)
                shift = gr.Slider(label="Time Shift", minimum=1.0, maximum=10.0, value=3.0, step=0.1)

            with gr.Accordion("KSampler / Advanced", open=False):
                cfg = gr.Slider(label="CFG", minimum=0.0, maximum=10.0, value=DEFAULT_CFG, step=0.1)

                with gr.Row():
                    scheduler_name = gr.Dropdown(
                        label="Scheduler",
                        choices=list(SCHEDULERS.keys()),
                        value="FlowMatch Euler",
                    )
                    num_train_timesteps = gr.Slider(
                        label="num_train_timesteps",
                        minimum=100,
                        maximum=2000,
                        value=1000,
                        step=10,
                    )

                with gr.Row():
                    use_dynamic_shifting = gr.Checkbox(label="use_dynamic_shifting", value=False)
                    max_seq = gr.Slider(label="Max Sequence Length", minimum=256, maximum=1024, value=512, step=16)

                with gr.Row():
                    base_shift = gr.Slider(label="base_shift", minimum=0.0, maximum=10.0, value=0.5, step=0.1)
                    max_shift = gr.Slider(label="max_shift", minimum=0.0, maximum=10.0, value=3.0, step=0.1)

            with gr.Row():
                lora_controls_enabled = bool(lora_loaded)
                use_lora = gr.Checkbox(label="Use LoRA", value=lora_controls_enabled, interactive=lora_controls_enabled)
                lora_strength = gr.Slider(
                    label="LoRA Strength",
                    minimum=0.0,
                    maximum=1.5,
                    value=1.0,
                    step=0.05,
                    interactive=lora_controls_enabled,
                )

            generate_btn = gr.Button("Generate", variant="primary")

            gr.Markdown("### 📝 Example Prompts")
            gr.Examples(examples=EXAMPLE_PROMPTS, inputs=prompt_input, label=None)

        with gr.Column(scale=1):
            output_gallery = gr.Gallery(
                label="Generated Images",
                columns=2,
                rows=2,
                height=600,
                object_fit="contain",
                format="png",
                interactive=False,
            )
            used_seed = gr.Textbox(label="Seed Used", interactive=False)

    def update_res_choices(_res_cat):
        if str(_res_cat) in RES_CHOICES:
            res_choices = RES_CHOICES[str(_res_cat)]
        else:
            res_choices = RES_CHOICES["1024"]
        return gr.update(value=res_choices[0], choices=res_choices)

    res_cat.change(update_res_choices, inputs=res_cat, outputs=resolution, api_visibility="private")

    generate_btn.click(
        generate,
        inputs=[
            prompt_input,
            resolution,
            seed,
            steps,
            shift,
            cfg,
            scheduler_name,
            num_train_timesteps,
            use_dynamic_shifting,
            base_shift,
            max_shift,
            random_seed,
            use_lora,
            lora_strength,
            max_seq,
            output_gallery,
        ],
        outputs=[output_gallery, used_seed, seed],
        api_visibility="public",
    )

css = """
.fillable{max-width: 1230px !important}

#floating_status_panel{
  position: fixed !important;
  right: 16px;
  bottom: 16px;
  width: min(420px, 92vw);
  max-height: 70vh;
  overflow: auto;
  z-index: 1000;
  background: rgba(255,255,255,0.9);
  border: 1px solid rgba(0,0,0,0.12);
  border-radius: 12px;
  box-shadow: 0 10px 30px rgba(0,0,0,0.18);
  padding: 8px 10px;
  backdrop-filter: blur(8px);
}
"""

if __name__ == "__main__":
    demo.launch(css=css)