Spaces:
Build error
Build error
| # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| """ | |
| Adapted from: | |
| https://github.com/bytedance/IRASim/blob/main/dataset/dataset_util.py | |
| """ | |
| import base64 | |
| import math | |
| import os | |
| from io import BytesIO | |
| import numpy as np | |
| import torch | |
| import torch.distributed as dist | |
| import torchvision.transforms.functional as F | |
| from PIL import Image | |
| def is_dist_avail_and_initialized(): | |
| if not dist.is_available(): | |
| return False | |
| if not dist.is_initialized(): | |
| return False | |
| return True | |
| def get_rank(): | |
| if not is_dist_avail_and_initialized(): | |
| return 0 | |
| return dist.get_rank() | |
| def get_1d_sincos_pos_embed_from_grid(embed_dim, pos): | |
| """ | |
| embed_dim: output dimension for each position | |
| pos: a list of positions to be encoded: size (M,) | |
| out: (M, D) | |
| """ | |
| assert embed_dim % 2 == 0 | |
| omega = np.arange(embed_dim // 2, dtype=np.float32) | |
| omega /= embed_dim / 2.0 | |
| omega = 1.0 / 10000**omega # (D/2,) | |
| pos = pos.reshape(-1) # (M,) | |
| out = np.einsum("m,d->md", pos, omega) # (M, D/2), outer product | |
| emb_sin = np.sin(out) # (M, D/2) | |
| emb_cos = np.cos(out) # (M, D/2) | |
| emb = np.concatenate([emb_sin, emb_cos], axis=1) # (M, D) | |
| return emb | |
| def get_2d_sincos_pos_embed_from_grid(embed_dim, grid): | |
| assert embed_dim % 2 == 0 | |
| # use half of dimensions to encode grid_h | |
| emb_h = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[0]) # (H*W, D/2) | |
| emb_w = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[1]) # (H*W, D/2) | |
| emb = np.concatenate([emb_h, emb_w], axis=1) # (H*W, D) | |
| return emb | |
| def get_2d_sincos_pos_embed(embed_dim, grid_size, cls_token=False): | |
| """ | |
| grid_size: int of the grid height and width | |
| return: | |
| pos_embed: [grid_size*grid_size, embed_dim] or [1+grid_size*grid_size, embed_dim] (w/ or w/o cls_token) | |
| """ | |
| grid_h = np.arange(grid_size, dtype=np.float32) | |
| grid_w = np.arange(grid_size, dtype=np.float32) | |
| grid = np.meshgrid(grid_w, grid_h) # here w goes first | |
| grid = np.stack(grid, axis=0) | |
| grid = grid.reshape([2, 1, grid_size, grid_size]) | |
| pos_embed = get_2d_sincos_pos_embed_from_grid(embed_dim, grid) | |
| if cls_token: | |
| pos_embed = np.concatenate([np.zeros([1, embed_dim]), pos_embed], axis=0) | |
| return pos_embed | |
| def b64_2_img(data: str): | |
| image_b64 = base64.b64decode(data) | |
| img = Image.open(BytesIO(image_b64)).convert("RGB") | |
| return img | |
| def get_continuous_action(d_acts, c_act_max, c_act_min, n_bins): | |
| c_act_max = c_act_max.to(d_acts.device) | |
| c_act_min = c_act_min.to(d_acts.device) | |
| c_acts = d_acts / (n_bins - 1) * (c_act_max - c_act_min) + c_act_min | |
| return c_acts | |
| def alpha2rotm(a): | |
| """Alpha euler angle to rotation matrix.""" | |
| rotm = np.array([[1, 0, 0], [0, np.cos(a), -np.sin(a)], [0, np.sin(a), np.cos(a)]]) | |
| return rotm | |
| def beta2rotm(b): | |
| """Beta euler angle to rotation matrix.""" | |
| rotm = np.array([[np.cos(b), 0, np.sin(b)], [0, 1, 0], [-np.sin(b), 0, np.cos(b)]]) | |
| return rotm | |
| def gamma2rotm(c): | |
| """Gamma euler angle to rotation matrix.""" | |
| rotm = np.array([[np.cos(c), -np.sin(c), 0], [np.sin(c), np.cos(c), 0], [0, 0, 1]]) | |
| return rotm | |
| def euler2rotm(euler_angles): | |
| """Euler angle (ZYX) to rotation matrix.""" | |
| alpha = euler_angles[0] | |
| beta = euler_angles[1] | |
| gamma = euler_angles[2] | |
| rotm_a = alpha2rotm(alpha) | |
| rotm_b = beta2rotm(beta) | |
| rotm_c = gamma2rotm(gamma) | |
| rotm = rotm_c @ rotm_b @ rotm_a | |
| return rotm | |
| def isRotm(R): | |
| # Checks if a matrix is a valid rotation matrix. | |
| # Forked from Andy Zeng | |
| Rt = np.transpose(R) | |
| shouldBeIdentity = np.dot(Rt, R) | |
| I = np.identity(3, dtype=R.dtype) | |
| n = np.linalg.norm(I - shouldBeIdentity) | |
| return n < 1e-6 | |
| def rotm2euler(R): | |
| # Forked from: https://learnopencv.com/rotation-matrix-to-euler-angles/ | |
| # R = Rz * Ry * Rx | |
| assert isRotm(R) | |
| sy = math.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0]) | |
| singular = sy < 1e-6 | |
| if not singular: | |
| x = math.atan2(R[2, 1], R[2, 2]) | |
| y = math.atan2(-R[2, 0], sy) | |
| z = math.atan2(R[1, 0], R[0, 0]) | |
| else: | |
| x = math.atan2(-R[1, 2], R[1, 1]) | |
| y = math.atan2(-R[2, 0], sy) | |
| z = 0 | |
| # (-pi , pi] | |
| while x > np.pi: | |
| x -= 2 * np.pi | |
| while x <= -np.pi: | |
| x += 2 * np.pi | |
| while y > np.pi: | |
| y -= 2 * np.pi | |
| while y <= -np.pi: | |
| y += 2 * np.pi | |
| while z > np.pi: | |
| z -= 2 * np.pi | |
| while z <= -np.pi: | |
| z += 2 * np.pi | |
| return np.array([x, y, z]) | |
| def get_converted_fp32_paths(deepspeed_ckpt_path): | |
| deepspeed_ckpt_path = deepspeed_ckpt_path.rstrip("/") | |
| ckpt_dir = os.path.dirname(deepspeed_ckpt_path) | |
| ckpt_name = os.path.basename(deepspeed_ckpt_path) | |
| fp32_ckpt_name = f"{ckpt_name}.fp32.pt" | |
| converted_path = os.path.join(ckpt_dir, fp32_ckpt_name) | |
| return converted_path | |
| def quat2rotm(quat): | |
| """Quaternion to rotation matrix. | |
| Args: | |
| quat (4, numpy array): quaternion x, y, z, w | |
| Returns: | |
| rotm (3x3 numpy array): rotation matrix | |
| """ | |
| w = quat[3] | |
| x = quat[0] | |
| y = quat[1] | |
| z = quat[2] | |
| s = w * w + x * x + y * y + z * z | |
| rotm = np.array( | |
| [ | |
| [1 - 2 * (y * y + z * z) / s, 2 * (x * y - z * w) / s, 2 * (x * z + y * w) / s], | |
| [2 * (x * y + z * w) / s, 1 - 2 * (x * x + z * z) / s, 2 * (y * z - x * w) / s], | |
| [2 * (x * z - y * w) / s, 2 * (y * z + x * w) / s, 1 - 2 * (x * x + y * y) / s], | |
| ] | |
| ) | |
| return rotm | |
| class Resize_Preprocess: | |
| def __init__(self, size): | |
| """ | |
| Initialize the preprocessing class with the target size. | |
| Args: | |
| size (tuple): The target height and width as a tuple (height, width). | |
| """ | |
| self.size = size | |
| def __call__(self, video_frames): | |
| """ | |
| Apply the transformation to each frame in the video. | |
| Args: | |
| video_frames (torch.Tensor): A tensor representing a batch of video frames. | |
| Returns: | |
| torch.Tensor: The transformed video frames. | |
| """ | |
| # Resize each frame in the video | |
| resized_frames = torch.stack([F.resize(frame, self.size, antialias=True) for frame in video_frames]) | |
| return resized_frames | |
| class Preprocess: | |
| def __init__(self, size): | |
| self.size = size | |
| def __call__(self, clip): | |
| clip = Preprocess.resize_scale(clip, self.size[0], self.size[1], interpolation_mode="bilinear") | |
| return clip | |
| def __repr__(self) -> str: | |
| return f"{self.__class__.__name__}(size={self.size})" | |
| def resize_scale(clip, target_height, target_width, interpolation_mode): | |
| target_ratio = target_height / target_width | |
| H = clip.size(-2) | |
| W = clip.size(-1) | |
| clip_ratio = H / W | |
| if clip_ratio > target_ratio: | |
| scale_ = target_width / W | |
| else: | |
| scale_ = target_height / H | |
| return torch.nn.functional.interpolate(clip, scale_factor=scale_, mode=interpolation_mode, align_corners=False) | |
| class ToTensorVideo: | |
| """ | |
| Convert tensor data type from uint8 to float, divide value by 255.0 and | |
| permute the dimensions of clip tensor | |
| """ | |
| def __init__(self): | |
| pass | |
| def __call__(self, clip): | |
| """ | |
| Args: | |
| clip (torch.tensor, dtype=torch.uint8): Size is (T, C, H, W) | |
| Return: | |
| clip (torch.tensor, dtype=torch.float): Size is (T, C, H, W) | |
| """ | |
| return to_tensor(clip) | |
| def __repr__(self) -> str: | |
| return self.__class__.__name__ | |
| def to_tensor(clip): | |
| """ | |
| Convert tensor data type from uint8 to float, divide value by 255.0 and | |
| permute the dimensions of clip tensor | |
| Args: | |
| clip (torch.tensor, dtype=torch.uint8): Size is (T, C, H, W) | |
| Return: | |
| clip (torch.tensor, dtype=torch.float): Size is (T, C, H, W) | |
| """ | |
| _is_tensor_video_clip(clip) | |
| if not clip.dtype == torch.uint8: | |
| raise TypeError("clip tensor should have data type uint8. Got %s" % str(clip.dtype)) | |
| # return clip.float().permute(3, 0, 1, 2) / 255.0 | |
| return clip.float() / 255.0 | |
| def _is_tensor_video_clip(clip): | |
| if not torch.is_tensor(clip): | |
| raise TypeError("clip should be Tensor. Got %s" % type(clip)) | |
| if not clip.ndimension() == 4: | |
| raise ValueError("clip should be 4D. Got %dD" % clip.dim()) | |
| return True | |