|
|
import torch |
|
|
from diffusers import StableDiffusionControlNetPipeline, ControlNetModel |
|
|
from controlnet_aux import OpenposeDetector |
|
|
from PIL import Image |
|
|
import random |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
class ControlNetProgressCallback: |
|
|
def __init__(self, progress, total_steps): |
|
|
self.progress = progress |
|
|
self.total_steps = total_steps |
|
|
self.current_step = 0 |
|
|
|
|
|
def __call__(self, pipe, step_index, timestep, callback_kwargs): |
|
|
self.current_step = step_index + 1 |
|
|
progress_percentage = self.current_step / self.total_steps |
|
|
|
|
|
|
|
|
if self.progress is not None: |
|
|
self.progress(progress_percentage, desc=f"ControlNet: Schritt {self.current_step}/{self.total_steps}") |
|
|
|
|
|
print(f"ControlNet Fortschritt: {self.current_step}/{self.total_steps} ({progress_percentage:.1%})") |
|
|
return callback_kwargs |
|
|
|
|
|
|
|
|
class ControlNetProcessor: |
|
|
def __init__(self, device="cuda", torch_dtype=torch.float32): |
|
|
self.device = device |
|
|
self.torch_dtype = torch_dtype |
|
|
self.pose_detector = None |
|
|
self.controlnet_openpose = None |
|
|
self.controlnet_canny = None |
|
|
self.controlnet_depth = None |
|
|
self.pipe_openpose = None |
|
|
self.pipe_canny = None |
|
|
self.pipe_depth = None |
|
|
self.pipe_multi_inside = None |
|
|
self.pipe_multi_outside = None |
|
|
self.conditioning_maps = None |
|
|
self.controlnet_type = None |
|
|
self.controlnet_scales = None |
|
|
|
|
|
def load_pose_detector(self): |
|
|
"""Lädt nur den Pose-Detector""" |
|
|
if self.pose_detector is None: |
|
|
print("Loading Pose Detector...") |
|
|
try: |
|
|
self.pose_detector = OpenposeDetector.from_pretrained("lllyasviel/ControlNet") |
|
|
except Exception as e: |
|
|
print(f"Warnung: Pose-Detector konnte nicht geladen werden: {e}") |
|
|
return self.pose_detector |
|
|
|
|
|
def extract_pose_simple(self, image): |
|
|
"""Einfache Pose-Extraktion ohne komplexe Abhängigkeiten""" |
|
|
try: |
|
|
img_array = np.array(image.convert("RGB")) |
|
|
edges = cv2.Canny(img_array, 100, 200) |
|
|
pose_image = Image.fromarray(edges).convert("RGB") |
|
|
print("⚠️ Verwende Kanten-basierte Pose-Approximation") |
|
|
return pose_image |
|
|
except Exception as e: |
|
|
print(f"Fehler bei einfacher Pose-Extraktion: {e}") |
|
|
return image.convert("RGB").resize((512, 512)) |
|
|
|
|
|
def extract_pose(self, image): |
|
|
"""Extrahiert Pose-Map aus Bild mit Fallback""" |
|
|
try: |
|
|
detector = self.load_pose_detector() |
|
|
if detector is None: |
|
|
return self.extract_pose_simple(image) |
|
|
|
|
|
pose_image = detector(image, hand_and_face=True) |
|
|
return pose_image |
|
|
except Exception as e: |
|
|
print(f"Fehler bei Pose-Extraktion: {e}") |
|
|
return self.extract_pose_simple(image) |
|
|
|
|
|
def extract_canny_edges(self, image): |
|
|
"""Extrahiert Canny Edges für Umgebungserhaltung""" |
|
|
try: |
|
|
img_array = np.array(image.convert("RGB")) |
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) |
|
|
edges = cv2.Canny(gray, 100, 200) |
|
|
|
|
|
|
|
|
edges_rgb = cv2.cvtColor(edges, cv2.COLOR_GRAY2RGB) |
|
|
edges_image = Image.fromarray(edges_rgb) |
|
|
|
|
|
print("✅ Canny Edge Map erstellt") |
|
|
return edges_image |
|
|
except Exception as e: |
|
|
print(f"Fehler bei Canny Edge Extraction: {e}") |
|
|
return image.convert("RGB").resize((512, 512)) |
|
|
|
|
|
def extract_depth_map(self, image): |
|
|
"""Extrahiert Depth Map für räumliche Konsistenz""" |
|
|
try: |
|
|
|
|
|
|
|
|
img_array = np.array(image.convert("RGB")) |
|
|
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) |
|
|
|
|
|
|
|
|
depth_map = cv2.GaussianBlur(gray, (5, 5), 0) |
|
|
depth_rgb = cv2.cvtColor(depth_map, cv2.COLOR_GRAY2RGB) |
|
|
depth_image = Image.fromarray(depth_rgb) |
|
|
|
|
|
print("✅ Depth Map erstellt (Grayscale Approximation)") |
|
|
return depth_image |
|
|
except Exception as e: |
|
|
print(f"Fehler bei Depth Map Extraction: {e}") |
|
|
return image.convert("RGB").resize((512, 512)) |
|
|
|
|
|
def load_controlnet_pipeline(self, controlnet_type="openpose"): |
|
|
"""Lädt die passende ControlNet Pipeline""" |
|
|
if controlnet_type == "openpose": |
|
|
if self.pipe_openpose is None: |
|
|
print("Loading OpenPose ControlNet pipeline...") |
|
|
try: |
|
|
self.controlnet_openpose = ControlNetModel.from_pretrained( |
|
|
"lllyasviel/sd-controlnet-openpose", |
|
|
torch_dtype=self.torch_dtype |
|
|
) |
|
|
self.pipe_openpose = StableDiffusionControlNetPipeline.from_pretrained( |
|
|
"runwayml/stable-diffusion-v1-5", |
|
|
controlnet=self.controlnet_openpose, |
|
|
torch_dtype=self.torch_dtype, |
|
|
safety_checker=None, |
|
|
requires_safety_checker=False |
|
|
).to(self.device) |
|
|
|
|
|
from diffusers import EulerAncestralDiscreteScheduler |
|
|
self.pipe_openpose.scheduler = EulerAncestralDiscreteScheduler.from_config(self.pipe_openpose.scheduler.config) |
|
|
self.pipe_openpose.enable_attention_slicing() |
|
|
print("✅ OpenPose ControlNet pipeline loaded successfully!") |
|
|
except Exception as e: |
|
|
print(f"Fehler beim Laden von OpenPose ControlNet: {e}") |
|
|
raise |
|
|
return self.pipe_openpose |
|
|
|
|
|
elif controlnet_type == "canny": |
|
|
if self.pipe_canny is None: |
|
|
print("Loading Canny ControlNet pipeline...") |
|
|
try: |
|
|
self.controlnet_canny = ControlNetModel.from_pretrained( |
|
|
"lllyasviel/sd-controlnet-canny", |
|
|
torch_dtype=self.torch_dtype |
|
|
) |
|
|
self.pipe_canny = StableDiffusionControlNetPipeline.from_pretrained( |
|
|
"runwayml/stable-diffusion-v1-5", |
|
|
controlnet=self.controlnet_canny, |
|
|
torch_dtype=self.torch_dtype, |
|
|
safety_checker=None, |
|
|
requires_safety_checker=False |
|
|
).to(self.device) |
|
|
|
|
|
from diffusers import EulerAncestralDiscreteScheduler |
|
|
self.pipe_canny.scheduler = EulerAncestralDiscreteScheduler.from_config(self.pipe_canny.scheduler.config) |
|
|
self.pipe_canny.enable_attention_slicing() |
|
|
print("✅ Canny ControlNet pipeline loaded successfully!") |
|
|
except Exception as e: |
|
|
print(f"Fehler beim Laden von Canny ControlNet: {e}") |
|
|
raise |
|
|
return self.pipe_canny |
|
|
|
|
|
elif controlnet_type == "depth": |
|
|
if self.pipe_depth is None: |
|
|
print("Loading Depth ControlNet pipeline...") |
|
|
try: |
|
|
self.controlnet_depth = ControlNetModel.from_pretrained( |
|
|
"lllyasviel/sd-controlnet-depth", |
|
|
torch_dtype=self.torch_dtype |
|
|
) |
|
|
self.pipe_depth = StableDiffusionControlNetPipeline.from_pretrained( |
|
|
"runwayml/stable-diffusion-v1-5", |
|
|
controlnet=self.controlnet_depth, |
|
|
torch_dtype=self.torch_dtype, |
|
|
safety_checker=None, |
|
|
requires_safety_checker=False |
|
|
).to(self.device) |
|
|
|
|
|
from diffusers import EulerAncestralDiscreteScheduler |
|
|
self.pipe_depth.scheduler = EulerAncestralDiscreteScheduler.from_config(self.pipe_depth.scheduler.config) |
|
|
self.pipe_depth.enable_attention_slicing() |
|
|
print("✅ Depth ControlNet pipeline loaded successfully!") |
|
|
except Exception as e: |
|
|
print(f"Fehler beim Laden von Depth ControlNet: {e}") |
|
|
raise |
|
|
return self.pipe_depth |
|
|
|
|
|
elif controlnet_type == "multi_inside": |
|
|
if self.pipe_multi_inside is None: |
|
|
print("Loading Multi-ControlNet pipeline für Inside-Box...") |
|
|
try: |
|
|
if self.controlnet_openpose is None: |
|
|
self.controlnet_openpose = ControlNetModel.from_pretrained( |
|
|
"lllyasviel/sd-controlnet-openpose", |
|
|
torch_dtype=self.torch_dtype |
|
|
) |
|
|
if self.controlnet_canny is None: |
|
|
self.controlnet_canny = ControlNetModel.from_pretrained( |
|
|
"lllyasviel/sd-controlnet-canny", |
|
|
torch_dtype=self.torch_dtype |
|
|
) |
|
|
|
|
|
self.pipe_multi_inside = StableDiffusionControlNetPipeline.from_pretrained( |
|
|
"runwayml/stable-diffusion-v1-5", |
|
|
controlnet=[self.controlnet_openpose, self.controlnet_canny], |
|
|
torch_dtype=self.torch_dtype, |
|
|
safety_checker=None, |
|
|
requires_safety_checker=False |
|
|
).to(self.device) |
|
|
|
|
|
from diffusers import EulerAncestralDiscreteScheduler |
|
|
self.pipe_multi_inside.scheduler = EulerAncestralDiscreteScheduler.from_config(self.pipe_multi_inside.scheduler.config) |
|
|
self.pipe_multi_inside.enable_attention_slicing() |
|
|
print("✅ Multi-ControlNet (Inside) pipeline loaded successfully!") |
|
|
except Exception as e: |
|
|
print(f"Fehler beim Laden von Multi-ControlNet Inside: {e}") |
|
|
raise |
|
|
return self.pipe_multi_inside |
|
|
|
|
|
elif controlnet_type == "multi_outside": |
|
|
if self.pipe_multi_outside is None: |
|
|
print("Loading Multi-ControlNet pipeline für Outside-Box...") |
|
|
try: |
|
|
if self.controlnet_depth is None: |
|
|
self.controlnet_depth = ControlNetModel.from_pretrained( |
|
|
"lllyasviel/sd-controlnet-depth", |
|
|
torch_dtype=self.torch_dtype |
|
|
) |
|
|
if self.controlnet_canny is None: |
|
|
self.controlnet_canny = ControlNetModel.from_pretrained( |
|
|
"lllyasviel/sd-controlnet-canny", |
|
|
torch_dtype=self.torch_dtype |
|
|
) |
|
|
|
|
|
self.pipe_multi_outside = StableDiffusionControlNetPipeline.from_pretrained( |
|
|
"runwayml/stable-diffusion-v1-5", |
|
|
controlnet=[self.controlnet_depth, self.controlnet_canny], |
|
|
torch_dtype=self.torch_dtype, |
|
|
safety_checker=None, |
|
|
requires_safety_checker=False |
|
|
).to(self.device) |
|
|
|
|
|
from diffusers import EulerAncestralDiscreteScheduler |
|
|
self.pipe_multi_outside.scheduler = EulerAncestralDiscreteScheduler.from_config(self.pipe_multi_outside.scheduler.config) |
|
|
self.pipe_multi_outside.enable_attention_slicing() |
|
|
print("✅ Multi-ControlNet (Outside) pipeline loaded successfully!") |
|
|
except Exception as e: |
|
|
print(f"Fehler beim Laden von Multi-ControlNet Outside: {e}") |
|
|
raise |
|
|
return self.pipe_multi_outside |
|
|
|
|
|
|
|
|
def prepare_conditioning_maps(self, image, keep_environment=False): |
|
|
""" |
|
|
NEUE METHODE: Erstellt und speichert Conditioning-Maps basierend auf Bild und Modus. |
|
|
Wird von app.py EINMAL am Anfang aufgerufen. |
|
|
""" |
|
|
print("🎯 ControlNet: Erstelle und speichere Conditioning-Maps...") |
|
|
|
|
|
if keep_environment: |
|
|
|
|
|
print(" Modus: Depth + Canny") |
|
|
self.conditioning_maps = [ |
|
|
self.extract_depth_map(image), |
|
|
self.extract_canny_edges(image) |
|
|
] |
|
|
self.controlnet_type = "multi_outside" |
|
|
self.controlnet_scales = [0.6, 0.4] |
|
|
else: |
|
|
|
|
|
print(" Modus: OpenPose + Canny") |
|
|
self.conditioning_maps = [ |
|
|
self.extract_pose(image), |
|
|
self.extract_canny_edges(image) |
|
|
] |
|
|
self.controlnet_type = "multi_inside" |
|
|
self.controlnet_scales = [0.7, 0.3] |
|
|
|
|
|
print(f"✅ {len(self.conditioning_maps)} Conditioning-Maps gespeichert.") |
|
|
return self.conditioning_maps |
|
|
|
|
|
def get_controlnet_conditioning(self, noisy_latents, timestep, prompt_embeds, controlnet_strength=1.0): |
|
|
""" |
|
|
NEUE KERNMETHODE: Berechnet Steuersignale für einen spezifischen Denoising-Step. |
|
|
Wird von app.py in JEDEM Denoising-Schritt aufgerufen. |
|
|
|
|
|
Args: |
|
|
noisy_latents: Aktuelle verrauschte Latents (Shape: [1, 4, 64, 64]) |
|
|
timestep: Aktueller Timestep (z.B. tensor([818])) |
|
|
prompt_embeds: Embeddings des Prompts |
|
|
controlnet_strength: Globale Stärke der ControlNet-Wirkung |
|
|
|
|
|
Returns: |
|
|
controlnet_outputs: Steuersignale, die an UNet übergeben werden |
|
|
""" |
|
|
if not hasattr(self, 'conditioning_maps') or self.conditioning_maps is None: |
|
|
raise ValueError("Conditioning-Maps nicht vorhanden. Rufen Sie zuerst prepare_conditioning_maps() auf.") |
|
|
|
|
|
|
|
|
pipe = self.load_controlnet_pipeline(self.controlnet_type) |
|
|
|
|
|
|
|
|
scaled_conditioning_scales = [scale * controlnet_strength for scale in self.controlnet_scales] |
|
|
|
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
controlnet_outputs = pipe.controlnet( |
|
|
noisy_latents, |
|
|
timestep, |
|
|
encoder_hidden_states=prompt_embeds, |
|
|
controlnet_cond=self.conditioning_maps, |
|
|
conditioning_scale=scaled_conditioning_scales, |
|
|
return_dict=True, |
|
|
) |
|
|
|
|
|
|
|
|
return controlnet_outputs |
|
|
|
|
|
|
|
|
|
|
|
def prepare_inpaint_input(self, image, keep_environment=False): |
|
|
""" |
|
|
Bereitet das Input-Bild für Inpaint vor |
|
|
Rückgabe: (image_für_inpaint, conditioning_info) |
|
|
|
|
|
HINWEIS: Diese Funktion wird nicht direkt von app.py verwendet, |
|
|
da die Logik in generate_with_controlnet enthalten ist. |
|
|
""" |
|
|
if keep_environment: |
|
|
|
|
|
print("🎯 Inpaint: Übergebe Depth+Canny Info (Outside-Box ändern)") |
|
|
depth_image = self.extract_depth_map(image) |
|
|
canny_image = self.extract_canny_edges(image) |
|
|
|
|
|
combined_map = Image.blend(depth_image.convert("RGB"), canny_image.convert("RGB"), alpha=0.5) |
|
|
return combined_map, {"type": "depth_canny", "image": combined_map} |
|
|
else: |
|
|
|
|
|
print("🎯 Inpaint: Übergebe Originalbild (Inside-Box ändern)") |
|
|
return image, {"type": "original", "image": image} |
|
|
|
|
|
|
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
torch_dtype = torch.float16 if device == "cuda" else torch.float32 |
|
|
controlnet_processor = ControlNetProcessor(device=device, torch_dtype=torch_dtype) |