Add _ForestDiffusion/utils/utils_diffusion.py
Browse files
_ForestDiffusion/utils/utils_diffusion.py
ADDED
|
@@ -0,0 +1,45 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
import numpy as np
|
| 2 |
+
from _ForestDiffusion.utils.diffusion import VPSDE
|
| 3 |
+
|
| 4 |
+
# Build the dataset of x(t) at multiple values of t
|
| 5 |
+
def build_data_xt(x0, x1, n_t=101, diffusion_type='flow', eps=1e-3, sde=None):
|
| 6 |
+
b, c = x1.shape
|
| 7 |
+
|
| 8 |
+
# Expand x0, x1
|
| 9 |
+
x0 = np.expand_dims(x0, axis=0) # [1, b, c]
|
| 10 |
+
x1 = np.expand_dims(x1, axis=0) # [1, b, c]
|
| 11 |
+
|
| 12 |
+
# t and expand
|
| 13 |
+
t = np.linspace(eps, 1, num=n_t)
|
| 14 |
+
t_expand = np.expand_dims(t, axis=(1,2)) # [t, 1, 1]
|
| 15 |
+
|
| 16 |
+
if diffusion_type == 'vp': # Forward diffusion from x0 to x1
|
| 17 |
+
mean, std = sde.marginal_prob(x1, t_expand)
|
| 18 |
+
x_t = mean + std*x0
|
| 19 |
+
else: # Interpolation between x0 and x1
|
| 20 |
+
x_t = t_expand * x1 + (1 - t_expand) * x0 # [t, b, c]
|
| 21 |
+
x_t = x_t.reshape(-1,c) # [t*b, c]
|
| 22 |
+
|
| 23 |
+
X = x_t
|
| 24 |
+
|
| 25 |
+
# Output to predict
|
| 26 |
+
if diffusion_type == 'vp':
|
| 27 |
+
alpha_, sigma_ = sde.marginal_prob_coef(x1, t_expand)
|
| 28 |
+
y = x0.reshape(b, c)
|
| 29 |
+
else:
|
| 30 |
+
y = x1.reshape(b, c) - x0.reshape(b, c) # [b, c]
|
| 31 |
+
|
| 32 |
+
return X, y
|
| 33 |
+
|
| 34 |
+
#### Below is for Flow-Matching Sampling ####
|
| 35 |
+
|
| 36 |
+
# Euler solver
|
| 37 |
+
def euler_solve(y0, my_model, N=101):
|
| 38 |
+
h = 1 / (N-1)
|
| 39 |
+
y = y0
|
| 40 |
+
t = 0
|
| 41 |
+
# from t=0 to t=1
|
| 42 |
+
for i in range(N-1):
|
| 43 |
+
y = y + h*my_model(t=t, y=y)
|
| 44 |
+
t = t + h
|
| 45 |
+
return y
|