# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the license found in the # LICENSE file in the root directory of this source tree. import gzip import json import os.path as osp import os import logging import cv2 import random import numpy as np from data.dataset_util import * from data.base_dataset import BaseDataset SEEN_CATEGORIES = [ "apple", "backpack", "banana", "baseballbat", "baseballglove", "bench", "bicycle", "bottle", "bowl", "broccoli", "cake", "car", "carrot", "cellphone", "chair", "cup", "donut", "hairdryer", "handbag", "hydrant", "keyboard", "laptop", "microwave", "motorcycle", "mouse", "orange", "parkingmeter", "pizza", "plant", "stopsign", "teddybear", "toaster", "toilet", "toybus", "toyplane", "toytrain", "toytruck", "tv", "umbrella", "vase", "wineglass", ] class Co3dDataset(BaseDataset): def __init__( self, common_conf, split: str = "train", CO3D_DIR: str = None, CO3D_ANNOTATION_DIR: str = None, min_num_images: int = 24, len_train: int = 100000, len_test: int = 10000, ): """ Initialize the Co3dDataset. Args: common_conf: Configuration object with common settings. split (str): Dataset split, either 'train' or 'test'. CO3D_DIR (str): Directory path to CO3D data. CO3D_ANNOTATION_DIR (str): Directory path to CO3D annotations. min_num_images (int): Minimum number of images per sequence. len_train (int): Length of the training dataset. len_test (int): Length of the test dataset. Raises: ValueError: If CO3D_DIR or CO3D_ANNOTATION_DIR is not specified. """ super().__init__(common_conf=common_conf) self.debug = common_conf.debug self.training = common_conf.training self.get_nearby = common_conf.get_nearby self.load_depth = common_conf.load_depth self.inside_random = common_conf.inside_random self.allow_duplicate_img = common_conf.allow_duplicate_img if CO3D_DIR is None or CO3D_ANNOTATION_DIR is None: raise ValueError("Both CO3D_DIR and CO3D_ANNOTATION_DIR must be specified.") category = sorted(SEEN_CATEGORIES) if self.debug: category = ["apple"] if split == "train": split_name_list = ["train"] self.len_train = len_train elif split == "test": split_name_list = ["test"] self.len_train = len_test else: raise ValueError(f"Invalid split: {split}") self.invalid_sequence = [] # set any invalid sequence names here self.category_map = {} self.data_store = {} self.seqlen = None self.min_num_images = min_num_images logging.info(f"CO3D_DIR is {CO3D_DIR}") self.CO3D_DIR = CO3D_DIR self.CO3D_ANNOTATION_DIR = CO3D_ANNOTATION_DIR total_frame_num = 0 for c in category: for split_name in split_name_list: annotation_file = osp.join( self.CO3D_ANNOTATION_DIR, f"{c}_{split_name}.jgz" ) try: with gzip.open(annotation_file, "r") as fin: annotation = json.loads(fin.read()) except FileNotFoundError: logging.error(f"Annotation file not found: {annotation_file}") continue for seq_name, seq_data in annotation.items(): if len(seq_data) < min_num_images: continue if seq_name in self.invalid_sequence: continue total_frame_num += len(seq_data) self.data_store[seq_name] = seq_data self.sequence_list = list(self.data_store.keys()) self.sequence_list_len = len(self.sequence_list) self.total_frame_num = total_frame_num status = "Training" if self.training else "Testing" logging.info(f"{status}: Co3D Data size: {self.sequence_list_len}") logging.info(f"{status}: Co3D Data dataset length: {len(self)}") def get_data( self, seq_index: int = None, img_per_seq: int = None, seq_name: str = None, ids: list = None, aspect_ratio: float = 1.0, ) -> dict: """ Retrieve data for a specific sequence. Args: seq_index (int): Index of the sequence to retrieve. img_per_seq (int): Number of images per sequence. seq_name (str): Name of the sequence. ids (list): Specific IDs to retrieve. aspect_ratio (float): Aspect ratio for image processing. Returns: dict: A batch of data including images, depths, and other metadata. """ if self.inside_random: seq_index = random.randint(0, self.sequence_list_len - 1) if seq_name is None: seq_name = self.sequence_list[seq_index] metadata = self.data_store[seq_name] if ids is None: ids = np.random.choice( len(metadata), img_per_seq, replace=self.allow_duplicate_img ) annos = [metadata[i] for i in ids] target_image_shape = self.get_target_shape(aspect_ratio) images = [] depths = [] cam_points = [] world_points = [] point_masks = [] extrinsics = [] intrinsics = [] image_paths = [] original_sizes = [] for anno in annos: filepath = anno["filepath"] image_path = osp.join(self.CO3D_DIR, filepath) image = read_image_cv2(image_path) if self.load_depth: depth_path = image_path.replace("/images", "/depths") + ".geometric.png" depth_map = read_depth(depth_path, 1.0) mvs_mask_path = image_path.replace( "/images", "/depth_masks" ).replace(".jpg", ".png") mvs_mask = cv2.imread(mvs_mask_path, cv2.IMREAD_GRAYSCALE) > 128 depth_map[~mvs_mask] = 0 depth_map = threshold_depth_map( depth_map, min_percentile=-1, max_percentile=98 ) else: depth_map = None original_size = np.array(image.shape[:2]) extri_opencv = np.array(anno["extri"]) intri_opencv = np.array(anno["intri"]) ( image, depth_map, extri_opencv, intri_opencv, world_coords_points, cam_coords_points, point_mask, _, ) = self.process_one_image( image, depth_map, extri_opencv, intri_opencv, original_size, target_image_shape, filepath=filepath, ) images.append(image) depths.append(depth_map) extrinsics.append(extri_opencv) intrinsics.append(intri_opencv) cam_points.append(cam_coords_points) world_points.append(world_coords_points) point_masks.append(point_mask) image_paths.append(image_path) original_sizes.append(original_size) set_name = "co3d" batch = { "seq_name": set_name + "_" + seq_name, "ids": ids, "frame_num": len(extrinsics), "images": images, "depths": depths, "extrinsics": extrinsics, "intrinsics": intrinsics, "cam_points": cam_points, "world_points": world_points, "point_masks": point_masks, "original_sizes": original_sizes, } return batch