qbhf2's picture
added NvidiaWarp and GarmentCode repos
66c9c8a
"""
This example solves a 2D Navier-Stokes flow problem
Du/dt -nu D(u) + grad p = 0
Div u = 0
with (hard) velocity-Dirichlet boundary conditions
and using semi-Lagrangian advection
"""
import argparse
import warp as wp
import warp.fem as fem
import numpy as np
from warp.fem.utils import array_axpy
from warp.sparse import bsr_mm, bsr_mv, bsr_copy
try:
from .bsr_utils import bsr_to_scipy
from .plot_utils import Plot
from .mesh_utils import gen_trimesh
except ImportError:
from bsr_utils import bsr_to_scipy
from plot_utils import Plot
from mesh_utils import gen_trimesh
# need to solve a saddle-point system, use scopy for simplicity
from scipy.sparse import bmat
from scipy.sparse.linalg import factorized
import matplotlib.pyplot as plt
import matplotlib.animation as animation
@fem.integrand
def u_boundary_value(s: fem.Sample, domain: fem.Domain, v: fem.Field, top_vel: float):
# Horizontal velocity on top of domain, zero elsewhere
if domain(s)[1] == 1.0:
return wp.dot(wp.vec2f(top_vel, 0.0), v(s))
return wp.dot(wp.vec2f(0.0, 0.0), v(s))
@fem.integrand
def mass_form(
s: fem.Sample,
u: fem.Field,
v: fem.Field,
):
return wp.dot(u(s), v(s))
@fem.integrand
def inertia_form(s: fem.Sample, u: fem.Field, v: fem.Field, dt: float):
return mass_form(s, u, v) / dt
@fem.integrand
def viscosity_form(s: fem.Sample, u: fem.Field, v: fem.Field, nu: float):
return 2.0 * nu * wp.ddot(fem.D(u, s), fem.D(v, s))
@fem.integrand
def viscosity_and_inertia_form(s: fem.Sample, u: fem.Field, v: fem.Field, dt: float, nu: float):
return inertia_form(s, u, v, dt) + viscosity_form(s, u, v, nu)
@fem.integrand
def transported_inertia_form(s: fem.Sample, domain: fem.Domain, u: fem.Field, v: fem.Field, dt: float):
pos = domain(s)
vel = u(s)
conv_pos = pos - 0.5 * vel * dt
conv_s = fem.lookup(domain, conv_pos, s)
conv_vel = u(conv_s)
conv_pos = conv_pos - 0.5 * conv_vel * dt
conv_vel = u(fem.lookup(domain, conv_pos, conv_s))
return wp.dot(conv_vel, v(s)) / dt
@fem.integrand
def div_form(
s: fem.Sample,
u: fem.Field,
q: fem.Field,
):
return -q(s) * fem.div(u, s)
class Example:
parser = argparse.ArgumentParser()
parser.add_argument("--resolution", type=int, default=25)
parser.add_argument("--degree", type=int, default=2)
parser.add_argument("--num_frames", type=int, default=1000)
parser.add_argument("--top_velocity", type=float, default=1.0)
parser.add_argument("--Re", type=float, default=1000.0)
parser.add_argument("--tri_mesh", action="store_true", help="Use a triangular mesh")
def __init__(self, stage=None, quiet=False, args=None, **kwargs):
if args is None:
# Read args from kwargs, add default arg values from parser
args = argparse.Namespace(**kwargs)
args = Example.parser.parse_args(args=[], namespace=args)
self._args = args
self._quiet = quiet
res = args.resolution
self.sim_dt = 1.0 / args.resolution
self.current_frame = 0
viscosity = args.top_velocity / args.Re
if args.tri_mesh:
positions, tri_vidx = gen_trimesh(res=wp.vec2i(res))
geo = fem.Trimesh2D(tri_vertex_indices=tri_vidx, positions=positions)
else:
geo = fem.Grid2D(res=wp.vec2i(res))
domain = fem.Cells(geometry=geo)
boundary = fem.BoundarySides(geo)
# Functions spaces: Q(d)-Q(d-1)
u_degree = args.degree
u_space = fem.make_polynomial_space(geo, degree=u_degree, dtype=wp.vec2)
p_space = fem.make_polynomial_space(geo, degree=u_degree - 1)
# Viscosity and inertia
u_test = fem.make_test(space=u_space, domain=domain)
u_trial = fem.make_trial(space=u_space, domain=domain)
u_matrix = fem.integrate(
viscosity_and_inertia_form,
fields={"u": u_trial, "v": u_test},
values={"nu": viscosity, "dt": self.sim_dt},
)
# Pressure-velocity coupling
p_test = fem.make_test(space=p_space, domain=domain)
div_matrix = fem.integrate(div_form, fields={"u": u_trial, "q": p_test})
# Enforcing the Dirichlet boundary condition the hard way;
# build projector for velocity left- and right-hand-sides
u_bd_test = fem.make_test(space=u_space, domain=boundary)
u_bd_trial = fem.make_trial(space=u_space, domain=boundary)
u_bd_projector = fem.integrate(mass_form, fields={"u": u_bd_trial, "v": u_bd_test}, nodal=True)
u_bd_value = fem.integrate(
u_boundary_value,
fields={"v": u_bd_test},
values={"top_vel": args.top_velocity},
nodal=True,
output_dtype=wp.vec2d,
)
fem.normalize_dirichlet_projector(u_bd_projector, u_bd_value)
u_bd_rhs = wp.zeros_like(u_bd_value)
fem.project_linear_system(u_matrix, u_bd_rhs, u_bd_projector, u_bd_value, normalize_projector=False)
# div_bd_rhs = div_matrix * u_bd_rhs
div_bd_rhs = wp.zeros(shape=(div_matrix.nrow,), dtype=div_matrix.scalar_type)
bsr_mv(div_matrix, u_bd_rhs, y=div_bd_rhs)
# div_matrix = div_matrix - div_matrix * bd_projector
bsr_mm(x=bsr_copy(div_matrix), y=u_bd_projector, z=div_matrix, alpha=-1.0, beta=1.0)
# Assemble saddle system with Scipy
div_matrix = bsr_to_scipy(div_matrix)
u_matrix = bsr_to_scipy(u_matrix)
div_bd_rhs = div_bd_rhs.numpy()
ones = np.ones(shape=(p_space.node_count(), 1), dtype=float)
saddle_system = bmat(
[
[u_matrix, div_matrix.transpose(), None],
[div_matrix, None, ones],
[None, ones.transpose(), None],
],
)
with wp.ScopedTimer("LU factorization"):
self._solve_saddle = factorized(saddle_system)
# Save data for computing time steps rhs
self._u_bd_projector = u_bd_projector
self._u_bd_rhs = u_bd_rhs
self._u_test = u_test
self._div_bd_rhs = div_bd_rhs
# Velocitiy field
self._u_field = u_space.make_field()
self.renderer = Plot(stage)
self.renderer.add_surface_vector("velocity", self._u_field)
def update(self):
self.current_frame += 1
u_rhs = fem.integrate(
transported_inertia_form,
fields={"u": self._u_field, "v": self._u_test},
values={"dt": self.sim_dt},
output_dtype=wp.vec2d,
)
# Apply boundary conditions
# u_rhs = (I - P) * u_rhs + u_bd_rhs
bsr_mv(self._u_bd_projector, x=u_rhs, y=u_rhs, alpha=-1.0, beta=1.0)
array_axpy(x=self._u_bd_rhs, y=u_rhs, alpha=1.0, beta=1.0)
# Assemble scipy saddle system rhs
u_dof_count = self._u_bd_projector.shape[0]
p_dof_count = self._div_bd_rhs.shape[0]
tot_dof_count = u_dof_count + p_dof_count + 1
u_slice = slice(0, u_dof_count)
p_slice = slice(u_dof_count, tot_dof_count - 1)
saddle_rhs = np.zeros(tot_dof_count)
saddle_rhs[u_slice] = u_rhs.numpy().flatten()
saddle_rhs[p_slice] = self._div_bd_rhs
x = self._solve_saddle(saddle_rhs)
# Extract result
self._u_field.dof_values = x[u_slice].reshape((-1, 2))
def render(self):
self.renderer.begin_frame(time = self.current_frame * self.sim_dt)
self.renderer.add_surface_vector("velocity", self._u_field)
self.renderer.end_frame()
if __name__ == "__main__":
wp.init()
wp.set_module_options({"enable_backward": False})
args = Example.parser.parse_args()
example = Example(args=args)
for k in range(args.num_frames):
print(f"Frame {k}:")
example.update()
example.render()
example.renderer.add_surface_vector("velocity_final", example._u_field)
example.renderer.plot(streamlines=set(["velocity_final"]))