Spaces:
Runtime error
Runtime error
File size: 6,022 Bytes
986c65b da0ff6a 986c65b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
import torch
import numpy as np
from diffusers import StableDiffusionImg2ImgPipeline, EulerAncestralDiscreteScheduler
from diffusers.utils import load_image
import utils
from PIL import Image
import os
import uuid
import zipfile
import cv2
class DeforumRunner:
def __init__(self, device="cpu"):
self.device = device
# Using the requested model
# Note: We use Img2ImgPipeline. Even for SDXS, we need 'steps' to make 'strength' work.
self.model_id = "IDKiro/sdxs-512-dreamshaper"
self.pipe = StableDiffusionImg2ImgPipeline.from_pretrained(
self.model_id,
safety_checker=None,
torch_dtype=torch.float32
)
self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(self.pipe.scheduler.config)
self.pipe.to(self.device)
self.pipe.set_progress_bar_config(disable=True)
def render(self,
prompts, # Dict {0: "prompt"}
neg_prompt,
max_frames,
width, height,
zoom_str, angle_str, tx_str, ty_str,
strength_str, noise_str,
fps, steps):
# 1. Parse all schedules up front
schedules = {
'zoom': utils.parse_weight_string(zoom_str, max_frames),
'angle': utils.parse_weight_string(angle_str, max_frames),
'tx': utils.parse_weight_string(tx_str, max_frames),
'ty': utils.parse_weight_string(ty_str, max_frames),
'strength': utils.parse_weight_string(strength_str, max_frames),
'noise': utils.parse_weight_string(noise_str, max_frames)
}
# 2. Setup output
run_id = uuid.uuid4().hex[:6]
output_dir = f"output_{run_id}"
os.makedirs(output_dir, exist_ok=True)
prev_img = None
color_anchor = None # For 'Match Frame 0'
generated_frames = []
print(f"Starting Deforum Run {run_id} for {max_frames} frames...")
# 3. Main Loop
for frame_idx in range(max_frames):
# --- A. Get Parameters for this frame ---
zoom = schedules['zoom'][frame_idx]
angle = schedules['angle'][frame_idx]
tx = schedules['tx'][frame_idx]
ty = schedules['ty'][frame_idx]
noise_amt = schedules['noise'][frame_idx]
strength = schedules['strength'][frame_idx]
# Get prompt (find latest key <= current frame)
prompt_keys = sorted([k for k in prompts.keys() if k <= frame_idx])
current_prompt = prompts[prompt_keys[-1]]
# --- B. Prepare Init Image ---
if prev_img is None:
# First frame: Generate from scratch (Text2Img via Img2Img with strength=1 effectively)
# We use a blank image or random noise as base if strictly using Img2Img,
# but better to just use strength=1.0 or high strength.
init_image = Image.new("RGB", (width, height), (0,0,0)) # Dummy
# High strength tells pipe to ignore init image
current_strength = 1.0
else:
# WARPING logic
warp_args = {'angle': angle, 'zoom': zoom, 'translation_x': tx, 'translation_y': ty}
init_image = utils.anim_frame_warp_2d(prev_img, warp_args)
# COLOR MATCHING (Match Frame 0)
if color_anchor is not None:
init_image = utils.maintain_colors(init_image, color_anchor)
# NOISE INJECTION
init_image = utils.add_noise(init_image, noise_amt)
current_strength = strength
# --- C. Generation ---
# If we are strictly 1-step, strength is ignored.
# We enforce min 2 steps if strength < 1.0 to ensure mixing.
# However, user wants SDXS optimization.
# Compromise: We pass the standard strength.
# If steps=1, strength is usually moot. We rely on 'steps' from UI (recommend 3).
gen_image = self.pipe(
prompt=current_prompt,
negative_prompt=neg_prompt,
image=init_image,
num_inference_steps=int(steps),
strength=current_strength,
guidance_scale=0.0, # SDXS Requirement
width=width,
height=height
).images[0]
# Update state
prev_img = gen_image
if color_anchor is None:
color_anchor = gen_image
generated_frames.append(gen_image)
# Yield results for UI
yield gen_image, None, None
# 4. Finalize (Video & Zip)
video_path = os.path.join(output_dir, "video.mp4")
self.save_video(generated_frames, video_path, fps)
zip_path = os.path.join(output_dir, "frames.zip")
self.save_zip(generated_frames, zip_path)
yield generated_frames[-1], video_path, zip_path
def save_video(self, frames, path, fps):
if not frames: return
w, h = frames[0].size
# Use OpenCV for CPU efficiency
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(path, fourcc, fps, (w, h))
for f in frames:
# PIL RGB -> OpenCV BGR
out.write(cv2.cvtColor(np.array(f), cv2.COLOR_RGB2BGR))
out.release()
def save_zip(self, frames, path):
with zipfile.ZipFile(path, 'w') as zf:
for i, f in enumerate(frames):
name = f"{i:05d}.png"
# Save to buffer to avoid writing temp files to disk
import io
buf = io.BytesIO()
f.save(buf, format="PNG")
zf.writestr(name, buf.getvalue()) |