Deforum_Soonr / dev /deforum_engine4.py
AlekseyCalvin's picture
Rename deforum_engine4.py to dev/deforum_engine4.py
77dabc5 verified
import torch
import numpy as np
from diffusers import StableDiffusionImg2ImgPipeline, EulerAncestralDiscreteScheduler
from diffusers.utils import load_image
import utils
from PIL import Image
import os
import uuid
import zipfile
import cv2
class DeforumRunner:
def __init__(self, device="cpu"):
self.device = device
# Using the requested model
# Note: We use Img2ImgPipeline. Even for SDXS, we need 'steps' to make 'strength' work.
self.model_id = "IDKiro/sdxs-512-dreamshaper"
self.pipe = StableDiffusionImg2ImgPipeline.from_pretrained(
self.model_id,
safety_checker=None,
torch_dtype=torch.float32
)
self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(self.pipe.scheduler.config)
self.pipe.to(self.device)
self.pipe.set_progress_bar_config(disable=True)
def render(self,
prompts, # Dict {0: "prompt"}
neg_prompt,
max_frames,
width, height,
zoom_str, angle_str, tx_str, ty_str,
strength_str, noise_str,
fps, steps):
# 1. Parse all schedules up front
schedules = {
'zoom': utils.parse_weight_string(zoom_str, max_frames),
'angle': utils.parse_weight_string(angle_str, max_frames),
'tx': utils.parse_weight_string(tx_str, max_frames),
'ty': utils.parse_weight_string(ty_str, max_frames),
'strength': utils.parse_weight_string(strength_str, max_frames),
'noise': utils.parse_weight_string(noise_str, max_frames)
}
# 2. Setup output
run_id = uuid.uuid4().hex[:6]
output_dir = f"output_{run_id}"
os.makedirs(output_dir, exist_ok=True)
prev_img = None
color_anchor = None # For 'Match Frame 0'
generated_frames = []
print(f"Starting Deforum Run {run_id} for {max_frames} frames...")
# 3. Main Loop
for frame_idx in range(max_frames):
# --- A. Get Parameters for this frame ---
zoom = schedules['zoom'][frame_idx]
angle = schedules['angle'][frame_idx]
tx = schedules['tx'][frame_idx]
ty = schedules['ty'][frame_idx]
noise_amt = schedules['noise'][frame_idx]
strength = schedules['strength'][frame_idx]
# Get prompt (find latest key <= current frame)
prompt_keys = sorted([k for k in prompts.keys() if k <= frame_idx])
current_prompt = prompts[prompt_keys[-1]]
# --- B. Prepare Init Image ---
if prev_img is None:
# First frame: Generate from scratch (Text2Img via Img2Img with strength=1 effectively)
# We use a blank image or random noise as base if strictly using Img2Img,
# but better to just use strength=1.0 or high strength.
init_image = Image.new("RGB", (width, height), (0,0,0)) # Dummy
# High strength tells pipe to ignore init image
current_strength = 1.0
else:
# WARPING logic
warp_args = {'angle': angle, 'zoom': zoom, 'translation_x': tx, 'translation_y': ty}
init_image = utils.anim_frame_warp_2d(prev_img, warp_args)
# COLOR MATCHING (Match Frame 0)
if color_anchor is not None:
init_image = utils.maintain_colors(init_image, color_anchor)
# NOISE INJECTION
init_image = utils.add_noise(init_image, noise_amt)
current_strength = strength
# --- C. Generation ---
# If we are strictly 1-step, strength is ignored.
# We enforce min 2 steps if strength < 1.0 to ensure mixing.
# However, user wants SDXS optimization.
# Compromise: We pass the standard strength.
# If steps=1, strength is usually moot. We rely on 'steps' from UI (recommend 3).
gen_image = self.pipe(
prompt=current_prompt,
negative_prompt=neg_prompt,
image=init_image,
num_inference_steps=int(steps),
strength=current_strength,
guidance_scale=0.0, # SDXS Requirement
width=width,
height=height
).images[0]
# Update state
prev_img = gen_image
if color_anchor is None:
color_anchor = gen_image
generated_frames.append(gen_image)
# Yield results for UI
yield gen_image, None, None
# 4. Finalize (Video & Zip)
video_path = os.path.join(output_dir, "video.mp4")
self.save_video(generated_frames, video_path, fps)
zip_path = os.path.join(output_dir, "frames.zip")
self.save_zip(generated_frames, zip_path)
yield generated_frames[-1], video_path, zip_path
def save_video(self, frames, path, fps):
if not frames: return
w, h = frames[0].size
# Use OpenCV for CPU efficiency
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(path, fourcc, fps, (w, h))
for f in frames:
# PIL RGB -> OpenCV BGR
out.write(cv2.cvtColor(np.array(f), cv2.COLOR_RGB2BGR))
out.release()
def save_zip(self, frames, path):
with zipfile.ZipFile(path, 'w') as zf:
for i, f in enumerate(frames):
name = f"{i:05d}.png"
# Save to buffer to avoid writing temp files to disk
import io
buf = io.BytesIO()
f.save(buf, format="PNG")
zf.writestr(name, buf.getvalue())