Spaces:
Runtime error
Runtime error
File size: 6,823 Bytes
942288a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
import torch, os, uuid, zipfile, cv2, gc, random
import numpy as np
from diffusers import AutoPipelineForImage2Image, LCMScheduler, EulerAncestralDiscreteScheduler, DDIMScheduler, DPMSolverMultistepScheduler
from PIL import Image
import utils
class DeforumRunner:
def __init__(self, device="cpu"):
self.device = device
self.pipe = None
self.stop_requested = False
self.current_config = (None, None, None)
def load_model(self, model_id, lora_id, scheduler_name):
if (model_id, lora_id, scheduler_name) == self.current_config and self.pipe is not None:
return
print(f"Loading Model: {model_id}")
if self.pipe: del self.pipe; gc.collect()
try:
self.pipe = AutoPipelineForImage2Image.from_pretrained(
model_id, safety_checker=None, torch_dtype=torch.float32
)
except:
self.pipe = AutoPipelineForImage2Image.from_pretrained(
model_id, safety_checker=None, torch_dtype=torch.float32, use_safetensors=False
)
if lora_id and lora_id != "None":
try:
self.pipe.load_lora_weights(lora_id)
self.pipe.fuse_lora()
except Exception as e: print(f"LoRA Error: {e}")
conf = self.pipe.scheduler.config
if scheduler_name == "LCM": self.pipe.scheduler = LCMScheduler.from_config(conf)
elif scheduler_name == "Euler A": self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(conf)
elif scheduler_name == "DDIM": self.pipe.scheduler = DDIMScheduler.from_config(conf)
elif scheduler_name == "DPM++ 2M": self.pipe.scheduler = DPMSolverMultistepScheduler.from_config(conf)
self.pipe.to(self.device)
self.pipe.enable_attention_slicing()
self.current_config = (model_id, lora_id, scheduler_name)
def stop(self): self.stop_requested = True
def render(self,
prompts, neg_prompt, max_frames, width, height,
zoom_s, angle_s, tx_s, ty_s, strength_s, noise_s,
fps, steps, cfg_scale, cadence,
color_mode, border_mode, seed_behavior, init_image,
model_id, lora_id, scheduler_name):
self.stop_requested = False
self.load_model(model_id, lora_id, scheduler_name)
# Parse Schedules
keys = ['z', 'a', 'tx', 'ty', 'str', 'noi']
inputs = [zoom_s, angle_s, tx_s, ty_s, strength_s, noise_s]
sched = {k: utils.parse_weight_string(v, max_frames) for k, v in zip(keys, inputs)}
run_id = uuid.uuid4().hex[:6]
os.makedirs(f"out_{run_id}", exist_ok=True)
# Init Image logic
if init_image:
prev_img = init_image.resize((width, height), Image.LANCZOS)
else:
prev_img = Image.fromarray(np.random.randint(0, 255, (height, width, 3), dtype=np.uint8))
color_anchor = prev_img.copy()
frames = []
# Global Seed Init
base_seed = random.randint(0, 2**32 - 1)
print(f"Run {run_id} Started. Seed: {base_seed}")
for i in range(max_frames):
if self.stop_requested: break
# --- SEED MANAGEMENT (Crucial for stability) ---
if seed_behavior == "fixed":
frame_seed = base_seed
elif seed_behavior == "random":
frame_seed = random.randint(0, 2**32 - 1)
else: # iter
frame_seed = base_seed + i
# Lock ALL RNGs for this frame
random.seed(frame_seed)
np.random.seed(frame_seed)
torch.manual_seed(frame_seed)
# --- 1. WARP ---
# Apply transform to the RESULT of the previous frame
args = {'angle': sched['a'][i], 'zoom': sched['z'][i], 'tx': sched['tx'][i], 'ty': sched['ty'][i]}
warped_img = utils.anim_frame_warp_2d(prev_img, args, border_mode)
# --- 2. CADENCE CHECK ---
if i % cadence == 0:
# --- ACTIVE DIFFUSION STEP ---
# A. Color Match
init_for_diff = utils.maintain_colors(warped_img, color_anchor, color_mode)
# B. Noise Injection (Seeded by np.random above)
init_for_diff = utils.add_noise(init_for_diff, sched['noi'][i])
# C. Prepare Generation
curr_prompt = prompts[max(k for k in prompts.keys() if k <= i)]
# D. Strength Safety
strength = sched['str'][i]
# If using SDXS/LCM with very few steps, ensure strength isn't 0-ing out steps
eff_steps = int(steps * strength)
if eff_steps < 1: strength = min(1.0, 1.1 / steps)
# E. Generate
gen_image = self.pipe(
prompt=curr_prompt,
negative_prompt=neg_prompt,
image=init_for_diff,
num_inference_steps=steps,
strength=strength,
guidance_scale=cfg_scale,
width=width, height=height
).images[0]
# F. Post-Color Stability
if color_mode != 'None':
gen_image = utils.maintain_colors(gen_image, color_anchor, color_mode)
# G. Update State for NEXT frame
prev_img = gen_image
else:
# --- TURBO STEP (Cadence) ---
# We show the warped image, AND we use it as the base for the next warp
gen_image = warped_img
prev_img = warped_img
frames.append(gen_image)
yield gen_image, None, None, f"Frame {i+1}/{max_frames}"
# Finalize
vid_p = f"out_{run_id}/video.mp4"
self.save_video(frames, vid_p, fps)
zip_p = f"out_{run_id}/frames.zip"
self.save_zip(frames, zip_p)
yield frames[-1], vid_p, zip_p, "Done"
def save_video(self, frames, path, fps):
if not frames: return
w, h = frames[0].size
out = cv2.VideoWriter(path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
for f in frames: out.write(cv2.cvtColor(np.array(f), cv2.COLOR_RGB2BGR))
out.release()
def save_zip(self, frames, path):
import io
with zipfile.ZipFile(path, 'w') as zf:
for j, f in enumerate(frames):
buf = io.BytesIO()
f.save(buf, format="PNG")
zf.writestr(f"f_{j:05d}.png", buf.getvalue()) |