Deforum_Soonr / dev /deforum_engine3.py
AlekseyCalvin's picture
Rename deforum_engine3.py to dev/deforum_engine3.py
e49bd6b verified
import torch, os, uuid, zipfile, cv2, gc, random
import numpy as np
from diffusers import AutoPipelineForImage2Image, LCMScheduler, EulerAncestralDiscreteScheduler, DDIMScheduler, DPMSolverMultistepScheduler
from PIL import Image
import utils
class DeforumRunner:
def __init__(self, device="cpu"):
self.device = device
self.pipe = None
self.stop_requested = False
self.current_config = (None, None, None) # Model, LoRA, Scheduler
def load_model(self, model_id, lora_id, scheduler_name):
# Avoid reloading if not changed
if (model_id, lora_id, scheduler_name) == self.current_config and self.pipe is not None:
return
print(f"Loading Model: {model_id} with {scheduler_name}")
if self.pipe: del self.pipe; gc.collect()
try:
self.pipe = AutoPipelineForImage2Image.from_pretrained(
model_id, safety_checker=None, torch_dtype=torch.float32
)
except:
# Fallback for non-safetensor repos
self.pipe = AutoPipelineForImage2Image.from_pretrained(
model_id, safety_checker=None, torch_dtype=torch.float32, use_safetensors=False
)
# Load LoRA
if lora_id and lora_id != "None":
try:
self.pipe.load_lora_weights(lora_id)
self.pipe.fuse_lora()
except Exception as e: print(f"LoRA Load Fail: {e}")
# Set Scheduler
s_config = self.pipe.scheduler.config
if scheduler_name == "LCM":
self.pipe.scheduler = LCMScheduler.from_config(s_config)
elif scheduler_name == "Euler A":
self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(s_config)
elif scheduler_name == "DDIM":
self.pipe.scheduler = DDIMScheduler.from_config(s_config)
elif scheduler_name == "DPM++ 2M":
self.pipe.scheduler = DPMSolverMultistepScheduler.from_config(s_config)
self.pipe.to(self.device)
self.pipe.enable_attention_slicing()
self.current_config = (model_id, lora_id, scheduler_name)
def stop(self):
self.stop_requested = True
def render(self,
prompts, neg_prompt, max_frames, width, height,
zoom_s, angle_s, tx_s, ty_s, strength_s, noise_s,
fps, steps, cadence,
color_mode, border_mode, seed_behavior, init_image,
model_id, lora_id, scheduler_name):
self.stop_requested = False
self.load_model(model_id, lora_id, scheduler_name)
# 1. Parse Schedules
keys = ['z', 'a', 'tx', 'ty', 'str', 'noi']
inputs = [zoom_s, angle_s, tx_s, ty_s, strength_s, noise_s]
sched = {k: utils.parse_weight_string(v, max_frames) for k, v in zip(keys, inputs)}
# 2. Setup Run
run_id = uuid.uuid4().hex[:6]
os.makedirs(f"out_{run_id}", exist_ok=True)
# Init Image & Canvas
if init_image:
prev_img = init_image.resize((width, height), Image.LANCZOS)
else:
# Start with neutral grey noise if no init
prev_img = Image.fromarray(np.random.randint(100, 150, (height, width, 3), dtype=np.uint8))
color_anchor = prev_img.copy()
frames = []
# Seed Setup
current_seed = random.randint(0, 2**32 - 1)
print(f"Starting Run {run_id}. Cadence: {cadence}")
# 3. Main Loop
for i in range(max_frames):
if self.stop_requested: break
# Update Seed
if seed_behavior == "iter": current_seed += 1
elif seed_behavior == "random": current_seed = random.randint(0, 2**32 - 1)
# else fixed
# Get Current Params
args = {'angle': sched['a'][i], 'zoom': sched['z'][i], 'tx': sched['tx'][i], 'ty': sched['ty'][i]}
# --- Deforum Logic ---
# 1. WARP (Happens every frame)
# Warp the *previous* result
warped_img = utils.anim_frame_warp_2d(prev_img, args, border_mode)
# 2. DECIDE: Generate or Skip (Cadence)
# If Cadence=1, we generate every frame.
# If Cadence=2, we generate on 0, 2, 4... and just warp on 1, 3, 5
if i % cadence == 0:
# --- GENERATION STEP ---
# A. Color Match (Pre-Diffusion)
init_for_diff = utils.maintain_colors(warped_img, color_anchor, color_mode)
# B. Add Noise
init_for_diff = utils.add_noise(init_for_diff, sched['noi'][i])
# C. Prompt
# Find latest prompt key <= current frame
p_keys = sorted([k for k in prompts.keys() if k <= i])
curr_prompt = prompts[p_keys[-1]]
# D. Strength Logic
# Prevent 0-step crash
curr_strength = sched['str'][i]
if (steps * curr_strength) < 1.0: curr_strength = 1.1 / steps
# E. Diffuse
generator = torch.Generator(device=self.device).manual_seed(current_seed)
# Using 1.5 - 2.0 guidance for LCM/SDXS to prevent frying
cfg = 1.5 if "LCM" in scheduler_name else 7.5
gen_image = self.pipe(
prompt=curr_prompt,
negative_prompt=neg_prompt,
image=init_for_diff,
num_inference_steps=steps,
strength=curr_strength,
guidance_scale=cfg,
width=width, height=height,
generator=generator
).images[0]
# F. Color Match (Post-Diffusion stability)
if color_mode != 'None':
gen_image = utils.maintain_colors(gen_image, color_anchor, color_mode)
else:
# --- CADENCE STEP (Turbo) ---
# Just use the warped image. This is the "In-between" frame.
# In true Deforum, we might blend this with the *next* generation,
# but for real-time/CPU, returning the warped frame is the standard "Turbo" behavior.
gen_image = warped_img
# Update State
prev_img = gen_image
frames.append(gen_image)
yield gen_image, None, None, f"Rendering Frame {i+1}/{max_frames}..."
# 4. Finalize
vid_path = f"out_{run_id}/video.mp4"
self.save_video(frames, vid_path, fps)
zip_path = f"out_{run_id}/frames.zip"
self.save_zip(frames, zip_path)
yield frames[-1], vid_path, zip_path, "Generation Complete"
def save_video(self, frames, path, fps):
if not frames: return
w, h = frames[0].size
out = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
for f in frames:
out.write(cv2.cvtColor(np.array(f), cv2.COLOR_RGB2BGR))
out.release()
def save_zip(self, frames, path):
import io
with zipfile.ZipFile(path, 'w') as zf:
for j, f in enumerate(frames):
name = f"f_{j:05d}.png"
buf = io.BytesIO()
f.save(buf, format="PNG")
zf.writestr(name, buf.getvalue())