AEmotionStudio commited on
Commit
e84b148
·
verified ·
1 Parent(s): e71d4cf

Mirror apg_guidance.py from ACE-Step/acestep-v15-base

Browse files
checkpoints/acestep-v15-base/apg_guidance.py ADDED
@@ -0,0 +1,220 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import torch
2
+ import torch.nn.functional as F
3
+
4
+
5
+ class MomentumBuffer:
6
+
7
+ def __init__(self, momentum: float = -0.75):
8
+ self.momentum = momentum
9
+ self.running_average = 0
10
+
11
+ def update(self, update_value: torch.Tensor):
12
+ new_average = self.momentum * self.running_average
13
+ self.running_average = update_value + new_average
14
+
15
+
16
+ def project(
17
+ v0: torch.Tensor, # [B, C, T]
18
+ v1: torch.Tensor, # [B, C, T]
19
+ dims=[-1],
20
+ ):
21
+ dtype = v0.dtype
22
+ device_type = v0.device.type
23
+ if device_type == "mps":
24
+ v0, v1 = v0.cpu(), v1.cpu()
25
+
26
+ v0, v1 = v0.double(), v1.double()
27
+ v1 = torch.nn.functional.normalize(v1, dim=dims)
28
+ v0_parallel = (v0 * v1).sum(dim=dims, keepdim=True) * v1
29
+ v0_orthogonal = v0 - v0_parallel
30
+ return v0_parallel.to(dtype).to(device_type), v0_orthogonal.to(dtype).to(device_type)
31
+
32
+
33
+ def apg_forward(
34
+ pred_cond: torch.Tensor, # [B, C, T]
35
+ pred_uncond: torch.Tensor, # [B, C, T]
36
+ guidance_scale: float,
37
+ momentum_buffer: MomentumBuffer = None,
38
+ eta: float = 0.0,
39
+ norm_threshold: float = 2.5,
40
+ dims=[-1],
41
+ ):
42
+ diff = pred_cond - pred_uncond
43
+ if momentum_buffer is not None:
44
+ momentum_buffer.update(diff)
45
+ diff = momentum_buffer.running_average
46
+
47
+ if norm_threshold > 0:
48
+ ones = torch.ones_like(diff)
49
+ diff_norm = diff.norm(p=2, dim=dims, keepdim=True)
50
+ scale_factor = torch.minimum(ones, norm_threshold / diff_norm)
51
+ diff = diff * scale_factor
52
+
53
+ diff_parallel, diff_orthogonal = project(diff, pred_cond, dims)
54
+ normalized_update = diff_orthogonal + eta * diff_parallel
55
+ pred_guided = pred_cond + (guidance_scale - 1) * normalized_update
56
+ return pred_guided
57
+
58
+
59
+ def cfg_forward(cond_output, uncond_output, cfg_strength):
60
+ return uncond_output + cfg_strength * (cond_output - uncond_output)
61
+
62
+
63
+ def call_cos_tensor(tensor1, tensor2):
64
+ """
65
+ Calculate cosine similarity between two normalized tensors.
66
+
67
+ Args:
68
+ tensor1: First tensor [B, ...]
69
+ tensor2: Second tensor [B, ...]
70
+
71
+ Returns:
72
+ Cosine similarity value [B, 1]
73
+ """
74
+ tensor1 = tensor1 / torch.linalg.norm(tensor1, dim=1, keepdim=True)
75
+ tensor2 = tensor2 / torch.linalg.norm(tensor2, dim=1, keepdim=True)
76
+ cosvalue = torch.sum(tensor1 * tensor2, dim=1, keepdim=True)
77
+ return cosvalue
78
+
79
+
80
+ def compute_perpendicular_component(latent_diff, latent_hat_uncond):
81
+ """
82
+ Decompose latent_diff into parallel and perpendicular components relative to latent_hat_uncond.
83
+
84
+ Args:
85
+ latent_diff: Difference tensor [B, C, ...]
86
+ latent_hat_uncond: Unconditional prediction tensor [B, C, ...]
87
+
88
+ Returns:
89
+ projection: Parallel component
90
+ perpendicular_component: Perpendicular component
91
+ """
92
+ n, t, c = latent_diff.shape
93
+ latent_diff = latent_diff.view(n * t, c).float()
94
+ latent_hat_uncond = latent_hat_uncond.view(n * t, c).float()
95
+
96
+ if latent_diff.size() != latent_hat_uncond.size():
97
+ raise ValueError("latent_diff and latent_hat_uncond must have the same shape [n, d].")
98
+
99
+ dot_product = torch.sum(latent_diff * latent_hat_uncond, dim=1, keepdim=True) # [n, 1]
100
+ norm_square = torch.sum(latent_hat_uncond * latent_hat_uncond, dim=1, keepdim=True) # [n, 1]
101
+ projection = (dot_product / (norm_square + 1e-8)) * latent_hat_uncond
102
+ perpendicular_component = latent_diff - projection
103
+
104
+ return projection.view(n, t, c), perpendicular_component.reshape(n, t, c)
105
+
106
+
107
+ def adg_forward(
108
+ latents: torch.Tensor,
109
+ noise_pred_cond: torch.Tensor,
110
+ noise_pred_uncond: torch.Tensor,
111
+ sigma: torch.Tensor,
112
+ guidance_scale: float,
113
+ angle_clip: float = 3.14 / 6, # pi/6 by default
114
+ apply_norm: bool = False,
115
+ apply_clip: bool = True,
116
+ ):
117
+ """
118
+ ADG (Angle-based Dynamic Guidance) forward pass for Flow Matching.
119
+
120
+ In flow matching (including SD3), sigma represents the current timestep t_curr.
121
+ The predictions are velocity fields v(x_t, t).
122
+
123
+ Args:
124
+ latents: Current state x_t [N, T, d] where d=64
125
+ noise_pred_cond: Conditional velocity prediction v_cond [N, T, d]
126
+ noise_pred_uncond: Unconditional velocity prediction v_uncond [N, T, d]
127
+ sigma: Current timestep t_curr (not t_prev!)
128
+ guidance_scale: Guidance strength
129
+ angle_clip: Maximum angle for clipping (default: pi/6)
130
+ apply_norm: Whether to normalize the result (ADG_w_norm variant)
131
+ apply_clip: Whether to clip the angle (ADG_wo_clip when False)
132
+
133
+ Returns:
134
+ Guided velocity prediction [N, T, d]
135
+ """
136
+ # Get batch size
137
+ n = noise_pred_cond.shape[0]
138
+ noise_pred_text = noise_pred_cond
139
+ n, t, c = noise_pred_text.shape
140
+
141
+ # Ensure sigma/t has the right shape for broadcasting [N, 1, 1]
142
+ if isinstance(sigma, (int, float)):
143
+ sigma = torch.tensor(sigma, device=latents.device, dtype=latents.dtype)
144
+ sigma = sigma.view(1, 1, 1).expand(n, 1, 1)
145
+ elif torch.is_tensor(sigma):
146
+ if sigma.numel() == 1:
147
+ sigma = sigma.view(1, 1, 1).expand(n, 1, 1)
148
+ elif sigma.numel() == n:
149
+ sigma = sigma.view(n, 1, 1)
150
+ else:
151
+ raise ValueError(f"sigma has incompatible shape. Expected scalar or size {n}, got {sigma.shape}")
152
+ else:
153
+ raise TypeError(f"sigma must be a number or tensor, got {type(sigma)}")
154
+
155
+ # Adjust guidance weight
156
+ weight = guidance_scale - 1
157
+ weight = weight * (weight > 0) + 1e-3
158
+
159
+ latent_hat_text = latents - sigma * noise_pred_text
160
+ latent_hat_uncond = latents - sigma * noise_pred_uncond
161
+ latent_diff = latent_hat_text - latent_hat_uncond
162
+
163
+ # Calculate angle between conditional and unconditional predicted data
164
+ latent_theta = torch.acos(
165
+ call_cos_tensor(latent_hat_text.view(-1, c).to(float),
166
+ latent_hat_uncond.reshape(-1, c).contiguous().to(float)))
167
+ latent_theta_new = torch.clip(weight * latent_theta, -angle_clip, angle_clip) if apply_clip else weight * latent_theta
168
+ proj, perp = compute_perpendicular_component(latent_diff, latent_hat_uncond)
169
+ latent_v_new = torch.cos(latent_theta_new) * latent_hat_text
170
+
171
+ latent_p_new = perp * torch.sin(latent_theta_new) / torch.sin(latent_theta) * (
172
+ torch.sin(latent_theta) > 1e-3) + perp * weight * (torch.sin(latent_theta) <= 1e-3)
173
+ latent_new = latent_v_new + latent_p_new
174
+ if apply_norm:
175
+ latent_new = latent_new * torch.linalg.norm(latent_hat_text, dim=1, keepdim=True) / torch.linalg.norm(
176
+ latent_new, dim=1, keepdim=True)
177
+
178
+ noise_pred = (latents - latent_new) / sigma
179
+ noise_pred = noise_pred.reshape(n, t, c).to(latents.dtype)
180
+ return noise_pred
181
+
182
+
183
+ def adg_w_norm_forward(
184
+ latents: torch.Tensor,
185
+ noise_pred_cond: torch.Tensor,
186
+ noise_pred_uncond: torch.Tensor,
187
+ sigma: float,
188
+ guidance_scale: float,
189
+ angle_clip: float = 3.14 / 3,
190
+ ):
191
+ """
192
+ ADG with normalization - preserves the magnitude of latent predictions.
193
+
194
+ This variant normalizes the final latent to maintain the same norm as the
195
+ conditional prediction, which can help preserve image quality.
196
+ """
197
+ return adg_forward(latents,
198
+ noise_pred_cond,
199
+ noise_pred_uncond,
200
+ sigma,
201
+ guidance_scale,
202
+ angle_clip=angle_clip,
203
+ apply_norm=True,
204
+ apply_clip=True)
205
+
206
+
207
+ def adg_wo_clip_forward(
208
+ latents: torch.Tensor,
209
+ noise_pred_cond: torch.Tensor,
210
+ noise_pred_uncond: torch.Tensor,
211
+ sigma: float,
212
+ guidance_scale: float,
213
+ ):
214
+ """
215
+ ADG without angle clipping - allows unbounded angle adjustments.
216
+
217
+ This variant doesn't clip the angle, which may result in more aggressive
218
+ guidance but could be less stable.
219
+ """
220
+ return adg_forward(latents, noise_pred_cond, noise_pred_uncond, sigma, guidance_scale, apply_norm=False, apply_clip=False)