import torch, os, uuid, zipfile, cv2, gc, random import numpy as np from diffusers import AutoPipelineForImage2Image, LCMScheduler, EulerAncestralDiscreteScheduler, DDIMScheduler, DPMSolverMultistepScheduler from PIL import Image import utils class DeforumRunner: def __init__(self, device="cpu"): self.device = device self.pipe = None self.stop_requested = False self.current_config = (None, None, None) def load_model(self, model_id, lora_id, scheduler_name): if (model_id, lora_id, scheduler_name) == self.current_config and self.pipe is not None: return print(f"Loading Model: {model_id}") if self.pipe: del self.pipe; gc.collect() try: self.pipe = AutoPipelineForImage2Image.from_pretrained( model_id, safety_checker=None, torch_dtype=torch.float32 ) except: self.pipe = AutoPipelineForImage2Image.from_pretrained( model_id, safety_checker=None, torch_dtype=torch.float32, use_safetensors=False ) if lora_id and lora_id != "None": try: self.pipe.load_lora_weights(lora_id) self.pipe.fuse_lora() except Exception as e: print(f"LoRA Error: {e}") conf = self.pipe.scheduler.config if scheduler_name == "LCM": self.pipe.scheduler = LCMScheduler.from_config(conf) elif scheduler_name == "Euler A": self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(conf) elif scheduler_name == "DDIM": self.pipe.scheduler = DDIMScheduler.from_config(conf) elif scheduler_name == "DPM++ 2M": self.pipe.scheduler = DPMSolverMultistepScheduler.from_config(conf) self.pipe.to(self.device) self.pipe.enable_attention_slicing() self.current_config = (model_id, lora_id, scheduler_name) def stop(self): self.stop_requested = True def render(self, prompts, neg_prompt, max_frames, width, height, zoom_s, angle_s, tx_s, ty_s, strength_s, noise_s, fps, steps, cfg_scale, cadence, color_mode, border_mode, seed_behavior, init_image, model_id, lora_id, scheduler_name): self.stop_requested = False self.load_model(model_id, lora_id, scheduler_name) # Parse Schedules keys = ['z', 'a', 'tx', 'ty', 'str', 'noi'] inputs = [zoom_s, angle_s, tx_s, ty_s, strength_s, noise_s] sched = {k: utils.parse_weight_string(v, max_frames) for k, v in zip(keys, inputs)} run_id = uuid.uuid4().hex[:6] os.makedirs(f"out_{run_id}", exist_ok=True) # Init Image logic if init_image: prev_img = init_image.resize((width, height), Image.LANCZOS) else: prev_img = Image.fromarray(np.random.randint(0, 255, (height, width, 3), dtype=np.uint8)) color_anchor = prev_img.copy() frames = [] # Global Seed Init base_seed = random.randint(0, 2**32 - 1) print(f"Run {run_id} Started. Seed: {base_seed}") for i in range(max_frames): if self.stop_requested: break # --- SEED MANAGEMENT (Crucial for stability) --- if seed_behavior == "fixed": frame_seed = base_seed elif seed_behavior == "random": frame_seed = random.randint(0, 2**32 - 1) else: # iter frame_seed = base_seed + i # Lock ALL RNGs for this frame random.seed(frame_seed) np.random.seed(frame_seed) torch.manual_seed(frame_seed) # --- 1. WARP --- # Apply transform to the RESULT of the previous frame args = {'angle': sched['a'][i], 'zoom': sched['z'][i], 'tx': sched['tx'][i], 'ty': sched['ty'][i]} warped_img = utils.anim_frame_warp_2d(prev_img, args, border_mode) # --- 2. CADENCE CHECK --- if i % cadence == 0: # --- ACTIVE DIFFUSION STEP --- # A. Color Match init_for_diff = utils.maintain_colors(warped_img, color_anchor, color_mode) # B. Noise Injection (Seeded by np.random above) init_for_diff = utils.add_noise(init_for_diff, sched['noi'][i]) # C. Prepare Generation curr_prompt = prompts[max(k for k in prompts.keys() if k <= i)] # D. Strength Safety strength = sched['str'][i] # If using SDXS/LCM with very few steps, ensure strength isn't 0-ing out steps eff_steps = int(steps * strength) if eff_steps < 1: strength = min(1.0, 1.1 / steps) # E. Generate gen_image = self.pipe( prompt=curr_prompt, negative_prompt=neg_prompt, image=init_for_diff, num_inference_steps=steps, strength=strength, guidance_scale=cfg_scale, width=width, height=height ).images[0] # F. Post-Color Stability if color_mode != 'None': gen_image = utils.maintain_colors(gen_image, color_anchor, color_mode) # G. Update State for NEXT frame prev_img = gen_image else: # --- TURBO STEP (Cadence) --- # We show the warped image, AND we use it as the base for the next warp gen_image = warped_img prev_img = warped_img frames.append(gen_image) yield gen_image, None, None, f"Frame {i+1}/{max_frames}" # Finalize vid_p = f"out_{run_id}/video.mp4" self.save_video(frames, vid_p, fps) zip_p = f"out_{run_id}/frames.zip" self.save_zip(frames, zip_p) yield frames[-1], vid_p, zip_p, "Done" def save_video(self, frames, path, fps): if not frames: return w, h = frames[0].size out = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) for f in frames: out.write(cv2.cvtColor(np.array(f), cv2.COLOR_RGB2BGR)) out.release() def save_zip(self, frames, path): import io with zipfile.ZipFile(path, 'w') as zf: for j, f in enumerate(frames): buf = io.BytesIO() f.save(buf, format="PNG") zf.writestr(f"f_{j:05d}.png", buf.getvalue())