File size: 6,022 Bytes
986c65b
 
 
 
 
da0ff6a
986c65b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import torch
import numpy as np
from diffusers import StableDiffusionImg2ImgPipeline, EulerAncestralDiscreteScheduler
from diffusers.utils import load_image
import utils
from PIL import Image
import os
import uuid
import zipfile
import cv2

class DeforumRunner:
    def __init__(self, device="cpu"):
        self.device = device
        # Using the requested model
        # Note: We use Img2ImgPipeline. Even for SDXS, we need 'steps' to make 'strength' work.
        self.model_id = "IDKiro/sdxs-512-dreamshaper"
        self.pipe = StableDiffusionImg2ImgPipeline.from_pretrained(
            self.model_id, 
            safety_checker=None,
            torch_dtype=torch.float32
        )
        self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(self.pipe.scheduler.config)
        self.pipe.to(self.device)
        self.pipe.set_progress_bar_config(disable=True)

    def render(self, 
               prompts,         # Dict {0: "prompt"}
               neg_prompt,
               max_frames,
               width, height,
               zoom_str, angle_str, tx_str, ty_str,
               strength_str, noise_str,
               fps, steps):
        
        # 1. Parse all schedules up front
        schedules = {
            'zoom': utils.parse_weight_string(zoom_str, max_frames),
            'angle': utils.parse_weight_string(angle_str, max_frames),
            'tx': utils.parse_weight_string(tx_str, max_frames),
            'ty': utils.parse_weight_string(ty_str, max_frames),
            'strength': utils.parse_weight_string(strength_str, max_frames),
            'noise': utils.parse_weight_string(noise_str, max_frames)
        }
        
        # 2. Setup output
        run_id = uuid.uuid4().hex[:6]
        output_dir = f"output_{run_id}"
        os.makedirs(output_dir, exist_ok=True)
        
        prev_img = None
        color_anchor = None # For 'Match Frame 0'
        
        generated_frames = []
        
        print(f"Starting Deforum Run {run_id} for {max_frames} frames...")

        # 3. Main Loop
        for frame_idx in range(max_frames):
            
            # --- A. Get Parameters for this frame ---
            zoom = schedules['zoom'][frame_idx]
            angle = schedules['angle'][frame_idx]
            tx = schedules['tx'][frame_idx]
            ty = schedules['ty'][frame_idx]
            noise_amt = schedules['noise'][frame_idx]
            strength = schedules['strength'][frame_idx]
            
            # Get prompt (find latest key <= current frame)
            prompt_keys = sorted([k for k in prompts.keys() if k <= frame_idx])
            current_prompt = prompts[prompt_keys[-1]]
            
            # --- B. Prepare Init Image ---
            if prev_img is None:
                # First frame: Generate from scratch (Text2Img via Img2Img with strength=1 effectively)
                # We use a blank image or random noise as base if strictly using Img2Img, 
                # but better to just use strength=1.0 or high strength.
                init_image = Image.new("RGB", (width, height), (0,0,0)) # Dummy
                # High strength tells pipe to ignore init image
                current_strength = 1.0 
            else:
                # WARPING logic
                warp_args = {'angle': angle, 'zoom': zoom, 'translation_x': tx, 'translation_y': ty}
                init_image = utils.anim_frame_warp_2d(prev_img, warp_args)
                
                # COLOR MATCHING (Match Frame 0)
                if color_anchor is not None:
                    init_image = utils.maintain_colors(init_image, color_anchor)
                
                # NOISE INJECTION
                init_image = utils.add_noise(init_image, noise_amt)
                
                current_strength = strength

            # --- C. Generation ---
            # If we are strictly 1-step, strength is ignored. 
            # We enforce min 2 steps if strength < 1.0 to ensure mixing.
            # However, user wants SDXS optimization. 
            # Compromise: We pass the standard strength. 
            # If steps=1, strength is usually moot. We rely on 'steps' from UI (recommend 3).
            
            gen_image = self.pipe(
                prompt=current_prompt,
                negative_prompt=neg_prompt,
                image=init_image,
                num_inference_steps=int(steps), 
                strength=current_strength,
                guidance_scale=0.0, # SDXS Requirement
                width=width,
                height=height
            ).images[0]
            
            # Update state
            prev_img = gen_image
            if color_anchor is None:
                color_anchor = gen_image
            
            generated_frames.append(gen_image)
            
            # Yield results for UI
            yield gen_image, None, None
            
        # 4. Finalize (Video & Zip)
        video_path = os.path.join(output_dir, "video.mp4")
        self.save_video(generated_frames, video_path, fps)
        
        zip_path = os.path.join(output_dir, "frames.zip")
        self.save_zip(generated_frames, zip_path)
        
        yield generated_frames[-1], video_path, zip_path

    def save_video(self, frames, path, fps):
        if not frames: return
        w, h = frames[0].size
        # Use OpenCV for CPU efficiency
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(path, fourcc, fps, (w, h))
        for f in frames:
            # PIL RGB -> OpenCV BGR
            out.write(cv2.cvtColor(np.array(f), cv2.COLOR_RGB2BGR))
        out.release()

    def save_zip(self, frames, path):
        with zipfile.ZipFile(path, 'w') as zf:
            for i, f in enumerate(frames):
                name = f"{i:05d}.png"
                # Save to buffer to avoid writing temp files to disk
                import io
                buf = io.BytesIO()
                f.save(buf, format="PNG")
                zf.writestr(name, buf.getvalue())