|
|
|
|
|
|
|
|
import math |
|
|
|
|
|
import numpy as np |
|
|
|
|
|
import torch |
|
|
import torch.nn as nn |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def gradient_f(X, batched = False, delta_lst = [1., 1., 1.]): |
|
|
''' |
|
|
Compute gradient of a torch tensor "X" in each direction |
|
|
Upper-boundaries: Backward Difference |
|
|
Non-boundaries & Upper-boundaries: Forward Difference |
|
|
if X is batched: (n_batch, ...); |
|
|
else: (...) |
|
|
''' |
|
|
device = X.device |
|
|
dim = len(X.size()) - 1 if batched else len(X.size()) |
|
|
|
|
|
|
|
|
if dim == 1: |
|
|
|
|
|
dX = torch.zeros(X.size(), dtype = torch.float, device = device) |
|
|
X = X.permute(1, 0) if batched else X |
|
|
dX = dX.permute(1, 0) if batched else dX |
|
|
dX[-1] = X[-1] - X[-2] |
|
|
dX[:-1] = X[1:] - X[:-1] |
|
|
|
|
|
dX = dX.permute(1, 0) if batched else dX |
|
|
dX /= delta_lst[0] |
|
|
elif dim == 2: |
|
|
|
|
|
dX = torch.zeros(X.size() + tuple([2]), dtype = torch.float, device = device) |
|
|
X = X.permute(1, 2, 0) if batched else X |
|
|
dX = dX.permute(1, 2, 3, 0) if batched else dX |
|
|
dX[-1, :, 0] = X[-1, :] - X[-2, :] |
|
|
dX[:-1, :, 0] = X[1:] - X[:-1] |
|
|
|
|
|
dX[:, -1, 1] = X[:, -1] - X[:, -2] |
|
|
dX[:, :-1, 1] = X[:, 1:] - X[:, :-1] |
|
|
|
|
|
dX = dX.permute(3, 0, 1, 2) if batched else dX |
|
|
dX[..., 0] /= delta_lst[0] |
|
|
dX[..., 1] /= delta_lst[1] |
|
|
elif dim == 3: |
|
|
|
|
|
dX = torch.zeros(X.size() + tuple([3]), dtype = torch.float, device = device) |
|
|
X = X.permute(1, 2, 3, 0) if batched else X |
|
|
dX = dX.permute(1, 2, 3, 4, 0) if batched else dX |
|
|
dX[-1, :, :, 0] = X[-1, :, :] - X[-2, :, :] |
|
|
dX[:-1, :, :, 0] = X[1:] - X[:-1] |
|
|
|
|
|
dX[:, -1, :, 1] = X[:, -1] - X[:, -2] |
|
|
dX[:, :-1, :, 1] = X[:, 1:] - X[:, :-1] |
|
|
|
|
|
dX[:, :, -1, 2] = X[:, :, -1] - X[:, :, -2] |
|
|
dX[:, :, :-1, 2] = X[:, :, 1:] - X[:, :, :-1] |
|
|
|
|
|
dX = dX.permute(4, 0, 1, 2, 3) if batched else dX |
|
|
dX[..., 0] /= delta_lst[0] |
|
|
dX[..., 1] /= delta_lst[1] |
|
|
dX[..., 2] /= delta_lst[2] |
|
|
return dX |
|
|
|
|
|
|
|
|
def gradient_b(X, batched = False, delta_lst = [1., 1., 1.]): |
|
|
''' |
|
|
Compute gradient of a torch tensor "X" in each direction |
|
|
Non-boundaries & Upper-boundaries: Backward Difference |
|
|
Lower-boundaries: Forward Difference |
|
|
if X is batched: (n_batch, ...); |
|
|
else: (...) |
|
|
''' |
|
|
device = X.device |
|
|
dim = len(X.size()) - 1 if batched else len(X.size()) |
|
|
|
|
|
|
|
|
if dim == 1: |
|
|
|
|
|
dX = torch.zeros(X.size(), dtype = torch.float, device = device) |
|
|
X = X.permute(1, 0) if batched else X |
|
|
dX = dX.permute(1, 0) if batched else dX |
|
|
dX[1:] = X[1:] - X[:-1] |
|
|
dX[0] = X[1] - X[0] |
|
|
|
|
|
dX = dX.permute(1, 0) if batched else dX |
|
|
dX /= delta_lst[0] |
|
|
elif dim == 2: |
|
|
|
|
|
dX = torch.zeros(X.size() + tuple([2]), dtype = torch.float, device = device) |
|
|
X = X.permute(1, 2, 0) if batched else X |
|
|
dX = dX.permute(1, 2, 3, 0) if batched else dX |
|
|
dX[1:, :, 0] = X[1:, :] - X[:-1, :] |
|
|
dX[0, :, 0] = X[1] - X[0] |
|
|
|
|
|
dX[:, 1:, 1] = X[:, 1:] - X[:, :-1] |
|
|
dX[:, 0, 1] = X[:, 1] - X[:, 0] |
|
|
|
|
|
dX = dX.permute(3, 0, 1, 2) if batched else dX |
|
|
dX[..., 0] /= delta_lst[0] |
|
|
dX[..., 1] /= delta_lst[1] |
|
|
elif dim == 3: |
|
|
|
|
|
dX = torch.zeros(X.size() + tuple([3]), dtype = torch.float, device = device) |
|
|
X = X.permute(1, 2, 3, 0) if batched else X |
|
|
dX = dX.permute(1, 2, 3, 4, 0) if batched else dX |
|
|
dX[1:, :, :, 0] = X[1:, :, :] - X[:-1, :, :] |
|
|
dX[0, :, :, 0] = X[1] - X[0] |
|
|
|
|
|
dX[:, 1:, :, 1] = X[:, 1:] - X[:, :-1] |
|
|
dX[:, 0, :, 1] = X[:, 1] - X[:, 0] |
|
|
|
|
|
dX[:, :, 1:, 2] = X[:, :, 1:] - X[:, :, :-1] |
|
|
dX[:, :, 0, 2] = X[:, :, 1] - X[:, :, 0] |
|
|
|
|
|
dX = dX.permute(4, 0, 1, 2, 3) if batched else dX |
|
|
dX[..., 0] /= delta_lst[0] |
|
|
dX[..., 1] /= delta_lst[1] |
|
|
dX[..., 2] /= delta_lst[2] |
|
|
return dX |
|
|
|
|
|
|
|
|
def gradient_c(X, batched = False, delta_lst = [1., 1., 1.]): |
|
|
''' |
|
|
Compute gradient of a torch tensor "X" in each direction |
|
|
Non-boundaries: Central Difference |
|
|
Upper-boundaries: Backward Difference |
|
|
Lower-boundaries: Forward Difference |
|
|
if X is batched: (n_batch, ...); |
|
|
else: (...) |
|
|
''' |
|
|
device = X.device |
|
|
dim = len(X.size()) - 1 if batched else len(X.size()) |
|
|
|
|
|
|
|
|
|
|
|
if dim == 1: |
|
|
|
|
|
dX = torch.zeros(X.size(), dtype = torch.float, device = device) |
|
|
X = X.permute(1, 0) if batched else X |
|
|
dX = dX.permute(1, 0) if batched else dX |
|
|
dX[1:-1] = (X[2:] - X[:-2]) / 2 |
|
|
dX[0] = X[1] - X[0] |
|
|
dX[-1] = X[-1] - X[-2] |
|
|
|
|
|
dX = dX.permute(1, 0) if batched else dX |
|
|
dX /= delta_lst[0] |
|
|
elif dim == 2: |
|
|
|
|
|
dX = torch.zeros(X.size() + tuple([2]), dtype = torch.float, device = device) |
|
|
X = X.permute(1, 2, 0) if batched else X |
|
|
dX = dX.permute(1, 2, 3, 0) if batched else dX |
|
|
dX[1:-1, :, 0] = (X[2:, :] - X[:-2, :]) / 2 |
|
|
dX[0, :, 0] = X[1] - X[0] |
|
|
dX[-1, :, 0] = X[-1] - X[-2] |
|
|
dX[:, 1:-1, 1] = (X[:, 2:] - X[:, :-2]) / 2 |
|
|
dX[:, 0, 1] = X[:, 1] - X[:, 0] |
|
|
dX[:, -1, 1] = X[:, -1] - X[:, -2] |
|
|
|
|
|
dX = dX.permute(3, 0, 1, 2) if batched else dX |
|
|
dX[..., 0] /= delta_lst[0] |
|
|
dX[..., 1] /= delta_lst[1] |
|
|
elif dim == 3: |
|
|
|
|
|
dX = torch.zeros(X.size() + tuple([3]), dtype = torch.float, device = device) |
|
|
X = X.permute(1, 2, 3, 0) if batched else X |
|
|
dX = dX.permute(1, 2, 3, 4, 0) if batched else dX |
|
|
dX[1:-1, :, :, 0] = (X[2:, :, :] - X[:-2, :, :]) / 2 |
|
|
dX[0, :, :, 0] = X[1] - X[0] |
|
|
dX[-1, :, :, 0] = X[-1] - X[-2] |
|
|
dX[:, 1:-1, :, 1] = (X[:, 2:, :] - X[:, :-2, :]) / 2 |
|
|
dX[:, 0, :, 1] = X[:, 1] - X[:, 0] |
|
|
dX[:, -1, :, 1] = X[:, -1] - X[:, -2] |
|
|
dX[:, :, 1:-1, 2] = (X[:, :, 2:] - X[:, :, :-2]) / 2 |
|
|
dX[:, :, 0, 2] = X[:, :, 1] - X[:, :, 0] |
|
|
dX[:, :, -1, 2] = X[:, :, -1] - X[:, :, -2] |
|
|
|
|
|
dX = dX.permute(4, 0, 1, 2, 3) if batched else dX |
|
|
dX[..., 0] /= delta_lst[0] |
|
|
dX[..., 1] /= delta_lst[1] |
|
|
dX[..., 2] /= delta_lst[2] |
|
|
return dX |
|
|
|
|
|
|
|
|
def gradient_c_numpy(X, batched = False, delta_lst = [1., 1., 1.]): |
|
|
''' |
|
|
Compute gradient of a Numpy array "X" in each direction |
|
|
Non-boundaries: Central Difference |
|
|
Upper-boundaries: Backward Difference |
|
|
Lower-boundaries: Forward Difference |
|
|
if X is batched: (n_batch, ...); |
|
|
else: (...) |
|
|
''' |
|
|
dim = len(X.shape) - 1 if batched else len(X.shape) |
|
|
|
|
|
if dim == 1: |
|
|
|
|
|
X = np.transpose(X, (1, 0)) if batched else X |
|
|
dX = np.zeros(X.shapee).astype(float) |
|
|
dX[1:-1] = (X[2:] - X[:-2]) / 2 |
|
|
dX[0] = X[1] - X[0] |
|
|
dX[-1] = X[-1] - X[-2] |
|
|
|
|
|
dX = np.transpose(X, (1, 0)) if batched else dX |
|
|
dX /= delta_lst[0] |
|
|
elif dim == 2: |
|
|
|
|
|
dX = np.zeros(X.shape + tuple([2])).astype(float) |
|
|
X = np.transpose(X, (1, 2, 0)) if batched else X |
|
|
dX = np.transpose(dX, (1, 2, 3, 0)) if batched else dX |
|
|
dX[1:-1, :, 0] = (X[2:, :] - X[:-2, :]) / 2 |
|
|
dX[0, :, 0] = X[1] - X[0] |
|
|
dX[-1, :, 0] = X[-1] - X[-2] |
|
|
dX[:, 1:-1, 1] = (X[:, 2:] - X[:, :-2]) / 2 |
|
|
dX[:, 0, 1] = X[:, 1] - X[:, 0] |
|
|
dX[:, -1, 1] = X[:, -1] - X[:, -2] |
|
|
|
|
|
dX = np.transpose(dX, (3, 0, 1, 2)) if batched else dX |
|
|
dX[..., 0] /= delta_lst[0] |
|
|
dX[..., 1] /= delta_lst[1] |
|
|
elif dim == 3: |
|
|
|
|
|
dX = np.zeros(X.shape + tuple([3])).astype(float) |
|
|
X = np.transpose(X, (1, 2, 3, 0)) if batched else X |
|
|
dX = np.transpose(dX, (1, 2, 3, 4, 0)) if batched else dX |
|
|
dX[1:-1, :, :, 0] = (X[2:, :, :] - X[:-2, :, :]) / 2 |
|
|
dX[0, :, :, 0] = X[1] - X[0] |
|
|
dX[-1, :, :, 0] = X[-1] - X[-2] |
|
|
dX[:, 1:-1, :, 1] = (X[:, 2:, :] - X[:, :-2, :]) / 2 |
|
|
dX[:, 0, :, 1] = X[:, 1] - X[:, 0] |
|
|
dX[:, -1, :, 1] = X[:, -1] - X[:, -2] |
|
|
dX[:, :, 1:-1, 2] = (X[:, :, 2:] - X[:, :, :-2]) / 2 |
|
|
dX[:, :, 0, 2] = X[:, :, 1] - X[:, :, 0] |
|
|
dX[:, :, -1, 2] = X[:, :, -1] - X[:, :, -2] |
|
|
|
|
|
dX = np.transpose(dX, (4, 0, 1, 2, 3)) if batched else dX |
|
|
dX[..., 0] /= delta_lst[0] |
|
|
dX[..., 1] /= delta_lst[1] |
|
|
dX[..., 2] /= delta_lst[2] |
|
|
return dX |
|
|
|
|
|
|
|
|
def gradient_f_numpy(X, batched = False, delta_lst = [1., 1., 1.]): |
|
|
''' |
|
|
Compute gradient of a torch tensor "X" in each direction |
|
|
Upper-boundaries: Backward Difference |
|
|
Non-boundaries & Upper-boundaries: Forward Difference |
|
|
if X is batched: (n_batch, ...); |
|
|
else: (...) |
|
|
''' |
|
|
dim = len(X.shape) - 1 if batched else len(X.shape) |
|
|
|
|
|
if dim == 1: |
|
|
|
|
|
X = np.transpose(X, (1, 0)) if batched else X |
|
|
dX = np.zeros(X.shapee).astype(float) |
|
|
dX[-1] = X[-1] - X[-2] |
|
|
dX[:-1] = X[1:] - X[:-1] |
|
|
|
|
|
dX = np.transpose(X, (1, 0)) if batched else dX |
|
|
dX /= delta_lst[0] |
|
|
elif dim == 2: |
|
|
|
|
|
dX = np.zeros(X.shape + tuple([2])).astype(float) |
|
|
X = np.transpose(X, (1, 2, 0)) if batched else X |
|
|
dX = np.transpose(dX, (1, 2, 3, 0)) if batched else dX |
|
|
dX[-1, :, 0] = X[-1, :] - X[-2, :] |
|
|
dX[:-1, :, 0] = X[1:] - X[:-1] |
|
|
|
|
|
dX[:, -1, 1] = X[:, -1] - X[:, -2] |
|
|
dX[:, :-1, 1] = X[:, 1:] - X[:, :-1] |
|
|
|
|
|
dX = np.transpose(dX, (3, 0, 1, 2)) if batched else dX |
|
|
dX[..., 0] /= delta_lst[0] |
|
|
dX[..., 1] /= delta_lst[1] |
|
|
elif dim == 3: |
|
|
|
|
|
dX = np.zeros(X.shape + tuple([3])).astype(float) |
|
|
X = np.transpose(X, (1, 2, 3, 0)) if batched else X |
|
|
dX = np.transpose(dX, (1, 2, 3, 4, 0)) if batched else dX |
|
|
dX[-1, :, :, 0] = X[-1, :, :] - X[-2, :, :] |
|
|
dX[:-1, :, :, 0] = X[1:] - X[:-1] |
|
|
|
|
|
dX[:, -1, :, 1] = X[:, -1] - X[:, -2] |
|
|
dX[:, :-1, :, 1] = X[:, 1:] - X[:, :-1] |
|
|
|
|
|
dX[:, :, -1, 2] = X[:, :, -1] - X[:, :, -2] |
|
|
dX[:, :, :-1, 2] = X[:, :, 1:] - X[:, :, :-1] |
|
|
|
|
|
dX = np.transpose(dX, (4, 0, 1, 2, 3)) if batched else dX |
|
|
dX[..., 0] /= delta_lst[0] |
|
|
dX[..., 1] /= delta_lst[1] |
|
|
dX[..., 2] /= delta_lst[2] |
|
|
return dX |
|
|
|
|
|
|
|
|
class Upwind(object): |
|
|
''' |
|
|
Backward if > 0, forward if <= 0 |
|
|
''' |
|
|
def __init__(self, U, data_spacing = [1., 1, 1.], batched = True): |
|
|
self.U = U |
|
|
self.batched = batched |
|
|
self.data_spacing = data_spacing |
|
|
self.dim = len(self.U.size()) - 1 if batched else len(self.U.size()) |
|
|
self.I = torch.ones(self.U.size(), dtype = torch.float, device = U.device) |
|
|
|
|
|
def dX(self, FGx): |
|
|
dXf = gradient_f(self.U, batched = self.batched, delta_lst = self.data_spacing)[..., 0] |
|
|
dXb = gradient_b(self.U, batched = self.batched, delta_lst = self.data_spacing)[..., 0] |
|
|
Xflag = (FGx > 0).float() |
|
|
return dXf * (self.I - Xflag) + dXb * Xflag |
|
|
|
|
|
def dY(self, FGy): |
|
|
dYf = gradient_f(self.U, batched = self.batched, delta_lst = self.data_spacing)[..., 1] |
|
|
dYb = gradient_b(self.U, batched = self.batched, delta_lst = self.data_spacing)[..., 1] |
|
|
Yflag = (FGy > 0).float() |
|
|
return dYf * (self.I - Yflag) + dYb * Yflag |
|
|
|
|
|
def dZ(self, FGz): |
|
|
dZf = gradient_f(self.U, batched = self.batched, delta_lst = self.data_spacing)[..., 2] |
|
|
dZb = gradient_b(self.U, batched = self.batched, delta_lst = self.data_spacing)[..., 2] |
|
|
Zflag = (FGz > 0).float() |
|
|
return dZf * (self.I - Zflag) + dZb * Zflag |
|
|
|
|
|
|
|
|
class AdvDiffPartial(nn.Module): |
|
|
def __init__(self, data_spacing, device): |
|
|
super(AdvDiffPartial, self).__init__() |
|
|
self.dimension = len(data_spacing) |
|
|
self.device = device |
|
|
self.data_spacing = data_spacing |
|
|
|
|
|
@property |
|
|
def Grad_Ds(self): |
|
|
return { |
|
|
'constant': self.Grad_constantD, |
|
|
'scalar': self.Grad_scalarD, |
|
|
'diag': self.Grad_diagD, |
|
|
'full': self.Grad_fullD, |
|
|
'full_dual': self.Grad_fullD, |
|
|
'full_spectral':self.Grad_fullD, |
|
|
'full_cholesky': self.Grad_fullD, |
|
|
'full_symmetric': self.Grad_fullD |
|
|
} |
|
|
@property |
|
|
def Grad_Vs(self): |
|
|
return { |
|
|
'constant': self.Grad_constantV, |
|
|
'scalar': self.Grad_scalarV, |
|
|
'vector': self.Grad_vectorV, |
|
|
'vector_div_free': self.Grad_div_free_vectorV, |
|
|
'vector_div_free_clebsch': self.Grad_div_free_vectorV, |
|
|
'vector_div_free_stream': self.Grad_div_free_vectorV, |
|
|
'vector_div_free_stream_gauge': self.Grad_div_free_vectorV, |
|
|
} |
|
|
|
|
|
def Grad_constantD(self, C, Dlst): |
|
|
if self.dimension == 1: |
|
|
return Dlst['D'] * (self.ddXc(C)) |
|
|
elif self.dimension == 2: |
|
|
return Dlst['D'] * (self.ddXc(C) + self.ddYc(C)) |
|
|
elif self.dimension == 3: |
|
|
return Dlst['D'] * (self.ddXc(C) + self.ddYc(C) + self.ddZc(C)) |
|
|
|
|
|
def Grad_constant_tensorD(self, C, Dlst): |
|
|
if self.dimension == 1: |
|
|
raise NotImplementedError |
|
|
elif self.dimension == 2: |
|
|
dC_c = self.dc(C) |
|
|
dC_f = self.df(C) |
|
|
return Dlst['Dxx'] * self.dXb(dC_f[..., 0]) +\ |
|
|
Dlst['Dxy'] * self.dXb(dC_f[..., 1]) + Dlst['Dxy'] * self.dYb(dC_f[..., 0]) +\ |
|
|
Dlst['Dyy'] * self.dYb(dC_f[..., 1]) |
|
|
elif self.dimension == 3: |
|
|
dC_c = self.dc(C) |
|
|
dC_f = self.df(C) |
|
|
return Dlst['Dxx'] * self.dXb(dC_f[..., 0]) + Dlst['Dyy'] * self.dYb(dC_f[..., 1]) + Dlst['Dzz'] * self.dZb(dC_f[..., 2]) + \ |
|
|
Dlst['Dxy'] * (self.dXb(dC_f[..., 1]) + self.dYb(dC_f[..., 0])) + \ |
|
|
Dlst['Dyz'] * (self.dYb(dC_f[..., 2]) + self.dZb(dC_f[..., 1])) + \ |
|
|
Dlst['Dxz'] * (self.dZb(dC_f[..., 0]) + self.dXb(dC_f[..., 2])) |
|
|
|
|
|
def Grad_scalarD(self, C, Dlst): |
|
|
|
|
|
|
|
|
|
|
|
if self.dimension == 1: |
|
|
dC = gradient_c(C, batched = True, delta_lst = self.data_spacing) |
|
|
return gradient_c(Dlst['D'], batched = True, delta_lst = self.data_spacing) * dC + \ |
|
|
Dlst['D'] * gradient_c(dC, batched = True, delta_lst = self.data_spacing) |
|
|
else: |
|
|
dC_c = gradient_c(C, batched = True, delta_lst = self.data_spacing) |
|
|
dC_f = gradient_f(C, batched = True, delta_lst = self.data_spacing) |
|
|
dD_c = gradient_c(Dlst['D'], batched = True, delta_lst = self.data_spacing) |
|
|
out = (dD_c * dC_c).sum(-1) |
|
|
for dim in range(dC_f.size(-1)): |
|
|
out += Dlst['D'] * gradient_b(dC_f[..., dim], batched = True, delta_lst = self.data_spacing)[..., dim] |
|
|
return out |
|
|
|
|
|
def Grad_diagD(self, C, Dlst): |
|
|
|
|
|
if self.dimension == 1: |
|
|
raise NotImplementedError('diag_D is not supported for 1D version of diffusivity') |
|
|
elif self.dimension == 2: |
|
|
dC_c = self.dc(C) |
|
|
dC_f = self.df(C) |
|
|
return self.dXc(Dlst['Dxx']) * dC_c[..., 0] + Dlst['Dxx'] * self.dXb(dC_f[..., 0]) +\ |
|
|
self.dYc(Dlst['Dyy']) * dC_c[..., 1] + Dlst['Dyy'] * self.dYb(dC_f[..., 1]) |
|
|
elif self.dimension == 3: |
|
|
dC_c = self.dc(C) |
|
|
dC_f = self.df(C) |
|
|
return self.dXc(Dlst['Dxx']) * dC_c[..., 0] + Dlst['Dxx'] * self.dXb(dC_f[..., 0]) +\ |
|
|
self.dYc(Dlst['Dyy']) * dC_c[..., 1] + Dlst['Dyy'] * self.dYb(dC_f[..., 1]) +\ |
|
|
self.dZc(Dlst['Dzz']) * dC_c[..., 2] + Dlst['Dzz'] * self.dZb(dC_f[..., 2]) |
|
|
|
|
|
def Grad_fullD(self, C, Dlst): |
|
|
|
|
|
'''https://github.com/uncbiag/PIANOinD/blob/master/Doc/PIANOinD.pdf''' |
|
|
if self.dimension == 1: |
|
|
raise NotImplementedError('full_D is not supported for 1D version of diffusivity') |
|
|
elif self.dimension == 2: |
|
|
dC_c = self.dc(C) |
|
|
dC_f = self.df(C) |
|
|
return self.dXc(Dlst['Dxx']) * dC_c[..., 0] + Dlst['Dxx'] * self.dXb(dC_f[..., 0]) +\ |
|
|
self.dXc(Dlst['Dxy']) * dC_c[..., 1] + Dlst['Dxy'] * self.dXb(dC_f[..., 1]) +\ |
|
|
self.dYc(Dlst['Dxy']) * dC_c[..., 0] + Dlst['Dxy'] * self.dYb(dC_f[..., 0]) +\ |
|
|
self.dYc(Dlst['Dyy']) * dC_c[..., 1] + Dlst['Dyy'] * self.dYb(dC_f[..., 1]) |
|
|
elif self.dimension == 3: |
|
|
dC_c = self.dc(C) |
|
|
dC_f = self.df(C) |
|
|
return (self.dXc(Dlst['Dxx']) + self.dYc(Dlst['Dxy']) + self.dZc(Dlst['Dxz'])) * dC_c[..., 0] + \ |
|
|
(self.dXc(Dlst['Dxy']) + self.dYc(Dlst['Dyy']) + self.dZc(Dlst['Dyz'])) * dC_c[..., 1] + \ |
|
|
(self.dXc(Dlst['Dxz']) + self.dYc(Dlst['Dyz']) + self.dZc(Dlst['Dzz'])) * dC_c[..., 2] + \ |
|
|
Dlst['Dxx'] * self.dXb(dC_f[..., 0]) + Dlst['Dyy'] * self.dYb(dC_f[..., 1]) + Dlst['Dzz'] * self.dZb(dC_f[..., 2]) + \ |
|
|
Dlst['Dxy'] * (self.dXb(dC_f[..., 1]) + self.dYb(dC_f[..., 0])) + \ |
|
|
Dlst['Dyz'] * (self.dYb(dC_f[..., 2]) + self.dZb(dC_f[..., 1])) + \ |
|
|
Dlst['Dxz'] * (self.dZb(dC_f[..., 0]) + self.dXb(dC_f[..., 2])) |
|
|
|
|
|
def Grad_constantV(self, C, Vlst): |
|
|
if len(Vlst['V'].size()) == 1: |
|
|
if self.dimension == 1: |
|
|
return - Vlst['V'] * self.dXb(C) if Vlst['V'] > 0 else - Vlst['V'] * self.dXf(C) |
|
|
elif self.dimension == 2: |
|
|
return - Vlst['V'] * (self.dXb(C) + self.dYb(C)) if Vlst['V'] > 0 else - Vlst['V'] * (self.dXf(C) + self.dYf(C)) |
|
|
elif self.dimension == 3: |
|
|
return - Vlst['V'] * (self.dXb(C) + self.dYb(C) + self.dZb(C)) if Vlst['V'] > 0 else - Vlst['V'] * (self.dXf(C) + self.dYf(C) + self.dZf(C)) |
|
|
else: |
|
|
if self.dimension == 1: |
|
|
return - Vlst['V'] * self.dXb(C) if Vlst['V'][0, 0] > 0 else - Vlst['V'] * self.dXf(C) |
|
|
elif self.dimension == 2: |
|
|
return - Vlst['V'] * (self.dXb(C) + self.dYb(C)) if Vlst['V'][0, 0, 0] > 0 else - Vlst['V'] * (self.dXf(C) + self.dYf(C)) |
|
|
elif self.dimension == 3: |
|
|
return - Vlst['V'] * (self.dXb(C) + self.dYb(C) + self.dZb(C)) if Vlst['V'][0, 0, 0, 0] > 0 else - Vlst['V'] * (self.dXf(C) + self.dYf(C) + self.dZf(C)) |
|
|
|
|
|
def Grad_constant_vectorV(self, C, Vlst): |
|
|
if self.dimension == 1: |
|
|
raise NotImplementedError |
|
|
elif self.dimension == 2: |
|
|
out_x = - Vlst['Vx'] * (self.dXb(C) + self.dYb(C)) if Vlst['Vx'][0, 0, 0] > 0 else - Vlst['Vx'] * (self.dXf(C) + self.dYf(C)) |
|
|
out_y = - Vlst['Vy'] * (self.dXb(C) + self.dYb(C)) if Vlst['Vy'][0, 0, 0] > 0 else - Vlst['Vy'] * (self.dXf(C) + self.dYf(C)) |
|
|
return out_x + out_y |
|
|
elif self.dimension == 3: |
|
|
out_x = - Vlst['Vx'] * (self.dXb(C) + self.dYb(C)) if Vlst['Vx'][0, 0, 0] > 0 else - Vlst['Vx'] * (self.dXf(C) + self.dYf(C)) |
|
|
out_y = - Vlst['Vy'] * (self.dXb(C) + self.dYb(C)) if Vlst['Vy'][0, 0, 0] > 0 else - Vlst['Vy'] * (self.dXf(C) + self.dYf(C)) |
|
|
out_z = - Vlst['Vz'] * (self.dXb(C) + self.dYb(C)) if Vlst['Vz'][0, 0, 0] > 0 else - Vlst['Vz'] * (self.dXf(C) + self.dYf(C)) |
|
|
return out_x + out_y + out_z |
|
|
|
|
|
def Grad_SimscalarV(self, C, Vlst): |
|
|
V = Vlst['V'] |
|
|
Upwind_C = Upwind(C, self.data_spacing) |
|
|
if self.dimension == 1: |
|
|
C_x = Upwind_C.dX(V) |
|
|
return - V * C_x |
|
|
if self.dimension == 2: |
|
|
C_x, C_y = Upwind_C.dX(V), Upwind_C.dY(V) |
|
|
return - V * (C_x + C_y) |
|
|
if self.dimension == 3: |
|
|
C_x, C_y, C_z = Upwind_C.dX(V), Upwind_C.dY(V), Upwind_C.dZ(V) |
|
|
return - V * (C_x + C_y + C_z) |
|
|
|
|
|
def Grad_scalarV(self, C, Vlst): |
|
|
V = Vlst['V'] |
|
|
Upwind_C = Upwind(C, self.data_spacing) |
|
|
dV = gradient_c(V, batched = True, delta_lst = self.data_spacing) |
|
|
if self.dimension == 1: |
|
|
C_x = Upwind_C.dX(V) |
|
|
return - V * C_x - C * dV |
|
|
elif self.dimension == 2: |
|
|
C_x, C_y = Upwind_C.dX(V), Upwind_C.dY(V) |
|
|
return - V * (C_x + C_y) - C * dV.sum(-1) |
|
|
elif self.dimension == 3: |
|
|
C_x, C_y, C_z = Upwind_C.dX(V), Upwind_C.dY(V), Upwind_C.dZ(V) |
|
|
return - V * (C_x + C_y + C_z) - C * dV.sum(-1) |
|
|
|
|
|
def Grad_div_free_vectorV(self, C, Vlst): |
|
|
''' For divergence-free-by-definition velocity''' |
|
|
if self.dimension == 1: |
|
|
raise NotImplementedError('clebschVector is not supported for 1D version of velocity') |
|
|
Upwind_C = Upwind(C, self.data_spacing) |
|
|
C_x, C_y = Upwind_C.dX(Vlst['Vx']), Upwind_C.dY(Vlst['Vy']) |
|
|
if self.dimension == 2: |
|
|
return - (Vlst['Vx'] * C_x + Vlst['Vy'] * C_y) |
|
|
elif self.dimension == 3: |
|
|
C_z = Upwind_C.dZ(Vlst['Vz']) |
|
|
return - (Vlst['Vx'] * C_x + Vlst['Vy'] * C_y + Vlst['Vz'] * C_z) |
|
|
|
|
|
def Grad_vectorV(self, C, Vlst): |
|
|
''' For general velocity''' |
|
|
if self.dimension == 1: |
|
|
raise NotImplementedError('vector is not supported for 1D version of velocity') |
|
|
Upwind_C = Upwind(C, self.data_spacing) |
|
|
C_x, C_y = Upwind_C.dX(Vlst['Vx']), Upwind_C.dY(Vlst['Vy']) |
|
|
Vx_x = self.dXc(Vlst['Vx']) |
|
|
Vy_y = self.dYc(Vlst['Vy']) |
|
|
if self.dimension == 2: |
|
|
return - (Vlst['Vx'] * C_x + Vlst['Vy'] * C_y) - C * (Vx_x + Vy_y) |
|
|
if self.dimension == 3: |
|
|
C_z = Upwind_C.dZ(Vlst['Vz']) |
|
|
Vz_z = self.dZc(Vlst['Vz']) |
|
|
return - (Vlst['Vx'] * C_x + Vlst['Vy'] * C_y + Vlst['Vz'] * C_z) - C * (Vx_x + Vy_y + Vz_z) |
|
|
|
|
|
|
|
|
def db(self, X): |
|
|
return gradient_b(X, batched = True, delta_lst = self.data_spacing) |
|
|
def df(self, X): |
|
|
return gradient_f(X, batched = True, delta_lst = self.data_spacing) |
|
|
def dc(self, X): |
|
|
return gradient_c(X, batched = True, delta_lst = self.data_spacing) |
|
|
def dXb(self, X): |
|
|
return gradient_b(X, batched = True, delta_lst = self.data_spacing)[..., 0] |
|
|
def dXf(self, X): |
|
|
return gradient_f(X, batched = True, delta_lst = self.data_spacing)[..., 0] |
|
|
def dXc(self, X): |
|
|
return gradient_c(X, batched = True, delta_lst = self.data_spacing)[..., 0] |
|
|
def dYb(self, X): |
|
|
return gradient_b(X, batched = True, delta_lst = self.data_spacing)[..., 1] |
|
|
def dYf(self, X): |
|
|
return gradient_f(X, batched = True, delta_lst = self.data_spacing)[..., 1] |
|
|
def dYc(self, X): |
|
|
return gradient_c(X, batched = True, delta_lst = self.data_spacing)[..., 1] |
|
|
def dZb(self, X): |
|
|
return gradient_b(X, batched = True, delta_lst = self.data_spacing)[..., 2] |
|
|
def dZf(self, X): |
|
|
return gradient_f(X, batched = True, delta_lst = self.data_spacing)[..., 2] |
|
|
def dZc(self, X): |
|
|
return gradient_c(X, batched = True, delta_lst = self.data_spacing)[..., 2] |
|
|
def ddXc(self, X): |
|
|
return gradient_b(gradient_f(X, batched = True, delta_lst = self.data_spacing)[..., 0], |
|
|
batched = True, delta_lst = self.data_spacing)[..., 0] |
|
|
def ddYc(self, X): |
|
|
return gradient_b(gradient_f(X, batched = True, delta_lst = self.data_spacing)[..., 1], |
|
|
batched = True, delta_lst = self.data_spacing)[..., 1] |
|
|
def ddZc(self, X): |
|
|
return gradient_b(gradient_f(X, batched = True, delta_lst = self.data_spacing)[..., 2], |
|
|
batched = True, delta_lst = self.data_spacing)[..., 2] |
|
|
|
|
|
|
|
|
|
|
|
class AdvDiffPDE(nn.Module): |
|
|
''' |
|
|
Plain advection-diffusion PDE solver for pre-set V_lst and D_lst (1D, 2D, 3D) for forward time series simulation |
|
|
''' |
|
|
def __init__(self, data_spacing, perf_pattern, D_type='scalar', V_type='vector', BC=None, dt=0.1, V_dict={}, D_dict={}, stochastic=False, device='cpu'): |
|
|
super(AdvDiffPDE, self).__init__() |
|
|
self.BC = BC |
|
|
self.dt = dt |
|
|
self.dimension = len(data_spacing) |
|
|
self.perf_pattern = perf_pattern |
|
|
self.partials = AdvDiffPartial(data_spacing, device) |
|
|
self.D_type, self.V_type = D_type, V_type |
|
|
self.stochastic = stochastic |
|
|
self.V_dict, self.D_dict = V_dict, D_dict |
|
|
self.Sigma, self.Sigma_V, self.Sigma_D = 0., 0., 0. |
|
|
if self.dimension == 1: |
|
|
self.neumann_BC = torch.nn.ReplicationPad1d(1) |
|
|
elif self.dimension == 2: |
|
|
self.neumann_BC = torch.nn.ReplicationPad2d(1) |
|
|
elif self.dimension == 3: |
|
|
self.neumann_BC = torch.nn.ReplicationPad3d(1) |
|
|
else: |
|
|
raise ValueError('Unsupported dimension: %d' % self.dimension) |
|
|
|
|
|
@property |
|
|
def set_BC(self): |
|
|
|
|
|
'''X: (n_batch, spatial_shape)''' |
|
|
if self.BC == 'neumann' or self.BC == 'cauchy': |
|
|
if self.dimension == 1: |
|
|
return lambda X: self.neumann_BC(X[:, 1:-1].unsqueeze(dim=1))[:,0] |
|
|
elif self.dimension == 2: |
|
|
return lambda X: self.neumann_BC(X[:, 1:-1, 1:-1].unsqueeze(dim=1))[:,0] |
|
|
elif self.dimension == 3: |
|
|
return lambda X: self.neumann_BC(X[:, 1:-1, 1:-1, 1:-1].unsqueeze(dim=1))[:,0] |
|
|
else: |
|
|
raise NotImplementedError('Unsupported B.C.!') |
|
|
elif self.BC == 'dirichlet_neumann' or self.BC == 'source_neumann': |
|
|
ctrl_wdth = 1 |
|
|
if self.dimension == 1: |
|
|
self.dirichlet_BC = torch.nn.ReplicationPad1d(ctrl_wdth) |
|
|
return lambda X: self.dirichlet_BC(X[:, ctrl_wdth : -ctrl_wdth].unsqueeze(dim=1))[:,0] |
|
|
elif self.dimension == 2: |
|
|
self.dirichlet_BC = torch.nn.ReplicationPad2d(ctrl_wdth) |
|
|
return lambda X: self.dirichlet_BC(X[:, ctrl_wdth : -ctrl_wdth, ctrl_wdth : -ctrl_wdth].unsqueeze(dim=1))[:,0] |
|
|
elif self.dimension == 3: |
|
|
self.dirichlet_BC = torch.nn.ReplicationPad3d(ctrl_wdth) |
|
|
return lambda X: self.neumann_dirichlet_BCBC(X[:, ctrl_wdth : -ctrl_wdth, ctrl_wdth : -ctrl_wdth, ctrl_wdth : -ctrl_wdth].unsqueeze(dim=1))[:,0] |
|
|
else: |
|
|
raise NotImplementedError('Unsupported B.C.!') |
|
|
else: |
|
|
return lambda X: X |
|
|
|
|
|
def forward(self, t, batch_C): |
|
|
''' |
|
|
t: (batch_size,) |
|
|
batch_C: (batch_size, (slc,) row, col) |
|
|
''' |
|
|
batch_size = batch_C.size(0) |
|
|
batch_C = self.set_BC(batch_C) |
|
|
if 'diff' not in self.perf_pattern: |
|
|
out = self.partials.Grad_Vs[self.V_type](batch_C, self.V_dict) |
|
|
if self.stochastic: |
|
|
out = out + self.Sigma * math.sqrt(self.dt) * torch.randn_like(batch_C).to(batch_C) |
|
|
elif 'adv' not in self.perf_pattern: |
|
|
out = self.partials.Grad_Ds[self.D_type](batch_C, self.D_dict) |
|
|
if self.stochastic: |
|
|
out = out + self.Sigma * math.sqrt(self.dt) * torch.randn_like(batch_C).to(batch_C) |
|
|
else: |
|
|
if self.stochastic: |
|
|
out_D = self.partials.Grad_Ds[self.D_type](batch_C, self.D_dict) |
|
|
out_V = self.partials.Grad_Vs[self.V_type](batch_C, self.V_dict) |
|
|
out = out_D + out_V + self.Sigma * math.sqrt(self.dt) * torch.randn_like(batch_C).to(batch_C) |
|
|
else: |
|
|
out_V = self.partials.Grad_Vs[self.V_type](batch_C, self.V_dict) |
|
|
out_D = self.partials.Grad_Ds[self.D_type](batch_C, self.D_dict) |
|
|
out = out_V + out_D |
|
|
return out |
|
|
|
|
|
|
|
|
|
|
|
|