Spaces:
Runtime error
Runtime error
| import torch, os, uuid, cv2, gc, random, io, zipfile | |
| import numpy as np | |
| from PIL import Image | |
| from diffusers import AutoPipelineForImage2Image, AutoPipelineForText2Image | |
| from diffusers import LCMScheduler, EulerAncestralDiscreteScheduler, DDIMScheduler, DPMSolverMultistepScheduler | |
| import deforum_data as d_data | |
| import deforum_warp as d_warp | |
| def match_colors(img, ref, mode): | |
| if mode == 'None' or ref is None: return img | |
| img_np = np.array(img).astype(np.uint8) | |
| ref_np = np.array(ref).astype(np.uint8) | |
| if "LAB" in mode: | |
| c1, c2 = cv2.cvtColor(img_np, cv2.COLOR_RGB2LAB), cv2.cvtColor(ref_np, cv2.COLOR_RGB2LAB) | |
| elif "HSV" in mode: | |
| c1, c2 = cv2.cvtColor(img_np, cv2.COLOR_RGB2HSV), cv2.cvtColor(ref_np, cv2.COLOR_RGB2HSV) | |
| else: | |
| c1, c2 = img_np, ref_np | |
| for i in range(3): | |
| c1[:,:,i] = np.clip(c1[:,:,i] - c1[:,:,i].mean() + c2[:,:,i].mean(), 0, 255) | |
| if "LAB" in mode: out = cv2.cvtColor(c1, cv2.COLOR_LAB2RGB) | |
| elif "HSV" in mode: out = cv2.cvtColor(c1, cv2.COLOR_HSV2RGB) | |
| else: out = c1 | |
| return Image.fromarray(out) | |
| def add_noise(image, amt): | |
| if amt <= 0: return image | |
| # Deforum Noise is added to the INPUT image before encoding | |
| arr = np.array(image).astype(np.float32) | |
| noise = np.random.normal(0, amt * 255, arr.shape) | |
| noisy = np.clip(arr + noise, 0, 255).astype(np.uint8) | |
| return Image.fromarray(noisy) | |
| class DeforumRunner: | |
| def __init__(self, device="cpu"): | |
| self.device = device | |
| self.pipe = None | |
| self.stop_req = False | |
| self.current_model = None | |
| def load_model(self, model_id, lora, scheduler): | |
| if model_id == self.current_model and self.pipe: return | |
| print(f"Loading: {model_id}") | |
| if self.pipe: del self.pipe; gc.collect() | |
| # Robust Load | |
| try: | |
| self.pipe = AutoPipelineForImage2Image.from_pretrained(model_id, safety_checker=None, torch_dtype=torch.float32) | |
| except: | |
| self.pipe = AutoPipelineForImage2Image.from_pretrained(model_id, safety_checker=None, torch_dtype=torch.float32, use_safetensors=False) | |
| if lora and lora.strip(): | |
| try: self.pipe.load_lora_weights(lora); self.pipe.fuse_lora() | |
| except: pass | |
| # Scheduler | |
| conf = self.pipe.scheduler.config | |
| if scheduler == "LCM": self.pipe.scheduler = LCMScheduler.from_config(conf) | |
| elif scheduler == "Euler A": self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(conf) | |
| elif scheduler == "DDIM": self.pipe.scheduler = DDIMScheduler.from_config(conf) | |
| elif scheduler == "DPM++ 2M": self.pipe.scheduler = DPMSolverMultistepScheduler.from_config(conf) | |
| self.pipe.to(self.device) | |
| try: self.pipe.enable_attention_slicing() | |
| except: pass | |
| self.current_model = model_id | |
| def render(self, args): | |
| self.stop_req = False | |
| try: | |
| self.load_model(args['model'], args['lora'], args['sched']) | |
| except Exception as e: | |
| yield None, None, None, f"Model Error: {e}"; return | |
| # Parse Schedules | |
| mf = int(args['max_frames']) | |
| s_z = d_data.parse_weight_schedule(args['zoom'], mf) | |
| s_a = d_data.parse_weight_schedule(args['angle'], mf) | |
| s_tx = d_data.parse_weight_schedule(args['tx'], mf) | |
| s_ty = d_data.parse_weight_schedule(args['ty'], mf) | |
| s_str = d_data.parse_weight_schedule(args['strength'], mf) | |
| s_noi = d_data.parse_weight_schedule(args['noise'], mf) | |
| prompts = d_data.parse_prompts(args['prompts']) | |
| run_id = uuid.uuid4().hex[:6] | |
| os.makedirs(f"out_{run_id}", exist_ok=True) | |
| # Init State | |
| prev_img = None | |
| color_ref = None | |
| # If Init Image exists, load it | |
| if args['init_image']: | |
| prev_img = args['init_image'].resize((args['W'], args['H']), Image.LANCZOS) | |
| color_ref = prev_img.copy() | |
| frames = [] | |
| base_seed = random.randint(0, 2**32-1) | |
| print(f"Run {run_id} Started.") | |
| for i in range(mf): | |
| if self.stop_req: break | |
| # Seed Management | |
| if args['seed_beh'] == "fixed": s_val = base_seed | |
| elif args['seed_beh'] == "random": s_val = random.randint(0, 2**32-1) | |
| else: s_val = base_seed + i | |
| random.seed(s_val); np.random.seed(s_val); torch.manual_seed(s_val) | |
| gen_seed = torch.Generator(self.device).manual_seed(s_val) | |
| # --- FRAME 0 --- | |
| if i == 0: | |
| if prev_img is None: | |
| # Generate pure noise for start | |
| dummy = Image.fromarray(np.random.randint(0, 255, (args['H'], args['W'], 3), dtype=np.uint8)) | |
| curr_prompt = prompts[0] | |
| # High strength to ignore dummy noise | |
| prev_img = self.pipe( | |
| prompt=curr_prompt, negative_prompt=args['neg'], | |
| image=dummy, strength=1.0, | |
| num_inference_steps=int(args['steps']), | |
| guidance_scale=float(args['cfg']), | |
| generator=gen_seed | |
| ).images[0] | |
| color_ref = prev_img.copy() | |
| frames.append(prev_img) | |
| yield prev_img, None, None, "Frame 0 Ready" | |
| continue | |
| # --- FRAME 1+ LOOP --- | |
| # 1. WARP (Perspective Transform) | |
| # The matrix math in d_warp now simulates depth via zoom scaling | |
| warped = d_warp.anim_frame_warp(prev_img, s_a[i], s_z[i], s_tx[i], s_ty[i], args['border']) | |
| # 2. DIFFUSE | |
| if i % int(args['cadence']) == 0: | |
| # Color | |
| inp = match_colors(warped, color_ref, args['color']) | |
| # Noise | |
| inp = add_noise(inp, s_noi[i]) | |
| curr_prompt = prompts[max(k for k in prompts.keys() if k <= i)] | |
| # Strength Guard | |
| st = s_str[i] | |
| if int(args['steps']) * st < 1: st = min(1.0, 1.1/int(args['steps'])) | |
| gen = self.pipe( | |
| prompt=curr_prompt, negative_prompt=args['neg'], | |
| image=inp, strength=st, | |
| num_inference_steps=int(args['steps']), | |
| guidance_scale=float(args['cfg']), | |
| generator=gen_seed | |
| ).images[0] | |
| # Coherence | |
| if args['color'] != "None": gen = match_colors(gen, color_ref, args['color']) | |
| prev_img = gen | |
| else: | |
| # Turbo | |
| gen = warped | |
| prev_img = warped | |
| frames.append(gen) | |
| yield gen, None, None, f"Frame {i+1}/{mf}" | |
| # Finalize | |
| v_p = f"out_{run_id}/video.mp4" | |
| self.save_vid(frames, v_p, int(args['fps'])) | |
| z_p = f"out_{run_id}/frames.zip" | |
| self.save_zip(frames, z_p) | |
| yield frames[-1], v_p, z_p, "Done" | |
| def stop(self): self.stop_req = True | |
| def save_vid(self, frames, path, fps): | |
| if not frames: return | |
| try: | |
| w, h = frames[0].size | |
| # 'mp4v' is widely supported for CPU/OpenCV | |
| out = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) | |
| for f in frames: | |
| out.write(cv2.cvtColor(np.array(f), cv2.COLOR_RGB2BGR)) | |
| out.release() | |
| except Exception as e: | |
| print(f"Video Save Error: {e}") | |
| def save_zip(self, frames, path): | |
| try: | |
| with zipfile.ZipFile(path, 'w') as zf: | |
| for j, f in enumerate(frames): | |
| buf = io.BytesIO() | |
| f.save(buf, format="PNG") | |
| zf.writestr(f"frame_{j:05d}.png", buf.getvalue()) | |
| except Exception as e: | |
| print(f"Zip Save Error: {e}") |