import torch import numpy as np from diffusers import StableDiffusionImg2ImgPipeline, EulerAncestralDiscreteScheduler from diffusers.utils import load_image import utils from PIL import Image import os import uuid import zipfile import cv2 class DeforumRunner: def __init__(self, device="cpu"): self.device = device # Using the requested model # Note: We use Img2ImgPipeline. Even for SDXS, we need 'steps' to make 'strength' work. self.model_id = "IDKiro/sdxs-512-dreamshaper" self.pipe = StableDiffusionImg2ImgPipeline.from_pretrained( self.model_id, safety_checker=None, torch_dtype=torch.float32 ) self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(self.pipe.scheduler.config) self.pipe.to(self.device) self.pipe.set_progress_bar_config(disable=True) def render(self, prompts, # Dict {0: "prompt"} neg_prompt, max_frames, width, height, zoom_str, angle_str, tx_str, ty_str, strength_str, noise_str, fps, steps): # 1. Parse all schedules up front schedules = { 'zoom': utils.parse_weight_string(zoom_str, max_frames), 'angle': utils.parse_weight_string(angle_str, max_frames), 'tx': utils.parse_weight_string(tx_str, max_frames), 'ty': utils.parse_weight_string(ty_str, max_frames), 'strength': utils.parse_weight_string(strength_str, max_frames), 'noise': utils.parse_weight_string(noise_str, max_frames) } # 2. Setup output run_id = uuid.uuid4().hex[:6] output_dir = f"output_{run_id}" os.makedirs(output_dir, exist_ok=True) prev_img = None color_anchor = None # For 'Match Frame 0' generated_frames = [] print(f"Starting Deforum Run {run_id} for {max_frames} frames...") # 3. Main Loop for frame_idx in range(max_frames): # --- A. Get Parameters for this frame --- zoom = schedules['zoom'][frame_idx] angle = schedules['angle'][frame_idx] tx = schedules['tx'][frame_idx] ty = schedules['ty'][frame_idx] noise_amt = schedules['noise'][frame_idx] strength = schedules['strength'][frame_idx] # Get prompt (find latest key <= current frame) prompt_keys = sorted([k for k in prompts.keys() if k <= frame_idx]) current_prompt = prompts[prompt_keys[-1]] # --- B. Prepare Init Image --- if prev_img is None: # First frame: Generate from scratch (Text2Img via Img2Img with strength=1 effectively) # We use a blank image or random noise as base if strictly using Img2Img, # but better to just use strength=1.0 or high strength. init_image = Image.new("RGB", (width, height), (0,0,0)) # Dummy # High strength tells pipe to ignore init image current_strength = 1.0 else: # WARPING logic warp_args = {'angle': angle, 'zoom': zoom, 'translation_x': tx, 'translation_y': ty} init_image = utils.anim_frame_warp_2d(prev_img, warp_args) # COLOR MATCHING (Match Frame 0) if color_anchor is not None: init_image = utils.maintain_colors(init_image, color_anchor) # NOISE INJECTION init_image = utils.add_noise(init_image, noise_amt) current_strength = strength # --- C. Generation --- # If we are strictly 1-step, strength is ignored. # We enforce min 2 steps if strength < 1.0 to ensure mixing. # However, user wants SDXS optimization. # Compromise: We pass the standard strength. # If steps=1, strength is usually moot. We rely on 'steps' from UI (recommend 3). gen_image = self.pipe( prompt=current_prompt, negative_prompt=neg_prompt, image=init_image, num_inference_steps=int(steps), strength=current_strength, guidance_scale=0.0, # SDXS Requirement width=width, height=height ).images[0] # Update state prev_img = gen_image if color_anchor is None: color_anchor = gen_image generated_frames.append(gen_image) # Yield results for UI yield gen_image, None, None # 4. Finalize (Video & Zip) video_path = os.path.join(output_dir, "video.mp4") self.save_video(generated_frames, video_path, fps) zip_path = os.path.join(output_dir, "frames.zip") self.save_zip(generated_frames, zip_path) yield generated_frames[-1], video_path, zip_path def save_video(self, frames, path, fps): if not frames: return w, h = frames[0].size # Use OpenCV for CPU efficiency fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(path, fourcc, fps, (w, h)) for f in frames: # PIL RGB -> OpenCV BGR out.write(cv2.cvtColor(np.array(f), cv2.COLOR_RGB2BGR)) out.release() def save_zip(self, frames, path): with zipfile.ZipFile(path, 'w') as zf: for i, f in enumerate(frames): name = f"{i:05d}.png" # Save to buffer to avoid writing temp files to disk import io buf = io.BytesIO() f.save(buf, format="PNG") zf.writestr(name, buf.getvalue())