pixagram-neo-backup / generator.py
primerz's picture
Update generator.py
ae0aa20 verified
raw
history blame
19.7 kB
"""
Generation logic for Pixagram AI Pixel Art Generator
UPDATED VERSION with InstantID pipeline integration
"""
import torch
import numpy as np
import cv2
from PIL import Image
import gc
from config import (
device, dtype, TRIGGER_WORD,
ADAPTIVE_THRESHOLDS, ADAPTIVE_PARAMS, CAPTION_CONFIG
)
from utils import (
sanitize_text, enhanced_color_match, color_match,
get_demographic_description, calculate_optimal_size, safe_image_size
)
from models import (
load_face_analysis, load_depth_detector, load_controlnets,
load_sdxl_pipeline, load_lora, setup_compel,
setup_scheduler, optimize_pipeline, load_caption_model, set_clip_skip
)
from memory_utils import MemoryManager, ModelOffloader
class RetroArtConverter:
"""Main class for retro art generation with InstantID"""
def __init__(self):
self.device = device
self.dtype = dtype
self.models_loaded = {
'custom_checkpoint': False,
'lora': False,
'instantid': False,
'zoe_depth': False
}
# Initialize memory manager
self.memory_manager = MemoryManager(device=device, dtype=dtype, verbose=True)
# Load face analysis (stays on CPU)
self.face_app, self.face_detection_enabled = load_face_analysis()
# Load depth detector (starts on CPU)
self.zoe_depth, zoe_success = load_depth_detector()
self.models_loaded['zoe_depth'] = zoe_success
# Load ControlNets AS LIST
controlnet_instantid, controlnet_depth = load_controlnets()
controlnets = [controlnet_instantid, controlnet_depth]
self.models_loaded['instantid'] = True
print("Initializing InstantID pipeline with Face + Depth ControlNets")
# Load SDXL pipeline with InstantID (handles IP-Adapter internally)
self.pipe, checkpoint_success = load_sdxl_pipeline(controlnets)
self.models_loaded['custom_checkpoint'] = checkpoint_success
# Load LORA
lora_success = load_lora(self.pipe)
self.models_loaded['lora'] = lora_success
# Setup Compel
self.compel, self.use_compel = setup_compel(self.pipe)
# Setup scheduler
setup_scheduler(self.pipe)
# Optimize
optimize_pipeline(self.pipe)
# Load caption model (starts on CPU)
self.caption_processor, self.caption_model, self.caption_enabled, self.caption_model_type = load_caption_model()
# Set CLIP skip
set_clip_skip(self.pipe)
# Print status
self._print_status()
# Initial memory cleanup
self.memory_manager.cleanup_memory(aggressive=True)
print(" [OK] RetroArtConverter initialized with optimized memory management!")
def _print_status(self):
"""Print model loading status"""
print("\n=== MODEL STATUS ===")
for model, loaded in self.models_loaded.items():
status = "[OK] LOADED" if loaded else "[FALLBACK/DISABLED]"
print(f"{model}: {status}")
print("InstantID Pipeline: [OK] ACTIVE")
print("IP-Adapter: [OK] Built into pipeline")
print("===================\n")
def get_depth_map(self, image):
"""Generate depth map using Zoe Depth with optimized GPU usage"""
if self.zoe_depth is not None:
try:
if image.mode != 'RGB':
image = image.convert('RGB')
# Use safe size helper to avoid numpy.int64 issues
orig_width, orig_height = safe_image_size(image)
# Use multiples of 64
target_width = int((orig_width // 64) * 64)
target_height = int((orig_height // 64) * 64)
target_width = int(max(64, target_width))
target_height = int(max(64, target_height))
size_for_depth = (int(target_width), int(target_height))
image_for_depth = image.resize(size_for_depth, Image.LANCZOS)
# Move depth model to GPU temporarily
self.zoe_depth = self.zoe_depth.to(self.device)
# Generate depth map
depth_array = self.zoe_depth(image_for_depth, detect_resolution=512, image_resolution=1024)
depth_image = Image.fromarray(depth_array)
# Move depth model back to CPU to free GPU memory
self.zoe_depth = self.zoe_depth.to("cpu")
torch.cuda.empty_cache() if torch.cuda.is_available() else None
if depth_image.size != image.size:
depth_image = depth_image.resize(image.size, Image.LANCZOS)
print(f"[DEPTH] Generated depth map: {depth_image.size} (model offloaded to CPU)")
return depth_image, depth_array
except Exception as e:
print(f"[DEPTH] Generation failed: {e}, using grayscale")
# Ensure model is back on CPU even if error
if hasattr(self, 'zoe_depth') and self.zoe_depth is not None:
self.zoe_depth = self.zoe_depth.to("cpu")
return image.convert('L').convert('RGB'), None
else:
print("[DEPTH] Detector not available, using grayscale")
return image.convert('L').convert('RGB'), None
def add_trigger_word(self, prompt):
"""Add trigger word to prompt if not present"""
if TRIGGER_WORD.lower() not in prompt.lower():
if not prompt or not prompt.strip():
return TRIGGER_WORD
return f"{TRIGGER_WORD}, {prompt}"
return prompt
def detect_face_quality(self, face):
"""Detect face quality and adaptively adjust parameters"""
try:
bbox = face.bbox
face_size = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
det_score = float(face.det_score) if hasattr(face, 'det_score') else 1.0
# Small face -> boost preservation
if face_size < ADAPTIVE_THRESHOLDS['small_face_size']:
return ADAPTIVE_PARAMS['small_face'].copy()
# Low confidence -> boost preservation
elif det_score < ADAPTIVE_THRESHOLDS['low_confidence']:
return ADAPTIVE_PARAMS['low_confidence'].copy()
# Check for profile view
elif hasattr(face, 'pose') and len(face.pose) > 1:
try:
yaw = float(face.pose[1])
if abs(yaw) > ADAPTIVE_THRESHOLDS['profile_angle']:
return ADAPTIVE_PARAMS['profile_view'].copy()
except (ValueError, TypeError, IndexError):
pass
return None
except Exception as e:
print(f"[ADAPTIVE] Quality detection failed: {e}")
return None
def generate_caption(self, image):
"""Generate caption for image with optimized GPU usage"""
if not self.caption_enabled or self.caption_model is None:
return None
try:
# Move caption model to GPU temporarily
self.caption_model = self.caption_model.to(self.device)
if self.caption_model_type == 'git':
inputs = self.caption_processor(images=image, return_tensors="pt").to(self.device)
generated_ids = self.caption_model.generate(**inputs, max_length=CAPTION_CONFIG['max_length'])
caption = self.caption_processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
elif self.caption_model_type == 'blip':
inputs = self.caption_processor(image, return_tensors="pt").to(self.device)
generated_ids = self.caption_model.generate(**inputs, max_length=CAPTION_CONFIG['max_length'])
caption = self.caption_processor.decode(generated_ids[0], skip_special_tokens=True)
else:
self.caption_model = self.caption_model.to("cpu") # Move back to CPU
return None
# Move caption model back to CPU to free GPU memory
self.caption_model = self.caption_model.to("cpu")
torch.cuda.empty_cache() if torch.cuda.is_available() else None
return sanitize_text(caption)
except Exception as e:
print(f"[CAPTION] Generation failed: {e}")
# Ensure model is back on CPU even if error
if hasattr(self, 'caption_model') and self.caption_model is not None:
self.caption_model = self.caption_model.to("cpu")
return None
def generate_retro_art(
self,
input_image,
prompt=" ",
negative_prompt=" ",
num_inference_steps=12,
guidance_scale=1.3,
depth_control_scale=0.75,
identity_control_scale=0.85,
lora_scale=1.0,
identity_preservation=1.2,
strength=0.50,
enable_color_matching=False,
consistency_mode=True,
seed=-1
):
"""Generate retro art with InstantID face preservation"""
try:
# Add trigger word
prompt = self.add_trigger_word(prompt)
prompt = sanitize_text(prompt)
negative_prompt = sanitize_text(negative_prompt)
print(f"[PROMPT] {prompt}")
# Calculate optimal size
orig_width, orig_height = safe_image_size(input_image)
optimal_width, optimal_height = calculate_optimal_size(orig_width, orig_height)
# Resize image
resized_image = input_image.resize((optimal_width, optimal_height), Image.LANCZOS)
print(f"[SIZE] Resized to {optimal_width}x{optimal_height}")
# Generate depth map
depth_image, depth_array = self.get_depth_map(resized_image)
# Detect faces
has_detected_faces = False
face_kps_image = None
face_embeddings = None
face_bbox_original = None
if self.face_detection_enabled and self.face_app is not None:
try:
image_array = cv2.cvtColor(np.array(resized_image), cv2.COLOR_RGB2BGR)
faces = self.face_app.get(image_array)
if len(faces) > 0:
has_detected_faces = True
face = faces[0]
# Get face embeddings (512D array)
face_embeddings = face.normed_embedding
# Draw keypoints
from pipeline_stable_diffusion_xl_instantid_img2img import draw_kps
face_kps_image = draw_kps(resized_image, face.kps)
# Get bbox for color matching
face_bbox_original = face.bbox
# Adaptive parameter adjustment
adaptive_params = self.detect_face_quality(face)
if adaptive_params:
print(f"[ADAPTIVE] {adaptive_params['reason']}")
identity_preservation = adaptive_params.get('identity_preservation', identity_preservation)
identity_control_scale = adaptive_params.get('identity_control_scale', identity_control_scale)
guidance_scale = adaptive_params.get('guidance_scale', guidance_scale)
lora_scale = adaptive_params.get('lora_scale', lora_scale)
print(f"[FACE] Detected face with {face.det_score:.2f} confidence")
print(f"[FACE] Embeddings shape: {face_embeddings.shape}")
else:
print("[FACE] No faces detected")
except Exception as e:
print(f"[FACE] Detection failed: {e}")
# Set LORA scale
if hasattr(self.pipe, 'set_adapters') and self.models_loaded['lora']:
try:
self.pipe.set_adapters(["retroart"], adapter_weights=[lora_scale])
print(f"[LORA] Scale: {lora_scale}")
except Exception as e:
print(f"[LORA] Could not set scale: {e}")
# Prepare generation kwargs
pipe_kwargs = {
"image": resized_image,
"strength": strength,
"num_inference_steps": num_inference_steps,
"guidance_scale": guidance_scale,
}
# Setup generator with seed
if seed == -1:
generator = torch.Generator(device=self.device)
actual_seed = generator.seed()
print(f"[SEED] Random: {actual_seed}")
else:
generator = torch.Generator(device=self.device).manual_seed(seed)
actual_seed = seed
print(f"[SEED] Fixed: {actual_seed}")
pipe_kwargs["generator"] = generator
# Use Compel for prompt encoding
if self.use_compel and self.compel is not None:
try:
conditioning = self.compel(prompt)
negative_conditioning = self.compel(negative_prompt)
pipe_kwargs["prompt_embeds"] = conditioning[0]
pipe_kwargs["pooled_prompt_embeds"] = conditioning[1]
pipe_kwargs["negative_prompt_embeds"] = negative_conditioning[0]
pipe_kwargs["negative_pooled_prompt_embeds"] = negative_conditioning[1]
print("[OK] Using Compel-encoded prompts")
except Exception as e:
print(f"[COMPEL] Failed, using standard prompts: {e}")
pipe_kwargs["prompt"] = prompt
pipe_kwargs["negative_prompt"] = negative_prompt
else:
pipe_kwargs["prompt"] = prompt
pipe_kwargs["negative_prompt"] = negative_prompt
# Configure ControlNets + IP-Adapter (SIMPLIFIED!)
if has_detected_faces and face_kps_image is not None:
print("Using InstantID (keypoints + embeddings) + Depth ControlNets")
# Control images: [face keypoints, depth map]
pipe_kwargs["control_image"] = [face_kps_image, depth_image]
# Conditioning scales: [identity, depth]
pipe_kwargs["controlnet_conditioning_scale"] = [
identity_control_scale,
depth_control_scale
]
# Control guidance timing (when each ControlNet is active)
# [start, start] - both active from beginning
# [end, end] - both active until end
pipe_kwargs["control_guidance_start"] = [0.0, 0.0]
pipe_kwargs["control_guidance_end"] = [1.0, 1.0]
# IP-Adapter face embeddings (SIMPLE - pipeline handles everything!)
if face_embeddings is not None:
print(f"Adding face embeddings for IP-Adapter...")
# Just pass the embeddings - pipeline does the rest!
pipe_kwargs["image_embeds"] = face_embeddings
# Control IP-Adapter strength
pipe_kwargs["ip_adapter_scale"] = identity_preservation
print(f" - Face embeddings shape: {face_embeddings.shape}")
print(f" - IP-Adapter scale: {identity_preservation}")
print(f" [OK] Face embeddings configured")
else:
print(" [WARNING] No face embeddings - using keypoints only")
else:
print("No faces detected - using Depth ControlNet only")
# Use depth for both ControlNet slots (identity scale = 0)
pipe_kwargs["control_image"] = [depth_image, depth_image]
pipe_kwargs["controlnet_conditioning_scale"] = [0.0, depth_control_scale]
# Control guidance timing for both slots
pipe_kwargs["control_guidance_start"] = [0.0, 0.0]
pipe_kwargs["control_guidance_end"] = [1.0, 1.0]
# Generate
print(f"Generating: Steps={num_inference_steps}, CFG={guidance_scale}, Strength={strength}")
result = self.pipe(**pipe_kwargs)
generated_image = result.images[0]
# Post-processing: Color matching
if enable_color_matching and has_detected_faces:
print("Applying enhanced face-aware color matching...")
try:
if face_bbox_original is not None:
generated_image = enhanced_color_match(
generated_image,
resized_image,
face_bbox=face_bbox_original
)
print("[OK] Enhanced color matching applied")
else:
generated_image = color_match(generated_image, resized_image, mode='mkl')
print("[OK] Standard color matching applied")
except Exception as e:
print(f"[COLOR] Matching failed: {e}")
elif enable_color_matching:
print("Applying standard color matching...")
try:
generated_image = color_match(generated_image, resized_image, mode='mkl')
print("[OK] Standard color matching applied")
except Exception as e:
print(f"[COLOR] Matching failed: {e}")
return generated_image
finally:
# Aggressive memory cleanup
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize() # Ensure all GPU operations complete
# Force garbage collection multiple times for thorough cleanup
for _ in range(3):
gc.collect()
# Additional cleanup for large tensors
if 'pipe_kwargs' in locals():
for key in list(pipe_kwargs.keys()):
if isinstance(pipe_kwargs.get(key), torch.Tensor):
del pipe_kwargs[key]
# Log memory status if in debug mode
if torch.cuda.is_available():
allocated = torch.cuda.memory_allocated() / 1024**3
reserved = torch.cuda.memory_reserved() / 1024**3
print(f"[MEMORY] GPU: {allocated:.2f}GB allocated, {reserved:.2f}GB reserved")
print("[OK] Generator class ready with InstantID support")