VISTA / ip_adapter /ip_adapter.py
ssoxye's picture
Clean Space repo (code only) + gradio app
689a987
# -*- coding: utf-8 -*-
import os
from typing import List
import torch
import torch.nn as nn
import torch.nn.functional as F
from diffusers import StableDiffusionPipeline
from diffusers.pipelines.controlnet import MultiControlNetModel
from PIL import Image, ImageOps
from safetensors import safe_open
from transformers import CLIPImageProcessor, CLIPVisionModelWithProjection, CLIPTokenizer, CLIPTextModelWithProjection
from .utils import is_torch2_available, get_generator
if is_torch2_available():
from .attention_processor import (
AttnProcessor2_0 as AttnProcessor,
)
from .attention_processor import (
CNAttnProcessor2_0 as CNAttnProcessor,
)
from .attention_processor import (
IPAttnProcessor2_0 as IPAttnProcessor,
)
else:
from .attention_processor import AttnProcessor, CNAttnProcessor, IPAttnProcessor
from .resampler import Resampler
import numpy as np, random
import math
import torch
import torch
import torch.nn.functional as F
import numpy as np
import cv2
from PIL import Image
def _cosine(a: torch.Tensor, b: torch.Tensor, eps: float = 1e-12) -> float:
a = a.float(); b = b.float()
na = a.norm(); nb = b.norm()
if na.item() < eps or nb.item() < eps:
return float("nan")
return float((a @ b) / (na * nb))
def verify_style_content_embeddings(adapter, sim_threshold: float = 0.999):
content_fps, style_fps = [], []
wrong_source = []
for name, proc in adapter.attn_procs.items():
group = getattr(proc, "group", "off")
mu = getattr(proc, "last_ip_mu", None)
src = getattr(proc, "last_ip_source", None)
if group not in ("content", "style"):
continue
if mu is None:
continue
if group == "content" and src != "tail":
wrong_source.append((name, group, src))
if group == "style" and src != "override":
wrong_source.append((name, group, src))
if group == "content":
content_fps.append((name, mu))
else:
style_fps.append((name, mu))
print("\n[Verify] token source check")
if wrong_source:
for name, grp, src in wrong_source:
print(f" - !! {name}: group={grp} but last_ip_source={src}")
else:
print(" - OK: content uses 'tail', style uses 'override'")
if not content_fps or not style_fps:
return False
content_mu = torch.stack([mu for _, mu in content_fps], dim=0).mean(dim=0)
style_mu = torch.stack([mu for _, mu in style_fps], dim=0).mean(dim=0)
cos = _cosine(content_mu, style_mu)
print(f"\n[Verify] group-wise cosine(content, style) = {cos:.6f}")
print("\n[Verify] layer-wise cosine to content-mean (lower is more different)")
for name, mu in style_fps:
cs = _cosine(content_mu, mu)
print(f" - {name:<60} cos={cs:.6f}")
ok = (not wrong_source) and (not math.isnan(cos)) and (cos < sim_threshold)
def _split_bounds(size, parts):
bounds = np.linspace(0, size, parts + 1)
return [int(round(b)) for b in bounds]
class ImageProjModel(torch.nn.Module):
"""Projection Model"""
def __init__(self, cross_attention_dim=1024, clip_embeddings_dim=1024, clip_extra_context_tokens=4):
super().__init__()
self.generator = None
self.cross_attention_dim = cross_attention_dim
self.clip_extra_context_tokens = clip_extra_context_tokens
self.proj = torch.nn.Linear(clip_embeddings_dim, self.clip_extra_context_tokens * cross_attention_dim)
self.norm = torch.nn.LayerNorm(cross_attention_dim)
def forward(self, image_embeds):
embeds = image_embeds
clip_extra_context_tokens = self.proj(embeds).reshape(
-1, self.clip_extra_context_tokens, self.cross_attention_dim
)
clip_extra_context_tokens = self.norm(clip_extra_context_tokens)
return clip_extra_context_tokens
class MLPProjModel(torch.nn.Module):
"""SD model with image prompt"""
def __init__(self, cross_attention_dim=1024, clip_embeddings_dim=1024):
super().__init__()
self.proj = torch.nn.Sequential(
torch.nn.Linear(clip_embeddings_dim, clip_embeddings_dim),
torch.nn.GELU(),
torch.nn.Linear(clip_embeddings_dim, cross_attention_dim),
torch.nn.LayerNorm(cross_attention_dim)
)
def forward(self, image_embeds):
clip_extra_context_tokens = self.proj(image_embeds)
return clip_extra_context_tokens
class IPAdapter:
def __init__(
self,
sd_pipe,
image_encoder_path,
ip_ckpt,
device,
mask=None,
sketch=None,
num_tokens=4,
target_blocks=None,
# NEW: block groups & scales
content_blocks=None,
style_blocks=None,
content_scale: float = 0.5,
style_scale: float = 0.5,
garment_images = None,
garment_mask = None,
):
self.device = device
self.image_encoder_path = image_encoder_path
self.ip_ckpt = ip_ckpt
self.num_tokens = num_tokens
self.target_blocks = target_blocks or []
self.pipe = sd_pipe.to(self.device)
self.mask = mask
self.sketch = sketch
self.garment_images = garment_images
self.garment_mask = garment_mask
self.content_blocks = [
"down_blocks.2.attentions.1",
]
self.style_blocks = [
"up_blocks.0.attentions.1",
]
self.content_scale = float(content_scale)
self.style_scale = float(style_scale)
self.attn_procs = {}
self.set_ip_adapter()
self.clip_tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-large-patch14")
self.text_encoder = CLIPTextModelWithProjection.from_pretrained(
"openai/clip-vit-large-patch14"
).to(self.device)
self.image_encoder = CLIPVisionModelWithProjection.from_pretrained(
self.image_encoder_path
).to(self.device, dtype=torch.float32)
self.clip_image_processor = CLIPImageProcessor()
self.image_proj_model = self.init_proj()
self.load_ip_adapter()
# --- utils ---
def _parse_block_id(self, name: str, prefix: str) -> int:
# "up_blocks.0.attentions.1.processor" -> 0
return int(name[len(prefix):].split(".")[0])
def init_proj(self):
image_proj_model = ImageProjModel(
cross_attention_dim=self.pipe.unet.config.cross_attention_dim,
clip_embeddings_dim=self.image_encoder.config.projection_dim,
clip_extra_context_tokens=self.num_tokens,
).to(self.device, dtype=torch.float32)
return image_proj_model
def _apply_group_scales(self):
for name, proc in self.attn_procs.items():
if not isinstance(proc, IPAttnProcessor):
continue
if any(b in name for b in self.content_blocks):
proc.skip = False
proc.scale = float(self.content_scale)
elif any(b in name for b in self.style_blocks):
proc.skip = False
proc.scale = float(self.style_scale)
else:
proc.skip = True
def _which_group(self, name: str) -> str:
if any(b in name for b in self.content_blocks):
return "content"
if any(b in name for b in self.style_blocks):
return "style"
return "off"
def _get_proc_tokens(self, proc):
for key in ("image_prompt_embeds", "ip_tokens", "image_prompts"):
t = getattr(proc, key, None)
if t is not None:
return t
return None
def print_block_scales(self, verbose: bool = True):
rows = []
for name, proc in self.attn_procs.items():
scale = getattr(proc, "scale", None)
skip = getattr(proc, "skip", None)
group = getattr(proc, "group", "self" if name.endswith("attn1.processor") else "off")
rows.append((name, group, scale, skip))
def _key(t):
n = t[0]
if n.startswith("down_blocks"): p = 0
elif n.startswith("mid_block"): p = 1
elif n.startswith("up_blocks"): p = 2
else: p = 3
# 숫자 추출
import re
m = re.findall(r"\d+", n)
idx = tuple(int(x) for x in m) if m else (999,)
return (p, idx, n)
rows.sort(key=_key)
if verbose:
print("\n[IPAdapter] Block-scale report")
for name, group, scale, skip in rows:
tag = "ATTN2" if name.endswith("attn2.processor") else "ATTN1"
print(f" - {name:<60} [{tag}] group={group:<7} scale={scale} skip={skip}")
return rows
def set_ip_adapter(self):
unet = self.pipe.unet
attn_procs = {}
self.attn_procs = {}
for name in unet.attn_processors.keys():
cross_attention_dim = None if name.endswith("attn1.processor") else unet.config.cross_attention_dim
if name.startswith("mid_block"):
hidden_size = unet.config.block_out_channels[-1]
elif name.startswith("up_blocks"):
block_id = self._parse_block_id(name, "up_blocks.")
hidden_size = list(reversed(unet.config.block_out_channels))[block_id]
elif name.startswith("down_blocks"):
block_id = self._parse_block_id(name, "down_blocks.")
hidden_size = unet.config.block_out_channels[block_id]
else:
hidden_size = unet.config.block_out_channels[0]
if cross_attention_dim is None:
proc = AttnProcessor()
setattr(proc, "layer_name", name)
else:
is_content = any(b in name for b in self.content_blocks)
is_style = any(b in name for b in self.style_blocks)
selected = is_content or is_style or any(b in name for b in self.target_blocks)
init_skip = not selected
init_scale = 1.0
if is_content:
init_scale = float(self.content_scale)
elif is_style:
init_scale = float(self.style_scale)
proc = IPAttnProcessor(
hidden_size=hidden_size,
cross_attention_dim=cross_attention_dim,
scale=init_scale,
num_tokens=self.num_tokens,
skip=init_skip,
).to(self.device, dtype=torch.float32)
setattr(proc, "layer_name", name)
setattr(proc, "group", "content" if is_content else ("style" if is_style else "off"))
attn_procs[name] = proc
self.attn_procs[name] = proc
unet.set_attn_processor(attn_procs)
if hasattr(self.pipe, "controlnet"):
if isinstance(self.pipe.controlnet, MultiControlNetModel):
for controlnet in self.pipe.controlnet.nets:
controlnet.set_attn_processor(CNAttnProcessor(num_tokens=self.num_tokens))
else:
self.pipe.controlnet.set_attn_processor(CNAttnProcessor(num_tokens=self.num_tokens))
def load_ip_adapter(self):
if os.path.splitext(self.ip_ckpt)[-1] == ".safetensors":
state_dict = {"image_proj": {}, "ip_adapter": {}}
with safe_open(self.ip_ckpt, framework="pt", device="cpu") as f:
for key in f.keys():
if key.startswith("image_proj."):
state_dict["image_proj"][key.replace("image_proj.", "")] = f.get_tensor(key)
elif key.startswith("ip_adapter."):
state_dict["ip_adapter"][key.replace("ip_adapter.", "")] = f.get_tensor(key)
else:
state_dict = torch.load(self.ip_ckpt, map_location="cpu")
self.image_proj_model.load_state_dict(state_dict["image_proj"])
ip_layers = torch.nn.ModuleList(self.pipe.unet.attn_processors.values())
ip_layers.load_state_dict(state_dict["ip_adapter"], strict=False)
@torch.inference_mode()
def get_image_embeds(self, pil_image=None, clip_image_embeds=None, content_prompt_embeds=None):
if pil_image is not None:
if isinstance(pil_image, Image.Image):
pil_image = [pil_image]
clip_image = self.clip_image_processor(images=pil_image, return_tensors="pt").pixel_values
clip_image_embeds = self.image_encoder(clip_image.to(self.device, dtype=torch.float32)).image_embeds
else:
clip_image_embeds = clip_image_embeds.to(self.device, dtype=torch.float32)
image_prompt_embeds = self.image_proj_model(clip_image_embeds) # [B, Ni, D] = [1,4,2048]
uncond_image_prompt_embeds = self.image_proj_model(torch.zeros_like(clip_image_embeds))
return image_prompt_embeds, uncond_image_prompt_embeds
def generate(
self,
pil_image=None,
clip_image_embeds=None,
prompt=None,
negative_prompt=None,
scale=1.0,
num_samples=4,
seed=None,
guidance_scale=7.5,
num_inference_steps=30,
neg_content_emb=None,
**kwargs,
):
if scale is not None:
self.set_scale(scale)
if pil_image is not None:
num_prompts = 1 if isinstance(pil_image, Image.Image) else len(pil_image)
else:
num_prompts = clip_image_embeds.size(0)
if prompt is None:
prompt = "best quality, high quality"
if negative_prompt is None:
negative_prompt = "monochrome, lowres, bad anatomy, worst quality, low quality"
if not isinstance(prompt, List):
prompt = [prompt] * num_prompts
if not isinstance(negative_prompt, List):
negative_prompt = [negative_prompt] * num_prompts
image_prompt_embeds, uncond_image_prompt_embeds = self.get_image_embeds(
pil_image=pil_image, clip_image_embeds=clip_image_embeds, content_prompt_embeds=neg_content_emb
)
bs_embed, seq_len, _ = image_prompt_embeds.shape
image_prompt_embeds = image_prompt_embeds.repeat(1, num_samples, 1)
image_prompt_embeds = image_prompt_embeds.view(bs_embed * num_samples, seq_len, -1)
uncond_image_prompt_embeds = uncond_image_prompt_embeds.repeat(1, num_samples, 1)
uncond_image_prompt_embeds = uncond_image_prompt_embeds.view(bs_embed * num_samples, seq_len, -1)
with torch.inference_mode():
prompt_embeds_, negative_prompt_embeds_ = self.pipe.encode_prompt(
prompt,
device=self.device,
num_images_per_prompt=num_samples,
do_classifier_free_guidance=True,
negative_prompt=negative_prompt,
)
prompt_embeds = torch.cat([prompt_embeds_, image_prompt_embeds], dim=1)
negative_prompt_embeds = torch.cat([negative_prompt_embeds_, uncond_image_prompt_embeds], dim=1)
generator = get_generator(seed, self.device)
images = self.pipe(
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
guidance_scale=guidance_scale,
num_inference_steps=num_inference_steps,
generator=generator,
**kwargs,
).images
return images
class IPAdapterXL(IPAdapter):
"""SDXL"""
def generate(
self,
pil_image,
prompt=None,
shape_prompt=None,
negative_prompt=None,
scale=1.0,
num_samples=4,
seed=None,
num_inference_steps=30,
neg_content_emb=None,
neg_content_prompt=None,
neg_content_scale=1.0,
**kwargs,
):
if scale is not None:
self.set_scale(scale)
num_prompts = 1 if isinstance(pil_image, Image.Image) else len(pil_image)
if prompt is None:
prompt = "best quality, high quality"
if negative_prompt is None:
negative_prompt = "monochrome, lowres, bad anatomy, worst quality, low quality"
if not isinstance(prompt, List):
prompt = [prompt] * num_prompts
if not isinstance(negative_prompt, List):
negative_prompt = [negative_prompt] * num_prompts
if neg_content_emb is None:
if neg_content_prompt is not None:
with torch.inference_mode():
(
prompt_embeds_, # [B, 77, 2048]
negative_prompt_embeds_,
pooled_prompt_embeds_, # [B, 1280]
negative_pooled_prompt_embeds_,
) = self.pipe.encode_prompt(
neg_content_prompt,
num_images_per_prompt=num_samples,
do_classifier_free_guidance=True,
negative_prompt=negative_prompt,
)
pooled_prompt_embeds_ *= neg_content_scale
else:
pooled_prompt_embeds_ = neg_content_emb
else:
pooled_prompt_embeds_ = None
content_ip_tokens, uncond_content_ip_tokens = self.get_image_embeds(
pil_image=pil_image,
content_prompt_embeds=pooled_prompt_embeds_
)
bs_embed, seq_len, _ = content_ip_tokens.shape
content_ip_tokens = content_ip_tokens.repeat(1, num_samples, 1).view(bs_embed * num_samples, seq_len, -1)
uncond_content_ip_tokens = uncond_content_ip_tokens.repeat(1, num_samples, 1).view(bs_embed * num_samples, seq_len, -1)
style_ip_tokens, uncond_style_ip_tokens = self.get_image_embeds(
pil_image=pil_image,
content_prompt_embeds=pooled_prompt_embeds_
)
bs_embed, seq_len, _ = style_ip_tokens.shape
style_ip_tokens = style_ip_tokens.repeat(1, num_samples, 1).view(bs_embed * num_samples, seq_len, -1)
style_ip_tokens_uncond = uncond_style_ip_tokens.repeat(1, num_samples, 1).view(bs_embed * num_samples, seq_len, -1)
with torch.inference_mode():
(
prompt_embeds,
negative_prompt_embeds,
pooled_prompt_embeds,
negative_pooled_prompt_embeds,
) = self.pipe.encode_prompt(
prompt,
device=self.device,
num_images_per_prompt=num_samples,
do_classifier_free_guidance=True,
negative_prompt=negative_prompt,
)
# ★ 여기서 "콘텐츠" IP 토큰만 붙인다
prompt_embeds = torch.cat([prompt_embeds, content_ip_tokens], dim=1)
negative_prompt_embeds = torch.cat([negative_prompt_embeds, uncond_content_ip_tokens], dim=1)
with torch.inference_mode():
(
shape_prompt_embeds,
shape_negative_prompt_embeds,
shape_pooled_prompt_embeds,
shape_negative_pooled_prompt_embeds,
) = self.pipe.encode_prompt(
shape_prompt,
device=self.device,
num_images_per_prompt=num_samples,
do_classifier_free_guidance=True,
negative_prompt=negative_prompt,
)
shape_prompt_embeds = torch.cat([shape_prompt_embeds, content_ip_tokens], dim=1)
shape_negative_prompt_embeds = torch.cat([shape_negative_prompt_embeds, uncond_content_ip_tokens], dim=1)
for name, proc in self.attn_procs.items():
if getattr(proc, "group", "off") == "style":
proc.ip_tokens_override = style_ip_tokens.to(self.device, dtype=torch.float32)
proc.ip_tokens_override_uncond = style_ip_tokens_uncond.to(self.device, dtype=torch.float32)
else:
if hasattr(proc, "ip_tokens_override"):
proc.ip_tokens_override = None
if hasattr(proc, "ip_tokens_override_uncond"):
proc.ip_tokens_override_uncond = None
self.generator = get_generator(seed, self.device)
images = self.pipe(
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
pooled_prompt_embeds=pooled_prompt_embeds,
negative_pooled_prompt_embeds=negative_pooled_prompt_embeds,
shape_prompt_embeds=shape_prompt_embeds,
shape_negative_prompt_embeds=shape_negative_prompt_embeds,
shape_pooled_prompt_embeds=shape_pooled_prompt_embeds,
shape_negative_pooled_prompt_embeds=shape_negative_pooled_prompt_embeds,
num_inference_steps=num_inference_steps,
generator=self.generator,
mask_image=self.mask,
sketch_image=self.sketch,
garment_images=self.garment_images,
garment_mask=self.garment_mask,
**kwargs,
).images
for name, proc in self.attn_procs.items():
if hasattr(proc, "ip_tokens_override"):
proc.ip_tokens_override = None
if hasattr(proc, "ip_tokens_override_uncond"):
proc.ip_tokens_override_uncond = None
return images
class IPAdapterPlus(IPAdapter):
"""IP-Adapter with fine-grained features"""
def init_proj(self):
image_proj_model = Resampler(
dim=self.pipe.unet.config.cross_attention_dim,
depth=4,
dim_head=64,
heads=12,
num_queries=self.num_tokens,
embedding_dim=self.image_encoder.config.hidden_size,
output_dim=self.pipe.unet.config.cross_attention_dim,
ff_mult=4,
).to(self.device, dtype=torch.float32)
return image_proj_model
@torch.inference_mode()
def get_image_embeds(self, pil_image=None, clip_image_embeds=None):
if isinstance(pil_image, Image.Image):
pil_image = [pil_image]
clip_image = self.clip_image_processor(images=pil_image, return_tensors="pt").pixel_values
clip_image = clip_image.to(self.device, dtype=torch.float32)
clip_image_embeds = self.image_encoder(clip_image, output_hidden_states=True).hidden_states[-2]
image_prompt_embeds = self.image_proj_model(clip_image_embeds)
uncond_clip_image_embeds = self.image_encoder(
torch.zeros_like(clip_image), output_hidden_states=True
).hidden_states[-2]
uncond_image_prompt_embeds = self.image_proj_model(uncond_clip_image_embeds)
return image_prompt_embeds, uncond_image_prompt_embeds
class IPAdapterFull(IPAdapterPlus):
"""IP-Adapter with full features"""
def init_proj(self):
image_proj_model = MLPProjModel(
cross_attention_dim=self.pipe.unet.config.cross_attention_dim,
clip_embeddings_dim=self.image_encoder.config.hidden_size,
).to(self.device, dtype=torch.float32)
return image_proj_model
class IPAdapterPlusXL(IPAdapter):
"""SDXL"""
def init_proj(self):
image_proj_model = Resampler(
dim=1280,
depth=4,
dim_head=64,
heads=20,
num_queries=self.num_tokens,
embedding_dim=self.image_encoder.config.hidden_size,
output_dim=self.pipe.unet.config.cross_attention_dim,
ff_mult=4,
).to(self.device, dtype=torch.float32)
return image_proj_model
@torch.inference_mode()
def get_image_embeds(self, pil_image):
if isinstance(pil_image, Image.Image):
pil_image = [pil_image]
clip_image = self.clip_image_processor(images=pil_image, return_tensors="pt").pixel_values
clip_image = clip_image.to(self.device, dtype=torch.float32)
clip_image_embeds = self.image_encoder(clip_image, output_hidden_states=True).hidden_states[-2]
image_prompt_embeds = self.image_proj_model(clip_image_embeds)
uncond_clip_image_embeds = self.image_encoder(
torch.zeros_like(clip_image), output_hidden_states=True
).hidden_states[-2]
uncond_image_prompt_embeds = self.image_proj_model(uncond_clip_image_embeds)
return image_prompt_embeds, uncond_image_prompt_embeds
def generate(
self,
pil_image,
prompt=None,
negative_prompt=None,
scale=1.0,
num_samples=4,
seed=None,
num_inference_steps=30,
**kwargs,
):
self.set_scale(scale)
num_prompts = 1 if isinstance(pil_image, Image.Image) else len(pil_image)
if prompt is None:
prompt = "best quality, high quality"
if negative_prompt is None:
negative_prompt = "monochrome, lowres, bad anatomy, worst quality, low quality"
if not isinstance(prompt, List):
prompt = [prompt] * num_prompts
if not isinstance(negative_prompt, List):
negative_prompt = [negative_prompt] * num_prompts
image_prompt_embeds, uncond_image_prompt_embeds = self.get_image_embeds(pil_image)
bs_embed, seq_len, _ = image_prompt_embeds.shape
image_prompt_embeds = image_prompt_embeds.repeat(1, num_samples, 1)
image_prompt_embeds = image_prompt_embeds.view(bs_embed * num_samples, seq_len, -1)
uncond_image_prompt_embeds = uncond_image_prompt_embeds.repeat(1, num_samples, 1)
uncond_image_prompt_embeds = uncond_image_prompt_embeds.view(bs_embed * num_samples, seq_len, -1)
with torch.inference_mode():
(
prompt_embeds,
negative_prompt_embeds,
pooled_prompt_embeds,
negative_pooled_prompt_embeds,
) = self.pipe.encode_prompt(
prompt,
device=self.device,
num_images_per_prompt=num_samples,
do_classifier_free_guidance=True,
negative_prompt=negative_prompt,
)
prompt_embeds = torch.cat([prompt_embeds, image_prompt_embeds], dim=1)
negative_prompt_embeds = torch.cat([negative_prompt_embeds, uncond_image_prompt_embeds], dim=1)
generator = get_generator(seed, self.device)
images = self.pipe(
prompt_embeds=prompt_embeds,
negative_prompt_embeds=negative_prompt_embeds,
pooled_prompt_embeds=pooled_prompt_embeds,
negative_pooled_prompt_embeds=negative_pooled_prompt_embeds,
num_inference_steps=num_inference_steps,
generator=generator,
**kwargs,
).images
return images