|
|
|
|
|
|
|
|
import os, sys |
|
|
os.environ["OMP_NUM_THREADS"] = "4" |
|
|
os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "1") |
|
|
sys.path.insert(0, os.path.abspath("./instantid")) |
|
|
|
|
|
import traceback, importlib.util |
|
|
import torch, gradio as gr |
|
|
from PIL import Image, ImageOps, ImageDraw |
|
|
from huggingface_hub import hf_hub_download |
|
|
from diffusers.models import ControlNetModel |
|
|
from insightface.app import FaceAnalysis |
|
|
|
|
|
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
DTYPE = torch.float16 if DEVICE == "cuda" else torch.float32 |
|
|
|
|
|
ASSETS_REPO = "InstantX/InstantID" |
|
|
CHECKPOINTS_DIR = "./checkpoints" |
|
|
CN_LOCAL_DIR = os.path.join(CHECKPOINTS_DIR, "ControlNetModel") |
|
|
IP_ADAPTER_LOCAL = os.path.join(CHECKPOINTS_DIR, "ip-adapter.bin") |
|
|
|
|
|
IP_STYLE_REPO = "h94/IP-Adapter" |
|
|
IP_STYLE_SUBFOLDER = "sdxl_models" |
|
|
IP_STYLE_WEIGHT = "ip-adapter_sdxl.bin" |
|
|
|
|
|
BASE_MODEL = "stabilityai/stable-diffusion-xl-base-1.0" |
|
|
|
|
|
def safe_download(repo, filename, local_dir, min_bytes, label, subfolder=None): |
|
|
os.makedirs(local_dir, exist_ok=True) |
|
|
local_path = os.path.join(local_dir, os.path.basename(filename)) |
|
|
if os.path.exists(local_path) and os.path.getsize(local_path) < min_bytes: |
|
|
try: os.remove(local_path) |
|
|
except Exception: pass |
|
|
path = hf_hub_download( |
|
|
repo_id=repo, |
|
|
filename=filename, |
|
|
local_dir=local_dir, |
|
|
local_dir_use_symlinks=False, |
|
|
resume_download=True, |
|
|
force_download=not os.path.exists(local_path), |
|
|
subfolder=subfolder, |
|
|
) |
|
|
size = os.path.getsize(path) |
|
|
if size < min_bytes: |
|
|
raise RuntimeError(f"Téléchargement incomplet de {label} (taille: {size} bytes).") |
|
|
print(f"✅ {label} téléchargé ({size/1e6:.1f} MB)") |
|
|
return path |
|
|
|
|
|
def ensure_assets_or_download(): |
|
|
os.makedirs(CHECKPOINTS_DIR, exist_ok=True) |
|
|
os.makedirs(CN_LOCAL_DIR, exist_ok=True) |
|
|
safe_download(ASSETS_REPO, "ControlNetModel/config.json", CHECKPOINTS_DIR, 1_000, "IdentityNet config") |
|
|
safe_download(ASSETS_REPO, "ControlNetModel/diffusion_pytorch_model.safetensors", CHECKPOINTS_DIR, 100_000_000, "IdentityNet weights") |
|
|
safe_download(ASSETS_REPO, "ip-adapter.bin", CHECKPOINTS_DIR, 100_000_000, "IP-Adapter (InstantID)") |
|
|
safe_download(IP_STYLE_REPO, IP_STYLE_WEIGHT, CHECKPOINTS_DIR, 20_000_000, "IP-Adapter Style (SDXL)", subfolder=IP_STYLE_SUBFOLDER) |
|
|
|
|
|
def import_pipeline_or_fail(): |
|
|
candidates = [ |
|
|
"./instantid/pipeline_stable_diffusion_xl_instantid_full.py", |
|
|
"./instantid/pipeline_stable_diffusion_xl_instantid.py", |
|
|
] |
|
|
pipeline_file = next((p for p in candidates if os.path.exists(p)), None) |
|
|
if pipeline_file is None: |
|
|
raise RuntimeError("❌ Pipeline manquante. Place `pipeline_stable_diffusion_xl_instantid_full.py` dans ./instantid/") |
|
|
if os.path.getsize(pipeline_file) < 1024: |
|
|
raise RuntimeError("❌ Pipeline trop petite (vide ?). Utilise la version SDXL officielle.") |
|
|
spec = importlib.util.spec_from_file_location("instantid_pipeline", pipeline_file) |
|
|
mod = importlib.util.module_from_spec(spec) |
|
|
spec.loader.exec_module(mod) |
|
|
for name, obj in vars(mod).items(): |
|
|
if isinstance(obj, type) and "InstantID" in name and hasattr(obj, "from_pretrained"): |
|
|
print(f"✅ Pipeline trouvée : {name}") |
|
|
return obj |
|
|
avail = [n for n, o in vars(mod).items() if isinstance(o, type)] |
|
|
raise RuntimeError("❌ Aucune classe pipeline InstantID trouvée. Classes dispo: " + ", ".join(avail)) |
|
|
|
|
|
def draw_kps_local(img_pil, kps): |
|
|
w, h = img_pil.size |
|
|
out = Image.new("RGB", (w, h), "white") |
|
|
d = ImageDraw.Draw(out) |
|
|
r = max(2, min(w, h)//100) |
|
|
for (x, y) in kps: |
|
|
d.ellipse((x - r, y - r, x + r, y + r), fill="black") |
|
|
return out |
|
|
|
|
|
load_logs = [] |
|
|
HAS_STYLE_ADAPTER = False |
|
|
try: |
|
|
SDXLInstantID = import_pipeline_or_fail() |
|
|
ensure_assets_or_download() |
|
|
|
|
|
controlnet_identitynet = ControlNetModel.from_pretrained(CN_LOCAL_DIR, torch_dtype=DTYPE) |
|
|
pipe = SDXLInstantID.from_pretrained( |
|
|
BASE_MODEL, |
|
|
controlnet=controlnet_identitynet, |
|
|
torch_dtype=DTYPE, |
|
|
safety_checker=None, |
|
|
feature_extractor=None, |
|
|
).to(DEVICE) |
|
|
|
|
|
pipe.load_ip_adapter_instantid(IP_ADAPTER_LOCAL) |
|
|
|
|
|
try: |
|
|
pipe.load_ip_adapter( |
|
|
IP_STYLE_REPO, |
|
|
subfolder=IP_STYLE_SUBFOLDER, |
|
|
weight_name=IP_STYLE_WEIGHT, |
|
|
adapter_name="style", |
|
|
) |
|
|
load_logs.append("✅ IP-Adapter Style (SDXL) chargé (adapter_name='style').") |
|
|
HAS_STYLE_ADAPTER = True |
|
|
except Exception as e: |
|
|
load_logs.append(f"ℹ️ IP-Adapter Style non chargé: {e}") |
|
|
|
|
|
if DEVICE == "cuda": |
|
|
if hasattr(pipe, "image_proj_model"): pipe.image_proj_model.to("cuda") |
|
|
if hasattr(pipe, "unet"): pipe.unet.to("cuda") |
|
|
|
|
|
load_logs.append("✅ InstantID prêt.") |
|
|
except Exception: |
|
|
load_logs += ["❌ ERREUR au chargement:", traceback.format_exc()] |
|
|
pipe = None |
|
|
|
|
|
if pipe is None: |
|
|
raise RuntimeError("Échec de chargement du pipeline.\n" + "\n".join(load_logs)) |
|
|
|
|
|
def load_face_analyser(): |
|
|
errors = [] |
|
|
for name in ("antelopev2", "buffalo_l"): |
|
|
try: |
|
|
fa = FaceAnalysis(name=name, root="./models", providers=["CPUExecutionProvider"]) |
|
|
fa.prepare(ctx_id=0, det_size=(640, 640)) |
|
|
print(f"✅ InsightFace chargé: {name}") |
|
|
return fa |
|
|
except Exception as e: |
|
|
errors.append(f"{name}: {e}") |
|
|
print(f"⚠️ InsightFace échec {name} → {e}") |
|
|
raise RuntimeError("Echec chargement InsightFace. Détails: " + " | ".join(errors)) |
|
|
|
|
|
fa = load_face_analyser() |
|
|
|
|
|
def extract_face_embed_and_kps(pil_img): |
|
|
import numpy as np, cv2 |
|
|
img_cv2 = cv2.cvtColor(np.array(pil_img.convert("RGB")), cv2.COLOR_RGB2BGR) |
|
|
faces = fa.get(img_cv2) |
|
|
if not faces: |
|
|
raise ValueError("Aucun visage détecté dans la photo.") |
|
|
face = faces[-1] |
|
|
emb_np = face["embedding"] |
|
|
if not isinstance(emb_np, np.ndarray): |
|
|
emb_np = np.asarray(emb_np, dtype="float32") |
|
|
if emb_np.ndim == 1: |
|
|
emb_np = emb_np[None, ...] |
|
|
face_emb = torch.from_numpy(emb_np).to(device=DEVICE, dtype=DTYPE) |
|
|
kps_img = draw_kps_local(pil_img, face["kps"]) |
|
|
return face_emb, kps_img |
|
|
|
|
|
def generate(face_image, style_image, prompt, negative_prompt, |
|
|
identity_strength, adapter_strength, style_strength, |
|
|
steps, cfg, width, height, seed): |
|
|
try: |
|
|
if face_image is None: |
|
|
return None, "Merci d'ajouter une photo visage.", "\n".join(load_logs) |
|
|
|
|
|
gen = None if seed is None or int(seed) < 0 else torch.Generator(device=DEVICE).manual_seed(int(seed)) |
|
|
|
|
|
|
|
|
from PIL import ImageOps |
|
|
face = ImageOps.exif_transpose(face_image).convert("RGB") |
|
|
ms = min(face.size); x = (face.width - ms) // 2; y = (face.height - ms) // 2 |
|
|
face_sq = face.crop((x, y, x + ms, y + ms)).resize((512, 512), Image.Resampling.LANCZOS) |
|
|
|
|
|
|
|
|
face_emb, kps_img = extract_face_embed_and_kps(face_sq) |
|
|
|
|
|
|
|
|
try: |
|
|
if HAS_STYLE_ADAPTER and style_image is not None: |
|
|
pipe.set_ip_adapter_scale({"instantid": float(adapter_strength), "style": float(style_strength)}) |
|
|
else: |
|
|
pipe.set_ip_adapter_scale(float(adapter_strength)) |
|
|
except Exception as e: |
|
|
print(f"ℹ️ set_ip_adapter_scale ignoré: {e}") |
|
|
|
|
|
|
|
|
cn = getattr(pipe, "controlnet", None) |
|
|
if isinstance(cn, (list, tuple)): |
|
|
n_cn = len(cn) |
|
|
else: |
|
|
try: n_cn = len(cn) |
|
|
except Exception: n_cn = 1 |
|
|
|
|
|
image_arg = [kps_img] * n_cn if n_cn > 1 else ([kps_img] if isinstance(cn, (list, tuple)) else kps_img) |
|
|
scale_val = float(identity_strength) |
|
|
scale_arg = [scale_val] * n_cn if n_cn > 1 else ([scale_val] if isinstance(cn, (list, tuple)) else scale_val) |
|
|
|
|
|
|
|
|
gen_kwargs = dict( |
|
|
prompt=(prompt or "").strip(), |
|
|
negative_prompt=(negative_prompt or "").strip(), |
|
|
image=image_arg, |
|
|
image_embeds=face_emb, |
|
|
added_conditions={"image_embeds": face_emb}, |
|
|
added_cond_kwargs={"image_embeds": face_emb}, |
|
|
controlnet_conditioning_scale=scale_arg, |
|
|
num_inference_steps=int(steps), |
|
|
guidance_scale=float(cfg), |
|
|
width=int(width), |
|
|
height=int(height), |
|
|
generator=gen, |
|
|
) |
|
|
if HAS_STYLE_ADAPTER and style_image is not None: |
|
|
try: |
|
|
gen_kwargs["ip_adapter_image"] = ImageOps.exif_transpose(style_image).convert("RGB") |
|
|
except Exception as e: |
|
|
print(f"ℹ️ ip_adapter_image ignoré: {e}") |
|
|
|
|
|
|
|
|
orig_forward = pipe.unet.forward |
|
|
|
|
|
def forward_patch(*args, **kwargs): |
|
|
|
|
|
ac = kwargs.get("added_conditions") |
|
|
if ac is None: |
|
|
ac = {} |
|
|
else: |
|
|
ac = dict(ac) |
|
|
ac["image_embeds"] = face_emb |
|
|
kwargs["added_conditions"] = ac |
|
|
|
|
|
kwargs["added_cond_kwargs"] = ac |
|
|
return orig_forward(*args, **kwargs) |
|
|
|
|
|
pipe.unet.forward = forward_patch |
|
|
|
|
|
try: |
|
|
images = pipe(**gen_kwargs).images |
|
|
finally: |
|
|
|
|
|
pipe.unet.forward = orig_forward |
|
|
|
|
|
return images[0], "", "\n".join(load_logs) |
|
|
|
|
|
except torch.cuda.OutOfMemoryError: |
|
|
return None, "CUDA OOM: baisse la résolution ou les steps.", "\n".join(load_logs) |
|
|
except Exception: |
|
|
import traceback |
|
|
return None, "Erreur:\n" + traceback.format_exc(), "\n".join(load_logs) |
|
|
|
|
|
|
|
|
|
|
|
EX_PROMPT = ( |
|
|
"one piece style, Eiichiro Oda style, anime portrait, upper body, pirate outfit, " |
|
|
"clean lineart, cel shading, vibrant colors, expressive eyes, dynamic composition, simple background" |
|
|
) |
|
|
EX_NEG = ( |
|
|
"realistic, photo, photorealistic, skin pores, complex lighting, " |
|
|
"low quality, worst quality, lowres, blurry, noisy, watermark, text, logo, jpeg artifacts, " |
|
|
"bad anatomy, deformed, multiple faces, nsfw" |
|
|
) |
|
|
|
|
|
with gr.Blocks(css="footer{display:none !important}") as demo: |
|
|
gr.Markdown("# 🏴☠️ InstantID SDXL + IP-Adapter Style (2D) — visage → perso One Piece") |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
face_image = gr.Image(type="pil", label="Photo visage (obligatoire)", height=260) |
|
|
style_image = gr.Image(type="pil", label="Image de style (optionnel)", height=260) |
|
|
gr.Markdown("Astuce : poster/planche One Piece → rendu 2D renforcé via IP-Adapter Style.") |
|
|
prompt = gr.Textbox(label="Prompt", value=EX_PROMPT, lines=3) |
|
|
negative = gr.Textbox(label="Negative Prompt", value=EX_NEG, lines=3) |
|
|
with gr.Row(): |
|
|
identity_strength = gr.Slider(0.2, 1.5, 0.95, 0.05, label="Fidélité visage (IdentityNet)") |
|
|
adapter_strength = gr.Slider(0.1, 1.5, 0.85, 0.05, label="Détails anime (InstantID)") |
|
|
style_strength = gr.Slider(0.1, 1.5, 0.95, 0.05, label="Force style (IP-Adapter Style)") |
|
|
steps = gr.Slider(10, 60, 30, 1, label="Steps") |
|
|
cfg = gr.Slider(0.1, 12.0, 6.5, 0.1, label="CFG") |
|
|
width = gr.Dropdown(choices=[576, 640, 704, 768, 896], value=704, label="Largeur") |
|
|
height = gr.Dropdown(choices=[704, 768, 896, 1024], value=896, label="Hauteur") |
|
|
seed = gr.Number(value=-1, label="Seed (-1 aléatoire)") |
|
|
btn = gr.Button("🎨 Générer", variant="primary") |
|
|
with gr.Column(): |
|
|
out_image = gr.Image(label="Résultat", interactive=False) |
|
|
err_box = gr.Textbox(label="Erreurs", visible=False) |
|
|
log_box = gr.Textbox(label="Logs", value="\n".join(load_logs), lines=12) |
|
|
|
|
|
def wrap(*args): |
|
|
img, err, logs = generate(*args) |
|
|
return img, gr.update(visible=bool(err), value=err), gr.update(value=logs) |
|
|
|
|
|
btn.click( |
|
|
wrap, |
|
|
inputs=[face_image, style_image, prompt, negative, |
|
|
identity_strength, adapter_strength, style_strength, |
|
|
steps, cfg, width, height, seed], |
|
|
outputs=[out_image, err_box, log_box], |
|
|
) |
|
|
|
|
|
demo.queue(api_open=False) |
|
|
if __name__ == "__main__": |
|
|
demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False) |
|
|
|