import torch, os, uuid, cv2, gc, random, io, zipfile import numpy as np from PIL import Image from diffusers import AutoPipelineForImage2Image, AutoPipelineForText2Image from diffusers import LCMScheduler, EulerAncestralDiscreteScheduler, DDIMScheduler, DPMSolverMultistepScheduler import deforum_data as d_data import deforum_warp as d_warp def match_colors(img, ref, mode): if mode == 'None' or ref is None: return img img_np = np.array(img).astype(np.uint8) ref_np = np.array(ref).astype(np.uint8) if "LAB" in mode: c1, c2 = cv2.cvtColor(img_np, cv2.COLOR_RGB2LAB), cv2.cvtColor(ref_np, cv2.COLOR_RGB2LAB) elif "HSV" in mode: c1, c2 = cv2.cvtColor(img_np, cv2.COLOR_RGB2HSV), cv2.cvtColor(ref_np, cv2.COLOR_RGB2HSV) else: c1, c2 = img_np, ref_np for i in range(3): c1[:,:,i] = np.clip(c1[:,:,i] - c1[:,:,i].mean() + c2[:,:,i].mean(), 0, 255) if "LAB" in mode: out = cv2.cvtColor(c1, cv2.COLOR_LAB2RGB) elif "HSV" in mode: out = cv2.cvtColor(c1, cv2.COLOR_HSV2RGB) else: out = c1 return Image.fromarray(out) def add_noise(image, amt): if amt <= 0: return image # Deforum Noise is added to the INPUT image before encoding arr = np.array(image).astype(np.float32) noise = np.random.normal(0, amt * 255, arr.shape) noisy = np.clip(arr + noise, 0, 255).astype(np.uint8) return Image.fromarray(noisy) class DeforumRunner: def __init__(self, device="cpu"): self.device = device self.pipe = None self.stop_req = False self.current_model = None def load_model(self, model_id, lora, scheduler): if model_id == self.current_model and self.pipe: return print(f"Loading: {model_id}") if self.pipe: del self.pipe; gc.collect() # Robust Load try: self.pipe = AutoPipelineForImage2Image.from_pretrained(model_id, safety_checker=None, torch_dtype=torch.float32) except: self.pipe = AutoPipelineForImage2Image.from_pretrained(model_id, safety_checker=None, torch_dtype=torch.float32, use_safetensors=False) if lora and lora.strip(): try: self.pipe.load_lora_weights(lora); self.pipe.fuse_lora() except: pass # Scheduler conf = self.pipe.scheduler.config if scheduler == "LCM": self.pipe.scheduler = LCMScheduler.from_config(conf) elif scheduler == "Euler A": self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(conf) elif scheduler == "DDIM": self.pipe.scheduler = DDIMScheduler.from_config(conf) elif scheduler == "DPM++ 2M": self.pipe.scheduler = DPMSolverMultistepScheduler.from_config(conf) self.pipe.to(self.device) try: self.pipe.enable_attention_slicing() except: pass self.current_model = model_id def render(self, args): self.stop_req = False try: self.load_model(args['model'], args['lora'], args['sched']) except Exception as e: yield None, None, None, f"Model Error: {e}"; return # Parse Schedules mf = int(args['max_frames']) s_z = d_data.parse_weight_schedule(args['zoom'], mf) s_a = d_data.parse_weight_schedule(args['angle'], mf) s_tx = d_data.parse_weight_schedule(args['tx'], mf) s_ty = d_data.parse_weight_schedule(args['ty'], mf) s_str = d_data.parse_weight_schedule(args['strength'], mf) s_noi = d_data.parse_weight_schedule(args['noise'], mf) prompts = d_data.parse_prompts(args['prompts']) run_id = uuid.uuid4().hex[:6] os.makedirs(f"out_{run_id}", exist_ok=True) # Init State prev_img = None color_ref = None # If Init Image exists, load it if args['init_image']: prev_img = args['init_image'].resize((args['W'], args['H']), Image.LANCZOS) color_ref = prev_img.copy() frames = [] base_seed = random.randint(0, 2**32-1) print(f"Run {run_id} Started.") for i in range(mf): if self.stop_req: break # Seed Management if args['seed_beh'] == "fixed": s_val = base_seed elif args['seed_beh'] == "random": s_val = random.randint(0, 2**32-1) else: s_val = base_seed + i random.seed(s_val); np.random.seed(s_val); torch.manual_seed(s_val) gen_seed = torch.Generator(self.device).manual_seed(s_val) # --- FRAME 0 --- if i == 0: if prev_img is None: # Generate pure noise for start dummy = Image.fromarray(np.random.randint(0, 255, (args['H'], args['W'], 3), dtype=np.uint8)) curr_prompt = prompts[0] # High strength to ignore dummy noise prev_img = self.pipe( prompt=curr_prompt, negative_prompt=args['neg'], image=dummy, strength=1.0, num_inference_steps=int(args['steps']), guidance_scale=float(args['cfg']), generator=gen_seed ).images[0] color_ref = prev_img.copy() frames.append(prev_img) yield prev_img, None, None, "Frame 0 Ready" continue # --- FRAME 1+ LOOP --- # 1. WARP (Perspective Transform) # The matrix math in d_warp now simulates depth via zoom scaling warped = d_warp.anim_frame_warp(prev_img, s_a[i], s_z[i], s_tx[i], s_ty[i], args['border']) # 2. DIFFUSE if i % int(args['cadence']) == 0: # Color inp = match_colors(warped, color_ref, args['color']) # Noise inp = add_noise(inp, s_noi[i]) curr_prompt = prompts[max(k for k in prompts.keys() if k <= i)] # Strength Guard st = s_str[i] if int(args['steps']) * st < 1: st = min(1.0, 1.1/int(args['steps'])) gen = self.pipe( prompt=curr_prompt, negative_prompt=args['neg'], image=inp, strength=st, num_inference_steps=int(args['steps']), guidance_scale=float(args['cfg']), generator=gen_seed ).images[0] # Coherence if args['color'] != "None": gen = match_colors(gen, color_ref, args['color']) prev_img = gen else: # Turbo gen = warped prev_img = warped frames.append(gen) yield gen, None, None, f"Frame {i+1}/{mf}" # Finalize v_p = f"out_{run_id}/video.mp4" self.save_vid(frames, v_p, int(args['fps'])) z_p = f"out_{run_id}/frames.zip" self.save_zip(frames, z_p) yield frames[-1], v_p, z_p, "Done" def stop(self): self.stop_req = True def save_vid(self, frames, path, fps): if not frames: return try: w, h = frames[0].size # 'mp4v' is widely supported for CPU/OpenCV out = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) for f in frames: out.write(cv2.cvtColor(np.array(f), cv2.COLOR_RGB2BGR)) out.release() except Exception as e: print(f"Video Save Error: {e}") def save_zip(self, frames, path): try: with zipfile.ZipFile(path, 'w') as zf: for j, f in enumerate(frames): buf = io.BytesIO() f.save(buf, format="PNG") zf.writestr(f"frame_{j:05d}.png", buf.getvalue()) except Exception as e: print(f"Zip Save Error: {e}")