Spaces:
Sleeping
Sleeping
| # Copyright (c) 2023 NVIDIA CORPORATION. All rights reserved. | |
| # NVIDIA CORPORATION and its licensors retain all intellectual property | |
| # and proprietary rights in and to this software, related documentation | |
| # and any modifications thereto. Any use, reproduction, disclosure or | |
| # distribution of this software and related documentation without an express | |
| # license agreement from NVIDIA CORPORATION is strictly prohibited. | |
| """Helpers to author OmniGraph attributes.""" | |
| import functools | |
| import inspect | |
| import math | |
| import operator | |
| from typing import ( | |
| Any, | |
| Optional, | |
| Union, | |
| Sequence, | |
| ) | |
| import numpy as np | |
| import omni.graph.core as og | |
| import warp as wp | |
| from omni.warp.nodes._impl.common import type_convert_og_to_warp | |
| ATTR_BUNDLE_TYPE = og.Type( | |
| og.BaseDataType.RELATIONSHIP, | |
| 1, | |
| 0, | |
| og.AttributeRole.BUNDLE, | |
| ) | |
| # Names | |
| # ------------------------------------------------------------------------------ | |
| _ATTR_PORT_TYPES = ( | |
| og.AttributePortType.ATTRIBUTE_PORT_TYPE_INPUT, | |
| og.AttributePortType.ATTRIBUTE_PORT_TYPE_OUTPUT, | |
| og.AttributePortType.ATTRIBUTE_PORT_TYPE_STATE, | |
| ) | |
| _ATTR_NAME_FMTS = {x: "{}:{{}}".format(og.get_port_type_namespace(x)) for x in _ATTR_PORT_TYPES} | |
| def attr_join_name( | |
| port_type: og.AttributePortType, | |
| base_name: str, | |
| ) -> str: | |
| """Build an attribute name by prefixing it with its port type.""" | |
| return _ATTR_NAME_FMTS[port_type].format(base_name) | |
| def attr_get_base_name( | |
| attr: og.Attribute, | |
| ) -> str: | |
| """Retrieves an attribute base name.""" | |
| name = attr.get_name() | |
| if ( | |
| attr.get_type_name() == "bundle" | |
| and (attr.get_port_type() == og.AttributePortType.ATTRIBUTE_PORT_TYPE_OUTPUT) | |
| and name.startswith("outputs_") | |
| ): | |
| # Output bundles are a bit special because they are in fact implemented | |
| # as USD primitives, and USD doesn't support the colon symbol `:` in | |
| # primitive names, thus output bundles are prefixed with `outputs_` in | |
| # OmniGraph instead of `outputs:` like everything else. | |
| return name[8:] | |
| return name.split(":")[-1] | |
| def attr_get_name( | |
| attr: og.Attribute, | |
| ) -> str: | |
| """Retrieves an attribute name.""" | |
| name = attr.get_name() | |
| if ( | |
| attr.get_type_name() == "bundle" | |
| and (attr.get_port_type() == og.AttributePortType.ATTRIBUTE_PORT_TYPE_OUTPUT) | |
| and name.startswith("outputs_") | |
| ): | |
| # Output bundles are a bit special because they are in fact implemented | |
| # as USD primitives, and USD doesn't support the colon symbol `:` in | |
| # primitive names, thus output bundles are prefixed with `outputs_` in | |
| # OmniGraph instead of `outputs:` like everything else. | |
| return attr_join_name( | |
| og.AttributePortType.ATTRIBUTE_PORT_TYPE_OUTPUT, | |
| name[8:], | |
| ) | |
| return name | |
| # Values | |
| # ------------------------------------------------------------------------------ | |
| def attr_get( | |
| attr: og.AttributeData, | |
| ) -> Any: | |
| """Retrieves the value from an attribute living on the CPU.""" | |
| return attr.get(on_gpu=False) | |
| def attr_set( | |
| attr: og.AttributeData, | |
| value: Any, | |
| ) -> None: | |
| """Sets the given value onto an array attribute living on the CPU.""" | |
| attr.set(value, on_gpu=False) | |
| def attr_get_array_on_gpu( | |
| attr: og.AttributeData, | |
| dtype: type, | |
| read_only: bool = True, | |
| ) -> wp.array: | |
| """Retrieves the value of an array attribute living on the GPU.""" | |
| attr.gpu_ptr_kind = og.PtrToPtrKind.CPU | |
| (ptr, _) = attr.get_array( | |
| on_gpu=True, | |
| get_for_write=not read_only, | |
| reserved_element_count=0 if read_only else attr.size(), | |
| ) | |
| return wp.from_ptr(ptr, attr.size(), dtype=dtype) | |
| def attr_cast_array_to_warp( | |
| value: Union[np.array, og.DataWrapper], | |
| dtype: type, | |
| shape: Sequence[int], | |
| device: wp.context.Device, | |
| ) -> wp.array: | |
| """Casts an attribute array value to its corresponding warp type.""" | |
| if device.is_cpu: | |
| return wp.array( | |
| value, | |
| dtype=dtype, | |
| shape=shape, | |
| owner=False, | |
| device=device, | |
| ) | |
| elif device.is_cuda: | |
| size = functools.reduce(operator.mul, shape) | |
| return wp.types.from_ptr( | |
| value.memory, | |
| size, | |
| dtype=dtype, | |
| shape=shape, | |
| device=device, | |
| ) | |
| assert False, "Unexpected device '{}'.".format(device.alias) | |
| # Tracking | |
| # ------------------------------------------------------------------------------ | |
| class AttrTracking: | |
| """Attributes state for tracking changes.""" | |
| def __init__(self, names: Sequence[str]) -> None: | |
| self._names = names | |
| self._state = [None] * len(names) | |
| def have_attrs_changed(self, db: og.Database) -> bool: | |
| """Compare the current attribute values with the internal state.""" | |
| for i, name in enumerate(self._names): | |
| cached_value = self._state[i] | |
| current_value = getattr(db.inputs, name) | |
| if isinstance(current_value, np.ndarray): | |
| if not np.array_equal(current_value, cached_value): | |
| return True | |
| elif current_value != cached_value: | |
| return True | |
| return False | |
| def update_state(self, db: og.Database) -> None: | |
| """Updates the internal state with the current attribute values.""" | |
| for i, name in enumerate(self._names): | |
| current_value = getattr(db.inputs, name) | |
| if isinstance(current_value, np.ndarray): | |
| self._state[i] = current_value.copy() | |
| else: | |
| self._state[i] = current_value | |
| # High-level Helper | |
| # ------------------------------------------------------------------------------ | |
| def from_omni_graph( | |
| value: Union[np.ndarray, og.DataWrapper, og.AttributeData, og.DynamicAttributeAccess], | |
| dtype: Optional[type] = None, | |
| shape: Optional[Sequence[int]] = None, | |
| device: Optional[wp.context.Device] = None, | |
| ) -> wp.array: | |
| """Casts an OmniGraph array data to its corresponding Warp type.""" | |
| def from_data_wrapper( | |
| data: og.DataWrapper, | |
| dtype: Optional[type], | |
| shape: Optional[Sequence[int]], | |
| device: Optional[wp.context.Device], | |
| ) -> wp.array: | |
| if data.gpu_ptr_kind != og.PtrToPtrKind.CPU: | |
| raise RuntimeError("All pointers must live on the CPU, make sure to set 'cudaPointers' to 'cpu'.") | |
| elif not data.is_array: | |
| raise RuntimeError("The attribute data isn't an array.") | |
| if dtype is None: | |
| base_type = type_convert_og_to_warp( | |
| og.Type( | |
| data.dtype.base_type, | |
| tuple_count=data.dtype.tuple_count, | |
| array_depth=0, | |
| role=og.AttributeRole.MATRIX if data.dtype.is_matrix_type() else og.AttributeRole.NONE, | |
| ), | |
| ) | |
| dim_count = len(data.shape) | |
| if dim_count == 1: | |
| dtype = base_type | |
| elif dim_count == 2: | |
| dtype = wp.types.vector(length=data.shape[1], dtype=base_type) | |
| elif dim_count == 3: | |
| dtype = wp.types.matrix(shape=(data.shape[1], data.shape[2]), dtype=base_type) | |
| else: | |
| raise RuntimeError("Arrays with more than 3 dimensions are not supported.") | |
| arr_size = data.shape[0] * data.dtype.size | |
| element_size = wp.types.type_size_in_bytes(dtype) | |
| if shape is None: | |
| # Infer a shape compatible with the dtype. | |
| for i in range(len(data.shape)): | |
| if functools.reduce(operator.mul, data.shape[: i + 1]) * element_size == arr_size: | |
| shape = data.shape[: i + 1] | |
| break | |
| if shape is None: | |
| if arr_size % element_size != 0: | |
| raise RuntimeError( | |
| "Cannot infer a size matching the Warp data type '{}' with " | |
| "an array size of '{}' bytes.".format(dtype.__name__, arr_size) | |
| ) | |
| size = arr_size // element_size | |
| else: | |
| size = functools.reduce(operator.mul, shape) | |
| src_device = wp.get_device(str(data.device)) | |
| dst_device = device | |
| return wp.from_ptr( | |
| data.memory, | |
| size, | |
| dtype=dtype, | |
| shape=shape, | |
| device=src_device, | |
| ).to(dst_device) | |
| def from_attr_data( | |
| data: og.AttributeData, | |
| dtype: Optional[type], | |
| shape: Optional[Sequence[int]], | |
| device: Optional[wp.context.Device], | |
| ) -> wp.array: | |
| if data.gpu_valid(): | |
| on_gpu = True | |
| elif data.cpu_valid(): | |
| on_gpu = False | |
| else: | |
| raise RuntimeError("The attribute data isn't valid.") | |
| if on_gpu: | |
| data_type = data.get_type() | |
| base_type = type_convert_og_to_warp( | |
| og.Type( | |
| data_type.base_type, | |
| tuple_count=data_type.tuple_count, | |
| array_depth=0, | |
| role=data_type.role, | |
| ), | |
| ) | |
| if dtype is None: | |
| dtype = base_type | |
| arr_size = data.size() * wp.types.type_size_in_bytes(base_type) | |
| element_size = wp.types.type_size_in_bytes(dtype) | |
| if shape is None: | |
| # Infer a shape compatible with the dtype. | |
| if data_type.is_matrix_type(): | |
| dim = math.isqrt(data_type.tuple_count) | |
| arr_shape = (data.size(), dim, dim) | |
| else: | |
| arr_shape = (data.size(), data_type.tuple_count) | |
| for i in range(len(arr_shape)): | |
| if functools.reduce(operator.mul, arr_shape[: i + 1]) * element_size == arr_size: | |
| shape = arr_shape[: i + 1] | |
| break | |
| if shape is None: | |
| if arr_size % element_size != 0: | |
| raise RuntimeError( | |
| "Cannot infer a size matching the Warp data type '{}' with " | |
| "an array size of '{}' bytes.".format(dtype.__name__, arr_size) | |
| ) | |
| size = arr_size // element_size | |
| else: | |
| size = functools.reduce(operator.mul, shape) | |
| data.gpu_ptr_kind = og.PtrToPtrKind.CPU | |
| (ptr, _) = data.get_array( | |
| on_gpu=True, | |
| get_for_write=not data.is_read_only(), | |
| reserved_element_count=0 if data.is_read_only() else data.size(), | |
| ) | |
| src_device = wp.get_device("cuda") | |
| dst_device = device | |
| return wp.from_ptr( | |
| ptr, | |
| size, | |
| dtype=dtype, | |
| shape=shape, | |
| device=src_device, | |
| ).to(dst_device) | |
| else: | |
| arr = data.get_array( | |
| on_gpu=False, | |
| get_for_write=not data.is_read_only(), | |
| reserved_element_count=0 if data.is_read_only() else data.size(), | |
| ) | |
| return wp.from_numpy(arr, dtype=dtype, shape=shape, device=device) | |
| if isinstance(value, np.ndarray): | |
| return wp.from_numpy(value, dtype=dtype, shape=shape, device=device) | |
| elif isinstance(value, og.DataWrapper): | |
| return from_data_wrapper(value, dtype, shape, device) | |
| elif isinstance(value, og.AttributeData): | |
| return from_attr_data(value, dtype, shape, device) | |
| elif og.DynamicAttributeAccess in inspect.getmro(type(getattr(value, "_parent", None))): | |
| if device is None: | |
| device = wp.get_device() | |
| if device.is_cpu: | |
| return wp.from_numpy(value.cpu, dtype=dtype, shape=shape, device=device) | |
| elif device.is_cuda: | |
| return from_data_wrapper(value.gpu, dtype, shape, device) | |
| else: | |
| assert False, "Unexpected device '{}'.".format(device.alias) | |
| return None | |