import sys import os import gc import shutil # --- 1. System Setup & Error Handling --- # Force install detectron2 if missing try: import detectron2 except ImportError: print("⚠️ Detectron2 missing. Installing...") os.system('pip install git+https://github.com/facebookresearch/detectron2.git') import requests import gradio as gr import spaces from PIL import Image import numpy as np import torch from torchvision import transforms from torchvision.transforms.functional import to_pil_image from huggingface_hub import hf_hub_download sys.path.append('./') # Import Local Modules try: from utils_mask import get_mask_location from src.tryon_pipeline import StableDiffusionXLInpaintPipeline as TryonPipeline from src.unet_hacked_garmnet import UNet2DConditionModel as UNet2DConditionModel_ref from src.unet_hacked_tryon import UNet2DConditionModel from preprocess.humanparsing.run_parsing import Parsing from preprocess.openpose.run_openpose import OpenPose from detectron2.data.detection_utils import convert_PIL_to_numpy, _apply_exif_orientation import apply_net except ImportError as e: raise ImportError(f"CRITICAL ERROR: Missing core modules. Error: {e}") from transformers import ( CLIPImageProcessor, CLIPVisionModelWithProjection, CLIPTextModel, CLIPTextModelWithProjection, AutoTokenizer ) from diffusers import DDPMScheduler, AutoencoderKL # --------------------------------------------------------- # 2. ROBUST MODEL DOWNLOADER (The Fix) # --------------------------------------------------------- def download_model_robust(repo_id, filename, local_path): if os.path.exists(local_path): # Quick size check to ensure it's not an empty corrupt file if os.path.getsize(local_path) > 1000: print(f"✅ Found {local_path}") return else: print(f"⚠️ Corrupt file found at {local_path}, redownloading...") os.remove(local_path) print(f"⬇️ Downloading {filename} to {local_path}...") try: # Create directory os.makedirs(os.path.dirname(local_path), exist_ok=True) # Download using Hugging Face Hub (Fast & Cached) downloaded_file = hf_hub_download( repo_id=repo_id, filename=filename, local_dir=os.path.dirname(local_path), local_dir_use_symlinks=False ) # If the filename in repo is different from target, rename it # (hf_hub_download saves to local_dir/filename) actual_download_path = os.path.join(os.path.dirname(local_path), filename) if actual_download_path != local_path: # Move it to the exact expected path if different if os.path.exists(actual_download_path): shutil.move(actual_download_path, local_path) print(f"✅ Successfully downloaded {local_path}") except Exception as e: print(f"❌ Failed to download {filename}: {e}") # Manual Fallback for complex paths try: url = f"https://huggingface.co/{repo_id}/resolve/main/{filename}" print(f"🔄 Trying direct URL fallback: {url}") os.system(f"wget -O {local_path} {url}") except: pass def check_and_download_models(): print("⏳ VALIDATING MODELS...") # 1. Parsing & OpenPose (From Camenduru) download_model_robust("camenduru/IDM-VTON", "humanparsing/parsing_atr.onnx", "ckpt/humanparsing/parsing_atr.onnx") download_model_robust("camenduru/IDM-VTON", "humanparsing/parsing_lip.onnx", "ckpt/humanparsing/parsing_lip.onnx") download_model_robust("camenduru/IDM-VTON", "densepose/model_final_162be9.pkl", "ckpt/densepose/model_final_162be9.pkl") download_model_robust("camenduru/IDM-VTON", "openpose/ckpts/body_pose_model.pth", "ckpt/openpose/ckpts/body_pose_model.pth") # 2. IP Adapter (From h94) download_model_robust("h94/IP-Adapter", "sdxl_models/ip-adapter-plus_sdxl_vit-h.bin", "ckpt/ip_adapter/ip-adapter-plus_sdxl_vit-h.bin") download_model_robust("h94/IP-Adapter", "models/image_encoder/config.json", "ckpt/image_encoder/config.json") download_model_robust("h94/IP-Adapter", "models/image_encoder/pytorch_model.bin", "ckpt/image_encoder/pytorch_model.bin") # EXECUTE DOWNLOAD BEFORE LOADING ANYTHING check_and_download_models() # --------------------------------------------------------- # 3. LOAD MODELS # --------------------------------------------------------- base_path = 'yisol/IDM-VTON' def load_models(): unet = UNet2DConditionModel.from_pretrained(base_path, subfolder="unet", torch_dtype=torch.float16) tokenizer_one = AutoTokenizer.from_pretrained(base_path, subfolder="tokenizer", use_fast=False) tokenizer_two = AutoTokenizer.from_pretrained(base_path, subfolder="tokenizer_2", use_fast=False) noise_scheduler = DDPMScheduler.from_pretrained(base_path, subfolder="scheduler") text_encoder_one = CLIPTextModel.from_pretrained(base_path, subfolder="text_encoder", torch_dtype=torch.float16) text_encoder_two = CLIPTextModelWithProjection.from_pretrained(base_path, subfolder="text_encoder_2", torch_dtype=torch.float16) image_encoder = CLIPVisionModelWithProjection.from_pretrained(base_path, subfolder="image_encoder", torch_dtype=torch.float16) vae = AutoencoderKL.from_pretrained(base_path, subfolder="vae", torch_dtype=torch.float16) UNet_Encoder = UNet2DConditionModel_ref.from_pretrained(base_path, subfolder="unet_encoder", torch_dtype=torch.float16) # Initialize Preprocessors parsing_model = Parsing(0) openpose_model = OpenPose(0) # Freeze Weights UNet_Encoder.requires_grad_(False) image_encoder.requires_grad_(False) vae.requires_grad_(False) unet.requires_grad_(False) text_encoder_one.requires_grad_(False) text_encoder_two.requires_grad_(False) pipe = TryonPipeline.from_pretrained( base_path, unet=unet, vae=vae, feature_extractor=CLIPImageProcessor(), text_encoder=text_encoder_one, text_encoder_2=text_encoder_two, tokenizer=tokenizer_one, tokenizer_2=tokenizer_two, scheduler=noise_scheduler, image_encoder=image_encoder, torch_dtype=torch.float16, ) pipe.unet_encoder = UNet_Encoder return pipe, openpose_model, parsing_model pipe, openpose_model, parsing_model = load_models() tensor_transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize([0.5], [0.5])]) # --------------------------------------------------------- # 4. INFERENCE # --------------------------------------------------------- @spaces.GPU(duration=120) def start_tryon(human_img, garm_img, garment_des, is_checked, is_checked_crop, denoise_steps, seed): device = "cuda" try: openpose_model.preprocessor.body_estimation.model.to(device) pipe.to(device) pipe.unet_encoder.to(device) if not human_img or not garm_img: raise gr.Error("Please upload both Human and Garment images.") garm_img = garm_img.convert("RGB").resize((768, 1024)) human_img_orig = human_img.convert("RGB") if is_checked_crop: width, height = human_img_orig.size target_width = int(min(width, height * (3 / 4))) target_height = int(min(height, width * (4 / 3))) left = (width - target_width) / 2 top = (height - target_height) / 2 right = (width + target_width) / 2 bottom = (height + target_height) / 2 cropped_img = human_img_orig.crop((left, top, right, bottom)) crop_size = cropped_img.size human_img = cropped_img.resize((768, 1024)) else: human_img = human_img_orig.resize((768, 1024)) with torch.no_grad(): keypoints = openpose_model(human_img.resize((384, 512))) model_parse, _ = parsing_model(human_img.resize((384, 512))) mask, mask_gray = get_mask_location('hd', "upper_body", model_parse, keypoints) mask = mask.resize((768, 1024)) mask_gray = (1 - transforms.ToTensor()(mask)) * tensor_transform(human_img) mask_gray = to_pil_image((mask_gray + 1.0) / 2.0) human_img_arg = _apply_exif_orientation(human_img.resize((384, 512))) human_img_arg = convert_PIL_to_numpy(human_img_arg, format="BGR") args = apply_net.create_argument_parser().parse_args(('show', './configs/densepose_rcnn_R_50_FPN_s1x.yaml', './ckpt/densepose/model_final_162be9.pkl', 'dp_segm', '-v', '--opts', 'MODEL.DEVICE', 'cuda')) pose_img = args.func(args, human_img_arg) pose_img = Image.fromarray(pose_img[:, :, ::-1]).resize((768, 1024)) prompt = "model is wearing " + garment_des negative_prompt = "monochrome, lowres, bad anatomy, worst quality, low quality" with torch.cuda.amp.autocast(): (prompt_embeds, negative_prompt_embeds, pooled_prompt_embeds, negative_pooled_prompt_embeds) = pipe.encode_prompt( prompt, num_images_per_prompt=1, do_classifier_free_guidance=True, negative_prompt=negative_prompt ) prompt_c = "a photo of " + garment_des (prompt_embeds_c, _, _, _) = pipe.encode_prompt( prompt_c, num_images_per_prompt=1, do_classifier_free_guidance=False, negative_prompt=negative_prompt ) pose_img = tensor_transform(pose_img).unsqueeze(0).to(device, torch.float16) garm_tensor = tensor_transform(garm_img).unsqueeze(0).to(device, torch.float16) generator = torch.Generator(device).manual_seed(int(seed)) if seed is not None else None images = pipe( prompt_embeds=prompt_embeds.to(device, torch.float16), negative_prompt_embeds=negative_prompt_embeds.to(device, torch.float16), pooled_prompt_embeds=pooled_prompt_embeds.to(device, torch.float16), negative_pooled_prompt_embeds=negative_pooled_prompt_embeds.to(device, torch.float16), num_inference_steps=int(denoise_steps), generator=generator, strength=1.0, pose_img=pose_img.to(device, torch.float16), text_embeds_cloth=prompt_embeds_c.to(device, torch.float16), cloth=garm_tensor.to(device, torch.float16), mask_image=mask, image=human_img, height=1024, width=768, ip_adapter_image=garm_img.resize((768, 1024)), guidance_scale=2.0, )[0] if is_checked_crop: out_img = images[0].resize(crop_size) human_img_orig.paste(out_img, (int(left), int(top))) final_result = human_img_orig else: final_result = images[0] return final_result, mask_gray except Exception as e: raise gr.Error(f"Error: {e}") finally: # Memory Cleanup try: del keypoints, model_parse, mask, pose_img, prompt_embeds, garm_tensor except: pass gc.collect() torch.cuda.empty_cache() # --------------------------------------------------------- # 5. UI # --------------------------------------------------------- with gr.Blocks(theme=gr.themes.Soft(), title="Tryonnix Engine") as demo: gr.Markdown("# ✨ Tryonnix 2D Engine (Stable)") with gr.Row(): with gr.Column(): img_human = gr.Image(label="Human", type="pil", height=400) img_garm = gr.Image(label="Garment", type="pil", height=400) desc = gr.Textbox(label="Description", value="short sleeve shirt") chk1 = gr.Checkbox(label="Auto-Mask", value=True, visible=False) chk2 = gr.Checkbox(label="Auto-Crop", value=True) steps = gr.Slider(label="Steps", minimum=20, maximum=50, value=30, step=1) seed = gr.Number(label="Seed", value=42) btn = gr.Button("🚀 Run", variant="primary") with gr.Column(): out = gr.Image(label="Result", type="pil", height=600) mask_out = gr.Image(label="Mask", type="pil", visible=False) btn.click(fn=start_tryon, inputs=[img_human, img_garm, desc, chk1, chk2, steps, seed], outputs=[out, mask_out], api_name="tryon") if __name__ == "__main__": demo.queue(max_size=10).launch()