import os import json import uuid import shutil import threading import time from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any, Tuple import zipfile import tempfile import gradio as gr import torch from PIL import Image import numpy as np from diffusers import ( StableDiffusionPipeline, UNet2DConditionModel, DDPMScheduler, AutoencoderKL ) from transformers import CLIPTextModel, CLIPTokenizer from peft import LoraConfig import logging from safetensors.torch import save_file # Configurar logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class LoRAImageTrainer: """Classe principal para treinamento de modelos LoRA para geração de imagens otimizada para baixo uso de GPU.""" def __init__(self): self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.training_jobs = {} self.models_cache = {} def get_available_models(self) -> List[str]: """Retorna lista de modelos base disponíveis para treinamento LoRA.""" return [ "runwayml/stable-diffusion-v1-5", "stabilityai/stable-diffusion-2-1", "CompVis/stable-diffusion-v1-4" # XL removido por ser pesado demais para Spaces gratuitos ] def load_base_model(self, model_name: str): """Carrega modelo base de difusão com otimizações para baixo uso de GPU.""" try: if model_name in self.models_cache: return self.models_cache[model_name] logger.info(f"Carregando modelo base: {model_name}") # Configurações para otimização de memória model_kwargs = { "torch_dtype": torch.float16 if torch.cuda.is_available() else torch.float32, "use_safetensors": True, "variant": "fp16" if torch.cuda.is_available() else None, "safety_checker": None, # Desativa verificador de segurança para economizar memória } # Carregar pipeline completo pipeline = StableDiffusionPipeline.from_pretrained( model_name, **model_kwargs ) if torch.cuda.is_available(): pipeline = pipeline.to(self.device) # Habilitar attention slicing para economia de memória pipeline.enable_attention_slicing() # Habilitar memory efficient attention se disponível try: pipeline.enable_xformers_memory_efficient_attention() except Exception as e: logger.warning("xformers não disponível, usando attention padrão") # Cache do modelo self.models_cache[model_name] = pipeline return pipeline except Exception as e: logger.error(f"Erro ao carregar modelo {model_name}: {str(e)}") raise e def prepare_image_dataset(self, image_files: List[str], captions: List[str], resolution: int = 512) -> List[Dict]: """Prepara dataset de imagens para treinamento.""" dataset = [] for img_path, caption in zip(image_files, captions): try: # Carregar e redimensionar imagem image = Image.open(img_path).convert("RGB") # Redimensionar mantendo aspect ratio image = self.resize_image(image, resolution) dataset.append({ "image": image, "caption": caption, "image_path": img_path }) except Exception as e: logger.error(f"Erro ao processar imagem {img_path}: {str(e)}") continue return dataset def resize_image(self, image: Image.Image, target_size: int) -> Image.Image: """Redimensiona imagem mantendo aspect ratio e fazendo crop central se necessário.""" width, height = image.size # Calcular novo tamanho mantendo aspect ratio if width > height: new_width = target_size new_height = int((height * target_size) / width) else: new_height = target_size new_width = int((width * target_size) / height) # Redimensionar image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) # Crop central para obter tamanho exato if new_width != target_size or new_height != target_size: left = (new_width - target_size) // 2 top = (new_height - target_size) // 2 right = left + target_size bottom = top + target_size image = image.crop((left, top, right, bottom)) return image def real_lora_training(self, job_id: str, model_name: str, dataset: List[Dict], r: int = 16, lora_alpha: int = 32, lora_dropout: float = 0.1, num_epochs: int = 10, learning_rate: float = 1e-4, batch_size: int = 1, resolution: int = 512) -> None: """TREINAMENTO REAL DE LoRA PARA IMAGENS - CORRIGIDO PARA DIFFUSERS + PEFT.""" try: # Atualizar status self.training_jobs[job_id]["status"] = "loading_model" self.training_jobs[job_id]["progress"] = 5 self.training_jobs[job_id]["logs"].append(f"{datetime.now().strftime('%H:%M:%S')} - Carregando modelo base: {model_name}") # Carregar modelo base pipeline = self.load_base_model(model_name) unet = pipeline.unet text_encoder = pipeline.text_encoder vae = pipeline.vae tokenizer = pipeline.tokenizer scheduler = pipeline.scheduler # Congelar parâmetros unet.requires_grad_(False) text_encoder.requires_grad_(False) vae.requires_grad_(False) # ✅ ✅ ✅ CORREÇÃO: REMOVER ADAPTADOR EXISTENTE ✅ ✅ ✅ if hasattr(unet, "peft_config") and "default" in unet.peft_config: unet.delete_adapter("default") # Criar configuração LoRA lora_config = LoraConfig( r=r, lora_alpha=lora_alpha, target_modules=["to_k", "to_q", "to_v", "to_out.0"], lora_dropout=lora_dropout, bias="none" ) # Aplicar LoRA ao UNet manualmente, sem usar get_peft_model diretamente unet.add_adapter(lora_config, adapter_name="default") # Ativar o adaptador unet.set_adapter("default") unet.train() unet.to(self.device) # Otimizador optimizer = torch.optim.AdamW(unet.parameters(), lr=learning_rate) # Preparar scheduler para treinamento self.training_jobs[job_id]["status"] = "preparing_data" self.training_jobs[job_id]["progress"] = 20 # Normalização de imagem def preprocess_image(image): image = np.array(image).astype(np.float32) / 255.0 image = image.transpose(2, 0, 1) image = torch.from_numpy(image).unsqueeze(0) return image # Loop de treinamento real total_steps = num_epochs * len(dataset) current_step = 0 self.training_jobs[job_id]["status"] = "training" self.training_jobs[job_id]["logs"].append(f"{datetime.now().strftime('%H:%M:%S')} - Iniciando treinamento real...") for epoch in range(num_epochs): for item in dataset: current_step += 1 # Obter imagem e legenda image = item["image"] caption = item["caption"] # Pré-processar imagem image_tensor = preprocess_image(image).to(self.device) if torch.cuda.is_available(): image_tensor = image_tensor.half() # Codificar imagem para latentes with torch.no_grad(): latents = vae.encode(image_tensor * 2 - 1).latent_dist.sample() * 0.18215 # Tokenizar texto inputs = tokenizer(caption, padding="max_length", max_length=tokenizer.model_max_length, truncation=True, return_tensors="pt") input_ids = inputs.input_ids.to(self.device) # Gerar timesteps aleatórios timesteps = torch.randint(0, scheduler.config.num_train_timesteps, (1,), device=self.device).long() # Adicionar ruído aos latentes noise = torch.randn_like(latents) noisy_latents = scheduler.add_noise(latents, noise, timesteps) # Forward pass encoder_hidden_states = text_encoder(input_ids)[0] noise_pred = unet(noisy_latents, timesteps, encoder_hidden_states=encoder_hidden_states).sample # Calcular perda loss = torch.nn.functional.mse_loss(noise_pred, noise) # Backward pass optimizer.zero_grad() loss.backward() optimizer.step() # Atualizar progresso progress = 30 + int((current_step / total_steps) * 60) self.training_jobs[job_id]["progress"] = min(progress, 90) if current_step % max(1, len(dataset)//2) == 0: log_msg = f"Época {epoch+1}, Step {current_step} - Loss: {loss.item():.4f}" self.training_jobs[job_id]["logs"].append(f"{datetime.now().strftime('%H:%M:%S')} - {log_msg}") # Salvar LoRA self.training_jobs[job_id]["status"] = "saving" self.training_jobs[job_id]["progress"] = 95 output_dir = f"./lora_models/{job_id}" os.makedirs(output_dir, exist_ok=True) # ✅ SALVAR APENAS OS PESOS DO LORA (NÃO O MODELO BASE) adapter_state_dict = unet.state_dict() adapter_weights = {k: v for k, v in adapter_state_dict.items() if "lora_" in k} save_file(adapter_weights, f"{output_dir}/adapter_model.safetensors") # Criar adapter_config.json lora_config_dict = { "r": r, "lora_alpha": lora_alpha, "target_modules": ["to_k", "to_q", "to_v", "to_out.0"], "lora_dropout": lora_dropout, "bias": "none", "task_type": "CAUSAL_LM", "base_model_name": model_name, "training_info": { "num_epochs": num_epochs, "learning_rate": learning_rate, "batch_size": batch_size, "resolution": resolution, "num_images": len(dataset) } } with open(f"{output_dir}/adapter_config.json", "w") as f: json.dump(lora_config_dict, f, indent=2) # README readme_content = f"""# LoRA Model - {job_id} Informações do Treinamento Modelo Base: {model_name} Rank (r): {r} LoRA Alpha: {lora_alpha} Dropout: {lora_dropout} Épocas: {num_epochs} Taxa de Aprendizado: {learning_rate} Resolução: {resolution}x{resolution} Número de Imagens: {len(dataset)} Data de Treinamento: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} Como Usar 1. Baixe os arquivos adapter_config.json e adapter_model.safetensors 2. Carregue em sua ferramenta de geração de imagens favorita (ComfyUI, Automatic1111, etc.) 3. Use o trigger word ou estilo aprendido durante o treinamento """ with open(f"{output_dir}/README.md", "w") as f: f.write(readme_content) # Finalizar self.training_jobs[job_id]["status"] = "completed" self.training_jobs[job_id]["progress"] = 100 self.training_jobs[job_id]["model_path"] = output_dir self.training_jobs[job_id]["completed_at"] = datetime.now().isoformat() self.training_jobs[job_id]["logs"].append(f"{datetime.now().strftime('%H:%M:%S')} - ✅ Treinamento REAL concluído! LoRA salvo em {output_dir}") logger.info(f"Treinamento LoRA REAL concluído para job {job_id}") except Exception as e: error_msg = f"Erro REAL no treinamento: {str(e)}" logger.error(error_msg) self.training_jobs[job_id]["status"] = "error" self.training_jobs[job_id]["error"] = error_msg self.training_jobs[job_id]["logs"].append(f"{datetime.now().strftime('%H:%M:%S')} - ❌ {error_msg}") def start_training(self, model_name: str, image_files: List[str], captions: List[str], **kwargs) -> str: """Inicia treinamento LoRA assíncrono.""" job_id = str(uuid.uuid4()) # Preparar dataset dataset = self.prepare_image_dataset(image_files, captions, kwargs.get('resolution', 512)) self.training_jobs[job_id] = { "id": job_id, "status": "queued", "progress": 0, "created_at": datetime.now().isoformat(), "model_name": model_name, "num_images": len(dataset), "logs": [], "error": None, "model_path": None, "completed_at": None } # Iniciar treinamento em thread separada thread = threading.Thread( target=self.real_lora_training, args=(job_id, model_name, dataset), kwargs=kwargs ) thread.daemon = True thread.start() return job_id def get_training_status(self, job_id: str) -> Dict[str, Any]: """Retorna status do treinamento.""" return self.training_jobs.get(job_id, {"error": "Job não encontrado"}) def list_trained_models(self) -> List[Dict[str, str]]: """Lista modelos LoRA treinados.""" models = [] lora_models_dir = Path("./lora_models") if lora_models_dir.exists(): for model_dir in lora_models_dir.iterdir(): if model_dir.is_dir(): config_file = model_dir / "adapter_config.json" if config_file.exists(): try: with open(config_file, 'r') as f: config = json.load(f) models.append({ "id": model_dir.name, "path": str(model_dir), "base_model": config.get("base_model_name", "Unknown"), "r": config.get("r", "Unknown"), "created": datetime.fromtimestamp(model_dir.stat().st_mtime).isoformat() }) except Exception as e: models.append({ "id": model_dir.name, "path": str(model_dir), "base_model": "Unknown", "r": "Unknown", "created": datetime.fromtimestamp(model_dir.stat().st_mtime).isoformat() }) return models def create_download_zip(self, model_path: str) -> str: """Cria um arquivo ZIP com os arquivos do modelo LoRA para download.""" zip_path = f"{model_path}.zip" with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: model_dir = Path(model_path) for file_path in model_dir.rglob('*'): if file_path.is_file(): arcname = file_path.relative_to(model_dir) zipf.write(file_path, arcname) return zip_path # Instância global do trainer trainer = LoRAImageTrainer() def create_gradio_interface(): """Cria interface Gradio para a ferramenta LoRA de geração de imagens.""" # CSS personalizado para responsividade móvel custom_css = """ /* Mobile-first responsive design */ @media (max-width: 768px) { .gradio-container { padding: 8px !important; margin: 0 !important; } .tab-nav { flex-wrap: wrap !important; gap: 4px !important; } .tab-nav button { font-size: 14px !important; padding: 8px 12px !important; min-width: auto !important; flex: 1 1 auto !important; } .form-container { padding: 12px !important; } .btn { width: 100% !important; padding: 12px !important; font-size: 16px !important; margin-bottom: 8px !important; min-height: 44px !important; } .textbox textarea { font-size: 16px !important; min-height: 120px !important; } .dropdown select { font-size: 16px !important; padding: 12px !important; } .output-text { font-size: 14px !important; line-height: 1.5 !important; } .column { margin-bottom: 16px !important; } .file-upload { min-height: 100px !important; } } /* Enhanced visual styles */ .lora-header { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 20px; border-radius: 12px; margin-bottom: 20px; text-align: center; box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1); } .status-indicator { display: inline-block; padding: 4px 8px; border-radius: 6px; font-size: 12px; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px; margin-right: 8px; } .status-queued { background-color: #fbbf24; color: #92400e; } .status-loading_model { background-color: #60a5fa; color: #1e40af; } .status-preparing_lora { background-color: #8b5cf6; color: #5b21b6; } .status-preparing_data { background-color: #06b6d4; color: #0e7490; } .status-training { background-color: #a78bfa; color: #5b21b6; } .status-saving { background-color: #f59e0b; color: #92400e; } .status-completed { background-color: #34d399; color: #065f46; } .status-error { background-color: #f87171; color: #991b1b; } /* Touch device optimizations */ @media (hover: none) and (pointer: coarse) { .btn { min-height: 44px !important; min-width: 44px !important; } .tab-nav button { min-height: 44px !important; min-width: 44px !important; } } """ def process_images_and_captions(files, captions_text): """Processa imagens e legendas enviadas pelo usuário.""" if not files: return "❌ Erro: Nenhuma imagem foi enviada!" # Processar legendas captions = [] if captions_text.strip(): captions = [line.strip() for line in captions_text.split('\n') if line.strip()] # Se não há legendas suficientes, usar legendas padrão while len(captions) < len(files): captions.append(f"training image {len(captions) + 1}") # Truncar legendas se houver mais que imagens captions = captions[:len(files)] return files, captions def start_training_wrapper(model_name, files, captions_text, trigger_word, r, lora_alpha, lora_dropout, num_epochs, learning_rate, batch_size, resolution): """Wrapper para iniciar treinamento via Gradio.""" if not files: return "❌ Erro: Nenhuma imagem foi enviada para treinamento!" if len(files) < 3: return "❌ Erro: Forneça pelo menos 3 imagens para treinamento!" try: # Processar imagens e legendas image_files = [f.name for f in files] # Processar legendas captions = [] if captions_text.strip(): captions = [line.strip() for line in captions_text.split('\n') if line.strip()] # Se não há legendas suficientes, usar trigger word + descrição padrão while len(captions) < len(files): if trigger_word.strip(): captions.append(f"{trigger_word.strip()}, high quality photo") else: captions.append(f"training image {len(captions) + 1}, high quality photo") # Truncar legendas se houver mais que imagens captions = captions[:len(files)] job_id = trainer.start_training( model_name=model_name, image_files=image_files, captions=captions, r=int(r), lora_alpha=int(lora_alpha), lora_dropout=float(lora_dropout), num_epochs=int(num_epochs), learning_rate=float(learning_rate), batch_size=int(batch_size), resolution=int(resolution) ) return f"✅ Treinamento REAL iniciado! ID do Job: {job_id}\n\n📊 Imagens: {len(files)}\n🏷️ Trigger Word: {trigger_word or 'Nenhuma'}\n\nUse o ID acima para verificar o progresso na aba 'Status do Treinamento'." except Exception as e: return f"❌ Erro ao iniciar treinamento: {str(e)}" def check_status_wrapper(job_id): """Wrapper para verificar status via Gradio.""" if not job_id.strip(): return "❌ Erro: Forneça um ID de job válido!" status = trainer.get_training_status(job_id.strip()) if "error" in status and status["error"] == "Job não encontrado": return "❌ Job não encontrado! Verifique o ID." # Criar indicador visual de status status_class = f"status-{status['status']}" status_emoji = { 'queued': '⏳', 'loading_model': '📥', 'preparing_lora': '⚙️', 'preparing_data': '📊', 'training': '🏋️', 'saving': '💾', 'completed': '✅', 'error': '❌' }.get(status['status'], '📊') # Barra de progresso visual progress = status['progress'] progress_bar = f"""
""" status_text = f""" 📊 Status do Treinamento LoRA 🆔 Job ID: {status['id']} {status_emoji} Status: {status['status'].upper().replace('_', ' ')} ⏳ Progresso: {status['progress']}% {progress_bar} 🤖 Modelo Base: {status['model_name']} 🖼️ Imagens: {status.get('num_images', 'N/A')} 📅 Criado em: {status['created_at']} """ if status['logs']: status_text += "📝 **Logs Recentes:**\n" for log in status['logs'][-5:]: # Últimos 5 logs status_text += f"• {log}\n" if status['status'] == 'completed': status_text += f"\n✅ **Treinamento Concluído!**\n📁 **Modelo salvo em:** {status['model_path']}" status_text += f"\n⏰ **Concluído em:** {status['completed_at']}" status_text += f"\n\n💡 **Próximos passos:** Vá para a aba 'Modelos Treinados' para baixar seu LoRA!" elif status['status'] == 'error': status_text += f"\n❌ **Erro:** {status['error']}" return status_text def list_models_wrapper(): """Wrapper para listar modelos via Gradio.""" models = trainer.list_trained_models() if not models: return "📭 Nenhum modelo LoRA treinado encontrado." models_text = "📚 **Modelos LoRA Treinados:**\n\n" for model in models: models_text += f"🆔 **ID:** {model['id']}\n" models_text += f"🤖 **Modelo Base:** {model['base_model']}\n" models_text += f"📊 **Rank (r):** {model['r']}\n" models_text += f"📁 **Caminho:** {model['path']}\n" models_text += f"📅 **Criado:** {model['created']}\n\n" models_text += "---\n\n" return models_text def download_model_wrapper(job_id): """Wrapper para preparar download do modelo.""" if not job_id.strip(): return None, "❌ Erro: Forneça um ID de job válido!" status = trainer.get_training_status(job_id.strip()) if "error" in status and status["error"] == "Job não encontrado": return None, "❌ Job não encontrado! Verifique o ID." if status['status'] != 'completed': return None, f"❌ Treinamento ainda não foi concluído. Status atual: {status['status']}" try: model_path = status['model_path'] zip_path = trainer.create_download_zip(model_path) return zip_path, f"✅ Arquivo ZIP criado com sucesso! Clique no link acima para baixar." except Exception as e: return None, f"❌ Erro ao criar arquivo de download: {str(e)}" # Interface Gradio with gr.Blocks( title="🎨 LoRA Image Trainer - Criador e Treinador de LoRA para Imagens", theme=gr.themes.Soft(), css=custom_css ) as interface: gr.HTML("""Criador e Treinador de LoRA para Geração de Imagens
Ferramenta otimizada para baixo uso de GPU, compatível com dispositivos móveis