# -*- coding: utf-8 -*- import os from typing import List import torch import torch.nn as nn import torch.nn.functional as F from diffusers import StableDiffusionPipeline from diffusers.pipelines.controlnet import MultiControlNetModel from PIL import Image, ImageOps from safetensors import safe_open from transformers import CLIPImageProcessor, CLIPVisionModelWithProjection, CLIPTokenizer, CLIPTextModelWithProjection from .utils import is_torch2_available, get_generator if is_torch2_available(): from .attention_processor import ( AttnProcessor2_0 as AttnProcessor, ) from .attention_processor import ( CNAttnProcessor2_0 as CNAttnProcessor, ) from .attention_processor import ( IPAttnProcessor2_0 as IPAttnProcessor, ) else: from .attention_processor import AttnProcessor, CNAttnProcessor, IPAttnProcessor from .resampler import Resampler import numpy as np, random import math import torch import torch import torch.nn.functional as F import numpy as np import cv2 from PIL import Image def _cosine(a: torch.Tensor, b: torch.Tensor, eps: float = 1e-12) -> float: a = a.float(); b = b.float() na = a.norm(); nb = b.norm() if na.item() < eps or nb.item() < eps: return float("nan") return float((a @ b) / (na * nb)) def verify_style_content_embeddings(adapter, sim_threshold: float = 0.999): content_fps, style_fps = [], [] wrong_source = [] for name, proc in adapter.attn_procs.items(): group = getattr(proc, "group", "off") mu = getattr(proc, "last_ip_mu", None) src = getattr(proc, "last_ip_source", None) if group not in ("content", "style"): continue if mu is None: continue if group == "content" and src != "tail": wrong_source.append((name, group, src)) if group == "style" and src != "override": wrong_source.append((name, group, src)) if group == "content": content_fps.append((name, mu)) else: style_fps.append((name, mu)) print("\n[Verify] token source check") if wrong_source: for name, grp, src in wrong_source: print(f" - !! {name}: group={grp} but last_ip_source={src}") else: print(" - OK: content uses 'tail', style uses 'override'") if not content_fps or not style_fps: return False content_mu = torch.stack([mu for _, mu in content_fps], dim=0).mean(dim=0) style_mu = torch.stack([mu for _, mu in style_fps], dim=0).mean(dim=0) cos = _cosine(content_mu, style_mu) print(f"\n[Verify] group-wise cosine(content, style) = {cos:.6f}") print("\n[Verify] layer-wise cosine to content-mean (lower is more different)") for name, mu in style_fps: cs = _cosine(content_mu, mu) print(f" - {name:<60} cos={cs:.6f}") ok = (not wrong_source) and (not math.isnan(cos)) and (cos < sim_threshold) def _split_bounds(size, parts): bounds = np.linspace(0, size, parts + 1) return [int(round(b)) for b in bounds] class ImageProjModel(torch.nn.Module): """Projection Model""" def __init__(self, cross_attention_dim=1024, clip_embeddings_dim=1024, clip_extra_context_tokens=4): super().__init__() self.generator = None self.cross_attention_dim = cross_attention_dim self.clip_extra_context_tokens = clip_extra_context_tokens self.proj = torch.nn.Linear(clip_embeddings_dim, self.clip_extra_context_tokens * cross_attention_dim) self.norm = torch.nn.LayerNorm(cross_attention_dim) def forward(self, image_embeds): embeds = image_embeds clip_extra_context_tokens = self.proj(embeds).reshape( -1, self.clip_extra_context_tokens, self.cross_attention_dim ) clip_extra_context_tokens = self.norm(clip_extra_context_tokens) return clip_extra_context_tokens class MLPProjModel(torch.nn.Module): """SD model with image prompt""" def __init__(self, cross_attention_dim=1024, clip_embeddings_dim=1024): super().__init__() self.proj = torch.nn.Sequential( torch.nn.Linear(clip_embeddings_dim, clip_embeddings_dim), torch.nn.GELU(), torch.nn.Linear(clip_embeddings_dim, cross_attention_dim), torch.nn.LayerNorm(cross_attention_dim) ) def forward(self, image_embeds): clip_extra_context_tokens = self.proj(image_embeds) return clip_extra_context_tokens class IPAdapter: def __init__( self, sd_pipe, image_encoder_path, ip_ckpt, device, mask=None, sketch=None, num_tokens=4, target_blocks=None, # NEW: block groups & scales content_blocks=None, style_blocks=None, content_scale: float = 0.5, style_scale: float = 0.5, garment_images = None, garment_mask = None, ): self.device = device self.image_encoder_path = image_encoder_path self.ip_ckpt = ip_ckpt self.num_tokens = num_tokens self.target_blocks = target_blocks or [] self.pipe = sd_pipe.to(self.device) self.mask = mask self.sketch = sketch self.garment_images = garment_images self.garment_mask = garment_mask self.content_blocks = [ "down_blocks.2.attentions.1", ] self.style_blocks = [ "up_blocks.0.attentions.1", ] self.content_scale = float(content_scale) self.style_scale = float(style_scale) self.attn_procs = {} self.set_ip_adapter() self.clip_tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-large-patch14") self.text_encoder = CLIPTextModelWithProjection.from_pretrained( "openai/clip-vit-large-patch14" ).to(self.device) self.image_encoder = CLIPVisionModelWithProjection.from_pretrained( self.image_encoder_path ).to(self.device, dtype=torch.float32) self.clip_image_processor = CLIPImageProcessor() self.image_proj_model = self.init_proj() self.load_ip_adapter() # --- utils --- def _parse_block_id(self, name: str, prefix: str) -> int: # "up_blocks.0.attentions.1.processor" -> 0 return int(name[len(prefix):].split(".")[0]) def init_proj(self): image_proj_model = ImageProjModel( cross_attention_dim=self.pipe.unet.config.cross_attention_dim, clip_embeddings_dim=self.image_encoder.config.projection_dim, clip_extra_context_tokens=self.num_tokens, ).to(self.device, dtype=torch.float32) return image_proj_model def _apply_group_scales(self): for name, proc in self.attn_procs.items(): if not isinstance(proc, IPAttnProcessor): continue if any(b in name for b in self.content_blocks): proc.skip = False proc.scale = float(self.content_scale) elif any(b in name for b in self.style_blocks): proc.skip = False proc.scale = float(self.style_scale) else: proc.skip = True def _which_group(self, name: str) -> str: if any(b in name for b in self.content_blocks): return "content" if any(b in name for b in self.style_blocks): return "style" return "off" def _get_proc_tokens(self, proc): for key in ("image_prompt_embeds", "ip_tokens", "image_prompts"): t = getattr(proc, key, None) if t is not None: return t return None def print_block_scales(self, verbose: bool = True): rows = [] for name, proc in self.attn_procs.items(): scale = getattr(proc, "scale", None) skip = getattr(proc, "skip", None) group = getattr(proc, "group", "self" if name.endswith("attn1.processor") else "off") rows.append((name, group, scale, skip)) def _key(t): n = t[0] if n.startswith("down_blocks"): p = 0 elif n.startswith("mid_block"): p = 1 elif n.startswith("up_blocks"): p = 2 else: p = 3 # 숫자 추출 import re m = re.findall(r"\d+", n) idx = tuple(int(x) for x in m) if m else (999,) return (p, idx, n) rows.sort(key=_key) if verbose: print("\n[IPAdapter] Block-scale report") for name, group, scale, skip in rows: tag = "ATTN2" if name.endswith("attn2.processor") else "ATTN1" print(f" - {name:<60} [{tag}] group={group:<7} scale={scale} skip={skip}") return rows def set_ip_adapter(self): unet = self.pipe.unet attn_procs = {} self.attn_procs = {} for name in unet.attn_processors.keys(): cross_attention_dim = None if name.endswith("attn1.processor") else unet.config.cross_attention_dim if name.startswith("mid_block"): hidden_size = unet.config.block_out_channels[-1] elif name.startswith("up_blocks"): block_id = self._parse_block_id(name, "up_blocks.") hidden_size = list(reversed(unet.config.block_out_channels))[block_id] elif name.startswith("down_blocks"): block_id = self._parse_block_id(name, "down_blocks.") hidden_size = unet.config.block_out_channels[block_id] else: hidden_size = unet.config.block_out_channels[0] if cross_attention_dim is None: proc = AttnProcessor() setattr(proc, "layer_name", name) else: is_content = any(b in name for b in self.content_blocks) is_style = any(b in name for b in self.style_blocks) selected = is_content or is_style or any(b in name for b in self.target_blocks) init_skip = not selected init_scale = 1.0 if is_content: init_scale = float(self.content_scale) elif is_style: init_scale = float(self.style_scale) proc = IPAttnProcessor( hidden_size=hidden_size, cross_attention_dim=cross_attention_dim, scale=init_scale, num_tokens=self.num_tokens, skip=init_skip, ).to(self.device, dtype=torch.float32) setattr(proc, "layer_name", name) setattr(proc, "group", "content" if is_content else ("style" if is_style else "off")) attn_procs[name] = proc self.attn_procs[name] = proc unet.set_attn_processor(attn_procs) if hasattr(self.pipe, "controlnet"): if isinstance(self.pipe.controlnet, MultiControlNetModel): for controlnet in self.pipe.controlnet.nets: controlnet.set_attn_processor(CNAttnProcessor(num_tokens=self.num_tokens)) else: self.pipe.controlnet.set_attn_processor(CNAttnProcessor(num_tokens=self.num_tokens)) def load_ip_adapter(self): if os.path.splitext(self.ip_ckpt)[-1] == ".safetensors": state_dict = {"image_proj": {}, "ip_adapter": {}} with safe_open(self.ip_ckpt, framework="pt", device="cpu") as f: for key in f.keys(): if key.startswith("image_proj."): state_dict["image_proj"][key.replace("image_proj.", "")] = f.get_tensor(key) elif key.startswith("ip_adapter."): state_dict["ip_adapter"][key.replace("ip_adapter.", "")] = f.get_tensor(key) else: state_dict = torch.load(self.ip_ckpt, map_location="cpu") self.image_proj_model.load_state_dict(state_dict["image_proj"]) ip_layers = torch.nn.ModuleList(self.pipe.unet.attn_processors.values()) ip_layers.load_state_dict(state_dict["ip_adapter"], strict=False) @torch.inference_mode() def get_image_embeds(self, pil_image=None, clip_image_embeds=None, content_prompt_embeds=None): if pil_image is not None: if isinstance(pil_image, Image.Image): pil_image = [pil_image] clip_image = self.clip_image_processor(images=pil_image, return_tensors="pt").pixel_values clip_image_embeds = self.image_encoder(clip_image.to(self.device, dtype=torch.float32)).image_embeds else: clip_image_embeds = clip_image_embeds.to(self.device, dtype=torch.float32) image_prompt_embeds = self.image_proj_model(clip_image_embeds) # [B, Ni, D] = [1,4,2048] uncond_image_prompt_embeds = self.image_proj_model(torch.zeros_like(clip_image_embeds)) return image_prompt_embeds, uncond_image_prompt_embeds def generate( self, pil_image=None, clip_image_embeds=None, prompt=None, negative_prompt=None, scale=1.0, num_samples=4, seed=None, guidance_scale=7.5, num_inference_steps=30, neg_content_emb=None, **kwargs, ): if scale is not None: self.set_scale(scale) if pil_image is not None: num_prompts = 1 if isinstance(pil_image, Image.Image) else len(pil_image) else: num_prompts = clip_image_embeds.size(0) if prompt is None: prompt = "best quality, high quality" if negative_prompt is None: negative_prompt = "monochrome, lowres, bad anatomy, worst quality, low quality" if not isinstance(prompt, List): prompt = [prompt] * num_prompts if not isinstance(negative_prompt, List): negative_prompt = [negative_prompt] * num_prompts image_prompt_embeds, uncond_image_prompt_embeds = self.get_image_embeds( pil_image=pil_image, clip_image_embeds=clip_image_embeds, content_prompt_embeds=neg_content_emb ) bs_embed, seq_len, _ = image_prompt_embeds.shape image_prompt_embeds = image_prompt_embeds.repeat(1, num_samples, 1) image_prompt_embeds = image_prompt_embeds.view(bs_embed * num_samples, seq_len, -1) uncond_image_prompt_embeds = uncond_image_prompt_embeds.repeat(1, num_samples, 1) uncond_image_prompt_embeds = uncond_image_prompt_embeds.view(bs_embed * num_samples, seq_len, -1) with torch.inference_mode(): prompt_embeds_, negative_prompt_embeds_ = self.pipe.encode_prompt( prompt, device=self.device, num_images_per_prompt=num_samples, do_classifier_free_guidance=True, negative_prompt=negative_prompt, ) prompt_embeds = torch.cat([prompt_embeds_, image_prompt_embeds], dim=1) negative_prompt_embeds = torch.cat([negative_prompt_embeds_, uncond_image_prompt_embeds], dim=1) generator = get_generator(seed, self.device) images = self.pipe( prompt_embeds=prompt_embeds, negative_prompt_embeds=negative_prompt_embeds, guidance_scale=guidance_scale, num_inference_steps=num_inference_steps, generator=generator, **kwargs, ).images return images class IPAdapterXL(IPAdapter): """SDXL""" def generate( self, pil_image, prompt=None, shape_prompt=None, negative_prompt=None, scale=1.0, num_samples=4, seed=None, num_inference_steps=30, neg_content_emb=None, neg_content_prompt=None, neg_content_scale=1.0, **kwargs, ): if scale is not None: self.set_scale(scale) num_prompts = 1 if isinstance(pil_image, Image.Image) else len(pil_image) if prompt is None: prompt = "best quality, high quality" if negative_prompt is None: negative_prompt = "monochrome, lowres, bad anatomy, worst quality, low quality" if not isinstance(prompt, List): prompt = [prompt] * num_prompts if not isinstance(negative_prompt, List): negative_prompt = [negative_prompt] * num_prompts if neg_content_emb is None: if neg_content_prompt is not None: with torch.inference_mode(): ( prompt_embeds_, # [B, 77, 2048] negative_prompt_embeds_, pooled_prompt_embeds_, # [B, 1280] negative_pooled_prompt_embeds_, ) = self.pipe.encode_prompt( neg_content_prompt, num_images_per_prompt=num_samples, do_classifier_free_guidance=True, negative_prompt=negative_prompt, ) pooled_prompt_embeds_ *= neg_content_scale else: pooled_prompt_embeds_ = neg_content_emb else: pooled_prompt_embeds_ = None content_ip_tokens, uncond_content_ip_tokens = self.get_image_embeds( pil_image=pil_image, content_prompt_embeds=pooled_prompt_embeds_ ) bs_embed, seq_len, _ = content_ip_tokens.shape content_ip_tokens = content_ip_tokens.repeat(1, num_samples, 1).view(bs_embed * num_samples, seq_len, -1) uncond_content_ip_tokens = uncond_content_ip_tokens.repeat(1, num_samples, 1).view(bs_embed * num_samples, seq_len, -1) style_ip_tokens, uncond_style_ip_tokens = self.get_image_embeds( pil_image=pil_image, content_prompt_embeds=pooled_prompt_embeds_ ) bs_embed, seq_len, _ = style_ip_tokens.shape style_ip_tokens = style_ip_tokens.repeat(1, num_samples, 1).view(bs_embed * num_samples, seq_len, -1) style_ip_tokens_uncond = uncond_style_ip_tokens.repeat(1, num_samples, 1).view(bs_embed * num_samples, seq_len, -1) with torch.inference_mode(): ( prompt_embeds, negative_prompt_embeds, pooled_prompt_embeds, negative_pooled_prompt_embeds, ) = self.pipe.encode_prompt( prompt, device=self.device, num_images_per_prompt=num_samples, do_classifier_free_guidance=True, negative_prompt=negative_prompt, ) # ★ 여기서 "콘텐츠" IP 토큰만 붙인다 prompt_embeds = torch.cat([prompt_embeds, content_ip_tokens], dim=1) negative_prompt_embeds = torch.cat([negative_prompt_embeds, uncond_content_ip_tokens], dim=1) with torch.inference_mode(): ( shape_prompt_embeds, shape_negative_prompt_embeds, shape_pooled_prompt_embeds, shape_negative_pooled_prompt_embeds, ) = self.pipe.encode_prompt( shape_prompt, device=self.device, num_images_per_prompt=num_samples, do_classifier_free_guidance=True, negative_prompt=negative_prompt, ) shape_prompt_embeds = torch.cat([shape_prompt_embeds, content_ip_tokens], dim=1) shape_negative_prompt_embeds = torch.cat([shape_negative_prompt_embeds, uncond_content_ip_tokens], dim=1) for name, proc in self.attn_procs.items(): if getattr(proc, "group", "off") == "style": proc.ip_tokens_override = style_ip_tokens.to(self.device, dtype=torch.float32) proc.ip_tokens_override_uncond = style_ip_tokens_uncond.to(self.device, dtype=torch.float32) else: if hasattr(proc, "ip_tokens_override"): proc.ip_tokens_override = None if hasattr(proc, "ip_tokens_override_uncond"): proc.ip_tokens_override_uncond = None self.generator = get_generator(seed, self.device) images = self.pipe( prompt_embeds=prompt_embeds, negative_prompt_embeds=negative_prompt_embeds, pooled_prompt_embeds=pooled_prompt_embeds, negative_pooled_prompt_embeds=negative_pooled_prompt_embeds, shape_prompt_embeds=shape_prompt_embeds, shape_negative_prompt_embeds=shape_negative_prompt_embeds, shape_pooled_prompt_embeds=shape_pooled_prompt_embeds, shape_negative_pooled_prompt_embeds=shape_negative_pooled_prompt_embeds, num_inference_steps=num_inference_steps, generator=self.generator, mask_image=self.mask, sketch_image=self.sketch, garment_images=self.garment_images, garment_mask=self.garment_mask, **kwargs, ).images for name, proc in self.attn_procs.items(): if hasattr(proc, "ip_tokens_override"): proc.ip_tokens_override = None if hasattr(proc, "ip_tokens_override_uncond"): proc.ip_tokens_override_uncond = None return images class IPAdapterPlus(IPAdapter): """IP-Adapter with fine-grained features""" def init_proj(self): image_proj_model = Resampler( dim=self.pipe.unet.config.cross_attention_dim, depth=4, dim_head=64, heads=12, num_queries=self.num_tokens, embedding_dim=self.image_encoder.config.hidden_size, output_dim=self.pipe.unet.config.cross_attention_dim, ff_mult=4, ).to(self.device, dtype=torch.float32) return image_proj_model @torch.inference_mode() def get_image_embeds(self, pil_image=None, clip_image_embeds=None): if isinstance(pil_image, Image.Image): pil_image = [pil_image] clip_image = self.clip_image_processor(images=pil_image, return_tensors="pt").pixel_values clip_image = clip_image.to(self.device, dtype=torch.float32) clip_image_embeds = self.image_encoder(clip_image, output_hidden_states=True).hidden_states[-2] image_prompt_embeds = self.image_proj_model(clip_image_embeds) uncond_clip_image_embeds = self.image_encoder( torch.zeros_like(clip_image), output_hidden_states=True ).hidden_states[-2] uncond_image_prompt_embeds = self.image_proj_model(uncond_clip_image_embeds) return image_prompt_embeds, uncond_image_prompt_embeds class IPAdapterFull(IPAdapterPlus): """IP-Adapter with full features""" def init_proj(self): image_proj_model = MLPProjModel( cross_attention_dim=self.pipe.unet.config.cross_attention_dim, clip_embeddings_dim=self.image_encoder.config.hidden_size, ).to(self.device, dtype=torch.float32) return image_proj_model class IPAdapterPlusXL(IPAdapter): """SDXL""" def init_proj(self): image_proj_model = Resampler( dim=1280, depth=4, dim_head=64, heads=20, num_queries=self.num_tokens, embedding_dim=self.image_encoder.config.hidden_size, output_dim=self.pipe.unet.config.cross_attention_dim, ff_mult=4, ).to(self.device, dtype=torch.float32) return image_proj_model @torch.inference_mode() def get_image_embeds(self, pil_image): if isinstance(pil_image, Image.Image): pil_image = [pil_image] clip_image = self.clip_image_processor(images=pil_image, return_tensors="pt").pixel_values clip_image = clip_image.to(self.device, dtype=torch.float32) clip_image_embeds = self.image_encoder(clip_image, output_hidden_states=True).hidden_states[-2] image_prompt_embeds = self.image_proj_model(clip_image_embeds) uncond_clip_image_embeds = self.image_encoder( torch.zeros_like(clip_image), output_hidden_states=True ).hidden_states[-2] uncond_image_prompt_embeds = self.image_proj_model(uncond_clip_image_embeds) return image_prompt_embeds, uncond_image_prompt_embeds def generate( self, pil_image, prompt=None, negative_prompt=None, scale=1.0, num_samples=4, seed=None, num_inference_steps=30, **kwargs, ): self.set_scale(scale) num_prompts = 1 if isinstance(pil_image, Image.Image) else len(pil_image) if prompt is None: prompt = "best quality, high quality" if negative_prompt is None: negative_prompt = "monochrome, lowres, bad anatomy, worst quality, low quality" if not isinstance(prompt, List): prompt = [prompt] * num_prompts if not isinstance(negative_prompt, List): negative_prompt = [negative_prompt] * num_prompts image_prompt_embeds, uncond_image_prompt_embeds = self.get_image_embeds(pil_image) bs_embed, seq_len, _ = image_prompt_embeds.shape image_prompt_embeds = image_prompt_embeds.repeat(1, num_samples, 1) image_prompt_embeds = image_prompt_embeds.view(bs_embed * num_samples, seq_len, -1) uncond_image_prompt_embeds = uncond_image_prompt_embeds.repeat(1, num_samples, 1) uncond_image_prompt_embeds = uncond_image_prompt_embeds.view(bs_embed * num_samples, seq_len, -1) with torch.inference_mode(): ( prompt_embeds, negative_prompt_embeds, pooled_prompt_embeds, negative_pooled_prompt_embeds, ) = self.pipe.encode_prompt( prompt, device=self.device, num_images_per_prompt=num_samples, do_classifier_free_guidance=True, negative_prompt=negative_prompt, ) prompt_embeds = torch.cat([prompt_embeds, image_prompt_embeds], dim=1) negative_prompt_embeds = torch.cat([negative_prompt_embeds, uncond_image_prompt_embeds], dim=1) generator = get_generator(seed, self.device) images = self.pipe( prompt_embeds=prompt_embeds, negative_prompt_embeds=negative_prompt_embeds, pooled_prompt_embeds=pooled_prompt_embeds, negative_pooled_prompt_embeds=negative_pooled_prompt_embeds, num_inference_steps=num_inference_steps, generator=generator, **kwargs, ).images return images