Spaces:
Runtime error
Runtime error
| import argparse | |
| import glob | |
| import json | |
| import os | |
| import sys | |
| from multiprocessing import Pool | |
| from pathlib import Path | |
| import cv2 | |
| import numpy as np | |
| from shapely.geometry import Polygon | |
| from tqdm import tqdm | |
| sys.path.append(str(Path(__file__).resolve().parent.parent)) | |
| from common_utils import resort_corners | |
| from create_coco_cc5k import create_coco_bounding_box | |
| from util.plot_utils import plot_semantic_rich_floorplan_opencv | |
| def plot_floor(output_coco_polygons, categories_dict, img_w, img_h, save_path, door_window_index=[10, 9]): | |
| gt_sem_rich = [] | |
| for j, (poly, poly_type) in enumerate(output_coco_polygons): | |
| corners = np.array(poly).reshape(-1, 2).astype(np.int32) | |
| # corners_flip_y = corners.copy() | |
| # corners_flip_y[:,1] = 255 - corners_flip_y[:,1] | |
| # corners = corners_flip_y | |
| gt_sem_rich.append([corners, poly_type]) | |
| # plot_semantic_rich_floorplan_nicely(gt_sem_rich, save_path, prec=None, rec=None, | |
| # plot_text=True, is_bw=False, | |
| # door_window_index=door_window_index, | |
| # img_w=img_w, | |
| # img_h=img_h, | |
| # semantics_label_mapping=get_dataset_class_labels(categories_dict), | |
| # ) | |
| plot_semantic_rich_floorplan_opencv( | |
| gt_sem_rich, | |
| save_path, | |
| img_w=img_w, | |
| img_h=img_h, | |
| door_window_index=door_window_index, | |
| semantics_label_mapping=get_dataset_class_labels(categories_dict), | |
| is_bw=False, | |
| ) | |
| def prepare_dict(categories_dict): | |
| save_dict = {"images": [], "annotations": [], "categories": categories_dict} | |
| return save_dict | |
| def extract_polygons_from_mask(binary_mask, output_mask_path): | |
| """ | |
| Extract polygons from a binary mask where regions with value 1 are polygons | |
| and background regions have value 0. | |
| Args: | |
| binary_mask (numpy.ndarray): Binary mask with shape (H, W), where 1 represents | |
| the polygon regions and 0 represents the background. | |
| Returns: | |
| list: A list of polygons, where each polygon is represented as a list of (x, y) coordinates. | |
| """ | |
| # Ensure the mask is binary (0 and 1) | |
| binary_mask = (binary_mask > 0).astype(np.uint8) | |
| # Find contours in the binary mask | |
| contours, _ = cv2.findContours(binary_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| # Extract polygons from contours | |
| polygons = [] | |
| for contour in contours: | |
| # Approximate the contour to reduce the number of points | |
| epsilon = 0.001 * cv2.arcLength(contour, True) # Adjust epsilon for more/less detail | |
| approx_polygon = cv2.approxPolyDP(contour, epsilon, True) | |
| polygons.append(approx_polygon.squeeze().tolist()) # Convert to list of (x, y) points | |
| # Convert binary_mask to a 3-channel image to draw colored polylines | |
| binary_mask_colored = cv2.cvtColor(binary_mask * 255, cv2.COLOR_GRAY2BGR) | |
| # Plot polygons on the binary mask with green color | |
| for polygon in polygons: | |
| points = np.array(polygon, dtype=np.int32) | |
| cv2.polylines(binary_mask_colored, [points], isClosed=True, color=(0, 0, 255), thickness=10) | |
| cv2.imwrite(output_mask_path, binary_mask_colored) | |
| return polygons | |
| def read_polygons_from_json(json_file): | |
| with open(json_file, "r") as f: | |
| data = json.load(f) | |
| category_dict = data["categories"] | |
| polygons = [data["annotations"][i]["segmentation"][0] for i in range(len(data["annotations"]))] | |
| poly_types = [data["annotations"][i]["category_id"] for i in range(len(data["annotations"]))] | |
| source_misc = [data["annotations"][i] for i in range(len(data["annotations"]))] | |
| source_polygons = [(polygons[i], poly_types[i]) for i in range(len(polygons))] | |
| return source_polygons, source_misc, category_dict | |
| def get_dataset_class_labels(category_dict): | |
| return {category_dict[i]["id"]: category_dict[i]["name"] for i in range(len(category_dict))} | |
| def check_all_window_door_inside(polygons, door_window_index): | |
| flag = all([poly_type in door_window_index for _, poly_type in polygons]) | |
| return flag | |
| def extract_region_and_annotation( | |
| source_image, | |
| source_annot_path, | |
| region_polygons, | |
| start_image_id, | |
| output_image_dir="output", | |
| output_annot_dir="annotations", | |
| output_aux_dir="output_aux", | |
| vis_aux=True, | |
| ): | |
| """ | |
| Extract regions of the floorplan from the source image based on polygons | |
| and generate annotations. | |
| Args: | |
| source_image (numpy.ndarray): The source image (H, W, 3). | |
| polygons (list): List of polygons, where each polygon is a list of (x, y) coordinates. | |
| output_dir (str): Directory to save the extracted regions and annotations. | |
| Returns: | |
| list: A list of annotations for each extracted region. | |
| """ | |
| door_window_index = [10, 9] | |
| source_polygons, source_misc, categories_dict = read_polygons_from_json(source_annot_path) | |
| source_img_id = os.path.basename(source_annot_path).split(".")[0].zfill(5) | |
| if vis_aux: | |
| gt_sem_rich_path = os.path.join(output_aux_dir, "{}_org_floor.png".format(source_img_id)) | |
| plot_floor( | |
| source_polygons, | |
| categories_dict, | |
| source_image.shape[1], | |
| source_image.shape[0], | |
| gt_sem_rich_path, | |
| door_window_index=door_window_index, | |
| ) | |
| margin = 10 | |
| img_id = start_image_id | |
| # each region polygon corresponds to an image | |
| for i, polygon in enumerate(region_polygons): | |
| instance_id = 0 | |
| output_coco_polygons = [] | |
| # Create a mask for the current polygon | |
| mask = np.zeros(source_image.shape[:2], dtype=np.uint8) | |
| points = np.array(polygon, dtype=np.int32) | |
| cv2.fillPoly(mask, [points], 255) | |
| # Crop the ROI to the bounding box of the polygon | |
| x, y, w, h = cv2.boundingRect(points) | |
| # Expand the bounding box by the margin | |
| x_expanded = max(x - margin, 0) | |
| y_expanded = max(y - margin, 0) | |
| w_expanded = min(x + w + margin, source_image.shape[1]) - x_expanded | |
| h_expanded = min(y + h + margin, source_image.shape[0]) - y_expanded | |
| x, y, w, h = x_expanded, y_expanded, w_expanded, h_expanded | |
| cropped_roi = source_image[y : y + h, x : x + w] | |
| save_dict = prepare_dict(categories_dict) | |
| # Create an annotation for the extracted region | |
| img_dict = {} | |
| img_dict["file_name"] = f"{str(img_id).zfill(5)}_{source_img_id}.png" | |
| img_dict["id"] = img_id | |
| img_dict["width"] = w | |
| img_dict["height"] = h | |
| # Save the cropped ROI | |
| roi_filename = f"{output_image_dir}/{str(img_id).zfill(5)}_{source_img_id}.png" | |
| cv2.imwrite(roi_filename, cropped_roi) | |
| bounding_box = np.array([x, y, x + w, y + h]) | |
| # Convert source polygons to NumPy arrays for vectorized operations | |
| source_polygons_np = [np.array(src_poly[0]).reshape(-1, 2) for src_poly in source_polygons] | |
| assert len(source_polygons_np) == len(source_polygons) | |
| coco_annotation_dict_list = [] | |
| # Iterate through the polygons and filter those inside the bounding box | |
| for j, tmp in enumerate(source_polygons_np): | |
| # Compute the bounding box of the current polygon | |
| poly_bbox = np.hstack([np.min(tmp, axis=0), np.max(tmp, axis=0)]) | |
| # Check if the polygon is outside the bounding box | |
| if np.any(poly_bbox[:2] < bounding_box[:2]) or np.any(poly_bbox[2:] > bounding_box[2:]): | |
| continue | |
| # Scale the polygon coordinates relative to the top-left corner of the bounding box | |
| scaled_polygon = tmp - bounding_box[:2] | |
| coco_seg_poly = [] | |
| poly_sorted = resort_corners(scaled_polygon) | |
| # image = draw_polygon_on_image(image, poly_shapely, "test_poly.jpg") | |
| for p in poly_sorted: | |
| coco_seg_poly += list(p) | |
| if len(scaled_polygon) == 2: | |
| area = source_misc[j]["area"] | |
| coco_bb = source_misc[j]["bbox"] | |
| # shift the bounding box | |
| coco_bb[0] -= bounding_box[0] | |
| coco_bb[1] -= bounding_box[1] | |
| else: | |
| poly_shapely = Polygon(scaled_polygon) | |
| area = poly_shapely.area | |
| rectangle_shapely = poly_shapely.envelope | |
| # Slightly wider bounding box | |
| bb_x, bb_y = rectangle_shapely.exterior.xy | |
| coco_bb = create_coco_bounding_box(bb_x, bb_y, w, h, bound_pad=2) | |
| coco_annotation_dict = { | |
| "segmentation": [coco_seg_poly], | |
| "area": area, | |
| "iscrowd": 0, | |
| "image_id": img_id, | |
| "bbox": coco_bb, | |
| "category_id": source_polygons[j][1], | |
| "id": instance_id, | |
| } | |
| coco_annotation_dict_list.append(coco_annotation_dict) | |
| output_coco_polygons.append([coco_seg_poly, source_polygons[j][1]]) | |
| # Remove after obtaining the polygon | |
| # source_polygons.pop(j) | |
| # source_misc.pop(j) | |
| instance_id += 1 | |
| # skip if just windows and doors are inside | |
| if check_all_window_door_inside(output_coco_polygons, door_window_index): | |
| instance_id -= len(coco_annotation_dict_list) | |
| continue | |
| save_dict["images"].append(img_dict) | |
| save_dict["annotations"] += coco_annotation_dict_list | |
| if vis_aux: | |
| gt_sem_rich_path = os.path.join( | |
| output_aux_dir, "{}_{}_floor.png".format(str(img_id).zfill(5), source_img_id) | |
| ) | |
| plot_floor( | |
| output_coco_polygons, categories_dict, w, h, gt_sem_rich_path, door_window_index=door_window_index | |
| ) | |
| # Save annotations to a JSON file | |
| json_path = f"{output_annot_dir}/{str(img_id).zfill(5)}_{source_img_id}.json" | |
| with open(json_path, "w") as f: | |
| json.dump(save_dict, f) | |
| img_id += 1 | |
| start_image_id = img_id | |
| return start_image_id | |
| def config(): | |
| a = argparse.ArgumentParser(description="Generate coco format data for Structured3D") | |
| a.add_argument( | |
| "--data_root", default="Structured3D_panorama", type=str, help="path to raw Structured3D_panorama folder" | |
| ) | |
| a.add_argument("--output", default="coco_cubicasa5k", type=str, help="path to output folder") | |
| args = a.parse_args() | |
| return args | |
| # Example usage | |
| if __name__ == "__main__": | |
| args = config() | |
| ### prepare | |
| outFolder = args.output | |
| if not os.path.exists(outFolder): | |
| os.mkdir(outFolder) | |
| annotation_outFolder = os.path.join(outFolder, "annotations_json") | |
| if not os.path.exists(annotation_outFolder): | |
| os.mkdir(annotation_outFolder) | |
| annos_train_folder = os.path.join(annotation_outFolder, "train") | |
| annos_val_folder = os.path.join(annotation_outFolder, "val") | |
| annos_test_folder = os.path.join(annotation_outFolder, "test") | |
| os.makedirs(annos_train_folder, exist_ok=True) | |
| os.makedirs(annos_val_folder, exist_ok=True) | |
| os.makedirs(annos_test_folder, exist_ok=True) | |
| train_img_folder = os.path.join(outFolder, "train") | |
| val_img_folder = os.path.join(outFolder, "val") | |
| test_img_folder = os.path.join(outFolder, "test") | |
| for img_folder in [train_img_folder, val_img_folder, test_img_folder]: | |
| if not os.path.exists(img_folder): | |
| os.mkdir(img_folder) | |
| ### begin processing | |
| start_image_id = 3500 | |
| save_folders = [train_img_folder, val_img_folder, test_img_folder] | |
| annos_folders = [annos_train_folder, annos_val_folder, annos_test_folder] | |
| splits = ["train", "val", "test"] | |
| def wrapper(index): | |
| image_path, annot_path, mask_path = packed_input_files[index] | |
| cur_image_id = int(os.path.basename(image_path).split(".")[0]) | |
| binary_mask = cv2.imread(mask_path)[:, :, -1] | |
| source_image = cv2.imread(image_path, cv2.IMREAD_COLOR) | |
| # Extract polygons | |
| region_polygons = extract_polygons_from_mask( | |
| binary_mask, output_mask_path=f"{save_aux_path}/{str(cur_image_id).zfill(5)}_polylines.png" | |
| ) | |
| return extract_region_and_annotation( | |
| source_image, | |
| annot_path, | |
| region_polygons, | |
| start_image_id + index * 10, | |
| save_path, | |
| save_anno_path, | |
| save_aux_path, | |
| vis_aux=True, | |
| ) | |
| def worker_init(input_files_object): | |
| # Store dataset as global to avoid pickling issues | |
| global packed_input_files | |
| packed_input_files = input_files_object | |
| for i, split in enumerate(splits): | |
| image_files = sorted(glob.glob(f"{args.data_root}/{split}/*.png")) | |
| image_id_list = [os.path.basename(image_path).split(".")[0] for image_path in image_files] | |
| anno_files = [f"{args.data_root}/annotations_json/{split}/{id_}.json" for id_ in image_id_list] | |
| mask_files = [f"{args.data_root}/{split}_aux/{id_}_mask.png" for id_ in image_id_list] | |
| save_path = save_folders[i] | |
| save_anno_path = annos_folders[i] | |
| save_aux_path = save_path.rstrip("/") + "_aux" | |
| os.makedirs(save_aux_path, exist_ok=True) | |
| # for j, (image_path, anno_path, mask_path) in enumerate(zip(image_files, anno_files, mask_files)): | |
| # cur_image_id = int(os.path.basename(image_path).split('.')[0]) | |
| # binary_mask = cv2.imread(mask_path)[:,:,-1] | |
| # source_image = cv2.imread(image_path, cv2.IMREAD_COLOR) | |
| # # Extract polygons | |
| # polygons = extract_polygons_from_mask(binary_mask, output_mask_path=f'{save_aux_path}/{str(cur_image_id).zfill(5)}_polylines.png') | |
| # # # skip if only one polygon (floorplan) | |
| # # if len(polygons) == 1: | |
| # # print(f"Skipping {image_path} with only one polygon") | |
| # # with open(anno_path, 'r') as f: | |
| # # data = json.load(f) | |
| # # # update image id | |
| # # data['images'][0]['id'] = start_image_id | |
| # # data['images'][0]["file_name"] = f'{str(start_image_id).zfill(5)}_{str(cur_image_id).zfill(5)}.png' | |
| # # for anno in data['annotations']: | |
| # # anno['image_id'] = start_image_id | |
| # # with open(f"{save_anno_path}/{str(start_image_id).zfill(5)}_{str(cur_image_id).zfill(5)}.json", 'w') as f: | |
| # # json.dump(data, f, indent=2) | |
| # # shutil.copy(image_path, f"{save_path}/{str(start_image_id).zfill(5)}_{str(cur_image_id).zfill(5)}.png") | |
| # # gt_sem_rich_path = os.path.join(save_aux_path, '{}_{}_floor.png'.format(str(start_image_id).zfill(5), str(cur_image_id).zfill(5))) | |
| # # output_coco_polygons = [(x['segmentation'][0], x['category_id']) for x in data['annotations']] | |
| # # plot_floor(output_coco_polygons, data['categories'], data['images'][0]['width'], data['images'][0]['height'], gt_sem_rich_path, door_window_index=[10, 9]) | |
| # # start_image_id += 1 | |
| # # continue | |
| # # # Print the extracted polygons | |
| # # print("Extracted polygons:") | |
| # # for i, polygon in enumerate(polygons): | |
| # # print(f"Polygon {i + 1}: {polygon}") | |
| # start_image_id = extract_region_and_annotation(source_image, | |
| # anno_path, | |
| # polygons, | |
| # start_image_id, | |
| # output_image_dir=save_path, | |
| # output_annot_dir=save_anno_path, | |
| # output_aux_dir=save_aux_path, | |
| # vis_aux=True) | |
| packed_input_files = list(zip(image_files, anno_files, mask_files)) | |
| # for j in range(5): | |
| # wrapper(j) | |
| # exit(0) | |
| num_processes = 16 | |
| with Pool(num_processes, initializer=worker_init, initargs=(packed_input_files,)) as p: | |
| indices = [j for j in range(len(packed_input_files))] | |
| list(tqdm(p.imap(wrapper, indices), total=len(indices))) | |
| start_image_id += len(packed_input_files) * 10 | |