pixagram-backup / models.py
primerz's picture
Update models.py
75da5ce verified
"""
Model loading and initialization for Pixagram AI Pixel Art Generator
FIXED VERSION with proper IP-Adapter and BLIP-2 support
"""
import torch
import time
from diffusers import (
StableDiffusionXLControlNetImg2ImgPipeline,
ControlNetModel,
AutoencoderKL,
LCMScheduler
)
from diffusers.models.attention_processor import AttnProcessor2_0
from transformers import CLIPVisionModelWithProjection
from insightface.app import FaceAnalysis
from controlnet_aux import ZoeDetector
from huggingface_hub import hf_hub_download
from compel import Compel, ReturnedEmbeddingsType
# Use reference implementation's attention processor
from attention_processor import IPAttnProcessor2_0, AttnProcessor
from resampler import Resampler
from config import (
device, dtype, MODEL_REPO, MODEL_FILES, HUGGINGFACE_TOKEN,
FACE_DETECTION_CONFIG, CLIP_SKIP, DOWNLOAD_CONFIG
)
def download_model_with_retry(repo_id, filename, max_retries=None):
"""Download model with retry logic and proper token handling."""
if max_retries is None:
max_retries = DOWNLOAD_CONFIG['max_retries']
for attempt in range(max_retries):
try:
print(f" Attempting to download {filename} (attempt {attempt + 1}/{max_retries})...")
kwargs = {"repo_type": "model"}
if HUGGINGFACE_TOKEN:
kwargs["token"] = HUGGINGFACE_TOKEN
path = hf_hub_download(
repo_id=repo_id,
filename=filename,
**kwargs
)
print(f" [OK] Downloaded: {filename}")
return path
except Exception as e:
print(f" [WARNING] Download attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
print(f" Retrying in {DOWNLOAD_CONFIG['retry_delay']} seconds...")
time.sleep(DOWNLOAD_CONFIG['retry_delay'])
else:
print(f" [ERROR] Failed to download {filename} after {max_retries} attempts")
raise
return None
def load_face_analysis():
"""Load face analysis model with proper error handling."""
print("Loading face analysis model...")
try:
face_app = FaceAnalysis(
name=FACE_DETECTION_CONFIG['model_name'],
root='./models/insightface',
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
face_app.prepare(
ctx_id=FACE_DETECTION_CONFIG['ctx_id'],
det_size=FACE_DETECTION_CONFIG['det_size']
)
print(" [OK] Face analysis model loaded successfully")
return face_app, True
except Exception as e:
print(f" [WARNING] Face detection not available: {e}")
return None, False
def load_depth_detector():
"""Load Zoe Depth detector."""
print("Loading Zoe Depth detector...")
try:
zoe_depth = ZoeDetector.from_pretrained("lllyasviel/Annotators")
zoe_depth.to(device)
print(" [OK] Zoe Depth loaded successfully")
return zoe_depth, True
except Exception as e:
print(f" [WARNING] Zoe Depth not available: {e}")
return None, False
def load_controlnets():
"""Load ControlNet models."""
print("Loading ControlNet Zoe Depth model...")
controlnet_depth = ControlNetModel.from_pretrained(
"diffusers/controlnet-zoe-depth-sdxl-1.0",
torch_dtype=dtype
).to(device)
print(" [OK] ControlNet Depth loaded")
print("Loading InstantID ControlNet...")
try:
controlnet_instantid = ControlNetModel.from_pretrained(
"InstantX/InstantID",
subfolder="ControlNetModel",
torch_dtype=dtype
).to(device)
print(" [OK] InstantID ControlNet loaded successfully")
return controlnet_depth, controlnet_instantid, True
except Exception as e:
print(f" [WARNING] InstantID ControlNet not available: {e}")
return controlnet_depth, None, False
def load_image_encoder():
"""Load CLIP Image Encoder for IP-Adapter."""
print("Loading CLIP Image Encoder for IP-Adapter...")
try:
image_encoder = CLIPVisionModelWithProjection.from_pretrained(
"h94/IP-Adapter",
subfolder="models/image_encoder",
torch_dtype=dtype
).to(device)
print(" [OK] CLIP Image Encoder loaded successfully")
return image_encoder
except Exception as e:
print(f" [ERROR] Could not load image encoder: {e}")
return None
def load_sdxl_pipeline(controlnets):
"""Load SDXL checkpoint from HuggingFace Hub."""
print("Loading SDXL checkpoint (horizon) with bundled VAE from HuggingFace Hub...")
try:
model_path = download_model_with_retry(MODEL_REPO, MODEL_FILES['checkpoint'])
pipe = StableDiffusionXLControlNetImg2ImgPipeline.from_single_file(
model_path,
controlnet=controlnets,
torch_dtype=dtype,
use_safetensors=True
).to(device)
print(" [OK] Custom checkpoint loaded successfully (VAE bundled)")
return pipe, True
except Exception as e:
print(f" [WARNING] Could not load custom checkpoint: {e}")
print(" Using default SDXL base model")
pipe = StableDiffusionXLControlNetImg2ImgPipeline.from_pretrained(
"stabilityai/stable-diffusion-xl-base-1.0",
controlnet=controlnets,
torch_dtype=dtype,
use_safetensors=True
).to(device)
return pipe, False
def load_lora(pipe):
"""Load LORA from HuggingFace Hub."""
print("Loading LORA (retroart) from HuggingFace Hub...")
try:
lora_path = download_model_with_retry(MODEL_REPO, MODEL_FILES['lora'])
# **FIX 2: Add adapter_name="retroart"**
pipe.load_lora_weights(lora_path, adapter_name="retroart")
print(f" [OK] LORA loaded successfully")
return True
except Exception as e:
print(f" [WARNING] Could not load LORA: {e}")
return False
def setup_ip_adapter(pipe, image_encoder):
"""
Setup IP-Adapter for InstantID face embeddings - PROPER IMPLEMENTATION.
Based on the reference InstantID pipeline.
"""
if image_encoder is None:
return None, False
print("Setting up IP-Adapter for InstantID face embeddings (proper implementation)...")
try:
# Download InstantID weights
ip_adapter_path = download_model_with_retry(
"InstantX/InstantID",
"ip-adapter.bin"
)
# Load full state dict
state_dict = torch.load(ip_adapter_path, map_location="cpu")
# Extract image_proj and ip_adapter weights
image_proj_state_dict = {}
ip_adapter_state_dict = {}
for key, value in state_dict.items():
if key.startswith("image_proj."):
image_proj_state_dict[key.replace("image_proj.", "")] = value
elif key.startswith("ip_adapter."):
ip_adapter_state_dict[key.replace("ip_adapter.", "")] = value
# Create Resampler (image projection model) with CORRECT parameters from reference
print("Creating Resampler (Perceiver architecture)...")
image_proj_model = Resampler(
dim=1280, # Hidden dimension
depth=4, # IMPORTANT: 4 layers (not 8!)
dim_head=64, # Dimension per head
heads=20, # Number of heads
num_queries=16, # Number of output tokens
embedding_dim=512, # InsightFace embedding dim
output_dim=pipe.unet.config.cross_attention_dim, # SDXL cross-attention dim (2048)
ff_mult=4 # Feedforward multiplier
)
image_proj_model.eval()
image_proj_model = image_proj_model.to(device, dtype=dtype)
# Load image_proj weights
if image_proj_state_dict:
try:
image_proj_model.load_state_dict(image_proj_state_dict, strict=True)
print(" [OK] Resampler loaded with pretrained weights")
except Exception as e:
print(f" [WARNING] Could not load Resampler weights: {e}")
print(" Using randomly initialized Resampler")
else:
print(" [WARNING] No image_proj weights found, using random initialization")
# Setup IP-Adapter attention processors
print("Setting up IP-Adapter attention processors...")
attn_procs = {}
num_tokens = 16 # Match Resampler num_queries
for name in pipe.unet.attn_processors.keys():
cross_attention_dim = None if name.endswith("attn1.processor") else pipe.unet.config.cross_attention_dim
if name.startswith("mid_block"):
hidden_size = pipe.unet.config.block_out_channels[-1]
elif name.startswith("up_blocks"):
block_id = int(name[len("up_blocks.")])
hidden_size = list(reversed(pipe.unet.config.block_out_channels))[block_id]
elif name.startswith("down_blocks"):
block_id = int(name[len("down_blocks.")])
hidden_size = pipe.unet.config.block_out_channels[block_id]
else:
hidden_size = pipe.unet.config.block_out_channels[-1]
if cross_attention_dim is None:
attn_procs[name] = AttnProcessor2_0()
else:
attn_procs[name] = IPAttnProcessor2_0(
hidden_size=hidden_size,
cross_attention_dim=cross_attention_dim,
scale=1.0,
num_tokens=num_tokens
).to(device, dtype=dtype)
# Set attention processors
pipe.unet.set_attn_processor(attn_procs)
# Load IP-Adapter weights into attention processors
if ip_adapter_state_dict:
try:
ip_layers = torch.nn.ModuleList(pipe.unet.attn_processors.values())
ip_layers.load_state_dict(ip_adapter_state_dict, strict=False)
print(" [OK] IP-Adapter attention weights loaded")
except Exception as e:
print(f" [WARNING] Could not load IP-Adapter weights: {e}")
else:
print(" [WARNING] No ip_adapter weights found")
# Store image encoder and projection model
pipe.image_encoder = image_encoder
print(" [OK] IP-Adapter fully loaded with InstantID architecture")
print(f" - Resampler: 4 layers, 20 heads, 16 output tokens")
print(f" - Face embeddings: 512D → 16x2048D")
return image_proj_model, True
except Exception as e:
print(f" [ERROR] Could not setup IP-Adapter: {e}")
import traceback
traceback.print_exc()
return None, False
def setup_compel(pipe):
"""Setup Compel for better SDXL prompt handling."""
print("Setting up Compel for enhanced prompt processing...")
try:
compel = Compel(
tokenizer=[pipe.tokenizer, pipe.tokenizer_2],
text_encoder=[pipe.text_encoder, pipe.text_encoder_2],
returned_embeddings_type=ReturnedEmbeddingsType.PENULTIMATE_HIDDEN_STATES_NON_NORMALIZED,
requires_pooled=[False, True]
)
print(" [OK] Compel loaded successfully")
return compel, True
except Exception as e:
print(f" [WARNING] Compel not available: {e}")
return None, False
def setup_scheduler(pipe):
"""Setup LCM scheduler."""
print("Setting up LCM scheduler...")
pipe.scheduler = LCMScheduler.from_config(pipe.scheduler.config)
print(" [OK] LCM scheduler configured")
def optimize_pipeline(pipe):
"""Apply optimizations to pipeline."""
# Try to enable xformers
if device == "cuda":
try:
pipe.enable_xformers_memory_efficient_attention()
print(" [OK] xformers enabled")
except Exception as e:
print(f" [INFO] xformers not available: {e}")
def load_caption_model():
"""
Load caption model with proper error handling.
Tries multiple models in order of quality.
"""
print("Loading caption model...")
# Try GIT-Large first (good balance of quality and compatibility)
try:
from transformers import AutoProcessor, AutoModelForCausalLM
print(" Attempting GIT-Large (recommended)...")
caption_processor = AutoProcessor.from_pretrained("microsoft/git-large-coco")
caption_model = AutoModelForCausalLM.from_pretrained(
"microsoft/git-large-coco",
torch_dtype=dtype
).to(device)
print(" [OK] GIT-Large model loaded (produces detailed captions)")
return caption_processor, caption_model, True, 'git'
except Exception as e1:
print(f" [INFO] GIT-Large not available: {e1}")
# Try BLIP base as fallback
try:
from transformers import BlipProcessor, BlipForConditionalGeneration
print(" Attempting BLIP base (fallback)...")
caption_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
caption_model = BlipForConditionalGeneration.from_pretrained(
"Salesforce/blip-image-captioning-base",
torch_dtype=dtype
).to(device)
print(" [OK] BLIP base model loaded (standard captions)")
return caption_processor, caption_model, True, 'blip'
except Exception as e2:
print(f" [WARNING] Caption models not available: {e2}")
print(" Caption generation will be disabled")
return None, None, False, 'none'
def set_clip_skip(pipe):
"""Set CLIP skip value."""
if hasattr(pipe, 'text_encoder'):
print(f" [OK] CLIP skip set to {CLIP_SKIP}")
print("[OK] Model loading functions ready")