| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """Convert poly2d to mask/bitmask.""" |
|
|
| import os |
| from functools import partial |
| from multiprocessing import Pool |
| from typing import Callable, Dict, List |
|
|
| import matplotlib |
| import matplotlib.pyplot as plt |
| import numpy as np |
| from PIL import Image |
| from scalabel.common.parallel import NPROC |
| from scalabel.common.typing import NDArrayU8 |
| from scalabel.label.io import group_and_sort, load |
| from scalabel.label.transforms import poly_to_patch |
| from scalabel.label.typing import Config, Frame, ImageSize, Label, Poly2D |
| from scalabel.label.utils import ( |
| check_crowd, |
| check_ignored, |
| get_leaf_categories, ) |
| from tqdm import tqdm |
|
|
| from bdd100k.common.logger import logger |
| from bdd100k.common.typing import BDD100KConfig |
| from bdd100k.common.utils import get_bdd100k_instance_id, load_bdd100k_config |
| from bdd100k.label.label import drivables, labels, lane_categories |
| from bdd100k.label.to_coco import parse_args |
| from bdd100k.label.to_scalabel import bdd100k_to_scalabel |
|
|
| IGNORE_LABEL = 255 |
| STUFF_NUM = 30 |
| LANE_DIRECTION_MAP = {"parallel": 0, "vertical": 1} |
| LANE_STYLE_MAP = {"solid": 0, "dashed": 1} |
|
|
|
|
| def frame_to_mask( |
| out_path: str, |
| shape: ImageSize, |
| colors: List[NDArrayU8], |
| poly2ds: List[List[Poly2D]], |
| with_instances: bool=True, |
| back_color: int=0, |
| closed: bool=True, ) -> None: |
| """Converting a frame of poly2ds to mask/bitmask.""" |
| assert len(colors) == len(poly2ds) |
| height, width = shape.height, shape.width |
|
|
| assert back_color >= 0 |
| if with_instances: |
| img: NDArrayU8 = ( |
| np.ones( |
| [height, width, 4], dtype=np.uint8) * back_color |
| ) |
| else: |
| img = ( |
| np.ones( |
| [height, width, 1], dtype=np.uint8) * back_color |
| ) |
|
|
| if len(colors) == 0: |
| pil_img = Image.fromarray(img.squeeze()) |
| pil_img.save(out_path) |
|
|
| matplotlib.use("Agg") |
| fig = plt.figure(facecolor="0") |
| fig.set_size_inches((width / fig.get_dpi()), height / fig.get_dpi()) |
| ax = fig.add_axes([0, 0, 1, 1]) |
| ax.axis("off") |
| ax.set_xlim(0, width) |
| ax.set_ylim(0, height) |
| ax.set_facecolor((0, 0, 0, 0)) |
| ax.invert_yaxis() |
|
|
| for i, poly2d in enumerate(poly2ds): |
| for poly in poly2d: |
| ax.add_patch( |
| poly_to_patch( |
| poly.vertices, |
| poly.types, |
| |
| color=( |
| ((i + 1) >> 8) / 255.0, |
| ((i + 1) % 255) / 255.0, |
| 0.0, ), |
| closed=closed, )) |
|
|
| fig.canvas.draw() |
| out: NDArrayU8 = np.frombuffer(fig.canvas.tostring_rgb(), np.uint8) |
| out = out.reshape((height, width, -1)).astype(np.int32) |
| out = (out[..., 0] << 8) + out[..., 1] |
| plt.close() |
|
|
| for i, color in enumerate(colors): |
| |
| img[out == i + 1] = color |
|
|
| img[img == 255] = 0 |
|
|
| pil_img = Image.fromarray(img.squeeze()) |
| pil_img.save(out_path) |
|
|
|
|
| def set_instance_color(label: Label, category_id: int, |
| ann_id: int) -> NDArrayU8: |
| """Set the color for an instance given its attributes and ID.""" |
| attributes = label.attributes |
| if attributes is None: |
| truncated, occluded, crowd, ignored = 0, 0, 0, 0 |
| else: |
| truncated = int(attributes.get("truncated", False)) |
| occluded = int(attributes.get("occluded", False)) |
| crowd = int(check_crowd(label)) |
| ignored = int(check_ignored(label)) |
| color: NDArrayU8 = np.array( |
| [ |
| category_id & 255, |
| (truncated << 3) + (occluded << 2) + (crowd << 1) + ignored, |
| ann_id >> 8, |
| ann_id & 255, |
| ], |
| dtype=np.uint8, ) |
| return color |
|
|
|
|
| def set_lane_color(label: Label, category_id: int) -> NDArrayU8: |
| """Set the color for the lane given its attributes and category.""" |
| attributes = label.attributes |
| if attributes is None: |
| lane_direction, lane_style = 0, 0 |
| else: |
| lane_direction = LANE_DIRECTION_MAP[str( |
| attributes.get("laneDirection", "parallel"))] |
| lane_style = LANE_STYLE_MAP[str(attributes.get("laneStyle", "solid"))] |
|
|
| |
| value = category_id |
| if lane_style == 0 and (category_id == 3 or category_id == 2): |
| value = 1 |
| if lane_style == 0: |
| value = 2 |
| else: |
| value = 3 |
|
|
| color: NDArrayU8 = np.array([value], dtype=np.uint8) |
| return color |
|
|
|
|
| def frames_to_masks( |
| nproc: int, |
| out_paths: List[str], |
| shapes: List[ImageSize], |
| colors_list: List[List[NDArrayU8]], |
| poly2ds_list: List[List[List[Poly2D]]], |
| with_instances: bool=True, |
| back_color: int=0, |
| closed: bool=True, ) -> None: |
| """Execute the mask conversion in parallel.""" |
| with Pool(nproc) as pool: |
| pool.starmap( |
| partial( |
| frame_to_mask, |
| with_instances=with_instances, |
| back_color=back_color, |
| closed=closed, ), |
| tqdm( |
| zip(out_paths, shapes, colors_list, poly2ds_list), |
| total=len(out_paths), ), ) |
|
|
|
|
| def seg_to_masks( |
| frames: List[Frame], |
| out_base: str, |
| config: Config, |
| nproc: int=NPROC, |
| mode: str="sem_seg", |
| back_color: int=IGNORE_LABEL, |
| closed: bool=True, ) -> None: |
| """Converting segmentation poly2d to 1-channel masks.""" |
| os.makedirs(out_base, exist_ok=True) |
| img_shape = config.imageSize |
|
|
| out_paths: List[str] = [] |
| shapes: List[ImageSize] = [] |
| colors_list: List[List[NDArrayU8]] = [] |
| poly2ds_list: List[List[List[Poly2D]]] = [] |
|
|
| categories = dict( |
| sem_seg=labels, drivable=drivables, lane_mark=lane_categories)[mode] |
| cat_name2id = { |
| cat.name: cat.trainId |
| for cat in categories if cat.trainId != IGNORE_LABEL |
| } |
|
|
| logger.info("Preparing annotations for Semseg to Bitmasks") |
|
|
| for image_anns in tqdm(frames): |
| |
| image_name = image_anns.name.replace(".jpg", ".png") |
| image_name = os.path.split(image_name)[-1] |
| out_path = os.path.join(out_base, image_name) |
| out_paths.append(out_path) |
|
|
| if img_shape is None: |
| if image_anns.size is not None: |
| img_shape = image_anns.size |
| else: |
| raise ValueError("Image shape not defined!") |
| shapes.append(img_shape) |
|
|
| colors: List[NDArrayU8] = [] |
| poly2ds: List[List[Poly2D]] = [] |
| colors_list.append(colors) |
| poly2ds_list.append(poly2ds) |
|
|
| if image_anns.labels is None: |
| continue |
|
|
| for label in image_anns.labels: |
| if label.category not in cat_name2id: |
| continue |
| if label.poly2d is None: |
| continue |
|
|
| category_id = cat_name2id[label.category] |
| if mode in ["sem_seg", "drivable"]: |
| color: NDArrayU8 = np.array([category_id], dtype=np.uint8) |
| else: |
| color = set_lane_color(label, category_id) |
|
|
| colors.append(color) |
| poly2ds.append(label.poly2d) |
|
|
| logger.info("Start Conversion for Seg to Masks") |
| frames_to_masks( |
| nproc, |
| out_paths, |
| shapes, |
| colors_list, |
| poly2ds_list, |
| with_instances=False, |
| back_color=back_color, |
| closed=closed, ) |
|
|
|
|
| ToMasksFunc = Callable[[List[Frame], str, Config, int], None] |
| semseg_to_masks: ToMasksFunc = partial( |
| seg_to_masks, mode="sem_seg", back_color=IGNORE_LABEL, closed=True) |
| drivable_to_masks: ToMasksFunc = partial( |
| seg_to_masks, |
| mode="drivable", |
| back_color=len(drivables) - 1, |
| closed=True, ) |
| lanemark_to_masks: ToMasksFunc = partial( |
| seg_to_masks, mode="lane_mark", back_color=IGNORE_LABEL, closed=False) |
|
|
|
|
| def insseg_to_bitmasks(frames: List[Frame], |
| out_base: str, |
| config: Config, |
| nproc: int=NPROC) -> None: |
| """Converting instance segmentation poly2d to bitmasks.""" |
| os.makedirs(out_base, exist_ok=True) |
| img_shape = config.imageSize |
|
|
| out_paths: List[str] = [] |
| shapes: List[ImageSize] = [] |
| colors_list: List[List[NDArrayU8]] = [] |
| poly2ds_list: List[List[List[Poly2D]]] = [] |
|
|
| categories = get_leaf_categories(config.categories) |
| cat_name2id = {cat.name: i + 1 for i, cat in enumerate(categories)} |
|
|
| logger.info("Preparing annotations for InsSeg to Bitmasks") |
|
|
| for image_anns in tqdm(frames): |
| ann_id = 0 |
|
|
| |
| image_name = image_anns.name.replace(".jpg", ".png") |
| image_name = os.path.split(image_name)[-1] |
| out_path = os.path.join(out_base, image_name) |
| out_paths.append(out_path) |
|
|
| if img_shape is None: |
| if image_anns.size is not None: |
| img_shape = image_anns.size |
| else: |
| raise ValueError("Image shape not defined!") |
| shapes.append(img_shape) |
|
|
| colors: List[NDArrayU8] = [] |
| poly2ds: List[List[Poly2D]] = [] |
| colors_list.append(colors) |
| poly2ds_list.append(poly2ds) |
|
|
| labels_ = image_anns.labels |
| if labels_ is None or len(labels_) == 0: |
| continue |
|
|
| |
| if labels_[0].score is not None: |
| labels_ = sorted(labels_, key=lambda label: float(label.score)) |
|
|
| for label in labels_: |
| if label.poly2d is None: |
| continue |
| if label.category not in cat_name2id: |
| continue |
|
|
| ann_id += 1 |
| category_id = cat_name2id[label.category] |
| color = set_instance_color(label, category_id, ann_id) |
| colors.append(color) |
| poly2ds.append(label.poly2d) |
|
|
| logger.info("Start conversion for InsSeg to Bitmasks") |
| frames_to_masks(nproc, out_paths, shapes, colors_list, poly2ds_list) |
|
|
|
|
| def panseg_to_bitmasks(frames: List[Frame], |
| out_base: str, |
| config: Config, |
| nproc: int=NPROC) -> None: |
| """Converting panoptic segmentation poly2d to bitmasks.""" |
| os.makedirs(out_base, exist_ok=True) |
| img_shape = config.imageSize |
|
|
| out_paths: List[str] = [] |
| shapes: List[ImageSize] = [] |
| colors_list: List[List[NDArrayU8]] = [] |
| poly2ds_list: List[List[List[Poly2D]]] = [] |
| cat_name2id = {cat.name: cat.id for cat in labels} |
|
|
| logger.info("Preparing annotations for InsSeg to Bitmasks") |
|
|
| for image_anns in tqdm(frames): |
| cur_ann_id = STUFF_NUM |
|
|
| |
| image_name = image_anns.name.replace(".jpg", ".png") |
| image_name = os.path.split(image_name)[-1] |
| out_path = os.path.join(out_base, image_name) |
| out_paths.append(out_path) |
|
|
| if img_shape is None: |
| if image_anns.size is not None: |
| img_shape = image_anns.size |
| else: |
| raise ValueError("Image shape not defined!") |
| shapes.append(img_shape) |
|
|
| colors: List[NDArrayU8] = [] |
| poly2ds: List[List[Poly2D]] = [] |
| colors_list.append(colors) |
| poly2ds_list.append(poly2ds) |
|
|
| labels_ = image_anns.labels |
| if labels_ is None or len(labels_) == 0: |
| continue |
|
|
| |
| if labels_[0].score is not None: |
| labels_ = sorted(labels_, key=lambda label: float(label.score)) |
|
|
| for label in labels_: |
| if label.poly2d is None: |
| continue |
| if label.category not in cat_name2id: |
| continue |
|
|
| category_id = cat_name2id[label.category] |
| if category_id == 0: |
| continue |
| if category_id <= STUFF_NUM: |
| ann_id = category_id |
| else: |
| cur_ann_id += 1 |
| ann_id = cur_ann_id |
|
|
| color = set_instance_color(label, category_id, ann_id) |
| colors.append(color) |
| poly2ds.append(label.poly2d) |
|
|
| logger.info("Start conversion for PanSeg to Bitmasks") |
| frames_to_masks(nproc, out_paths, shapes, colors_list, poly2ds_list) |
|
|
|
|
| def segtrack_to_bitmasks(frames: List[Frame], |
| out_base: str, |
| config: Config, |
| nproc: int=NPROC) -> None: |
| """Converting segmentation tracking poly2d to bitmasks.""" |
| frames_list = group_and_sort(frames) |
| img_shape = config.imageSize |
|
|
| out_paths: List[str] = [] |
| shapes: List[ImageSize] = [] |
| colors_list: List[List[NDArrayU8]] = [] |
| poly2ds_list: List[List[List[Poly2D]]] = [] |
|
|
| categories = get_leaf_categories(config.categories) |
| cat_name2id = {cat.name: i + 1 for i, cat in enumerate(categories)} |
|
|
| logger.info("Preparing annotations for SegTrack to Bitmasks") |
|
|
| for video_anns in tqdm(frames_list): |
| global_instance_id: int = 1 |
| instance_id_maps: Dict[str, int] = {} |
|
|
| video_name = video_anns[0].videoName |
| out_dir = os.path.join(out_base, video_name) |
| if not os.path.isdir(out_dir): |
| os.makedirs(out_dir) |
|
|
| for image_anns in video_anns: |
| |
| image_name = image_anns.name.replace(".jpg", ".png") |
| image_name = os.path.split(image_name)[-1] |
| out_path = os.path.join(out_dir, image_name) |
| out_paths.append(out_path) |
|
|
| if img_shape is None: |
| if image_anns.size is not None: |
| img_shape = image_anns.size |
| else: |
| raise ValueError("Image shape not defined!") |
| shapes.append(img_shape) |
|
|
| colors: List[NDArrayU8] = [] |
| poly2ds: List[List[Poly2D]] = [] |
| colors_list.append(colors) |
| poly2ds_list.append(poly2ds) |
|
|
| labels_ = image_anns.labels |
| if labels_ is None or len(labels_) == 0: |
| continue |
|
|
| |
| if labels_[0].score is not None: |
| labels_ = sorted(labels_, key=lambda label: float(label.score)) |
|
|
| for label in labels_: |
| if label.poly2d is None: |
| continue |
| if label.category not in cat_name2id: |
| continue |
|
|
| instance_id, global_instance_id = get_bdd100k_instance_id( |
| instance_id_maps, global_instance_id, label.id) |
| category_id = cat_name2id[label.category] |
| color = set_instance_color(label, category_id, instance_id) |
| colors.append(color) |
| poly2ds.append(label.poly2d) |
|
|
| logger.info("Start Conversion for SegTrack to Bitmasks") |
| frames_to_masks(nproc, out_paths, shapes, colors_list, poly2ds_list) |
|
|
|
|
| def main() -> None: |
| """Main function.""" |
| args = parse_args() |
| args.mode = "lane_mark" |
|
|
| os.environ["QT_QPA_PLATFORM"] = "offscreen" |
|
|
| convert_funcs: Dict[str, ToMasksFunc] = dict( |
| sem_seg=semseg_to_masks, |
| drivable=drivable_to_masks, |
| lane_mark=lanemark_to_masks, |
| pan_seg=panseg_to_bitmasks, |
| ins_seg=insseg_to_bitmasks, |
| seg_track=segtrack_to_bitmasks, ) |
|
|
| dataset = load(args.input, args.nproc) |
| if args.config is not None: |
| bdd100k_config = load_bdd100k_config(args.config) |
| elif dataset.config is not None: |
| bdd100k_config = BDD100KConfig(config=dataset.config) |
| else: |
| bdd100k_config = load_bdd100k_config(args.mode) |
|
|
| if args.mode in ["ins_seg", "seg_track"]: |
| frames = bdd100k_to_scalabel(dataset.frames, bdd100k_config) |
| else: |
| frames = dataset.frames |
|
|
| convert_funcs[args.mode](frames, args.output, bdd100k_config.scalabel, |
| args.nproc) |
|
|
| logger.info("Finished!") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|