| | import csv |
| | import os |
| | import warnings |
| | from collections import defaultdict |
| |
|
| | import imagesize |
| | import numpy as np |
| | import skimage.io as io |
| | from tqdm import tqdm |
| |
|
| | from detectron2.data.detection_utils import read_image |
| | from ape.data.mapper_utils import mask_to_polygons |
| |
|
| |
|
| | def csvread(file): |
| | if file: |
| | with open(file, "r", encoding="utf-8") as f: |
| | csv_f = csv.reader(f) |
| | data = [] |
| | for row in csv_f: |
| | data.append(row) |
| | else: |
| | data = None |
| |
|
| | return data |
| |
|
| |
|
| | def csvwrite(data, file): |
| | with open(file, "w", encoding="utf-8") as f: |
| | writer = csv.writer(f) |
| | for d in data: |
| | writer.writerow(d) |
| |
|
| |
|
| | def _url_to_license(licenses, mode="http"): |
| | |
| | |
| |
|
| | |
| | licenses_by_url = {} |
| |
|
| | for license in licenses: |
| | |
| | if mode == "https": |
| | url = "https:" + license["url"][5:] |
| | else: |
| | url = license["url"] |
| | |
| | licenses_by_url[url] = license |
| |
|
| | return licenses_by_url |
| |
|
| |
|
| | def _list_to_dict(list_data): |
| |
|
| | dict_data = [] |
| | columns = list_data.pop(0) |
| | for i in range(len(list_data)): |
| | dict_data.append({columns[j]: list_data[i][j] for j in range(len(columns))}) |
| |
|
| | return dict_data |
| |
|
| |
|
| | def convert_category_annotations(orginal_category_info): |
| |
|
| | categories = [] |
| | num_categories = len(orginal_category_info) |
| | for i in range(num_categories): |
| | cat = {} |
| | cat["id"] = i + 1 |
| | cat["name"] = orginal_category_info[i][1] |
| | cat["freebase_id"] = orginal_category_info[i][0] |
| |
|
| | categories.append(cat) |
| |
|
| | return categories |
| |
|
| |
|
| | def convert_image_annotations( |
| | original_image_metadata, |
| | original_image_annotations, |
| | original_image_sizes, |
| | image_dir, |
| | categories, |
| | licenses, |
| | apply_exif, |
| | origin_info=False, |
| | ): |
| |
|
| | original_image_metadata_dict = _list_to_dict(original_image_metadata) |
| | original_image_annotations_dict = _list_to_dict(original_image_annotations) |
| |
|
| | cats_by_freebase_id = {cat["freebase_id"]: cat for cat in categories} |
| |
|
| | if original_image_sizes: |
| | image_size_dict = {x[0]: [int(x[1]), int(x[2])] for x in original_image_sizes[1:]} |
| | else: |
| | image_size_dict = {} |
| |
|
| | |
| | licenses_by_url_http = _url_to_license(licenses, mode="http") |
| | licenses_by_url_https = _url_to_license(licenses, mode="https") |
| |
|
| | |
| | pos_img_lvl_anns = defaultdict(list) |
| | neg_img_lvl_anns = defaultdict(list) |
| | for ann in original_image_annotations_dict: |
| | cat_of_ann = cats_by_freebase_id[ann["LabelName"]]["id"] |
| | if int(ann["Confidence"]) == 1: |
| | pos_img_lvl_anns[ann["ImageID"]].append(cat_of_ann) |
| | elif int(ann["Confidence"]) == 0: |
| | neg_img_lvl_anns[ann["ImageID"]].append(cat_of_ann) |
| |
|
| | |
| | images = [] |
| |
|
| | |
| | num_images = len(original_image_metadata_dict) |
| | for i in tqdm(range(num_images), mininterval=0.5): |
| | |
| | key = original_image_metadata_dict[i]["ImageID"] |
| |
|
| | |
| | img = {} |
| | img["id"] = key |
| | img["file_name"] = key + ".jpg" |
| | img["neg_category_ids"] = neg_img_lvl_anns.get(key, []) |
| | img["pos_category_ids"] = pos_img_lvl_anns.get(key, []) |
| | if origin_info: |
| | img["original_url"] = original_image_metadata_dict[i]["OriginalURL"] |
| | license_url = original_image_metadata_dict[i]["License"] |
| | |
| | try: |
| | img["license"] = licenses_by_url_https[license_url]["id"] |
| | except: |
| | img["license"] = licenses_by_url_http[license_url]["id"] |
| |
|
| | |
| | image_size = image_size_dict.get(key, None) |
| | if image_size is not None: |
| | img["width"], img["height"] = image_size |
| | else: |
| | filename = os.path.join(image_dir, img["file_name"]) |
| | img["width"], img["height"] = imagesize.get(filename) |
| | if apply_exif: |
| | filename = os.path.join(image_dir, img["file_name"]) |
| | image = read_image(filename, format="BGR") |
| | if image.shape[1] != img["width"] or image.shape[0] != img["height"]: |
| | print("before exif correction: ", img) |
| | img["width"], img["height"] = image.shape[1], image.shape[0] |
| | print("after exif correction: ", img) |
| |
|
| | |
| |
|
| | |
| | images.append(img) |
| |
|
| | return images |
| |
|
| |
|
| | def convert_instance_annotations(original_annotations, images, categories, start_index=0): |
| |
|
| | original_annotations_dict = _list_to_dict(original_annotations) |
| |
|
| | imgs = {img["id"]: img for img in images} |
| | cats = {cat["id"]: cat for cat in categories} |
| | cats_by_freebase_id = {cat["freebase_id"]: cat for cat in categories} |
| |
|
| | annotations = [] |
| |
|
| | annotated_attributes = [ |
| | attr |
| | for attr in ["IsOccluded", "IsTruncated", "IsGroupOf", "IsDepiction", "IsInside"] |
| | if attr in original_annotations[0] |
| | ] |
| | annotated_attributes = ["IsOccluded", "IsTruncated", "IsGroupOf", "IsDepiction", "IsInside"] |
| |
|
| | num_instances = len(original_annotations_dict) |
| | for i in tqdm(range(num_instances), mininterval=0.5): |
| | |
| | |
| | key = i + start_index |
| | csv_line = i |
| | ann = {} |
| | ann["id"] = key |
| | image_id = original_annotations_dict[csv_line]["ImageID"] |
| | ann["image_id"] = image_id |
| | ann["freebase_id"] = original_annotations_dict[csv_line]["LabelName"] |
| | ann["category_id"] = cats_by_freebase_id[ann["freebase_id"]]["id"] |
| | ann["iscrowd"] = False |
| |
|
| | xmin = float(original_annotations_dict[csv_line]["XMin"]) * imgs[image_id]["width"] |
| | ymin = float(original_annotations_dict[csv_line]["YMin"]) * imgs[image_id]["height"] |
| | xmax = float(original_annotations_dict[csv_line]["XMax"]) * imgs[image_id]["width"] |
| | ymax = float(original_annotations_dict[csv_line]["YMax"]) * imgs[image_id]["height"] |
| | dx = xmax - xmin |
| | dy = ymax - ymin |
| | ann["bbox"] = [round(a, 2) for a in [xmin, ymin, dx, dy]] |
| | ann["area"] = round(dx * dy, 2) |
| |
|
| | for attribute in annotated_attributes: |
| | |
| | ann[attribute] = int(original_annotations_dict[csv_line][attribute]) |
| |
|
| | annotations.append(ann) |
| |
|
| | return annotations |
| |
|
| |
|
| | def _id_to_rgb(array): |
| | B = array // 256**2 |
| | rest = array % 256**2 |
| | G = rest // 256 |
| | R = rest % 256 |
| | return np.stack([R, G, B], axis=-1).astype("uint8") |
| |
|
| |
|
| | def _get_mask_file(segment, mask_dir): |
| | name = "{}_{}_{}.png".format( |
| | segment["ImageID"], segment["LabelName"].replace("/", ""), segment["BoxID"] |
| | ) |
| | return os.path.join(mask_dir, name) |
| |
|
| |
|
| | def _combine_small_on_top(masks): |
| | combined = np.zeros(shape=masks[0].shape, dtype="uint32") |
| | sizes = [np.sum(m != 0) for m in masks] |
| | for idx in np.argsort(sizes)[::-1]: |
| | mask = masks[idx] |
| | combined[mask != 0] = mask[mask != 0] |
| | return combined |
| |
|
| |
|
| | def _greedy_combine(masks): |
| | combined = np.zeros(shape=masks[0].shape, dtype="uint32") |
| | for maks in maksk: |
| | combined[mask != 0] = mask[mask != 0] |
| | return combined |
| |
|
| |
|
| | def convert_segmentation_annotations( |
| | original_segmentations, |
| | images, |
| | categories, |
| | original_mask_dir, |
| | segmentation_out_dir, |
| | start_index=0, |
| | ): |
| |
|
| | original_segmentations_dict = _list_to_dict(original_segmentations) |
| |
|
| | if not os.path.isdir(segmentation_out_dir): |
| | os.mkdir(segmentation_out_dir) |
| |
|
| | image_ids = list(np.unique([ann["ImageID"] for ann in original_segmentations_dict])) |
| | filtered_images = [img for img in images if img["id"] in image_ids] |
| |
|
| | imgs = {img["id"]: img for img in filtered_images} |
| | cats = {cat["id"]: cat for cat in categories} |
| | cats_by_freebase_id = {cat["freebase_id"]: cat for cat in categories} |
| |
|
| | for i in range(len(original_segmentations_dict)): |
| | original_segmentations_dict[i]["SegmentID"] = i + 1 |
| |
|
| | img_segment_map = defaultdict(list) |
| | for segment in original_segmentations_dict: |
| | img_segment_map[segment["ImageID"]].append(segment) |
| |
|
| | annotations = [] |
| | segment_index = 0 + start_index |
| | for img in tqdm(filtered_images, mininterval=0.5): |
| | ann = dict() |
| | ann["file_name"] = img["file_name"] |
| | ann["image_id"] = img["id"] |
| | ann["segments_info"] = [] |
| | masks = [] |
| | for segment in img_segment_map[img["id"]]: |
| | |
| | mask_file = _get_mask_file(segment, original_mask_dir) |
| | mask = io.imread(mask_file) |
| | |
| | if np.max(mask) == 0: |
| | continue |
| | mask = mask // 255 |
| | mask = mask * segment["SegmentID"] |
| | masks.append(mask) |
| |
|
| | |
| | segment_info = {} |
| | |
| | xmin = float(segment["BoxXMin"]) * img["width"] |
| | ymin = float(segment["BoxYMin"]) * img["height"] |
| | xmax = float(segment["BoxXMax"]) * img["width"] |
| | ymax = float(segment["BoxYMax"]) * img["height"] |
| | dx = xmax - xmin |
| | dy = ymax - ymin |
| | |
| | segment_info["bbox"] = [round(a, 2) for a in [xmin, ymin, dx, dy]] |
| | segment_info["area"] = round(dx * dy, 2) |
| | segment_info["category_id"] = (cats_by_freebase_id[segment["LabelName"]],) |
| | segment_info["id"] = segment_index |
| | segment_index += 1 |
| | |
| | ann["segments_info"].append(segment_info) |
| |
|
| | |
| | |
| | |
| | combined_binary_mask = _combine_small_on_top(masks) |
| | |
| | ids_in_mask = len(np.unique(combined_binary_mask[combined_binary_mask != 0])) |
| | num_segments = len(img_segment_map[img["id"]]) |
| | if ids_in_mask != num_segments: |
| | print("Overlapping masks in image {}".format(ann["image_id"])) |
| | values_in_output = np.unique(combined_binary_mask[combined_binary_mask != 0]) |
| | ids_in_segments = [segment["SegmentID"] for segment in img_segment_map[img["id"]]] |
| | not_in_segments = [x for x in values_in_output if x not in ids_in_segments] |
| | not_in_values = [x for x in ids_in_segments if x not in values_in_output] |
| | print("Not in segments: {}".format(not_in_segments)) |
| | print("Not in pixel values: {}".format(not_in_values)) |
| | |
| | continue |
| |
|
| | combined_rgb_mask = _id_to_rgb(combined_binary_mask) |
| | out_file = os.path.join(segmentation_out_dir, "{}.png".format(ann["image_id"])) |
| | with warnings.catch_warnings(): |
| | warnings.simplefilter("ignore") |
| | io.imsave(out_file, combined_rgb_mask) |
| |
|
| | annotations.append(ann) |
| |
|
| | return annotations |
| |
|
| |
|
| | def convert_segmentation_annotations_polygon( |
| | original_segmentations, |
| | images, |
| | categories, |
| | original_mask_dir, |
| | segmentation_out_dir, |
| | start_index=0, |
| | ): |
| |
|
| | original_segmentations_dict = _list_to_dict(original_segmentations) |
| |
|
| | cats_by_freebase_id = {cat["freebase_id"]: cat for cat in categories} |
| | images_by_id = {img["id"]: img for img in images} |
| |
|
| | annotations = [] |
| | segment_index = 0 + start_index |
| | num_no_mask = 0 |
| | for segment in tqdm(original_segmentations_dict): |
| | ann = {} |
| | ann["id"] = segment_index |
| | ann["image_id"] = segment["ImageID"] |
| | ann["freebase_id"] = segment["LabelName"] |
| | ann["category_id"] = cats_by_freebase_id[segment["LabelName"]]["id"] |
| | ann["iscrowd"] = False |
| |
|
| | img = images_by_id[segment["ImageID"]] |
| | xmin = float(segment["BoxXMin"]) * img["width"] |
| | ymin = float(segment["BoxYMin"]) * img["height"] |
| | xmax = float(segment["BoxXMax"]) * img["width"] |
| | ymax = float(segment["BoxYMax"]) * img["height"] |
| | dx = xmax - xmin |
| | dy = ymax - ymin |
| | ann["bbox"] = [round(a, 2) for a in [xmin, ymin, dx, dy]] |
| | ann["area"] = round(dx * dy, 2) |
| |
|
| | mask_file = _get_mask_file(segment, original_mask_dir) |
| |
|
| | mask = io.imread(mask_file) |
| | |
| | if np.max(mask) == 0: |
| | num_no_mask += 1 |
| | continue |
| |
|
| | mask_shape = mask.shape |
| | mask, hierarchy, _ = mask_to_polygons(mask) |
| | mask = [mask[i] for i in range(len(mask)) if hierarchy.reshape(-1, 4)[i][3] < 0] |
| | |
| | scale = [1.0 * img["width"] / mask_shape[1], 1.0 * img["height"] / mask_shape[0]] |
| | scale = np.array(scale) |
| | offset = [0, 0] |
| | offset = np.array(offset) |
| | mask = [m.reshape(-1, 2) for m in mask] |
| | mask = [m * scale + offset for m in mask] |
| | mask = [m.reshape(-1) for m in mask] |
| | mask = [m.tolist() for m in mask] |
| | |
| |
|
| | ann["segmentation"] = mask |
| |
|
| | segment_index += 1 |
| | annotations.append(ann) |
| |
|
| | |
| | |
| |
|
| | print(num_no_mask) |
| | return annotations |
| |
|
| |
|
| | def filter_images(images, annotations): |
| | image_ids = list(np.unique([ann["image_id"] for ann in annotations])) |
| | filtered_images = [img for img in images if img["id"] in image_ids] |
| | return filtered_images |
| |
|