zhouyik's picture
Upload folder using huggingface_hub
032e687 verified
import csv
import os
import warnings
from collections import defaultdict
import imagesize
import numpy as np
import skimage.io as io
from tqdm import tqdm
from detectron2.data.detection_utils import read_image
from ape.data.mapper_utils import mask_to_polygons
def csvread(file):
if file:
with open(file, "r", encoding="utf-8") as f:
csv_f = csv.reader(f)
data = []
for row in csv_f:
data.append(row)
else:
data = None
return data
def csvwrite(data, file):
with open(file, "w", encoding="utf-8") as f:
writer = csv.writer(f)
for d in data:
writer.writerow(d)
def _url_to_license(licenses, mode="http"):
# create dict with license urls as
# mode is either http or https
# create dict
licenses_by_url = {}
for license in licenses:
# Get URL
if mode == "https":
url = "https:" + license["url"][5:]
else:
url = license["url"]
# Add to dict
licenses_by_url[url] = license
return licenses_by_url
def _list_to_dict(list_data):
dict_data = []
columns = list_data.pop(0)
for i in range(len(list_data)):
dict_data.append({columns[j]: list_data[i][j] for j in range(len(columns))})
return dict_data
def convert_category_annotations(orginal_category_info):
categories = []
num_categories = len(orginal_category_info)
for i in range(num_categories):
cat = {}
cat["id"] = i + 1
cat["name"] = orginal_category_info[i][1]
cat["freebase_id"] = orginal_category_info[i][0]
categories.append(cat)
return categories
def convert_image_annotations(
original_image_metadata,
original_image_annotations,
original_image_sizes,
image_dir,
categories,
licenses,
apply_exif,
origin_info=False,
):
original_image_metadata_dict = _list_to_dict(original_image_metadata)
original_image_annotations_dict = _list_to_dict(original_image_annotations)
cats_by_freebase_id = {cat["freebase_id"]: cat for cat in categories}
if original_image_sizes:
image_size_dict = {x[0]: [int(x[1]), int(x[2])] for x in original_image_sizes[1:]}
else:
image_size_dict = {}
# Get dict with license urls
licenses_by_url_http = _url_to_license(licenses, mode="http")
licenses_by_url_https = _url_to_license(licenses, mode="https")
# convert original image annotations to dicts
pos_img_lvl_anns = defaultdict(list)
neg_img_lvl_anns = defaultdict(list)
for ann in original_image_annotations_dict:
cat_of_ann = cats_by_freebase_id[ann["LabelName"]]["id"]
if int(ann["Confidence"]) == 1:
pos_img_lvl_anns[ann["ImageID"]].append(cat_of_ann)
elif int(ann["Confidence"]) == 0:
neg_img_lvl_anns[ann["ImageID"]].append(cat_of_ann)
# Create list
images = []
# loop through entries skipping title line
num_images = len(original_image_metadata_dict)
for i in tqdm(range(num_images), mininterval=0.5):
# Select image ID as key
key = original_image_metadata_dict[i]["ImageID"]
# Copy information
img = {}
img["id"] = key
img["file_name"] = key + ".jpg"
img["neg_category_ids"] = neg_img_lvl_anns.get(key, [])
img["pos_category_ids"] = pos_img_lvl_anns.get(key, [])
if origin_info:
img["original_url"] = original_image_metadata_dict[i]["OriginalURL"]
license_url = original_image_metadata_dict[i]["License"]
# Look up license id
try:
img["license"] = licenses_by_url_https[license_url]["id"]
except:
img["license"] = licenses_by_url_http[license_url]["id"]
# Extract height and width
image_size = image_size_dict.get(key, None)
if image_size is not None:
img["width"], img["height"] = image_size
else:
filename = os.path.join(image_dir, img["file_name"])
img["width"], img["height"] = imagesize.get(filename)
if apply_exif:
filename = os.path.join(image_dir, img["file_name"])
image = read_image(filename, format="BGR")
if image.shape[1] != img["width"] or image.shape[0] != img["height"]:
print("before exif correction: ", img)
img["width"], img["height"] = image.shape[1], image.shape[0]
print("after exif correction: ", img)
# print("We skip this image, as the annotations may wrong.")
# Add to list of images
images.append(img)
return images
def convert_instance_annotations(original_annotations, images, categories, start_index=0):
original_annotations_dict = _list_to_dict(original_annotations)
imgs = {img["id"]: img for img in images}
cats = {cat["id"]: cat for cat in categories}
cats_by_freebase_id = {cat["freebase_id"]: cat for cat in categories}
annotations = []
annotated_attributes = [
attr
for attr in ["IsOccluded", "IsTruncated", "IsGroupOf", "IsDepiction", "IsInside"]
if attr in original_annotations[0]
]
annotated_attributes = ["IsOccluded", "IsTruncated", "IsGroupOf", "IsDepiction", "IsInside"]
num_instances = len(original_annotations_dict)
for i in tqdm(range(num_instances), mininterval=0.5):
# set individual instance id
# use start_index to separate indices between dataset splits
key = i + start_index
csv_line = i
ann = {}
ann["id"] = key
image_id = original_annotations_dict[csv_line]["ImageID"]
ann["image_id"] = image_id
ann["freebase_id"] = original_annotations_dict[csv_line]["LabelName"]
ann["category_id"] = cats_by_freebase_id[ann["freebase_id"]]["id"]
ann["iscrowd"] = False
xmin = float(original_annotations_dict[csv_line]["XMin"]) * imgs[image_id]["width"]
ymin = float(original_annotations_dict[csv_line]["YMin"]) * imgs[image_id]["height"]
xmax = float(original_annotations_dict[csv_line]["XMax"]) * imgs[image_id]["width"]
ymax = float(original_annotations_dict[csv_line]["YMax"]) * imgs[image_id]["height"]
dx = xmax - xmin
dy = ymax - ymin
ann["bbox"] = [round(a, 2) for a in [xmin, ymin, dx, dy]]
ann["area"] = round(dx * dy, 2)
for attribute in annotated_attributes:
# ann[attribute.lower()] = int(original_annotations_dict[csv_line][attribute])
ann[attribute] = int(original_annotations_dict[csv_line][attribute])
annotations.append(ann)
return annotations
def _id_to_rgb(array):
B = array // 256**2
rest = array % 256**2
G = rest // 256
R = rest % 256
return np.stack([R, G, B], axis=-1).astype("uint8")
def _get_mask_file(segment, mask_dir):
name = "{}_{}_{}.png".format(
segment["ImageID"], segment["LabelName"].replace("/", ""), segment["BoxID"]
)
return os.path.join(mask_dir, name)
def _combine_small_on_top(masks):
combined = np.zeros(shape=masks[0].shape, dtype="uint32")
sizes = [np.sum(m != 0) for m in masks]
for idx in np.argsort(sizes)[::-1]:
mask = masks[idx]
combined[mask != 0] = mask[mask != 0]
return combined
def _greedy_combine(masks):
combined = np.zeros(shape=masks[0].shape, dtype="uint32")
for maks in maksk:
combined[mask != 0] = mask[mask != 0]
return combined
def convert_segmentation_annotations(
original_segmentations,
images,
categories,
original_mask_dir,
segmentation_out_dir,
start_index=0,
):
original_segmentations_dict = _list_to_dict(original_segmentations)
if not os.path.isdir(segmentation_out_dir):
os.mkdir(segmentation_out_dir)
image_ids = list(np.unique([ann["ImageID"] for ann in original_segmentations_dict]))
filtered_images = [img for img in images if img["id"] in image_ids]
imgs = {img["id"]: img for img in filtered_images}
cats = {cat["id"]: cat for cat in categories}
cats_by_freebase_id = {cat["freebase_id"]: cat for cat in categories}
for i in range(len(original_segmentations_dict)):
original_segmentations_dict[i]["SegmentID"] = i + 1
img_segment_map = defaultdict(list)
for segment in original_segmentations_dict:
img_segment_map[segment["ImageID"]].append(segment)
annotations = []
segment_index = 0 + start_index
for img in tqdm(filtered_images, mininterval=0.5):
ann = dict()
ann["file_name"] = img["file_name"]
ann["image_id"] = img["id"]
ann["segments_info"] = []
masks = []
for segment in img_segment_map[img["id"]]:
# collect mask
mask_file = _get_mask_file(segment, original_mask_dir)
mask = io.imread(mask_file) # load png
# exclude empty masks
if np.max(mask) == 0:
continue
mask = mask // 255 # set to [0,1]
mask = mask * segment["SegmentID"]
masks.append(mask)
# collect segment info
segment_info = {}
# Compute bbox coordinates
xmin = float(segment["BoxXMin"]) * img["width"]
ymin = float(segment["BoxYMin"]) * img["height"]
xmax = float(segment["BoxXMax"]) * img["width"]
ymax = float(segment["BoxYMax"]) * img["height"]
dx = xmax - xmin
dy = ymax - ymin
# Fill in annotations
segment_info["bbox"] = [round(a, 2) for a in [xmin, ymin, dx, dy]]
segment_info["area"] = round(dx * dy, 2)
segment_info["category_id"] = (cats_by_freebase_id[segment["LabelName"]],)
segment_info["id"] = segment_index
segment_index += 1
# append
ann["segments_info"].append(segment_info)
# combined_binary_mask = sum(masks)
# Looks like many masks overlap
# currently managed by greedy combining
combined_binary_mask = _combine_small_on_top(masks)
# check if masks overlap. If they do we have a problem
ids_in_mask = len(np.unique(combined_binary_mask[combined_binary_mask != 0]))
num_segments = len(img_segment_map[img["id"]])
if ids_in_mask != num_segments:
print("Overlapping masks in image {}".format(ann["image_id"]))
values_in_output = np.unique(combined_binary_mask[combined_binary_mask != 0])
ids_in_segments = [segment["SegmentID"] for segment in img_segment_map[img["id"]]]
not_in_segments = [x for x in values_in_output if x not in ids_in_segments]
not_in_values = [x for x in ids_in_segments if x not in values_in_output]
print("Not in segments: {}".format(not_in_segments))
print("Not in pixel values: {}".format(not_in_values))
# don't include the annotation into the output
continue
combined_rgb_mask = _id_to_rgb(combined_binary_mask)
out_file = os.path.join(segmentation_out_dir, "{}.png".format(ann["image_id"]))
with warnings.catch_warnings():
warnings.simplefilter("ignore")
io.imsave(out_file, combined_rgb_mask)
annotations.append(ann)
return annotations
def convert_segmentation_annotations_polygon(
original_segmentations,
images,
categories,
original_mask_dir,
segmentation_out_dir,
start_index=0,
):
original_segmentations_dict = _list_to_dict(original_segmentations)
cats_by_freebase_id = {cat["freebase_id"]: cat for cat in categories}
images_by_id = {img["id"]: img for img in images}
annotations = []
segment_index = 0 + start_index
num_no_mask = 0
for segment in tqdm(original_segmentations_dict):
ann = {}
ann["id"] = segment_index
ann["image_id"] = segment["ImageID"]
ann["freebase_id"] = segment["LabelName"]
ann["category_id"] = cats_by_freebase_id[segment["LabelName"]]["id"]
ann["iscrowd"] = False
img = images_by_id[segment["ImageID"]]
xmin = float(segment["BoxXMin"]) * img["width"]
ymin = float(segment["BoxYMin"]) * img["height"]
xmax = float(segment["BoxXMax"]) * img["width"]
ymax = float(segment["BoxYMax"]) * img["height"]
dx = xmax - xmin
dy = ymax - ymin
ann["bbox"] = [round(a, 2) for a in [xmin, ymin, dx, dy]]
ann["area"] = round(dx * dy, 2)
mask_file = _get_mask_file(segment, original_mask_dir)
mask = io.imread(mask_file) # load png
# exclude empty masks
if np.max(mask) == 0:
num_no_mask += 1
continue
mask_shape = mask.shape
mask, hierarchy, _ = mask_to_polygons(mask)
mask = [mask[i] for i in range(len(mask)) if hierarchy.reshape(-1, 4)[i][3] < 0]
# print(mask_shape, segment, img, mask)
scale = [1.0 * img["width"] / mask_shape[1], 1.0 * img["height"] / mask_shape[0]]
scale = np.array(scale)
offset = [0, 0]
offset = np.array(offset)
mask = [m.reshape(-1, 2) for m in mask]
mask = [m * scale + offset for m in mask]
mask = [m.reshape(-1) for m in mask]
mask = [m.tolist() for m in mask]
# print(mask_shape, segment, img, mask)
ann["segmentation"] = mask
segment_index += 1
annotations.append(ann)
# if segment_index > 100:
# break
print(num_no_mask)
return annotations
def filter_images(images, annotations):
image_ids = list(np.unique([ann["image_id"] for ann in annotations]))
filtered_images = [img for img in images if img["id"] in image_ids]
return filtered_images