| import numpy as np |
| from _ForestDiffusion.utils.diffusion import VPSDE |
|
|
| |
| def build_data_xt(x0, x1, n_t=101, diffusion_type='flow', eps=1e-3, sde=None): |
| b, c = x1.shape |
|
|
| |
| x0 = np.expand_dims(x0, axis=0) |
| x1 = np.expand_dims(x1, axis=0) |
|
|
| |
| t = np.linspace(eps, 1, num=n_t) |
| t_expand = np.expand_dims(t, axis=(1,2)) |
|
|
| if diffusion_type == 'vp': |
| mean, std = sde.marginal_prob(x1, t_expand) |
| x_t = mean + std*x0 |
| else: |
| x_t = t_expand * x1 + (1 - t_expand) * x0 |
| x_t = x_t.reshape(-1,c) |
|
|
| X = x_t |
|
|
| |
| if diffusion_type == 'vp': |
| alpha_, sigma_ = sde.marginal_prob_coef(x1, t_expand) |
| y = x0.reshape(b, c) |
| else: |
| y = x1.reshape(b, c) - x0.reshape(b, c) |
|
|
| return X, y |
|
|
| |
|
|
| |
| def euler_solve(y0, my_model, N=101): |
| h = 1 / (N-1) |
| y = y0 |
| t = 0 |
| |
| for i in range(N-1): |
| y = y + h*my_model(t=t, y=y) |
| t = t + h |
| return y |
|
|