Spaces:
Runtime error
Runtime error
| import torch | |
| import numpy as np | |
| from diffusers import StableDiffusionImg2ImgPipeline, EulerAncestralDiscreteScheduler | |
| from diffusers.utils import load_image | |
| import utils | |
| from PIL import Image | |
| import os | |
| import uuid | |
| import zipfile | |
| import cv2 | |
| class DeforumRunner: | |
| def __init__(self, device="cpu"): | |
| self.device = device | |
| # Using the requested model | |
| # Note: We use Img2ImgPipeline. Even for SDXS, we need 'steps' to make 'strength' work. | |
| self.model_id = "IDKiro/sdxs-512-dreamshaper" | |
| self.pipe = StableDiffusionImg2ImgPipeline.from_pretrained( | |
| self.model_id, | |
| safety_checker=None, | |
| torch_dtype=torch.float32 | |
| ) | |
| self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(self.pipe.scheduler.config) | |
| self.pipe.to(self.device) | |
| self.pipe.set_progress_bar_config(disable=True) | |
| def render(self, | |
| prompts, # Dict {0: "prompt"} | |
| neg_prompt, | |
| max_frames, | |
| width, height, | |
| zoom_str, angle_str, tx_str, ty_str, | |
| strength_str, noise_str, | |
| fps, steps): | |
| # 1. Parse all schedules up front | |
| schedules = { | |
| 'zoom': utils.parse_weight_string(zoom_str, max_frames), | |
| 'angle': utils.parse_weight_string(angle_str, max_frames), | |
| 'tx': utils.parse_weight_string(tx_str, max_frames), | |
| 'ty': utils.parse_weight_string(ty_str, max_frames), | |
| 'strength': utils.parse_weight_string(strength_str, max_frames), | |
| 'noise': utils.parse_weight_string(noise_str, max_frames) | |
| } | |
| # 2. Setup output | |
| run_id = uuid.uuid4().hex[:6] | |
| output_dir = f"output_{run_id}" | |
| os.makedirs(output_dir, exist_ok=True) | |
| prev_img = None | |
| color_anchor = None # For 'Match Frame 0' | |
| generated_frames = [] | |
| print(f"Starting Deforum Run {run_id} for {max_frames} frames...") | |
| # 3. Main Loop | |
| for frame_idx in range(max_frames): | |
| # --- A. Get Parameters for this frame --- | |
| zoom = schedules['zoom'][frame_idx] | |
| angle = schedules['angle'][frame_idx] | |
| tx = schedules['tx'][frame_idx] | |
| ty = schedules['ty'][frame_idx] | |
| noise_amt = schedules['noise'][frame_idx] | |
| strength = schedules['strength'][frame_idx] | |
| # Get prompt (find latest key <= current frame) | |
| prompt_keys = sorted([k for k in prompts.keys() if k <= frame_idx]) | |
| current_prompt = prompts[prompt_keys[-1]] | |
| # --- B. Prepare Init Image --- | |
| if prev_img is None: | |
| # First frame: Generate from scratch (Text2Img via Img2Img with strength=1 effectively) | |
| # We use a blank image or random noise as base if strictly using Img2Img, | |
| # but better to just use strength=1.0 or high strength. | |
| init_image = Image.new("RGB", (width, height), (0,0,0)) # Dummy | |
| # High strength tells pipe to ignore init image | |
| current_strength = 1.0 | |
| else: | |
| # WARPING logic | |
| warp_args = {'angle': angle, 'zoom': zoom, 'translation_x': tx, 'translation_y': ty} | |
| init_image = utils.anim_frame_warp_2d(prev_img, warp_args) | |
| # COLOR MATCHING (Match Frame 0) | |
| if color_anchor is not None: | |
| init_image = utils.maintain_colors(init_image, color_anchor) | |
| # NOISE INJECTION | |
| init_image = utils.add_noise(init_image, noise_amt) | |
| current_strength = strength | |
| # --- C. Generation --- | |
| # If we are strictly 1-step, strength is ignored. | |
| # We enforce min 2 steps if strength < 1.0 to ensure mixing. | |
| # However, user wants SDXS optimization. | |
| # Compromise: We pass the standard strength. | |
| # If steps=1, strength is usually moot. We rely on 'steps' from UI (recommend 3). | |
| gen_image = self.pipe( | |
| prompt=current_prompt, | |
| negative_prompt=neg_prompt, | |
| image=init_image, | |
| num_inference_steps=int(steps), | |
| strength=current_strength, | |
| guidance_scale=0.0, # SDXS Requirement | |
| width=width, | |
| height=height | |
| ).images[0] | |
| # Update state | |
| prev_img = gen_image | |
| if color_anchor is None: | |
| color_anchor = gen_image | |
| generated_frames.append(gen_image) | |
| # Yield results for UI | |
| yield gen_image, None, None | |
| # 4. Finalize (Video & Zip) | |
| video_path = os.path.join(output_dir, "video.mp4") | |
| self.save_video(generated_frames, video_path, fps) | |
| zip_path = os.path.join(output_dir, "frames.zip") | |
| self.save_zip(generated_frames, zip_path) | |
| yield generated_frames[-1], video_path, zip_path | |
| def save_video(self, frames, path, fps): | |
| if not frames: return | |
| w, h = frames[0].size | |
| # Use OpenCV for CPU efficiency | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(path, fourcc, fps, (w, h)) | |
| for f in frames: | |
| # PIL RGB -> OpenCV BGR | |
| out.write(cv2.cvtColor(np.array(f), cv2.COLOR_RGB2BGR)) | |
| out.release() | |
| def save_zip(self, frames, path): | |
| with zipfile.ZipFile(path, 'w') as zf: | |
| for i, f in enumerate(frames): | |
| name = f"{i:05d}.png" | |
| # Save to buffer to avoid writing temp files to disk | |
| import io | |
| buf = io.BytesIO() | |
| f.save(buf, format="PNG") | |
| zf.writestr(name, buf.getvalue()) |