peirong26's picture
Upload 187 files
2571f24 verified
# ported from https://github.com/pvigier/perlin-numpy
import math
import numpy as np
import torch
import torch.nn as nn
def gradient_f(X, batched = False, delta_lst = [1., 1., 1.]):
'''
Compute gradient of a torch tensor "X" in each direction
Upper-boundaries: Backward Difference
Non-boundaries & Upper-boundaries: Forward Difference
if X is batched: (n_batch, ...);
else: (...)
'''
device = X.device
dim = len(X.size()) - 1 if batched else len(X.size())
#print(batched)
#print(dim)
if dim == 1:
#print('dim = 1')
dX = torch.zeros(X.size(), dtype = torch.float, device = device)
X = X.permute(1, 0) if batched else X
dX = dX.permute(1, 0) if batched else dX
dX[-1] = X[-1] - X[-2] # Backward Difference
dX[:-1] = X[1:] - X[:-1] # Forward Difference
dX = dX.permute(1, 0) if batched else dX
dX /= delta_lst[0]
elif dim == 2:
#print('dim = 2')
dX = torch.zeros(X.size() + tuple([2]), dtype = torch.float, device = device)
X = X.permute(1, 2, 0) if batched else X
dX = dX.permute(1, 2, 3, 0) if batched else dX # put batch to last dim
dX[-1, :, 0] = X[-1, :] - X[-2, :] # Backward Difference
dX[:-1, :, 0] = X[1:] - X[:-1] # Forward Difference
dX[:, -1, 1] = X[:, -1] - X[:, -2] # Backward Difference
dX[:, :-1, 1] = X[:, 1:] - X[:, :-1] # Forward Difference
dX = dX.permute(3, 0, 1, 2) if batched else dX
dX[..., 0] /= delta_lst[0]
dX[..., 1] /= delta_lst[1]
elif dim == 3:
#print('dim = 3')
dX = torch.zeros(X.size() + tuple([3]), dtype = torch.float, device = device)
X = X.permute(1, 2, 3, 0) if batched else X
dX = dX.permute(1, 2, 3, 4, 0) if batched else dX
dX[-1, :, :, 0] = X[-1, :, :] - X[-2, :, :] # Backward Difference
dX[:-1, :, :, 0] = X[1:] - X[:-1] # Forward Difference
dX[:, -1, :, 1] = X[:, -1] - X[:, -2] # Backward Difference
dX[:, :-1, :, 1] = X[:, 1:] - X[:, :-1] # Forward Difference
dX[:, :, -1, 2] = X[:, :, -1] - X[:, :, -2] # Backward Difference
dX[:, :, :-1, 2] = X[:, :, 1:] - X[:, :, :-1] # Forward Difference
dX = dX.permute(4, 0, 1, 2, 3) if batched else dX
dX[..., 0] /= delta_lst[0]
dX[..., 1] /= delta_lst[1]
dX[..., 2] /= delta_lst[2]
return dX
def gradient_b(X, batched = False, delta_lst = [1., 1., 1.]):
'''
Compute gradient of a torch tensor "X" in each direction
Non-boundaries & Upper-boundaries: Backward Difference
Lower-boundaries: Forward Difference
if X is batched: (n_batch, ...);
else: (...)
'''
device = X.device
dim = len(X.size()) - 1 if batched else len(X.size())
#print(batched)
#print(dim)
if dim == 1:
#print('dim = 1')
dX = torch.zeros(X.size(), dtype = torch.float, device = device)
X = X.permute(1, 0) if batched else X
dX = dX.permute(1, 0) if batched else dX
dX[1:] = X[1:] - X[:-1] # Backward Difference
dX[0] = X[1] - X[0] # Forward Difference
dX = dX.permute(1, 0) if batched else dX
dX /= delta_lst[0]
elif dim == 2:
#print('dim = 2')
dX = torch.zeros(X.size() + tuple([2]), dtype = torch.float, device = device)
X = X.permute(1, 2, 0) if batched else X
dX = dX.permute(1, 2, 3, 0) if batched else dX # put batch to last dim
dX[1:, :, 0] = X[1:, :] - X[:-1, :] # Backward Difference
dX[0, :, 0] = X[1] - X[0] # Forward Difference
dX[:, 1:, 1] = X[:, 1:] - X[:, :-1] # Backward Difference
dX[:, 0, 1] = X[:, 1] - X[:, 0] # Forward Difference
dX = dX.permute(3, 0, 1, 2) if batched else dX
dX[..., 0] /= delta_lst[0]
dX[..., 1] /= delta_lst[1]
elif dim == 3:
#print('dim = 3')
dX = torch.zeros(X.size() + tuple([3]), dtype = torch.float, device = device)
X = X.permute(1, 2, 3, 0) if batched else X
dX = dX.permute(1, 2, 3, 4, 0) if batched else dX
dX[1:, :, :, 0] = X[1:, :, :] - X[:-1, :, :] # Backward Difference
dX[0, :, :, 0] = X[1] - X[0] # Forward Difference
dX[:, 1:, :, 1] = X[:, 1:] - X[:, :-1] # Backward Difference
dX[:, 0, :, 1] = X[:, 1] - X[:, 0] # Forward Difference
dX[:, :, 1:, 2] = X[:, :, 1:] - X[:, :, :-1] # Backward Difference
dX[:, :, 0, 2] = X[:, :, 1] - X[:, :, 0] # Forward Difference
dX = dX.permute(4, 0, 1, 2, 3) if batched else dX
dX[..., 0] /= delta_lst[0]
dX[..., 1] /= delta_lst[1]
dX[..., 2] /= delta_lst[2]
return dX
def gradient_c(X, batched = False, delta_lst = [1., 1., 1.]):
'''
Compute gradient of a torch tensor "X" in each direction
Non-boundaries: Central Difference
Upper-boundaries: Backward Difference
Lower-boundaries: Forward Difference
if X is batched: (n_batch, ...);
else: (...)
'''
device = X.device
dim = len(X.size()) - 1 if batched else len(X.size())
#print(X.size())
#print(batched)
#print(dim)
if dim == 1:
#print('dim = 1')
dX = torch.zeros(X.size(), dtype = torch.float, device = device)
X = X.permute(1, 0) if batched else X
dX = dX.permute(1, 0) if batched else dX
dX[1:-1] = (X[2:] - X[:-2]) / 2 # Central Difference
dX[0] = X[1] - X[0] # Forward Difference
dX[-1] = X[-1] - X[-2] # Backward Difference
dX = dX.permute(1, 0) if batched else dX
dX /= delta_lst[0]
elif dim == 2:
#print('dim = 2')
dX = torch.zeros(X.size() + tuple([2]), dtype = torch.float, device = device)
X = X.permute(1, 2, 0) if batched else X
dX = dX.permute(1, 2, 3, 0) if batched else dX # put batch to last dim
dX[1:-1, :, 0] = (X[2:, :] - X[:-2, :]) / 2
dX[0, :, 0] = X[1] - X[0]
dX[-1, :, 0] = X[-1] - X[-2]
dX[:, 1:-1, 1] = (X[:, 2:] - X[:, :-2]) / 2
dX[:, 0, 1] = X[:, 1] - X[:, 0]
dX[:, -1, 1] = X[:, -1] - X[:, -2]
dX = dX.permute(3, 0, 1, 2) if batched else dX
dX[..., 0] /= delta_lst[0]
dX[..., 1] /= delta_lst[1]
elif dim == 3:
#print('dim = 3')
dX = torch.zeros(X.size() + tuple([3]), dtype = torch.float, device = device)
X = X.permute(1, 2, 3, 0) if batched else X
dX = dX.permute(1, 2, 3, 4, 0) if batched else dX
dX[1:-1, :, :, 0] = (X[2:, :, :] - X[:-2, :, :]) / 2
dX[0, :, :, 0] = X[1] - X[0]
dX[-1, :, :, 0] = X[-1] - X[-2]
dX[:, 1:-1, :, 1] = (X[:, 2:, :] - X[:, :-2, :]) / 2
dX[:, 0, :, 1] = X[:, 1] - X[:, 0]
dX[:, -1, :, 1] = X[:, -1] - X[:, -2]
dX[:, :, 1:-1, 2] = (X[:, :, 2:] - X[:, :, :-2]) / 2
dX[:, :, 0, 2] = X[:, :, 1] - X[:, :, 0]
dX[:, :, -1, 2] = X[:, :, -1] - X[:, :, -2]
dX = dX.permute(4, 0, 1, 2, 3) if batched else dX
dX[..., 0] /= delta_lst[0]
dX[..., 1] /= delta_lst[1]
dX[..., 2] /= delta_lst[2]
return dX
def gradient_c_numpy(X, batched = False, delta_lst = [1., 1., 1.]):
'''
Compute gradient of a Numpy array "X" in each direction
Non-boundaries: Central Difference
Upper-boundaries: Backward Difference
Lower-boundaries: Forward Difference
if X is batched: (n_batch, ...);
else: (...)
'''
dim = len(X.shape) - 1 if batched else len(X.shape)
#print(dim)
if dim == 1:
#print('dim = 1')
X = np.transpose(X, (1, 0)) if batched else X
dX = np.zeros(X.shapee).astype(float)
dX[1:-1] = (X[2:] - X[:-2]) / 2 # Central Difference
dX[0] = X[1] - X[0] # Forward Difference
dX[-1] = X[-1] - X[-2] # Backward Difference
dX = np.transpose(X, (1, 0)) if batched else dX
dX /= delta_lst[0]
elif dim == 2:
#print('dim = 2')
dX = np.zeros(X.shape + tuple([2])).astype(float)
X = np.transpose(X, (1, 2, 0)) if batched else X
dX = np.transpose(dX, (1, 2, 3, 0)) if batched else dX # put batch to last dim
dX[1:-1, :, 0] = (X[2:, :] - X[:-2, :]) / 2
dX[0, :, 0] = X[1] - X[0]
dX[-1, :, 0] = X[-1] - X[-2]
dX[:, 1:-1, 1] = (X[:, 2:] - X[:, :-2]) / 2
dX[:, 0, 1] = X[:, 1] - X[:, 0]
dX[:, -1, 1] = X[:, -1] - X[:, -2]
dX = np.transpose(dX, (3, 0, 1, 2)) if batched else dX
dX[..., 0] /= delta_lst[0]
dX[..., 1] /= delta_lst[1]
elif dim == 3:
#print('dim = 3')
dX = np.zeros(X.shape + tuple([3])).astype(float)
X = np.transpose(X, (1, 2, 3, 0)) if batched else X
dX = np.transpose(dX, (1, 2, 3, 4, 0)) if batched else dX # put batch to last dim
dX[1:-1, :, :, 0] = (X[2:, :, :] - X[:-2, :, :]) / 2
dX[0, :, :, 0] = X[1] - X[0]
dX[-1, :, :, 0] = X[-1] - X[-2]
dX[:, 1:-1, :, 1] = (X[:, 2:, :] - X[:, :-2, :]) / 2
dX[:, 0, :, 1] = X[:, 1] - X[:, 0]
dX[:, -1, :, 1] = X[:, -1] - X[:, -2]
dX[:, :, 1:-1, 2] = (X[:, :, 2:] - X[:, :, :-2]) / 2
dX[:, :, 0, 2] = X[:, :, 1] - X[:, :, 0]
dX[:, :, -1, 2] = X[:, :, -1] - X[:, :, -2]
dX = np.transpose(dX, (4, 0, 1, 2, 3)) if batched else dX
dX[..., 0] /= delta_lst[0]
dX[..., 1] /= delta_lst[1]
dX[..., 2] /= delta_lst[2]
return dX
def gradient_f_numpy(X, batched = False, delta_lst = [1., 1., 1.]):
'''
Compute gradient of a torch tensor "X" in each direction
Upper-boundaries: Backward Difference
Non-boundaries & Upper-boundaries: Forward Difference
if X is batched: (n_batch, ...);
else: (...)
'''
dim = len(X.shape) - 1 if batched else len(X.shape)
#print(dim)
if dim == 1:
#print('dim = 1')
X = np.transpose(X, (1, 0)) if batched else X
dX = np.zeros(X.shapee).astype(float)
dX[-1] = X[-1] - X[-2] # Backward Difference
dX[:-1] = X[1:] - X[:-1] # Forward Difference
dX = np.transpose(X, (1, 0)) if batched else dX
dX /= delta_lst[0]
elif dim == 2:
#print('dim = 2')
dX = np.zeros(X.shape + tuple([2])).astype(float)
X = np.transpose(X, (1, 2, 0)) if batched else X
dX = np.transpose(dX, (1, 2, 3, 0)) if batched else dX # put batch to last dim
dX[-1, :, 0] = X[-1, :] - X[-2, :] # Backward Difference
dX[:-1, :, 0] = X[1:] - X[:-1] # Forward Difference
dX[:, -1, 1] = X[:, -1] - X[:, -2] # Backward Difference
dX[:, :-1, 1] = X[:, 1:] - X[:, :-1] # Forward Difference
dX = np.transpose(dX, (3, 0, 1, 2)) if batched else dX
dX[..., 0] /= delta_lst[0]
dX[..., 1] /= delta_lst[1]
elif dim == 3:
#print('dim = 3')
dX = np.zeros(X.shape + tuple([3])).astype(float)
X = np.transpose(X, (1, 2, 3, 0)) if batched else X
dX = np.transpose(dX, (1, 2, 3, 4, 0)) if batched else dX # put batch to last dim
dX[-1, :, :, 0] = X[-1, :, :] - X[-2, :, :] # Backward Difference
dX[:-1, :, :, 0] = X[1:] - X[:-1] # Forward Difference
dX[:, -1, :, 1] = X[:, -1] - X[:, -2] # Backward Difference
dX[:, :-1, :, 1] = X[:, 1:] - X[:, :-1] # Forward Difference
dX[:, :, -1, 2] = X[:, :, -1] - X[:, :, -2] # Backward Difference
dX[:, :, :-1, 2] = X[:, :, 1:] - X[:, :, :-1] # Forward Difference
dX = np.transpose(dX, (4, 0, 1, 2, 3)) if batched else dX
dX[..., 0] /= delta_lst[0]
dX[..., 1] /= delta_lst[1]
dX[..., 2] /= delta_lst[2]
return dX
class Upwind(object):
'''
Backward if > 0, forward if <= 0
'''
def __init__(self, U, data_spacing = [1., 1, 1.], batched = True):
self.U = U # (s, r, c)
self.batched = batched
self.data_spacing = data_spacing
self.dim = len(self.U.size()) - 1 if batched else len(self.U.size())
self.I = torch.ones(self.U.size(), dtype = torch.float, device = U.device)
def dX(self, FGx):
dXf = gradient_f(self.U, batched = self.batched, delta_lst = self.data_spacing)[..., 0]
dXb = gradient_b(self.U, batched = self.batched, delta_lst = self.data_spacing)[..., 0]
Xflag = (FGx > 0).float()
return dXf * (self.I - Xflag) + dXb * Xflag
def dY(self, FGy):
dYf = gradient_f(self.U, batched = self.batched, delta_lst = self.data_spacing)[..., 1]
dYb = gradient_b(self.U, batched = self.batched, delta_lst = self.data_spacing)[..., 1]
Yflag = (FGy > 0).float()
return dYf * (self.I - Yflag) + dYb * Yflag
def dZ(self, FGz):
dZf = gradient_f(self.U, batched = self.batched, delta_lst = self.data_spacing)[..., 2]
dZb = gradient_b(self.U, batched = self.batched, delta_lst = self.data_spacing)[..., 2]
Zflag = (FGz > 0).float()
return dZf * (self.I - Zflag) + dZb * Zflag
class AdvDiffPartial(nn.Module):
def __init__(self, data_spacing, device):
super(AdvDiffPartial, self).__init__()
self.dimension = len(data_spacing) # (slc, row, col)
self.device = device
self.data_spacing = data_spacing
@property
def Grad_Ds(self):
return {
'constant': self.Grad_constantD,
'scalar': self.Grad_scalarD,
'diag': self.Grad_diagD,
'full': self.Grad_fullD,
'full_dual': self.Grad_fullD,
'full_spectral':self.Grad_fullD,
'full_cholesky': self.Grad_fullD,
'full_symmetric': self.Grad_fullD
}
@property
def Grad_Vs(self):
return {
'constant': self.Grad_constantV,
'scalar': self.Grad_scalarV,
'vector': self.Grad_vectorV, # For general V w/o div-free TODO self.Grad_vectorV
'vector_div_free': self.Grad_div_free_vectorV,
'vector_div_free_clebsch': self.Grad_div_free_vectorV,
'vector_div_free_stream': self.Grad_div_free_vectorV,
'vector_div_free_stream_gauge': self.Grad_div_free_vectorV,
}
def Grad_constantD(self, C, Dlst):
if self.dimension == 1:
return Dlst['D'] * (self.ddXc(C))
elif self.dimension == 2:
return Dlst['D'] * (self.ddXc(C) + self.ddYc(C))
elif self.dimension == 3:
return Dlst['D'] * (self.ddXc(C) + self.ddYc(C) + self.ddZc(C))
def Grad_constant_tensorD(self, C, Dlst):
if self.dimension == 1:
raise NotImplementedError
elif self.dimension == 2:
dC_c = self.dc(C)
dC_f = self.df(C)
return Dlst['Dxx'] * self.dXb(dC_f[..., 0]) +\
Dlst['Dxy'] * self.dXb(dC_f[..., 1]) + Dlst['Dxy'] * self.dYb(dC_f[..., 0]) +\
Dlst['Dyy'] * self.dYb(dC_f[..., 1])
elif self.dimension == 3:
dC_c = self.dc(C)
dC_f = self.df(C)
return Dlst['Dxx'] * self.dXb(dC_f[..., 0]) + Dlst['Dyy'] * self.dYb(dC_f[..., 1]) + Dlst['Dzz'] * self.dZb(dC_f[..., 2]) + \
Dlst['Dxy'] * (self.dXb(dC_f[..., 1]) + self.dYb(dC_f[..., 0])) + \
Dlst['Dyz'] * (self.dYb(dC_f[..., 2]) + self.dZb(dC_f[..., 1])) + \
Dlst['Dxz'] * (self.dZb(dC_f[..., 0]) + self.dXb(dC_f[..., 2]))
def Grad_scalarD(self, C, Dlst): # batch_C: (batch_size, (slc), row, col)
# Expanded version: \nabla (D \nabla C) => \nabla D \cdot \nabla C (part (a)) + D \Delta C (part (b)) #
# NOTE: Work better than Central Differences !!! #
# Nested Forward-Backward Difference Scheme in part (b)#
if self.dimension == 1:
dC = gradient_c(C, batched = True, delta_lst = self.data_spacing)
return gradient_c(Dlst['D'], batched = True, delta_lst = self.data_spacing) * dC + \
Dlst['D'] * gradient_c(dC, batched = True, delta_lst = self.data_spacing)
else: # (dimension = 2 or 3)
dC_c = gradient_c(C, batched = True, delta_lst = self.data_spacing)
dC_f = gradient_f(C, batched = True, delta_lst = self.data_spacing)
dD_c = gradient_c(Dlst['D'], batched = True, delta_lst = self.data_spacing)
out = (dD_c * dC_c).sum(-1)
for dim in range(dC_f.size(-1)):
out += Dlst['D'] * gradient_b(dC_f[..., dim], batched = True, delta_lst = self.data_spacing)[..., dim]
return out
def Grad_diagD(self, C, Dlst):
# Expanded version #
if self.dimension == 1:
raise NotImplementedError('diag_D is not supported for 1D version of diffusivity')
elif self.dimension == 2:
dC_c = self.dc(C)
dC_f = self.df(C)
return self.dXc(Dlst['Dxx']) * dC_c[..., 0] + Dlst['Dxx'] * self.dXb(dC_f[..., 0]) +\
self.dYc(Dlst['Dyy']) * dC_c[..., 1] + Dlst['Dyy'] * self.dYb(dC_f[..., 1])
elif self.dimension == 3:
dC_c = self.dc(C)
dC_f = self.df(C)
return self.dXc(Dlst['Dxx']) * dC_c[..., 0] + Dlst['Dxx'] * self.dXb(dC_f[..., 0]) +\
self.dYc(Dlst['Dyy']) * dC_c[..., 1] + Dlst['Dyy'] * self.dYb(dC_f[..., 1]) +\
self.dZc(Dlst['Dzz']) * dC_c[..., 2] + Dlst['Dzz'] * self.dZb(dC_f[..., 2])
def Grad_fullD(self, C, Dlst):
# Expanded version #
'''https://github.com/uncbiag/PIANOinD/blob/master/Doc/PIANOinD.pdf'''
if self.dimension == 1:
raise NotImplementedError('full_D is not supported for 1D version of diffusivity')
elif self.dimension == 2:
dC_c = self.dc(C)
dC_f = self.df(C)
return self.dXc(Dlst['Dxx']) * dC_c[..., 0] + Dlst['Dxx'] * self.dXb(dC_f[..., 0]) +\
self.dXc(Dlst['Dxy']) * dC_c[..., 1] + Dlst['Dxy'] * self.dXb(dC_f[..., 1]) +\
self.dYc(Dlst['Dxy']) * dC_c[..., 0] + Dlst['Dxy'] * self.dYb(dC_f[..., 0]) +\
self.dYc(Dlst['Dyy']) * dC_c[..., 1] + Dlst['Dyy'] * self.dYb(dC_f[..., 1])
elif self.dimension == 3:
dC_c = self.dc(C)
dC_f = self.df(C)
return (self.dXc(Dlst['Dxx']) + self.dYc(Dlst['Dxy']) + self.dZc(Dlst['Dxz'])) * dC_c[..., 0] + \
(self.dXc(Dlst['Dxy']) + self.dYc(Dlst['Dyy']) + self.dZc(Dlst['Dyz'])) * dC_c[..., 1] + \
(self.dXc(Dlst['Dxz']) + self.dYc(Dlst['Dyz']) + self.dZc(Dlst['Dzz'])) * dC_c[..., 2] + \
Dlst['Dxx'] * self.dXb(dC_f[..., 0]) + Dlst['Dyy'] * self.dYb(dC_f[..., 1]) + Dlst['Dzz'] * self.dZb(dC_f[..., 2]) + \
Dlst['Dxy'] * (self.dXb(dC_f[..., 1]) + self.dYb(dC_f[..., 0])) + \
Dlst['Dyz'] * (self.dYb(dC_f[..., 2]) + self.dZb(dC_f[..., 1])) + \
Dlst['Dxz'] * (self.dZb(dC_f[..., 0]) + self.dXb(dC_f[..., 2]))
def Grad_constantV(self, C, Vlst):
if len(Vlst['V'].size()) == 1:
if self.dimension == 1:
return - Vlst['V'] * self.dXb(C) if Vlst['V'] > 0 else - Vlst['V'] * self.dXf(C)
elif self.dimension == 2:
return - Vlst['V'] * (self.dXb(C) + self.dYb(C)) if Vlst['V'] > 0 else - Vlst['V'] * (self.dXf(C) + self.dYf(C))
elif self.dimension == 3:
return - Vlst['V'] * (self.dXb(C) + self.dYb(C) + self.dZb(C)) if Vlst['V'] > 0 else - Vlst['V'] * (self.dXf(C) + self.dYf(C) + self.dZf(C))
else:
if self.dimension == 1:
return - Vlst['V'] * self.dXb(C) if Vlst['V'][0, 0] > 0 else - Vlst['V'] * self.dXf(C)
elif self.dimension == 2:
return - Vlst['V'] * (self.dXb(C) + self.dYb(C)) if Vlst['V'][0, 0, 0] > 0 else - Vlst['V'] * (self.dXf(C) + self.dYf(C))
elif self.dimension == 3:
return - Vlst['V'] * (self.dXb(C) + self.dYb(C) + self.dZb(C)) if Vlst['V'][0, 0, 0, 0] > 0 else - Vlst['V'] * (self.dXf(C) + self.dYf(C) + self.dZf(C))
def Grad_constant_vectorV(self, C, Vlst):
if self.dimension == 1:
raise NotImplementedError
elif self.dimension == 2:
out_x = - Vlst['Vx'] * (self.dXb(C) + self.dYb(C)) if Vlst['Vx'][0, 0, 0] > 0 else - Vlst['Vx'] * (self.dXf(C) + self.dYf(C))
out_y = - Vlst['Vy'] * (self.dXb(C) + self.dYb(C)) if Vlst['Vy'][0, 0, 0] > 0 else - Vlst['Vy'] * (self.dXf(C) + self.dYf(C))
return out_x + out_y
elif self.dimension == 3:
out_x = - Vlst['Vx'] * (self.dXb(C) + self.dYb(C)) if Vlst['Vx'][0, 0, 0] > 0 else - Vlst['Vx'] * (self.dXf(C) + self.dYf(C))
out_y = - Vlst['Vy'] * (self.dXb(C) + self.dYb(C)) if Vlst['Vy'][0, 0, 0] > 0 else - Vlst['Vy'] * (self.dXf(C) + self.dYf(C))
out_z = - Vlst['Vz'] * (self.dXb(C) + self.dYb(C)) if Vlst['Vz'][0, 0, 0] > 0 else - Vlst['Vz'] * (self.dXf(C) + self.dYf(C))
return out_x + out_y + out_z
def Grad_SimscalarV(self, C, Vlst):
V = Vlst['V']
Upwind_C = Upwind(C, self.data_spacing)
if self.dimension == 1:
C_x = Upwind_C.dX(V)
return - V * C_x
if self.dimension == 2:
C_x, C_y = Upwind_C.dX(V), Upwind_C.dY(V)
return - V * (C_x + C_y)
if self.dimension == 3:
C_x, C_y, C_z = Upwind_C.dX(V), Upwind_C.dY(V), Upwind_C.dZ(V)
return - V * (C_x + C_y + C_z)
def Grad_scalarV(self, C, Vlst):
V = Vlst['V']
Upwind_C = Upwind(C, self.data_spacing)
dV = gradient_c(V, batched = True, delta_lst = self.data_spacing)
if self.dimension == 1:
C_x = Upwind_C.dX(V)
return - V * C_x - C * dV
elif self.dimension == 2:
C_x, C_y = Upwind_C.dX(V), Upwind_C.dY(V)
return - V * (C_x + C_y) - C * dV.sum(-1)
elif self.dimension == 3:
C_x, C_y, C_z = Upwind_C.dX(V), Upwind_C.dY(V), Upwind_C.dZ(V)
return - V * (C_x + C_y + C_z) - C * dV.sum(-1)
def Grad_div_free_vectorV(self, C, Vlst):
''' For divergence-free-by-definition velocity'''
if self.dimension == 1:
raise NotImplementedError('clebschVector is not supported for 1D version of velocity')
Upwind_C = Upwind(C, self.data_spacing)
C_x, C_y = Upwind_C.dX(Vlst['Vx']), Upwind_C.dY(Vlst['Vy'])
if self.dimension == 2:
return - (Vlst['Vx'] * C_x + Vlst['Vy'] * C_y)
elif self.dimension == 3:
C_z = Upwind_C.dZ(Vlst['Vz'])
return - (Vlst['Vx'] * C_x + Vlst['Vy'] * C_y + Vlst['Vz'] * C_z)
def Grad_vectorV(self, C, Vlst):
''' For general velocity'''
if self.dimension == 1:
raise NotImplementedError('vector is not supported for 1D version of velocity')
Upwind_C = Upwind(C, self.data_spacing)
C_x, C_y = Upwind_C.dX(Vlst['Vx']), Upwind_C.dY(Vlst['Vy'])
Vx_x = self.dXc(Vlst['Vx'])
Vy_y = self.dYc(Vlst['Vy'])
if self.dimension == 2:
return - (Vlst['Vx'] * C_x + Vlst['Vy'] * C_y) - C * (Vx_x + Vy_y)
if self.dimension == 3:
C_z = Upwind_C.dZ(Vlst['Vz'])
Vz_z = self.dZc(Vlst['Vz'])
return - (Vlst['Vx'] * C_x + Vlst['Vy'] * C_y + Vlst['Vz'] * C_z) - C * (Vx_x + Vy_y + Vz_z)
################# Utilities #################
def db(self, X):
return gradient_b(X, batched = True, delta_lst = self.data_spacing)
def df(self, X):
return gradient_f(X, batched = True, delta_lst = self.data_spacing)
def dc(self, X):
return gradient_c(X, batched = True, delta_lst = self.data_spacing)
def dXb(self, X):
return gradient_b(X, batched = True, delta_lst = self.data_spacing)[..., 0]
def dXf(self, X):
return gradient_f(X, batched = True, delta_lst = self.data_spacing)[..., 0]
def dXc(self, X):
return gradient_c(X, batched = True, delta_lst = self.data_spacing)[..., 0]
def dYb(self, X):
return gradient_b(X, batched = True, delta_lst = self.data_spacing)[..., 1]
def dYf(self, X):
return gradient_f(X, batched = True, delta_lst = self.data_spacing)[..., 1]
def dYc(self, X):
return gradient_c(X, batched = True, delta_lst = self.data_spacing)[..., 1]
def dZb(self, X):
return gradient_b(X, batched = True, delta_lst = self.data_spacing)[..., 2]
def dZf(self, X):
return gradient_f(X, batched = True, delta_lst = self.data_spacing)[..., 2]
def dZc(self, X):
return gradient_c(X, batched = True, delta_lst = self.data_spacing)[..., 2]
def ddXc(self, X):
return gradient_b(gradient_f(X, batched = True, delta_lst = self.data_spacing)[..., 0],
batched = True, delta_lst = self.data_spacing)[..., 0]
def ddYc(self, X):
return gradient_b(gradient_f(X, batched = True, delta_lst = self.data_spacing)[..., 1],
batched = True, delta_lst = self.data_spacing)[..., 1]
def ddZc(self, X):
return gradient_b(gradient_f(X, batched = True, delta_lst = self.data_spacing)[..., 2],
batched = True, delta_lst = self.data_spacing)[..., 2]
class AdvDiffPDE(nn.Module):
'''
Plain advection-diffusion PDE solver for pre-set V_lst and D_lst (1D, 2D, 3D) for forward time series simulation
'''
def __init__(self, data_spacing, perf_pattern, D_type='scalar', V_type='vector', BC=None, dt=0.1, V_dict={}, D_dict={}, stochastic=False, device='cpu'):
super(AdvDiffPDE, self).__init__()
self.BC = BC
self.dt = dt
self.dimension = len(data_spacing)
self.perf_pattern = perf_pattern
self.partials = AdvDiffPartial(data_spacing, device)
self.D_type, self.V_type = D_type, V_type
self.stochastic = stochastic
self.V_dict, self.D_dict = V_dict, D_dict
self.Sigma, self.Sigma_V, self.Sigma_D = 0., 0., 0. # Only for initialization #
if self.dimension == 1:
self.neumann_BC = torch.nn.ReplicationPad1d(1)
elif self.dimension == 2:
self.neumann_BC = torch.nn.ReplicationPad2d(1)
elif self.dimension == 3:
self.neumann_BC = torch.nn.ReplicationPad3d(1)
else:
raise ValueError('Unsupported dimension: %d' % self.dimension)
@property
def set_BC(self):
# NOTE For bondary condition of mass concentration #
'''X: (n_batch, spatial_shape)'''
if self.BC == 'neumann' or self.BC == 'cauchy':
if self.dimension == 1:
return lambda X: self.neumann_BC(X[:, 1:-1].unsqueeze(dim=1))[:,0]
elif self.dimension == 2:
return lambda X: self.neumann_BC(X[:, 1:-1, 1:-1].unsqueeze(dim=1))[:,0]
elif self.dimension == 3:
return lambda X: self.neumann_BC(X[:, 1:-1, 1:-1, 1:-1].unsqueeze(dim=1))[:,0]
else:
raise NotImplementedError('Unsupported B.C.!')
elif self.BC == 'dirichlet_neumann' or self.BC == 'source_neumann':
ctrl_wdth = 1
if self.dimension == 1:
self.dirichlet_BC = torch.nn.ReplicationPad1d(ctrl_wdth)
return lambda X: self.dirichlet_BC(X[:, ctrl_wdth : -ctrl_wdth].unsqueeze(dim=1))[:,0]
elif self.dimension == 2:
self.dirichlet_BC = torch.nn.ReplicationPad2d(ctrl_wdth)
return lambda X: self.dirichlet_BC(X[:, ctrl_wdth : -ctrl_wdth, ctrl_wdth : -ctrl_wdth].unsqueeze(dim=1))[:,0]
elif self.dimension == 3:
self.dirichlet_BC = torch.nn.ReplicationPad3d(ctrl_wdth)
return lambda X: self.neumann_dirichlet_BCBC(X[:, ctrl_wdth : -ctrl_wdth, ctrl_wdth : -ctrl_wdth, ctrl_wdth : -ctrl_wdth].unsqueeze(dim=1))[:,0]
else:
raise NotImplementedError('Unsupported B.C.!')
else:
return lambda X: X
def forward(self, t, batch_C):
'''
t: (batch_size,)
batch_C: (batch_size, (slc,) row, col)
'''
batch_size = batch_C.size(0)
batch_C = self.set_BC(batch_C)
if 'diff' not in self.perf_pattern:
out = self.partials.Grad_Vs[self.V_type](batch_C, self.V_dict)
if self.stochastic:
out = out + self.Sigma * math.sqrt(self.dt) * torch.randn_like(batch_C).to(batch_C)
elif 'adv' not in self.perf_pattern:
out = self.partials.Grad_Ds[self.D_type](batch_C, self.D_dict)
if self.stochastic:
out = out + self.Sigma * math.sqrt(self.dt) * torch.randn_like(batch_C).to(batch_C)
else:
if self.stochastic:
out_D = self.partials.Grad_Ds[self.D_type](batch_C, self.D_dict)
out_V = self.partials.Grad_Vs[self.V_type](batch_C, self.V_dict)
out = out_D + out_V + self.Sigma * math.sqrt(self.dt) * torch.randn_like(batch_C).to(batch_C)
else:
out_V = self.partials.Grad_Vs[self.V_type](batch_C, self.V_dict)
out_D = self.partials.Grad_Ds[self.D_type](batch_C, self.D_dict)
out = out_V + out_D
return out