Spaces:
Runtime error
Runtime error
| import torch, os, uuid, zipfile, cv2, gc, random | |
| import numpy as np | |
| from diffusers import AutoPipelineForImage2Image, LCMScheduler, EulerAncestralDiscreteScheduler, DDIMScheduler, DPMSolverMultistepScheduler | |
| from PIL import Image | |
| import utils | |
| class DeforumRunner: | |
| def __init__(self, device="cpu"): | |
| self.device = device | |
| self.pipe = None | |
| self.stop_requested = False | |
| self.current_config = (None, None, None) # Model, LoRA, Scheduler | |
| def load_model(self, model_id, lora_id, scheduler_name): | |
| # Avoid reloading if not changed | |
| if (model_id, lora_id, scheduler_name) == self.current_config and self.pipe is not None: | |
| return | |
| print(f"Loading Model: {model_id} with {scheduler_name}") | |
| if self.pipe: del self.pipe; gc.collect() | |
| try: | |
| self.pipe = AutoPipelineForImage2Image.from_pretrained( | |
| model_id, safety_checker=None, torch_dtype=torch.float32 | |
| ) | |
| except: | |
| # Fallback for non-safetensor repos | |
| self.pipe = AutoPipelineForImage2Image.from_pretrained( | |
| model_id, safety_checker=None, torch_dtype=torch.float32, use_safetensors=False | |
| ) | |
| # Load LoRA | |
| if lora_id and lora_id != "None": | |
| try: | |
| self.pipe.load_lora_weights(lora_id) | |
| self.pipe.fuse_lora() | |
| except Exception as e: print(f"LoRA Load Fail: {e}") | |
| # Set Scheduler | |
| s_config = self.pipe.scheduler.config | |
| if scheduler_name == "LCM": | |
| self.pipe.scheduler = LCMScheduler.from_config(s_config) | |
| elif scheduler_name == "Euler A": | |
| self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(s_config) | |
| elif scheduler_name == "DDIM": | |
| self.pipe.scheduler = DDIMScheduler.from_config(s_config) | |
| elif scheduler_name == "DPM++ 2M": | |
| self.pipe.scheduler = DPMSolverMultistepScheduler.from_config(s_config) | |
| self.pipe.to(self.device) | |
| self.pipe.enable_attention_slicing() | |
| self.current_config = (model_id, lora_id, scheduler_name) | |
| def stop(self): | |
| self.stop_requested = True | |
| def render(self, | |
| prompts, neg_prompt, max_frames, width, height, | |
| zoom_s, angle_s, tx_s, ty_s, strength_s, noise_s, | |
| fps, steps, cadence, | |
| color_mode, border_mode, seed_behavior, init_image, | |
| model_id, lora_id, scheduler_name): | |
| self.stop_requested = False | |
| self.load_model(model_id, lora_id, scheduler_name) | |
| # 1. Parse Schedules | |
| keys = ['z', 'a', 'tx', 'ty', 'str', 'noi'] | |
| inputs = [zoom_s, angle_s, tx_s, ty_s, strength_s, noise_s] | |
| sched = {k: utils.parse_weight_string(v, max_frames) for k, v in zip(keys, inputs)} | |
| # 2. Setup Run | |
| run_id = uuid.uuid4().hex[:6] | |
| os.makedirs(f"out_{run_id}", exist_ok=True) | |
| # Init Image & Canvas | |
| if init_image: | |
| prev_img = init_image.resize((width, height), Image.LANCZOS) | |
| else: | |
| # Start with neutral grey noise if no init | |
| prev_img = Image.fromarray(np.random.randint(100, 150, (height, width, 3), dtype=np.uint8)) | |
| color_anchor = prev_img.copy() | |
| frames = [] | |
| # Seed Setup | |
| current_seed = random.randint(0, 2**32 - 1) | |
| print(f"Starting Run {run_id}. Cadence: {cadence}") | |
| # 3. Main Loop | |
| for i in range(max_frames): | |
| if self.stop_requested: break | |
| # Update Seed | |
| if seed_behavior == "iter": current_seed += 1 | |
| elif seed_behavior == "random": current_seed = random.randint(0, 2**32 - 1) | |
| # else fixed | |
| # Get Current Params | |
| args = {'angle': sched['a'][i], 'zoom': sched['z'][i], 'tx': sched['tx'][i], 'ty': sched['ty'][i]} | |
| # --- Deforum Logic --- | |
| # 1. WARP (Happens every frame) | |
| # Warp the *previous* result | |
| warped_img = utils.anim_frame_warp_2d(prev_img, args, border_mode) | |
| # 2. DECIDE: Generate or Skip (Cadence) | |
| # If Cadence=1, we generate every frame. | |
| # If Cadence=2, we generate on 0, 2, 4... and just warp on 1, 3, 5 | |
| if i % cadence == 0: | |
| # --- GENERATION STEP --- | |
| # A. Color Match (Pre-Diffusion) | |
| init_for_diff = utils.maintain_colors(warped_img, color_anchor, color_mode) | |
| # B. Add Noise | |
| init_for_diff = utils.add_noise(init_for_diff, sched['noi'][i]) | |
| # C. Prompt | |
| # Find latest prompt key <= current frame | |
| p_keys = sorted([k for k in prompts.keys() if k <= i]) | |
| curr_prompt = prompts[p_keys[-1]] | |
| # D. Strength Logic | |
| # Prevent 0-step crash | |
| curr_strength = sched['str'][i] | |
| if (steps * curr_strength) < 1.0: curr_strength = 1.1 / steps | |
| # E. Diffuse | |
| generator = torch.Generator(device=self.device).manual_seed(current_seed) | |
| # Using 1.5 - 2.0 guidance for LCM/SDXS to prevent frying | |
| cfg = 1.5 if "LCM" in scheduler_name else 7.5 | |
| gen_image = self.pipe( | |
| prompt=curr_prompt, | |
| negative_prompt=neg_prompt, | |
| image=init_for_diff, | |
| num_inference_steps=steps, | |
| strength=curr_strength, | |
| guidance_scale=cfg, | |
| width=width, height=height, | |
| generator=generator | |
| ).images[0] | |
| # F. Color Match (Post-Diffusion stability) | |
| if color_mode != 'None': | |
| gen_image = utils.maintain_colors(gen_image, color_anchor, color_mode) | |
| else: | |
| # --- CADENCE STEP (Turbo) --- | |
| # Just use the warped image. This is the "In-between" frame. | |
| # In true Deforum, we might blend this with the *next* generation, | |
| # but for real-time/CPU, returning the warped frame is the standard "Turbo" behavior. | |
| gen_image = warped_img | |
| # Update State | |
| prev_img = gen_image | |
| frames.append(gen_image) | |
| yield gen_image, None, None, f"Rendering Frame {i+1}/{max_frames}..." | |
| # 4. Finalize | |
| vid_path = f"out_{run_id}/video.mp4" | |
| self.save_video(frames, vid_path, fps) | |
| zip_path = f"out_{run_id}/frames.zip" | |
| self.save_zip(frames, zip_path) | |
| yield frames[-1], vid_path, zip_path, "Generation Complete" | |
| def save_video(self, frames, path, fps): | |
| if not frames: return | |
| w, h = frames[0].size | |
| out = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) | |
| for f in frames: | |
| out.write(cv2.cvtColor(np.array(f), cv2.COLOR_RGB2BGR)) | |
| out.release() | |
| def save_zip(self, frames, path): | |
| import io | |
| with zipfile.ZipFile(path, 'w') as zf: | |
| for j, f in enumerate(frames): | |
| name = f"f_{j:05d}.png" | |
| buf = io.BytesIO() | |
| f.save(buf, format="PNG") | |
| zf.writestr(name, buf.getvalue()) |