""" This example computes a 2D weakly-compressible Stokes flow around a moving object, including: - defining active cells from a mask, and restricting the computation domain to those - utilizing the PicQuadrature to integrate over unstructured particles """ import math import warp as wp import numpy as np import warp.fem as fem from warp.utils import array_cast from warp.fem.utils import array_axpy from warp.sparse import bsr_transposed, bsr_mm, bsr_axpy, bsr_mv # Import example utilities # Make sure that works both when imported as module and run as standalone file try: from .bsr_utils import bsr_cg from .plot_utils import Plot except ImportError: from bsr_utils import bsr_cg from plot_utils import Plot @fem.integrand def vel_from_particles_form(s: fem.Sample, particle_vel: wp.array(dtype=wp.vec2), v: fem.Field): vel = particle_vel[s.qp_index] return wp.dot(vel, v(s)) @fem.integrand def viscosity_form(s: fem.Sample, u: fem.Field, v: fem.Field, nu: float): return nu * wp.ddot(fem.D(u, s), fem.D(v, s)) @fem.integrand def mass_form( s: fem.Sample, u: fem.Field, v: fem.Field, ): return wp.dot(u(s), v(s)) @fem.integrand def scalar_mass_form( s: fem.Sample, p: fem.Field, q: fem.Field, ): return p(s) * q(s) @fem.integrand def div_form( s: fem.Sample, u: fem.Field, q: fem.Field, ): return q(s) * fem.div(u, s) @fem.integrand def cell_activity(s: fem.Sample, domain: fem.Domain, c1: wp.vec2, c2: wp.vec2, radius: float): pos = domain(s) if wp.length(pos - c1) < radius: return 0.0 if wp.length(pos - c2) < radius: return 0.0 return 1.0 @wp.kernel def inverse_array_kernel(m: wp.array(dtype=wp.float64)): m[wp.tid()] = wp.float64(1.0) / m[wp.tid()] class Example: def __init__(self, stage=None, quiet=False): self._quiet = quiet self.res = 50 self.cell_size = 1.0 / self.res self.vel = 1.0 self.viscosity = 100.0 self.compliance = 0.01 self.bd_strength = 100000.0 geo = fem.Grid2D(res=wp.vec2i(self.res)) # Displacement boundary conditions are defined by two circles going in opposite directions # Sample particles along those circle_radius = 0.15 c1_center = wp.vec2(0.25, 0.5) c2_center = wp.vec2(0.75, 0.5) particles, particle_areas, particle_velocities = self._gen_particles(circle_radius, c1_center, c2_center) # Disable cells that are interior to the circles cell_space = fem.make_polynomial_space(geo, degree=0) activity = cell_space.make_field() fem.interpolate( cell_activity, dest=activity, values={"c1": c1_center, "c2": c2_center, "radius": circle_radius - self.cell_size}, ) # Explicitly define the active geometry partition from those cells self._active_partition = fem.ExplicitGeometryPartition(geo, wp.array(activity.dof_values.numpy(), dtype=int)) if not self._quiet: print("Active cells:", self._active_partition.cell_count()) # Function spaces -- Q1 for vel, Q0 for pressure u_space = fem.make_polynomial_space(geo, degree=1, dtype=wp.vec2) p_space = fem.make_polynomial_space(geo, degree=0) self._active_space_partition = fem.make_space_partition( space=u_space, geometry_partition=self._active_partition ) self._active_p_space_partition = fem.make_space_partition( space=p_space, geometry_partition=self._active_partition ) self._u_field = u_space.make_field() self._p_field = p_space.make_field() # Particle-based quadrature rule over active cells domain = fem.Cells(geometry=self._active_partition) self._pic_quadrature = fem.PicQuadrature(domain, particles, particle_areas) self._particle_velocities = particle_velocities self.renderer = Plot(stage) def update(self): u_space = self._u_field.space p_space = self._p_field.space # Weakly-enforced boundary condition on particles u_test = fem.make_test(space=u_space, space_partition=self._active_space_partition) u_trial = fem.make_trial(space=u_space, space_partition=self._active_space_partition) u_rhs = fem.integrate( vel_from_particles_form, quadrature=self._pic_quadrature, fields={"v": u_test}, values={"particle_vel": self._particle_velocities}, output_dtype=wp.vec2d, ) u_bd_matrix = fem.integrate(mass_form, quadrature=self._pic_quadrature, fields={"u": u_trial, "v": u_test}) # Viscosity u_visc_matrix = fem.integrate( viscosity_form, fields={"u": u_trial, "v": u_test}, values={"nu": self.viscosity}, ) # Pressure-velocity coupling p_test = fem.make_test(space=p_space, space_partition=self._active_p_space_partition) p_trial = fem.make_trial(space=p_space, space_partition=self._active_p_space_partition) div_matrix = fem.integrate(div_form, fields={"u": u_trial, "q": p_test}) inv_p_mass_matrix = fem.integrate(scalar_mass_form, fields={"p": p_trial, "q": p_test}) wp.launch( kernel=inverse_array_kernel, dim=inv_p_mass_matrix.values.shape, device=inv_p_mass_matrix.values.device, inputs=[inv_p_mass_matrix.values], ) # Assemble linear system u_matrix = u_visc_matrix bsr_axpy(u_bd_matrix, u_matrix, alpha=self.bd_strength) div_matrix_t = bsr_transposed(div_matrix) gradient_matrix = bsr_mm(div_matrix_t, inv_p_mass_matrix) bsr_mm(gradient_matrix, div_matrix, u_matrix, alpha=1.0 / self.compliance, beta=1.0) array_axpy(u_rhs, u_rhs, alpha=0.0, beta=self.bd_strength) # Solve for displacement u_res = wp.zeros_like(u_rhs) bsr_cg(u_matrix, x=u_res, b=u_rhs, quiet=self._quiet) # Compute pressure from displacement div_u = bsr_mv(A=div_matrix, x=u_res) p_res = bsr_mv(A=inv_p_mass_matrix, x=div_u, alpha=-1) # Copy to fields u_nodes = wp.indexedarray(self._u_field.dof_values, indices=self._active_space_partition.space_node_indices()) p_nodes = wp.indexedarray(self._p_field.dof_values, indices=self._active_p_space_partition.space_node_indices()) array_cast(in_array=u_res, out_array=u_nodes) array_cast(in_array=p_res, out_array=p_nodes) def render(self): self.renderer.add_surface("pressure", self._p_field) self.renderer.add_surface_vector("velocity", self._u_field) def _gen_particles(self, circle_radius, c1_center, c2_center): """Generate some particles along two circles defining velocity boundary conditions""" # Generate particles defining the transfer displacement particles_per_circle = int(2.0 * math.pi * circle_radius * self.res) angles = np.linspace(0, 2.0 * math.pi, particles_per_circle, endpoint=False) n_particles = 2 * particles_per_circle particles = np.empty((n_particles, 2), dtype=float) particles[:particles_per_circle, 0] = c1_center[0] + circle_radius * np.cos(angles) particles[:particles_per_circle, 1] = c1_center[1] + circle_radius * np.sin(angles) particles[particles_per_circle:, 0] = c2_center[0] + circle_radius * np.cos(angles) particles[particles_per_circle:, 1] = c2_center[1] + circle_radius * np.sin(angles) particle_areas = np.ones(n_particles) * self.cell_size**2 particle_velocities = np.zeros_like(particles) particle_velocities[:particles_per_circle, 0] = self.vel particle_velocities[particles_per_circle:, 0] = -self.vel particles = wp.array(particles, dtype=wp.vec2) particle_areas = wp.array(particle_areas, dtype=float) particle_velocities = wp.array(particle_velocities, dtype=wp.vec2) return particles, particle_areas, particle_velocities if __name__ == "__main__": wp.init() wp.set_module_options({"enable_backward": False}) example = Example() example.update() example.render() example.renderer.plot(streamlines=["velocity"])