Spaces:
Runtime error
Runtime error
| import torch | |
| from PIL import Image | |
| import numpy as np | |
| from transformers import BlipProcessor, BlipForConditionalGeneration, CLIPProcessor, CLIPModel | |
| from diffusers import StableDiffusionPipeline, ControlNetModel, StableDiffusionControlNetPipeline, EulerAncestralDiscreteScheduler | |
| import os | |
| import logging | |
| import time | |
| import random | |
| import gc | |
| from functools import lru_cache | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class ModelManager: | |
| def __init__(self): | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| logger.info(f"使用设备: {self.device}") | |
| # 优化的模型配置 | |
| self.model_config = { | |
| "caption_model": "Salesforce/blip-image-captioning-large", | |
| "clip_model": "openai/clip-vit-large-patch14", | |
| "sd_model": "runwayml/stable-diffusion-v1-5", | |
| "controlnet_model": "lllyasviel/control_v11p_sd15_openpose" | |
| } | |
| # 模型容器 | |
| self.caption_processor = None | |
| self.caption_model = None | |
| self.clip_processor = None | |
| self.clip_model = None | |
| self.sd_pipeline = None | |
| self.controlnet = None | |
| self.controlnet_pipeline = None | |
| # 性能优化设置 | |
| self.torch_dtype = torch.float16 if self.device == "cuda" else torch.float32 | |
| self.enable_attention_slicing = True | |
| self.enable_cpu_offload = False # 16GB显存应该够用 | |
| # 预加载所有模型 | |
| self.load_all_models() | |
| def optimize_memory_usage(self): | |
| """内存优化设置""" | |
| if torch.cuda.is_available(): | |
| # 启用内存优化 | |
| torch.backends.cudnn.benchmark = True | |
| torch.backends.cuda.matmul.allow_tf32 = True | |
| torch.backends.cudnn.allow_tf32 = True | |
| def load_all_models(self): | |
| """按顺序加载所有模型,优化显存使用""" | |
| self.optimize_memory_usage() | |
| try: | |
| self.load_caption_model() | |
| self.load_clip_model() | |
| self.load_sd_pipeline() | |
| self.load_controlnet_pipeline() | |
| logger.info("所有模型加载完成") | |
| if torch.cuda.is_available(): | |
| logger.info(f"GPU显存使用: {torch.cuda.memory_allocated()/1024**3:.2f}GB / {torch.cuda.max_memory_allocated()/1024**3:.2f}GB") | |
| except Exception as e: | |
| logger.error(f"模型加载过程中出错: {e}") | |
| raise | |
| def load_caption_model(self): | |
| """加载BLIP图像描述模型""" | |
| try: | |
| logger.info("加载 BLIP 图像描述模型...") | |
| self.caption_processor = BlipProcessor.from_pretrained( | |
| self.model_config["caption_model"], | |
| cache_dir="/tmp/models" | |
| ) | |
| self.caption_model = BlipForConditionalGeneration.from_pretrained( | |
| self.model_config["caption_model"], | |
| cache_dir="/tmp/models", | |
| torch_dtype=self.torch_dtype, | |
| low_cpu_mem_usage=True | |
| ).to(self.device) | |
| # 启用内存优化 | |
| self.caption_model.eval() | |
| logger.info("BLIP 模型加载完成") | |
| except Exception as e: | |
| logger.error(f"BLIP 模型加载失败: {e}") | |
| self.caption_model = None | |
| self.caption_processor = None | |
| def load_clip_model(self): | |
| """加载CLIP风格分析模型""" | |
| try: | |
| logger.info("加载 CLIP 模型...") | |
| self.clip_processor = CLIPProcessor.from_pretrained( | |
| self.model_config["clip_model"], | |
| cache_dir="/tmp/models" | |
| ) | |
| self.clip_model = CLIPModel.from_pretrained( | |
| self.model_config["clip_model"], | |
| cache_dir="/tmp/models", | |
| torch_dtype=self.torch_dtype | |
| ).to(self.device) | |
| self.clip_model.eval() | |
| logger.info("CLIP 模型加载完成") | |
| except Exception as e: | |
| logger.error(f"CLIP 模型加载失败: {e}") | |
| self.clip_model = None | |
| self.clip_processor = None | |
| def load_sd_pipeline(self): | |
| """加载Stable Diffusion Pipeline""" | |
| try: | |
| logger.info("加载 Stable Diffusion Pipeline...") | |
| self.sd_pipeline = StableDiffusionPipeline.from_pretrained( | |
| self.model_config["sd_model"], | |
| torch_dtype=self.torch_dtype, | |
| cache_dir="/tmp/models", | |
| safety_checker=None, | |
| requires_safety_checker=False, | |
| use_safetensors=True, | |
| low_cpu_mem_usage=True | |
| ) | |
| # 优化设置 | |
| self.sd_pipeline = self.sd_pipeline.to(self.device) | |
| self.sd_pipeline.scheduler = EulerAncestralDiscreteScheduler.from_config( | |
| self.sd_pipeline.scheduler.config | |
| ) | |
| # 启用内存优化 | |
| if self.enable_attention_slicing: | |
| self.sd_pipeline.enable_attention_slicing() | |
| # 启用内存高效attention(如果可用) | |
| try: | |
| self.sd_pipeline.enable_xformers_memory_efficient_attention() | |
| logger.info("启用了xformers内存优化") | |
| except: | |
| logger.info("xformers不可用,使用默认attention") | |
| # 启用VAE slicing以节省显存 | |
| self.sd_pipeline.enable_vae_slicing() | |
| logger.info("Stable Diffusion Pipeline 加载完成") | |
| except Exception as e: | |
| logger.error(f"Stable Diffusion Pipeline 加载失败: {e}") | |
| self.sd_pipeline = None | |
| def load_controlnet_pipeline(self): | |
| """加载ControlNet Pipeline""" | |
| try: | |
| logger.info("加载 ControlNet 模型和 Pipeline...") | |
| self.controlnet = ControlNetModel.from_pretrained( | |
| self.model_config["controlnet_model"], | |
| cache_dir="/tmp/models", | |
| torch_dtype=self.torch_dtype, | |
| low_cpu_mem_usage=True | |
| ).to(self.device) | |
| self.controlnet_pipeline = StableDiffusionControlNetPipeline.from_pretrained( | |
| self.model_config["sd_model"], | |
| controlnet=self.controlnet, | |
| cache_dir="/tmp/models", | |
| torch_dtype=self.torch_dtype, | |
| safety_checker=None, | |
| requires_safety_checker=False, | |
| low_cpu_mem_usage=True | |
| ).to(self.device) | |
| # 优化设置 | |
| self.controlnet_pipeline.scheduler = EulerAncestralDiscreteScheduler.from_config( | |
| self.controlnet_pipeline.scheduler.config | |
| ) | |
| # 内存优化 | |
| if self.enable_attention_slicing: | |
| self.controlnet_pipeline.enable_attention_slicing() | |
| try: | |
| self.controlnet_pipeline.enable_xformers_memory_efficient_attention() | |
| logger.info("ControlNet启用了xformers内存优化") | |
| except: | |
| logger.info("ControlNet使用默认attention") | |
| self.controlnet_pipeline.enable_vae_slicing() | |
| logger.info("ControlNet Pipeline 加载完成") | |
| except Exception as e: | |
| logger.error(f"ControlNet Pipeline 加载失败: {e}") | |
| self.controlnet = None | |
| self.controlnet_pipeline = None | |
| # 禁用梯度计算节省显存 | |
| def generate_caption(self, image): | |
| """使用BLIP模型生成图像描述""" | |
| if self.caption_model is None or self.caption_processor is None: | |
| self.load_caption_model() | |
| if self.caption_model is None: | |
| return "时尚服装设计作品" | |
| try: | |
| # 预处理图像 | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # 调整图像大小以节省显存 | |
| if image.width > 512 or image.height > 512: | |
| image.thumbnail((512, 512), Image.Resampling.LANCZOS) | |
| inputs = self.caption_processor(images=image, return_tensors="pt").to(self.device) | |
| # 生成描述 | |
| outputs = self.caption_model.generate( | |
| **inputs, | |
| max_length=50, | |
| num_beams=4, | |
| temperature=0.7, | |
| do_sample=True | |
| ) | |
| caption = self.caption_processor.decode(outputs[0], skip_special_tokens=True) | |
| # 清理显存 | |
| del inputs, outputs | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| return caption | |
| except Exception as e: | |
| logger.error(f"图像描述生成失败: {e}") | |
| return "时尚服装设计作品" | |
| def analyze_style(self, image): | |
| """使用CLIP模型分析服装风格""" | |
| if self.clip_model is None or self.clip_processor is None: | |
| self.load_clip_model() | |
| if self.clip_model is None: | |
| return {"时尚潮流": 0.8, "现代风格": 0.6} | |
| try: | |
| # 风格标签 - 使用英文避免token问题 | |
| style_labels = [ | |
| "business formal suit professional attire", | |
| "casual comfortable everyday wear", | |
| "athletic sportswear activewear", | |
| "fashion trendy modern stylish", | |
| "vintage retro classic style", | |
| "streetwear urban contemporary", | |
| "elegant sophisticated refined" | |
| ] | |
| style_names = ["商务正装", "休闲风", "运动风", "时尚潮流", "复古风", "街头风", "优雅风"] | |
| # 预处理图像 | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # 调整图像大小 | |
| if image.width > 224 or image.height > 224: | |
| image.thumbnail((224, 224), Image.Resampling.LANCZOS) | |
| # 处理输入 | |
| inputs = self.clip_processor( | |
| text=style_labels, | |
| images=image, | |
| return_tensors="pt", | |
| padding=True, | |
| truncation=True, | |
| max_length=77 # CLIP的最大长度 | |
| ).to(self.device) | |
| # 获取相似度分数 | |
| outputs = self.clip_model(**inputs) | |
| logits_per_image = outputs.logits_per_image | |
| probs = logits_per_image.softmax(dim=1).cpu().numpy()[0] | |
| # 构建结果 | |
| style_scores = {name: float(prob) for name, prob in zip(style_names, probs)} | |
| # 清理显存 | |
| del inputs, outputs | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| return style_scores | |
| except Exception as e: | |
| logger.error(f"风格分析失败: {e}") | |
| return {"时尚潮流": 0.8, "现代风格": 0.6} | |
| def generate_image(self, prompt, negative_prompt=None, num_inference_steps=25, guidance_scale=7.5, width=512, height=512, **kwargs): | |
| """使用Stable Diffusion生成设计图像""" | |
| if self.sd_pipeline is None: | |
| self.load_sd_pipeline() | |
| if self.sd_pipeline is None: | |
| logger.error("无法生成图像:Stable Diffusion 模型未加载") | |
| return self.create_placeholder_image(width, height) | |
| try: | |
| # 优化参数 | |
| if negative_prompt is None: | |
| negative_prompt = "blurry, low quality, distorted, text, watermark, ugly, deformed" | |
| # 确保尺寸是8的倍数 | |
| width = (width // 8) * 8 | |
| height = (height // 8) * 8 | |
| # 生成图像 | |
| result = self.sd_pipeline( | |
| prompt=prompt, | |
| negative_prompt=negative_prompt, | |
| num_inference_steps=num_inference_steps, | |
| guidance_scale=guidance_scale, | |
| height=height, | |
| width=width, | |
| generator=torch.Generator(device=self.device).manual_seed(random.randint(0, 2**32-1)) | |
| ) | |
| # 清理显存 | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| return result.images[0] | |
| except Exception as e: | |
| logger.error(f"图像生成失败: {e}") | |
| return self.create_placeholder_image(width, height) | |
| def generate_controlnet_image(self, image, prompt, reference_image=None, negative_prompt=None, num_inference_steps=30, guidance_scale=8.0, **kwargs): | |
| """使用ControlNet生成3D试穿效果""" | |
| if self.controlnet_pipeline is None: | |
| self.load_controlnet_pipeline() | |
| if self.controlnet_pipeline is None: | |
| logger.error("无法生成3D试穿:ControlNet 模型未加载") | |
| return self.create_placeholder_image(512, 768) | |
| try: | |
| # 预处理控制图像 | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # 调整图像尺寸 | |
| control_image = image.resize((512, 768), Image.Resampling.LANCZOS) | |
| # 创建简单的姿态控制图(人体轮廓) | |
| control_image = self.create_pose_control_image(control_image) | |
| if negative_prompt is None: | |
| negative_prompt = "blurry, distorted, low quality, unrealistic, extra limbs, deformed, bad anatomy, multiple people" | |
| # 如果有参考设计,增强提示词 | |
| if reference_image is not None: | |
| prompt = f"{prompt}, based on reference design" | |
| # 生成3D试穿效果 | |
| result = self.controlnet_pipeline( | |
| prompt=prompt, | |
| image=control_image, | |
| negative_prompt=negative_prompt, | |
| num_inference_steps=num_inference_steps, | |
| guidance_scale=guidance_scale, | |
| controlnet_conditioning_scale=1.0, | |
| generator=torch.Generator(device=self.device).manual_seed(random.randint(0, 2**32-1)) | |
| ) | |
| # 清理显存 | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| return result.images[0] | |
| except Exception as e: | |
| logger.error(f"ControlNet图像生成失败: {e}") | |
| return self.create_placeholder_image(512, 768) | |
| def create_pose_control_image(self, image): | |
| """创建简单的姿态控制图""" | |
| try: | |
| # 转换为numpy数组 | |
| img_array = np.array(image) | |
| # 创建简单的人体轮廓控制图 | |
| # 这里使用边缘检测作为简化的姿态控制 | |
| from scipy import ndimage | |
| gray = np.mean(img_array, axis=2) | |
| edges = ndimage.sobel(gray) | |
| # 归一化到0-255范围 | |
| edges = ((edges - edges.min()) / (edges.max() - edges.min()) * 255).astype(np.uint8) | |
| # 转换回PIL图像 | |
| control_image = Image.fromarray(edges, mode='L').convert('RGB') | |
| return control_image | |
| except Exception as e: | |
| logger.warning(f"创建姿态控制图失败: {e}") | |
| # 返回原图的边缘检测版本 | |
| return image.convert('L').convert('RGB') | |
| def create_placeholder_image(self, width, height): | |
| """创建占位图像""" | |
| colors = [(220, 220, 220), (200, 220, 240), (240, 220, 200), (220, 240, 200)] | |
| color = random.choice(colors) | |
| return Image.new('RGB', (width, height), color=color) | |
| def cleanup(self): | |
| """清理显存缓存,保持模型加载状态""" | |
| logger.info("清理GPU显存缓存...") | |
| try: | |
| if torch.cuda.is_available(): | |
| # 强制垃圾回收 | |
| gc.collect() | |
| # 清理CUDA缓存 | |
| torch.cuda.empty_cache() | |
| torch.cuda.ipc_collect() | |
| # 显示显存使用情况 | |
| allocated = torch.cuda.memory_allocated() / 1024**3 | |
| cached = torch.cuda.memory_reserved() / 1024**3 | |
| logger.info(f"显存使用: {allocated:.2f}GB (分配) / {cached:.2f}GB (缓存)") | |
| logger.info("显存清理完成") | |
| except Exception as e: | |
| logger.error(f"显存清理失败: {e}") | |
| def move_models_to_cpu(self): | |
| """将模型移至CPU释放GPU显存""" | |
| try: | |
| logger.info("将所有模型移至CPU...") | |
| models_to_move = [ | |
| ('caption_model', self.caption_model), | |
| ('clip_model', self.clip_model), | |
| ('sd_pipeline', self.sd_pipeline), | |
| ('controlnet_pipeline', self.controlnet_pipeline), | |
| ('controlnet', self.controlnet) | |
| ] | |
| for model_name, model in models_to_move: | |
| if model is not None: | |
| try: | |
| if hasattr(model, 'to'): | |
| model.to('cpu') | |
| logger.info(f"{model_name} 已移至CPU") | |
| except Exception as e: | |
| logger.warning(f"移动 {model_name} 到CPU失败: {e}") | |
| # 清理GPU缓存 | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| torch.cuda.ipc_collect() | |
| allocated = torch.cuda.memory_allocated() / 1024**3 | |
| logger.info(f"移至CPU后GPU显存使用: {allocated:.2f}GB") | |
| logger.info("所有模型已移至CPU") | |
| except Exception as e: | |
| logger.error(f"移动模型到CPU失败: {e}") | |
| def move_models_to_gpu(self): | |
| """将模型移回GPU""" | |
| try: | |
| logger.info("将所有模型移回GPU...") | |
| models_to_move = [ | |
| ('caption_model', self.caption_model), | |
| ('clip_model', self.clip_model), | |
| ('sd_pipeline', self.sd_pipeline), | |
| ('controlnet_pipeline', self.controlnet_pipeline), | |
| ('controlnet', self.controlnet) | |
| ] | |
| for model_name, model in models_to_move: | |
| if model is not None: | |
| try: | |
| if hasattr(model, 'to'): | |
| model.to(self.device) | |
| logger.info(f"{model_name} 已移回GPU") | |
| except Exception as e: | |
| logger.warning(f"移动 {model_name} 到GPU失败: {e}") | |
| if torch.cuda.is_available(): | |
| allocated = torch.cuda.memory_allocated() / 1024**3 | |
| logger.info(f"移回GPU后显存使用: {allocated:.2f}GB") | |
| logger.info("所有模型已移回GPU") | |
| except Exception as e: | |
| logger.error(f"移动模型到GPU失败: {e}") | |
| def force_reload_all_models(self): | |
| """强制重新加载所有模型""" | |
| logger.info("开始强制重新加载所有模型...") | |
| try: | |
| # 释放现有模型 | |
| models_to_delete = [ | |
| 'caption_model', 'caption_processor', | |
| 'clip_model', 'clip_processor', | |
| 'sd_pipeline', 'controlnet', 'controlnet_pipeline' | |
| ] | |
| for model_name in models_to_delete: | |
| if hasattr(self, model_name): | |
| model = getattr(self, model_name) | |
| if model is not None: | |
| try: | |
| del model | |
| setattr(self, model_name, None) | |
| logger.info(f"释放 {model_name}") | |
| except Exception as e: | |
| logger.warning(f"释放 {model_name} 失败: {e}") | |
| # 强制垃圾回收 | |
| gc.collect() | |
| # 清理GPU缓存 | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| torch.cuda.ipc_collect() | |
| logger.info("开始重新加载模型...") | |
| # 重新加载所有模型 | |
| self.load_all_models() | |
| logger.info("所有模型重新加载完成") | |
| except Exception as e: | |
| logger.error(f"强制重新加载模型失败: {e}") | |
| raise | |
| def get_model_status(self): | |
| """获取模型加载状态""" | |
| status = { | |
| "caption_model": self.caption_model is not None, | |
| "clip_model": self.clip_model is not None, | |
| "sd_pipeline": self.sd_pipeline is not None, | |
| "controlnet_pipeline": self.controlnet_pipeline is not None, | |
| "device": self.device | |
| } | |
| if torch.cuda.is_available(): | |
| status["gpu_memory"] = { | |
| "allocated": f"{torch.cuda.memory_allocated() / 1024**3:.2f}GB", | |
| "cached": f"{torch.cuda.memory_reserved() / 1024**3:.2f}GB", | |
| "max_allocated": f"{torch.cuda.max_memory_allocated() / 1024**3:.2f}GB" | |
| } | |
| return status | |
| def optimize_for_inference(self): | |
| """优化模型以提高推理速度""" | |
| logger.info("优化模型推理性能...") | |
| try: | |
| # 编译模型(如果PyTorch版本支持) | |
| if hasattr(torch, 'compile'): | |
| models_to_compile = [ | |
| self.caption_model, | |
| self.clip_model | |
| ] | |
| for model in models_to_compile: | |
| if model is not None: | |
| try: | |
| model = torch.compile(model) | |
| logger.info(f"模型编译成功") | |
| except Exception as e: | |
| logger.info(f"模型编译跳过: {e}") | |
| # 设置模型为评估模式 | |
| models = [self.caption_model, self.clip_model] | |
| for model in models: | |
| if model is not None: | |
| model.eval() | |
| logger.info("模型优化完成") | |
| except Exception as e: | |
| logger.warning(f"模型优化失败: {e}") | |
| def benchmark_models(self): | |
| """基准测试模型性能""" | |
| logger.info("开始模型性能基准测试...") | |
| try: | |
| # 创建测试图像 | |
| test_image = Image.new('RGB', (512, 512), color=(128, 128, 128)) | |
| results = {} | |
| # 测试BLIP | |
| if self.caption_model is not None: | |
| start_time = time.time() | |
| _ = self.generate_caption(test_image) | |
| results['caption_time'] = time.time() - start_time | |
| # 测试CLIP | |
| if self.clip_model is not None: | |
| start_time = time.time() | |
| _ = self.analyze_style(test_image) | |
| results['clip_time'] = time.time() - start_time | |
| # 测试SD | |
| if self.sd_pipeline is not None: | |
| start_time = time.time() | |
| _ = self.generate_image("test fashion design", num_inference_steps=5) | |
| results['sd_time'] = time.time() - start_time | |
| logger.info(f"基准测试结果: {results}") | |
| return results | |
| except Exception as e: | |
| logger.error(f"基准测试失败: {e}") | |
| return {} |