Spaces:
Runtime error
Runtime error
Create deforum_engine.py
Browse files- deforum_engine.py +150 -0
deforum_engine.py
ADDED
|
@@ -0,0 +1,150 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
import torch
|
| 2 |
+
import numpy as np
|
| 3 |
+
from diffusers import StableDiffusionImg2ImgPipeline, EulerAncestralDiscreteScheduler
|
| 4 |
+
from diffusers.utils import load_image
|
| 5 |
+
import utils
|
| 6 |
+
import os
|
| 7 |
+
import uuid
|
| 8 |
+
import zipfile
|
| 9 |
+
import cv2
|
| 10 |
+
|
| 11 |
+
class DeforumRunner:
|
| 12 |
+
def __init__(self, device="cpu"):
|
| 13 |
+
self.device = device
|
| 14 |
+
# Using the requested model
|
| 15 |
+
# Note: We use Img2ImgPipeline. Even for SDXS, we need 'steps' to make 'strength' work.
|
| 16 |
+
self.model_id = "IDKiro/sdxs-512-dreamshaper"
|
| 17 |
+
self.pipe = StableDiffusionImg2ImgPipeline.from_pretrained(
|
| 18 |
+
self.model_id,
|
| 19 |
+
safety_checker=None,
|
| 20 |
+
torch_dtype=torch.float32
|
| 21 |
+
)
|
| 22 |
+
self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(self.pipe.scheduler.config)
|
| 23 |
+
self.pipe.to(self.device)
|
| 24 |
+
self.pipe.set_progress_bar_config(disable=True)
|
| 25 |
+
|
| 26 |
+
def render(self,
|
| 27 |
+
prompts, # Dict {0: "prompt"}
|
| 28 |
+
neg_prompt,
|
| 29 |
+
max_frames,
|
| 30 |
+
width, height,
|
| 31 |
+
zoom_str, angle_str, tx_str, ty_str,
|
| 32 |
+
strength_str, noise_str,
|
| 33 |
+
fps, steps):
|
| 34 |
+
|
| 35 |
+
# 1. Parse all schedules up front
|
| 36 |
+
schedules = {
|
| 37 |
+
'zoom': utils.parse_weight_string(zoom_str, max_frames),
|
| 38 |
+
'angle': utils.parse_weight_string(angle_str, max_frames),
|
| 39 |
+
'tx': utils.parse_weight_string(tx_str, max_frames),
|
| 40 |
+
'ty': utils.parse_weight_string(ty_str, max_frames),
|
| 41 |
+
'strength': utils.parse_weight_string(strength_str, max_frames),
|
| 42 |
+
'noise': utils.parse_weight_string(noise_str, max_frames)
|
| 43 |
+
}
|
| 44 |
+
|
| 45 |
+
# 2. Setup output
|
| 46 |
+
run_id = uuid.uuid4().hex[:6]
|
| 47 |
+
output_dir = f"output_{run_id}"
|
| 48 |
+
os.makedirs(output_dir, exist_ok=True)
|
| 49 |
+
|
| 50 |
+
prev_img = None
|
| 51 |
+
color_anchor = None # For 'Match Frame 0'
|
| 52 |
+
|
| 53 |
+
generated_frames = []
|
| 54 |
+
|
| 55 |
+
print(f"Starting Deforum Run {run_id} for {max_frames} frames...")
|
| 56 |
+
|
| 57 |
+
# 3. Main Loop
|
| 58 |
+
for frame_idx in range(max_frames):
|
| 59 |
+
|
| 60 |
+
# --- A. Get Parameters for this frame ---
|
| 61 |
+
zoom = schedules['zoom'][frame_idx]
|
| 62 |
+
angle = schedules['angle'][frame_idx]
|
| 63 |
+
tx = schedules['tx'][frame_idx]
|
| 64 |
+
ty = schedules['ty'][frame_idx]
|
| 65 |
+
noise_amt = schedules['noise'][frame_idx]
|
| 66 |
+
strength = schedules['strength'][frame_idx]
|
| 67 |
+
|
| 68 |
+
# Get prompt (find latest key <= current frame)
|
| 69 |
+
prompt_keys = sorted([k for k in prompts.keys() if k <= frame_idx])
|
| 70 |
+
current_prompt = prompts[prompt_keys[-1]]
|
| 71 |
+
|
| 72 |
+
# --- B. Prepare Init Image ---
|
| 73 |
+
if prev_img is None:
|
| 74 |
+
# First frame: Generate from scratch (Text2Img via Img2Img with strength=1 effectively)
|
| 75 |
+
# We use a blank image or random noise as base if strictly using Img2Img,
|
| 76 |
+
# but better to just use strength=1.0 or high strength.
|
| 77 |
+
init_image = Image.new("RGB", (width, height), (0,0,0)) # Dummy
|
| 78 |
+
# High strength tells pipe to ignore init image
|
| 79 |
+
current_strength = 1.0
|
| 80 |
+
else:
|
| 81 |
+
# WARPING logic
|
| 82 |
+
warp_args = {'angle': angle, 'zoom': zoom, 'translation_x': tx, 'translation_y': ty}
|
| 83 |
+
init_image = utils.anim_frame_warp_2d(prev_img, warp_args)
|
| 84 |
+
|
| 85 |
+
# COLOR MATCHING (Match Frame 0)
|
| 86 |
+
if color_anchor is not None:
|
| 87 |
+
init_image = utils.maintain_colors(init_image, color_anchor)
|
| 88 |
+
|
| 89 |
+
# NOISE INJECTION
|
| 90 |
+
init_image = utils.add_noise(init_image, noise_amt)
|
| 91 |
+
|
| 92 |
+
current_strength = strength
|
| 93 |
+
|
| 94 |
+
# --- C. Generation ---
|
| 95 |
+
# If we are strictly 1-step, strength is ignored.
|
| 96 |
+
# We enforce min 2 steps if strength < 1.0 to ensure mixing.
|
| 97 |
+
# However, user wants SDXS optimization.
|
| 98 |
+
# Compromise: We pass the standard strength.
|
| 99 |
+
# If steps=1, strength is usually moot. We rely on 'steps' from UI (recommend 3).
|
| 100 |
+
|
| 101 |
+
gen_image = self.pipe(
|
| 102 |
+
prompt=current_prompt,
|
| 103 |
+
negative_prompt=neg_prompt,
|
| 104 |
+
image=init_image,
|
| 105 |
+
num_inference_steps=int(steps),
|
| 106 |
+
strength=current_strength,
|
| 107 |
+
guidance_scale=0.0, # SDXS Requirement
|
| 108 |
+
width=width,
|
| 109 |
+
height=height
|
| 110 |
+
).images[0]
|
| 111 |
+
|
| 112 |
+
# Update state
|
| 113 |
+
prev_img = gen_image
|
| 114 |
+
if color_anchor is None:
|
| 115 |
+
color_anchor = gen_image
|
| 116 |
+
|
| 117 |
+
generated_frames.append(gen_image)
|
| 118 |
+
|
| 119 |
+
# Yield results for UI
|
| 120 |
+
yield gen_image, None, None
|
| 121 |
+
|
| 122 |
+
# 4. Finalize (Video & Zip)
|
| 123 |
+
video_path = os.path.join(output_dir, "video.mp4")
|
| 124 |
+
self.save_video(generated_frames, video_path, fps)
|
| 125 |
+
|
| 126 |
+
zip_path = os.path.join(output_dir, "frames.zip")
|
| 127 |
+
self.save_zip(generated_frames, zip_path)
|
| 128 |
+
|
| 129 |
+
yield generated_frames[-1], video_path, zip_path
|
| 130 |
+
|
| 131 |
+
def save_video(self, frames, path, fps):
|
| 132 |
+
if not frames: return
|
| 133 |
+
w, h = frames[0].size
|
| 134 |
+
# Use OpenCV for CPU efficiency
|
| 135 |
+
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
| 136 |
+
out = cv2.VideoWriter(path, fourcc, fps, (w, h))
|
| 137 |
+
for f in frames:
|
| 138 |
+
# PIL RGB -> OpenCV BGR
|
| 139 |
+
out.write(cv2.cvtColor(np.array(f), cv2.COLOR_RGB2BGR))
|
| 140 |
+
out.release()
|
| 141 |
+
|
| 142 |
+
def save_zip(self, frames, path):
|
| 143 |
+
with zipfile.ZipFile(path, 'w') as zf:
|
| 144 |
+
for i, f in enumerate(frames):
|
| 145 |
+
name = f"{i:05d}.png"
|
| 146 |
+
# Save to buffer to avoid writing temp files to disk
|
| 147 |
+
import io
|
| 148 |
+
buf = io.BytesIO()
|
| 149 |
+
f.save(buf, format="PNG")
|
| 150 |
+
zf.writestr(name, buf.getvalue())
|