raster2seq / data_preprocess /cubicasa5k /create_coco_cc5k.py
anas
Initial deployment of Raster2Seq floor plan vectorization API
fadb92b
import argparse
import json
import os
import sys
from multiprocessing import Pool
from pathlib import Path
import cv2
import matplotlib.pyplot as plt
import numpy as np
from loaders import FloorplanSVG
from matplotlib.patches import Patch
from PIL import Image
from shapely.geometry import Polygon
from skimage import measure
from tqdm import tqdm
sys.path.append(str(Path(__file__).resolve().parent.parent.parent))
sys.path.append(str(Path(__file__).resolve().parent.parent))
from common_utils import resort_corners
from stru3d.stru3d_utils import type2id
#### ORIGINAL ROOM NAMES & ICON_NAMES ####
ROOM_NAMES = {
0: "Background",
1: "Outdoor",
2: "Wall",
3: "Kitchen",
4: "Living Room",
5: "Bed Room",
6: "Bath",
7: "Entry",
8: "Railing",
9: "Storage",
10: "Garage",
11: "Undefined",
}
ICON_NAMES = {
0: "No Icon",
1: "Window",
2: "Door",
3: "Closet",
4: "Electrical Applience",
5: "Toilet",
6: "Sink",
7: "Sauna Bench",
8: "Fire Place",
9: "Bathtub",
10: "Chimney",
}
CC5K_2_S3D_MAPPING = {
0: None, # "Background"
1: type2id["balcony"], # "Outdoor" -> balcony (4)
2: None, # "Wall" has no direct match
3: type2id["kitchen"], # Kitchen -> kitchen (1)
4: type2id["living room"], # Living Room -> living room (0)
5: type2id["bedroom"], # Bed Room -> bedroom (2)
6: type2id["bathroom"], # Bath -> bathroom (3)
7: 18, # 'Entry' has no direct match
8: 19, # "Railing" has no direct match
9: type2id["store room"], # Storage -> store room (9)
10: type2id["garage"], # Garage -> garage (14)
11: type2id["undefined"], # Undefined -> undefined (15)
12: type2id["window"], # Window -> window (17)
13: type2id["door"], # Door -> door (16)
}
CC5K_MAPPING = {
0: None,
1: 0, # Outdoor
2: 1, # Wall
3: 2, # Kitchen
4: 3, # Living Room
5: 4, # Bed Room
6: 5, # Bath
7: 6, # Entry
8: 1, # Railing -> Wall
9: 7, # Storage
10: 8, # Garage
11: 9, # Undefined
12: 10, # Window
13: 11, # Door
}
CC5K_MAPPING_2 = {
0: None,
1: 0, # Outdoor
2: None, # Wall
3: 1, # Kitchen
4: 2, # Living Room
5: 3, # Bed Room
6: 4, # Bath
7: 5, # Entry
8: None, # Railing -> Wall
9: 6, # Storage
10: 7, # Garage
11: 8, # Undefined
12: 9, # Window
13: 10, # Door
}
CC5K_CLASS_MAPPING = {
"Outdoor": 0,
"Wall, Railing": 1,
"Kitchen": 2,
"Living Room": 3,
"Bed Room": 4,
"Bath": 5,
"Entry": 6,
"Storage": 7,
"Garage": 8,
"Undefined": 9,
"Window": 10,
"Door": 11,
}
CC5K_CLASS_MAPPING_2 = {
"Outdoor": 0,
"Kitchen": 1,
"Living Room": 2,
"Bed Room": 3,
"Bath": 4,
"Entry": 5,
"Storage": 6,
"Garage": 7,
"Undefined": 8,
"Window": 9,
"Door": 10,
}
CLASS_MAPPING = {
"living room": 0,
"kitchen": 1,
"bedroom": 2,
"bathroom": 3,
"balcony": 4,
"corridor": 5,
"dining room": 6,
"study": 7,
"studio": 8,
"store room": 9,
"garden": 10,
"laundry room": 11,
"office": 12,
"basement": 13,
"garage": 14,
"undefined": 15,
"door": 16,
"window": 17,
"entry": 18,
"railing": 19,
}
def fill_holes_in_mask(binary_mask):
"""
Fill 0-pixels in a binary mask that are completely surrounded by 1-pixels.
Args:
binary_mask (numpy.ndarray): Binary mask with 0 and 1 values.
Returns:
numpy.ndarray: Binary mask with holes filled.
"""
# Ensure the mask is binary (0 and 1)
binary_mask = (binary_mask > 0).astype(np.uint8)
# Apply dilation
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15))
binary_mask = cv2.dilate(binary_mask, kernel, iterations=1)
# Find contours in the mask
contours, _ = cv2.findContours(binary_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# Fill the contours
filled_mask = binary_mask.copy()
cv2.fillPoly(filled_mask, contours, 1)
return filled_mask
def close_contour(contour):
if not np.array_equal(contour[0], contour[-1]):
contour = np.vstack((contour, contour[0]))
return contour
def binary_mask_to_polygon(binary_mask, tolerance=0):
"""Converts a binary mask to COCO polygon representation
Ref: https://github.com/waspinator/pycococreator/blob/master/pycococreatortools/pycococreatortools.py
Args:
binary_mask: a 2D binary numpy array where '1's represent the object
tolerance: Maximum distance from original points of polygon to approximated
polygonal chain. If tolerance is 0, the original coordinate array is returned.
"""
polygons = []
# pad mask to close contours of shapes which start and end at an edge
padded_binary_mask = np.pad(binary_mask, pad_width=1, mode="constant", constant_values=0)
contours = measure.find_contours(padded_binary_mask, 0.5)
contours = np.subtract(contours, 1)
for contour in contours:
contour = close_contour(contour)
contour = measure.approximate_polygon(contour, tolerance)
if len(contour) < 3:
continue
contour = np.flip(contour, axis=1)
segmentation = contour.ravel().tolist()
# after padding and subtracting 1 we may get -0.5 points in our segmentation
segmentation = [0 if i < 0 else i for i in segmentation]
polygons.append(segmentation)
return polygons
def extract_icon_cv2(mask, start_cls_id=11, skip_classes=[]):
room_ids = np.unique(mask)
room_polygons = []
new_mask = np.zeros(mask.shape)
# window, door
for room_id in room_ids:
if room_id in skip_classes:
continue
true_room_id = int(room_id) + start_cls_id
# Create binary mask for this room
room_mask = (mask == room_id).astype(np.uint8)
new_mask = np.where(room_mask, true_room_id, 0)
# Find contours using OpenCV
contours, _ = cv2.findContours(room_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
# # Get the largest contour
# largest_contour = max(contours, key=cv2.contourArea)
for cnt in contours:
polygon = [tuple(point[0]) for point in cnt]
if len(polygon) < 3:
continue
poly = Polygon(polygon)
simplified_poly = poly.simplify(tolerance=0.5, preserve_topology=True)
simplified_poly = list(simplified_poly.exterior.coords)
room_polygons.append([simplified_poly, true_room_id])
return room_polygons, new_mask
def visualize_room_polygons(mask, room_polygons, class_names, save_path="cubicasa_debug.png", bg_polygons=None):
"""
Visualize the extracted room polygons.
Args:
mask: Original segmentation mask
room_polygons: Dictionary of room polygons as returned by extract_room_polygons
figsize: Figure size for the plot
"""
# Set figure size to exactly 256x256 pixels
dpi = 100 # Standard screen DPI
figsize = (mask.shape[1] / dpi, mask.shape[0] / dpi) # Convert pixels to inches
# Get unique classes from the mask
unique_classes = np.unique(mask)
# Create a discrete colormap
cmap = plt.cm.get_cmap("gist_ncar", 256) # nipy_spectral
# cmap = ListedColormap([cmap(x) for x in np.linspace(0, 1, int(20))])
fig = plt.figure(figsize=figsize)
ax = fig.add_axes([0, 0, 1, 1])
plt.imshow(mask, cmap=cmap, interpolation="nearest", alpha=0.6, vmin=0, vmax=20)
# Plot each room polygon
for polygon, room_cls in room_polygons:
polygon_array = np.array(polygon).copy()
# # flip y
# polygon_array[:, 1] = mask.shape[0] - polygon_array[:, 1] - 1
ax.plot(polygon_array[:, 0], polygon_array[:, 1], "k-", linewidth=2)
# Add room ID label at the centroid
centroid_x = np.mean(polygon_array[:, 0])
centroid_y = np.mean(polygon_array[:, 1])
ax.text(
centroid_x,
centroid_y,
str(room_cls),
fontsize=12,
ha="center",
va="center",
bbox=dict(facecolor="white", alpha=0.7),
)
if bg_polygons is not None:
# Plot each room polygon
for polygon, room_cls in bg_polygons:
polygon_array = np.array(polygon).copy()
# # flip y
# polygon_array[:, 1] = mask.shape[0] - polygon_array[:, 1] - 1
ax.plot(polygon_array[:, 0], polygon_array[:, 1], "c-", linewidth=2)
# Create custom legend elements
legend_elements = []
norm = np.linspace(0, 1, 21) # int(max(unique_classes))+1
for i, cls in enumerate(sorted(unique_classes)):
# if int(cls) == 0:
# continue
# Get the exact same color that imshow uses
color = cmap(norm[int(cls)])
# color = cmap(int(cls))
cls_name = f"{int(cls)}_{class_names[int(cls)]}"
# You can replace f"Class {cls}" with your actual class names if available
legend_elements.append(Patch(facecolor=color, edgecolor="black", label=f"{cls_name}", alpha=0.6))
# Add the legend to the plot
ax.legend(
handles=legend_elements,
loc="best",
title="Classes",
fontsize=20,
markerscale=4,
title_fontsize=28,
)
# plt.title('Room Polygons Extracted from Segmentation Mask')
plt.axis("equal")
plt.axis("off")
fig.savefig(save_path, bbox_inches="tight", pad_inches=0)
plt.close()
def config():
a = argparse.ArgumentParser(description="Generate coco format data for Structured3D")
a.add_argument(
"--data_root", default="Structured3D_panorama", type=str, help="path to raw Structured3D_panorama folder"
)
a.add_argument("--output", default="coco_cubicasa5k", type=str, help="path to output folder")
a.add_argument("--disable_wd2line", action="store_true")
args = a.parse_args()
return args
def save_image(image_path, output_path, mask=None):
"""
ref: https://github.com/ultralytics/ultralytics/issues/339
"""
img = Image.open(image_path).convert("RGB")
img.info.pop("icc_profile", None)
if mask is not None:
img_array = np.array(img)
if len(mask.shape) == 2 and len(img_array.shape) == 3:
mask = mask[:, :, np.newaxis]
masked_img = np.where(mask == 0, 255, img_array)
img = Image.fromarray(masked_img.astype(np.uint8))
img.save(output_path)
def remove_polygons_by_type(polygons, skip_types=[]):
new_room_polygons = []
for polygon, poly_type in polygons:
if poly_type in skip_types:
continue
new_room_polygons.append([polygon, poly_type])
return new_room_polygons
def merge_rooms_and_icons(room_polygons, icon_polygons):
new_icon_polygons = []
for poly, poly_type in icon_polygons:
new_icon_polygons.append([poly, poly_type + 11])
return room_polygons + new_icon_polygons
def create_coco_bounding_box(bb_x, bb_y, image_width, image_height, bound_pad=2):
bb_x = np.unique(bb_x)
bb_y = np.unique(bb_y)
bb_x_min = np.maximum(np.min(bb_x) - bound_pad, 0)
bb_y_min = np.maximum(np.min(bb_y) - bound_pad, 0)
bb_x_max = np.minimum(np.max(bb_x) + bound_pad, image_width - 1)
bb_y_max = np.minimum(np.max(bb_y) + bound_pad, image_height - 1)
bb_width = bb_x_max - bb_x_min
bb_height = bb_y_max - bb_y_min
coco_bb = [bb_x_min, bb_y_min, bb_width, bb_height]
return coco_bb
def process_floorplan(
image_set,
scene_id,
start_scene_id,
args,
save_dir,
annos_folder,
use_org_cc5k_classs=False,
vis_fp=False,
wd2line=False,
):
if use_org_cc5k_classs:
class_mapping_dict = CC5K_MAPPING_2 # old: CC5K_MAPPING
class_to_index_dict = CC5K_CLASS_MAPPING_2
door_window_index = [10, 9]
else:
class_mapping_dict = CC5K_2_S3D_MAPPING
class_to_index_dict = CLASS_MAPPING
door_window_index = [16, 17]
mask = image_set["label"].numpy()
room_polygons = [[poly, poly_type] for poly, poly_type in zip(image_set["room_polygon"], image_set["room_type"])]
icon_polygons = [[poly, poly_type] for poly, poly_type in zip(image_set["icon_polygon"], image_set["icon_type"])]
image_height, image_width = mask.shape[1:]
coco_annotation_dict_list = []
# for storing
save_dict = prepare_dict(class_to_index_dict) # old: CC5K_CLASS_MAPPING
instance_id = 0
img_id = int(scene_id) + start_scene_id
img_dict = {}
img_dict["file_name"] = str(img_id).zfill(5) + ".png"
img_dict["id"] = img_id
img_dict["width"] = image_width
img_dict["height"] = image_height
if vis_fp:
os.makedirs(save_dir.rstrip("/") + "_aux", exist_ok=True)
visualize_room_polygons(
mask[0],
room_polygons,
list(ROOM_NAMES.values()),
save_path=f"{save_dir.rstrip('/') + '_aux'}/{str(img_id).zfill(5)}_room.png",
)
visualize_room_polygons(
mask[1],
icon_polygons,
list(ICON_NAMES.values()),
bg_polygons=room_polygons,
save_path=f"{save_dir.rstrip('/') + '_aux'}/{str(img_id).zfill(5)}_icon.png",
)
#### FILTER NON-USE TYPES
# DROP BG
room_skip_types = [0]
filtered_room_polygons = remove_polygons_by_type(room_polygons, skip_types=room_skip_types)
# visualize_room_polygons(mask[0], filtered_room_polygons, list(ROOM_NAMES.values()),
# save_path=f"{save_dir.rstrip('/') + '_aux'}/{str(img_id).zfill(5)}_room_filtered.png")
# Exclude all furnitures, excepts window, door
icon_skip_types = [0, *list(range(3, 11))]
filtered_icon_polygons = remove_polygons_by_type(icon_polygons, skip_types=icon_skip_types)
# visualize_room_polygons(mask[1], filtered_icon_polygons, list(ICON_NAMES.values()),
# bg_polygons=room_polygons, save_path=f"{save_dir.rstrip('/') + '_aux'}/{str(img_id).zfill(5)}_icon_filtered.png")
#### COMBINED
combined_polygons = merge_rooms_and_icons(filtered_room_polygons, filtered_icon_polygons)
filtered_mask1 = mask[0].copy()
filtered_mask1[np.isin(mask[0], room_skip_types)] = 0
filtered_mask2 = mask[1].copy()
filtered_mask2[np.isin(mask[1], icon_skip_types)] = 0
filtered_mask2[filtered_mask2 != 0] += 11
filtered_mask = np.where(filtered_mask2 != 0, filtered_mask2, filtered_mask1)
new_filtered_mask = filtered_mask.copy()
for src_type, dest_type in class_mapping_dict.items():
if dest_type is None:
continue
new_filtered_mask[filtered_mask == src_type] = dest_type + 1
# filtered_mask = new_filtered_mask
binary_mask = np.zeros_like(filtered_mask)
binary_mask = np.where((mask[0] + mask[1]) != 0, 1, 0).astype(np.uint8)
filled_mask = fill_holes_in_mask(binary_mask)
cv2.imwrite(
f"{save_dir.rstrip('/') + '_aux'}/{str(img_id).zfill(5) + '_mask.png'}", filled_mask.astype(np.uint8) * 255
)
# visualize_room_polygons(combined_mask, combined_polygons, list(ROOM_NAMES.values()) + list(ICON_NAMES.values()), save_path=f"{save_dir}/{str(img_id).zfill(5)}_combined.png")
save_image(
f"{args.data_root}/{image_set['folder']}/F1_scaled.png",
f"{save_dir}/{str(img_id).zfill(5) + '.png'}",
mask=filled_mask,
)
if vis_fp:
save_image(
f"{args.data_root}/{image_set['folder']}/F1_scaled.png",
f"{save_dir.rstrip('/') + '_aux'}/{str(img_id).zfill(5) + '_org.png'}",
mask=None,
)
output_polygon_list = []
combined_polygon_list = []
for poly_ind, (polygon, poly_type) in enumerate(combined_polygons):
poly_shapely = Polygon(polygon)
area = poly_shapely.area
org_poly_type = poly_type
poly_type = class_mapping_dict[poly_type]
if poly_type is None:
continue
if poly_type not in door_window_index and area < 100:
continue
if poly_type in door_window_index and area < 1:
continue
rectangle_shapely = poly_shapely.envelope
polygon = np.array(polygon)
### here we convert door/window annotation into a single line
if poly_type in door_window_index and wd2line:
if polygon.shape[0] > 4:
if len(polygon) == 5 and (polygon[0] == polygon[-1]).all():
polygon = polygon[:-1] # drop last point since it is same as first
else:
bounding_rect = np.array(poly_shapely.minimum_rotated_rectangle.exterior.coords)
polygon = bounding_rect[:4]
assert polygon.shape[0] == 4
midp_1 = (polygon[0] + polygon[1]) / 2
midp_2 = (polygon[1] + polygon[2]) / 2
midp_3 = (polygon[2] + polygon[3]) / 2
midp_4 = (polygon[3] + polygon[0]) / 2
dist_1_3 = np.square(midp_1 - midp_3).sum()
dist_2_4 = np.square(midp_2 - midp_4).sum()
if dist_1_3 > dist_2_4:
polygon = np.row_stack([midp_1, midp_3])
else:
polygon = np.row_stack([midp_2, midp_4])
coco_seg_poly = []
poly_sorted = resort_corners(polygon)
for p in poly_sorted:
coco_seg_poly += list(p)
# Slightly wider bounding box
bb_x, bb_y = rectangle_shapely.exterior.xy
coco_bb = create_coco_bounding_box(bb_x, bb_y, image_width, image_height, bound_pad=2)
coco_annotation_dict = {
"segmentation": [coco_seg_poly],
"area": area,
"iscrowd": 0,
"image_id": img_id,
"bbox": coco_bb,
"category_id": poly_type,
"id": instance_id,
}
coco_annotation_dict_list.append(coco_annotation_dict)
instance_id += 1
combined_polygon_list.append([np.array(coco_seg_poly).reshape(-1, 2), org_poly_type])
output_polygon_list.append([np.array(coco_seg_poly).reshape(-1, 2), poly_type + 1])
#### end split_file loop
save_dict["images"].append(img_dict)
save_dict["annotations"] += coco_annotation_dict_list
json_path = f"{annos_folder}/{str(img_id).zfill(5) + '.json'}"
with open(json_path, "w") as f:
json.dump(save_dict, f)
if vis_fp:
visualize_room_polygons(
filtered_mask,
combined_polygon_list,
list(ROOM_NAMES.values()) + ["window", "door"],
save_path=f"{save_dir.rstrip('/') + '_aux'}/{str(img_id).zfill(5)}_combined.png",
)
visualize_room_polygons(
new_filtered_mask,
output_polygon_list,
["null"] + list(class_to_index_dict.keys()),
save_path=f"{save_dir.rstrip('/') + '_aux'}/{str(img_id).zfill(5)}_final.png",
)
def prepare_dict(categories_dict):
save_dict = {"images": [], "annotations": [], "categories": []}
for key, value in categories_dict.items():
type_dict = {"supercategory": "room", "id": value, "name": key}
save_dict["categories"].append(type_dict)
return save_dict
if __name__ == "__main__":
args = config()
### prepare
outFolder = args.output
if not os.path.exists(outFolder):
os.mkdir(outFolder)
annotation_outFolder = os.path.join(outFolder, "annotations_json")
if not os.path.exists(annotation_outFolder):
os.mkdir(annotation_outFolder)
annos_train_folder = os.path.join(annotation_outFolder, "train")
annos_val_folder = os.path.join(annotation_outFolder, "val")
annos_test_folder = os.path.join(annotation_outFolder, "test")
os.makedirs(annos_train_folder, exist_ok=True)
os.makedirs(annos_val_folder, exist_ok=True)
os.makedirs(annos_test_folder, exist_ok=True)
train_img_folder = os.path.join(outFolder, "train")
val_img_folder = os.path.join(outFolder, "val")
test_img_folder = os.path.join(outFolder, "test")
for img_folder in [train_img_folder, val_img_folder, test_img_folder]:
if not os.path.exists(img_folder):
os.mkdir(img_folder)
coco_train_json_path = os.path.join(annotation_outFolder, "train.json")
coco_val_json_path = os.path.join(annotation_outFolder, "val.json")
coco_test_json_path = os.path.join(annotation_outFolder, "test.json")
### begin processing
start_scene_id = 3500 # following index of s3d data
split_set = ["train.txt", "val.txt", "test.txt"]
save_folders = [train_img_folder, val_img_folder, test_img_folder]
coco_json_paths = [coco_train_json_path, coco_val_json_path, coco_test_json_path]
annos_folders = [annos_train_folder, annos_val_folder, annos_test_folder]
def wrapper(scene_id):
image_set = dataset[scene_id]
process_floorplan(
image_set,
scene_id,
start_scene_id,
args,
save_dir,
annos_folder,
use_org_cc5k_classs=True,
vis_fp=scene_id < 100,
wd2line=not args.disable_wd2line,
)
def worker_init(dataset_obj):
# Store dataset as global to avoid pickling issues
global dataset
dataset = dataset_obj
for split_id, split_file in enumerate(split_set):
dataset = FloorplanSVG(args.data_root, split_file, format="txt", original_size=False)
save_dir = save_folders[split_id]
json_path = coco_json_paths[split_id]
print(f"############# {split_file}")
annos_folder = annos_folders[split_id]
num_processes = 16
with Pool(num_processes, initializer=worker_init, initargs=(dataset,)) as p:
indices = range(len(dataset))
list(tqdm(p.imap(wrapper, indices), total=len(dataset)))
start_scene_id += len(dataset)