Hunsain Mazhar
Remove monkey patch for diffusers compatibility; revert huggingface_hub and gradio versions for stability
5ed2da6 | import sys | |
| import os | |
| import gc | |
| import shutil | |
| # --- 1. System Setup & Error Handling --- | |
| # Force install detectron2 if missing | |
| try: | |
| import detectron2 | |
| except ImportError: | |
| print("β οΈ Detectron2 missing. Installing...") | |
| os.system('pip install git+https://github.com/facebookresearch/detectron2.git') | |
| import requests | |
| import gradio as gr | |
| import spaces | |
| from PIL import Image | |
| import numpy as np | |
| import torch | |
| from torchvision import transforms | |
| from torchvision.transforms.functional import to_pil_image | |
| from huggingface_hub import hf_hub_download | |
| sys.path.append('./') | |
| # Import Local Modules | |
| try: | |
| from utils_mask import get_mask_location | |
| from src.tryon_pipeline import StableDiffusionXLInpaintPipeline as TryonPipeline | |
| from src.unet_hacked_garmnet import UNet2DConditionModel as UNet2DConditionModel_ref | |
| from src.unet_hacked_tryon import UNet2DConditionModel | |
| from preprocess.humanparsing.run_parsing import Parsing | |
| from preprocess.openpose.run_openpose import OpenPose | |
| from detectron2.data.detection_utils import convert_PIL_to_numpy, _apply_exif_orientation | |
| import apply_net | |
| except ImportError as e: | |
| raise ImportError(f"CRITICAL ERROR: Missing core modules. Error: {e}") | |
| from transformers import ( | |
| CLIPImageProcessor, CLIPVisionModelWithProjection, CLIPTextModel, | |
| CLIPTextModelWithProjection, AutoTokenizer | |
| ) | |
| from diffusers import DDPMScheduler, AutoencoderKL | |
| # --------------------------------------------------------- | |
| # 2. ROBUST MODEL DOWNLOADER (The Fix) | |
| # --------------------------------------------------------- | |
| def download_model_robust(repo_id, filename, local_path): | |
| if os.path.exists(local_path): | |
| # Quick size check to ensure it's not an empty corrupt file | |
| if os.path.getsize(local_path) > 1000: | |
| print(f"β Found {local_path}") | |
| return | |
| else: | |
| print(f"β οΈ Corrupt file found at {local_path}, redownloading...") | |
| os.remove(local_path) | |
| print(f"β¬οΈ Downloading {filename} to {local_path}...") | |
| try: | |
| # Create directory | |
| os.makedirs(os.path.dirname(local_path), exist_ok=True) | |
| # Download using Hugging Face Hub (Fast & Cached) | |
| downloaded_file = hf_hub_download( | |
| repo_id=repo_id, | |
| filename=filename, | |
| local_dir=os.path.dirname(local_path), | |
| local_dir_use_symlinks=False | |
| ) | |
| # If the filename in repo is different from target, rename it | |
| # (hf_hub_download saves to local_dir/filename) | |
| actual_download_path = os.path.join(os.path.dirname(local_path), filename) | |
| if actual_download_path != local_path: | |
| # Move it to the exact expected path if different | |
| if os.path.exists(actual_download_path): | |
| shutil.move(actual_download_path, local_path) | |
| print(f"β Successfully downloaded {local_path}") | |
| except Exception as e: | |
| print(f"β Failed to download {filename}: {e}") | |
| # Manual Fallback for complex paths | |
| try: | |
| url = f"https://huggingface.co/{repo_id}/resolve/main/{filename}" | |
| print(f"π Trying direct URL fallback: {url}") | |
| os.system(f"wget -O {local_path} {url}") | |
| except: | |
| pass | |
| def check_and_download_models(): | |
| print("β³ VALIDATING MODELS...") | |
| # 1. Parsing & OpenPose (From Camenduru) | |
| download_model_robust("camenduru/IDM-VTON", "humanparsing/parsing_atr.onnx", "ckpt/humanparsing/parsing_atr.onnx") | |
| download_model_robust("camenduru/IDM-VTON", "humanparsing/parsing_lip.onnx", "ckpt/humanparsing/parsing_lip.onnx") | |
| download_model_robust("camenduru/IDM-VTON", "densepose/model_final_162be9.pkl", "ckpt/densepose/model_final_162be9.pkl") | |
| download_model_robust("camenduru/IDM-VTON", "openpose/ckpts/body_pose_model.pth", "ckpt/openpose/ckpts/body_pose_model.pth") | |
| # 2. IP Adapter (From h94) | |
| download_model_robust("h94/IP-Adapter", "sdxl_models/ip-adapter-plus_sdxl_vit-h.bin", "ckpt/ip_adapter/ip-adapter-plus_sdxl_vit-h.bin") | |
| download_model_robust("h94/IP-Adapter", "models/image_encoder/config.json", "ckpt/image_encoder/config.json") | |
| download_model_robust("h94/IP-Adapter", "models/image_encoder/pytorch_model.bin", "ckpt/image_encoder/pytorch_model.bin") | |
| # EXECUTE DOWNLOAD BEFORE LOADING ANYTHING | |
| check_and_download_models() | |
| # --------------------------------------------------------- | |
| # 3. LOAD MODELS | |
| # --------------------------------------------------------- | |
| base_path = 'yisol/IDM-VTON' | |
| def load_models(): | |
| unet = UNet2DConditionModel.from_pretrained(base_path, subfolder="unet", torch_dtype=torch.float16) | |
| tokenizer_one = AutoTokenizer.from_pretrained(base_path, subfolder="tokenizer", use_fast=False) | |
| tokenizer_two = AutoTokenizer.from_pretrained(base_path, subfolder="tokenizer_2", use_fast=False) | |
| noise_scheduler = DDPMScheduler.from_pretrained(base_path, subfolder="scheduler") | |
| text_encoder_one = CLIPTextModel.from_pretrained(base_path, subfolder="text_encoder", torch_dtype=torch.float16) | |
| text_encoder_two = CLIPTextModelWithProjection.from_pretrained(base_path, subfolder="text_encoder_2", torch_dtype=torch.float16) | |
| image_encoder = CLIPVisionModelWithProjection.from_pretrained(base_path, subfolder="image_encoder", torch_dtype=torch.float16) | |
| vae = AutoencoderKL.from_pretrained(base_path, subfolder="vae", torch_dtype=torch.float16) | |
| UNet_Encoder = UNet2DConditionModel_ref.from_pretrained(base_path, subfolder="unet_encoder", torch_dtype=torch.float16) | |
| # Initialize Preprocessors | |
| parsing_model = Parsing(0) | |
| openpose_model = OpenPose(0) | |
| # Freeze Weights | |
| UNet_Encoder.requires_grad_(False) | |
| image_encoder.requires_grad_(False) | |
| vae.requires_grad_(False) | |
| unet.requires_grad_(False) | |
| text_encoder_one.requires_grad_(False) | |
| text_encoder_two.requires_grad_(False) | |
| pipe = TryonPipeline.from_pretrained( | |
| base_path, unet=unet, vae=vae, feature_extractor=CLIPImageProcessor(), | |
| text_encoder=text_encoder_one, text_encoder_2=text_encoder_two, | |
| tokenizer=tokenizer_one, tokenizer_2=tokenizer_two, | |
| scheduler=noise_scheduler, image_encoder=image_encoder, torch_dtype=torch.float16, | |
| ) | |
| pipe.unet_encoder = UNet_Encoder | |
| return pipe, openpose_model, parsing_model | |
| pipe, openpose_model, parsing_model = load_models() | |
| tensor_transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize([0.5], [0.5])]) | |
| # --------------------------------------------------------- | |
| # 4. INFERENCE | |
| # --------------------------------------------------------- | |
| def start_tryon(human_img, garm_img, garment_des, is_checked, is_checked_crop, denoise_steps, seed): | |
| device = "cuda" | |
| try: | |
| openpose_model.preprocessor.body_estimation.model.to(device) | |
| pipe.to(device) | |
| pipe.unet_encoder.to(device) | |
| if not human_img or not garm_img: | |
| raise gr.Error("Please upload both Human and Garment images.") | |
| garm_img = garm_img.convert("RGB").resize((768, 1024)) | |
| human_img_orig = human_img.convert("RGB") | |
| if is_checked_crop: | |
| width, height = human_img_orig.size | |
| target_width = int(min(width, height * (3 / 4))) | |
| target_height = int(min(height, width * (4 / 3))) | |
| left = (width - target_width) / 2 | |
| top = (height - target_height) / 2 | |
| right = (width + target_width) / 2 | |
| bottom = (height + target_height) / 2 | |
| cropped_img = human_img_orig.crop((left, top, right, bottom)) | |
| crop_size = cropped_img.size | |
| human_img = cropped_img.resize((768, 1024)) | |
| else: | |
| human_img = human_img_orig.resize((768, 1024)) | |
| with torch.no_grad(): | |
| keypoints = openpose_model(human_img.resize((384, 512))) | |
| model_parse, _ = parsing_model(human_img.resize((384, 512))) | |
| mask, mask_gray = get_mask_location('hd', "upper_body", model_parse, keypoints) | |
| mask = mask.resize((768, 1024)) | |
| mask_gray = (1 - transforms.ToTensor()(mask)) * tensor_transform(human_img) | |
| mask_gray = to_pil_image((mask_gray + 1.0) / 2.0) | |
| human_img_arg = _apply_exif_orientation(human_img.resize((384, 512))) | |
| human_img_arg = convert_PIL_to_numpy(human_img_arg, format="BGR") | |
| args = apply_net.create_argument_parser().parse_args(('show', './configs/densepose_rcnn_R_50_FPN_s1x.yaml', './ckpt/densepose/model_final_162be9.pkl', 'dp_segm', '-v', '--opts', 'MODEL.DEVICE', 'cuda')) | |
| pose_img = args.func(args, human_img_arg) | |
| pose_img = Image.fromarray(pose_img[:, :, ::-1]).resize((768, 1024)) | |
| prompt = "model is wearing " + garment_des | |
| negative_prompt = "monochrome, lowres, bad anatomy, worst quality, low quality" | |
| with torch.cuda.amp.autocast(): | |
| (prompt_embeds, negative_prompt_embeds, pooled_prompt_embeds, negative_pooled_prompt_embeds) = pipe.encode_prompt( | |
| prompt, num_images_per_prompt=1, do_classifier_free_guidance=True, negative_prompt=negative_prompt | |
| ) | |
| prompt_c = "a photo of " + garment_des | |
| (prompt_embeds_c, _, _, _) = pipe.encode_prompt( | |
| prompt_c, num_images_per_prompt=1, do_classifier_free_guidance=False, negative_prompt=negative_prompt | |
| ) | |
| pose_img = tensor_transform(pose_img).unsqueeze(0).to(device, torch.float16) | |
| garm_tensor = tensor_transform(garm_img).unsqueeze(0).to(device, torch.float16) | |
| generator = torch.Generator(device).manual_seed(int(seed)) if seed is not None else None | |
| images = pipe( | |
| prompt_embeds=prompt_embeds.to(device, torch.float16), | |
| negative_prompt_embeds=negative_prompt_embeds.to(device, torch.float16), | |
| pooled_prompt_embeds=pooled_prompt_embeds.to(device, torch.float16), | |
| negative_pooled_prompt_embeds=negative_pooled_prompt_embeds.to(device, torch.float16), | |
| num_inference_steps=int(denoise_steps), generator=generator, strength=1.0, | |
| pose_img=pose_img.to(device, torch.float16), | |
| text_embeds_cloth=prompt_embeds_c.to(device, torch.float16), | |
| cloth=garm_tensor.to(device, torch.float16), | |
| mask_image=mask, image=human_img, height=1024, width=768, | |
| ip_adapter_image=garm_img.resize((768, 1024)), guidance_scale=2.0, | |
| )[0] | |
| if is_checked_crop: | |
| out_img = images[0].resize(crop_size) | |
| human_img_orig.paste(out_img, (int(left), int(top))) | |
| final_result = human_img_orig | |
| else: | |
| final_result = images[0] | |
| return final_result, mask_gray | |
| except Exception as e: | |
| raise gr.Error(f"Error: {e}") | |
| finally: | |
| # Memory Cleanup | |
| try: | |
| del keypoints, model_parse, mask, pose_img, prompt_embeds, garm_tensor | |
| except: | |
| pass | |
| gc.collect() | |
| torch.cuda.empty_cache() | |
| # --------------------------------------------------------- | |
| # 5. UI | |
| # --------------------------------------------------------- | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Tryonnix Engine") as demo: | |
| gr.Markdown("# β¨ Tryonnix 2D Engine (Stable)") | |
| with gr.Row(): | |
| with gr.Column(): | |
| img_human = gr.Image(label="Human", type="pil", height=400) | |
| img_garm = gr.Image(label="Garment", type="pil", height=400) | |
| desc = gr.Textbox(label="Description", value="short sleeve shirt") | |
| chk1 = gr.Checkbox(label="Auto-Mask", value=True, visible=False) | |
| chk2 = gr.Checkbox(label="Auto-Crop", value=True) | |
| steps = gr.Slider(label="Steps", minimum=20, maximum=50, value=30, step=1) | |
| seed = gr.Number(label="Seed", value=42) | |
| btn = gr.Button("π Run", variant="primary") | |
| with gr.Column(): | |
| out = gr.Image(label="Result", type="pil", height=600) | |
| mask_out = gr.Image(label="Mask", type="pil", visible=False) | |
| btn.click(fn=start_tryon, inputs=[img_human, img_garm, desc, chk1, chk2, steps, seed], outputs=[out, mask_out], api_name="tryon") | |
| if __name__ == "__main__": | |
| demo.queue(max_size=10).launch() |