Spaces:
Sleeping
Sleeping
File size: 6,413 Bytes
66c9c8a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 | """
This example simulates a convection-diffusion PDE using Discontinuous Galerkin
with upwind transport and Symmetric Interior Penalty
D phi / dt - nu d2 phi / dx^2 = 0
"""
import argparse
import warp as wp
import warp.fem as fem
from warp.sparse import bsr_axpy
# Import example utilities
# Make sure that works both when imported as module and run as standalone file
try:
from .bsr_utils import bsr_to_scipy
from .mesh_utils import gen_trimesh, gen_quadmesh
from .plot_utils import Plot
from .example_convection_diffusion import (
initial_condition,
velocity,
inertia_form,
diffusion_form,
)
except ImportError:
from bsr_utils import bsr_to_scipy
from mesh_utils import gen_trimesh, gen_quadmesh
from plot_utils import Plot
from example_convection_diffusion import (
initial_condition,
velocity,
inertia_form,
diffusion_form,
)
# Non-SPD system, solve using scipy
from scipy.sparse.linalg import factorized
# Standard transport term, on cells' interior
@fem.integrand
def transport_form(s: fem.Sample, domain: fem.Domain, phi: fem.Field, psi: fem.Field, ang_vel: float):
pos = domain(s)
vel = velocity(pos, ang_vel)
return psi(s) * wp.dot(fem.grad(phi, s), vel)
# Upwind flux, on cell sides
@fem.integrand
def upwind_transport_form(s: fem.Sample, domain: fem.Domain, phi: fem.Field, psi: fem.Field, ang_vel: float):
pos = domain(s)
vel = velocity(pos, ang_vel)
vel_n = wp.dot(vel, fem.normal(domain, s))
return fem.jump(phi, s) * (-fem.average(psi, s) * vel_n + 0.5 * fem.jump(psi, s) * wp.abs(vel_n))
# Symmetric-Interior-Penalty diffusion term (See Pietro Ern 2012)
@fem.integrand
def sip_diffusion_form(
s: fem.Sample,
domain: fem.Domain,
psi: fem.Field,
phi: fem.Field,
):
nor = fem.normal(domain, s)
penalty = fem.measure_ratio(domain, s) * float(fem.degree(psi) * fem.degree(phi))
return penalty * fem.jump(phi, s) * fem.jump(psi, s) - (
wp.dot(fem.grad_average(phi, s), nor) * fem.jump(psi, s)
+ wp.dot(fem.grad_average(psi, s), nor) * fem.jump(phi, s)
)
class Example:
parser = argparse.ArgumentParser()
parser.add_argument("--resolution", type=int, default=50)
parser.add_argument("--degree", type=int, default=2)
parser.add_argument("--num_frames", type=int, default=100)
parser.add_argument("--viscosity", type=float, default=0.001)
parser.add_argument("--ang_vel", type=float, default=1.0)
parser.add_argument("--mesh", choices=("grid", "tri", "quad"), default="grid", help="Mesh type")
def __init__(self, stage=None, quiet=False, args=None, **kwargs):
if args is None:
# Read args from kwargs, add default arg values from parser
args = argparse.Namespace(**kwargs)
args = Example.parser.parse_args(args=[], namespace=args)
self._args = args
self._quiet = quiet
res = args.resolution
self.sim_dt = 1.0 / (args.ang_vel * res)
self.current_frame = 0
if args.mesh == "tri":
positions, tri_vidx = gen_trimesh(res=wp.vec2i(args.resolution))
geo = fem.Trimesh2D(tri_vertex_indices=tri_vidx, positions=positions)
elif args.mesh == "quad":
positions, quad_vidx = gen_quadmesh(res=wp.vec2i(args.resolution))
geo = fem.Quadmesh2D(quad_vertex_indices=quad_vidx, positions=positions)
else:
geo = fem.Grid2D(res=wp.vec2i(args.resolution))
domain = fem.Cells(geometry=geo)
sides = fem.Sides(geo)
scalar_space = fem.make_polynomial_space(
geo,
discontinuous=True,
degree=args.degree,
family=fem.Polynomial.GAUSS_LEGENDRE,
)
# Assemble transport, diffusion and inertia matrices
self._test = fem.make_test(space=scalar_space, domain=domain)
trial = fem.make_trial(space=scalar_space, domain=domain)
matrix_inertia = fem.integrate(
inertia_form,
fields={"phi": trial, "psi": self._test},
values={"dt": self.sim_dt},
)
matrix_transport = fem.integrate(
transport_form,
fields={"phi": trial, "psi": self._test},
values={"ang_vel": args.ang_vel},
)
side_test = fem.make_test(space=scalar_space, domain=sides)
side_trial = fem.make_trial(space=scalar_space, domain=sides)
bsr_axpy(
fem.integrate(
upwind_transport_form,
fields={"phi": side_trial, "psi": side_test},
values={"ang_vel": args.ang_vel},
),
y=matrix_transport,
)
matrix_diffusion = fem.integrate(
diffusion_form,
fields={"u": trial, "v": self._test},
)
bsr_axpy(
fem.integrate(
sip_diffusion_form,
fields={"phi": side_trial, "psi": side_test},
),
y=matrix_diffusion,
)
self._matrix = matrix_inertia
bsr_axpy(x=matrix_transport, y=self._matrix)
bsr_axpy(x=matrix_diffusion, y=self._matrix, alpha=args.viscosity)
# Compute LU factorization of system matrix
self._solve_lu = factorized(bsr_to_scipy(self._matrix))
# Initial condition
self._phi_field = scalar_space.make_field()
fem.interpolate(initial_condition, dest=self._phi_field)
self.renderer = Plot(stage)
self.renderer.add_surface("phi", self._phi_field)
def update(self):
self.current_frame += 1
rhs = fem.integrate(
inertia_form,
fields={"phi": self._phi_field, "psi": self._test},
values={"dt": self.sim_dt},
)
self._phi_field.dof_values = self._solve_lu(rhs.numpy())
def render(self):
self.renderer.begin_frame(time = self.current_frame * self.sim_dt)
self.renderer.add_surface("phi", self._phi_field)
self.renderer.end_frame()
if __name__ == "__main__":
wp.init()
wp.set_module_options({"enable_backward": False})
args = Example.parser.parse_args()
example = Example(args=args)
for k in range(args.num_frames):
print(f"Frame {k}:")
example.update()
example.render()
example.renderer.plot()
|