| | |
| |
|
| | import io |
| | from PIL import Image as PImage |
| | import numpy as np |
| | from collections import defaultdict |
| | import cv2 |
| | from typing import Tuple, List |
| | from scipy.spatial.distance import cdist |
| |
|
| | from hoho.read_write_colmap import read_cameras_binary, read_images_binary, read_points3D_binary |
| | from hoho.color_mappings import gestalt_color_mapping, ade20k_color_mapping |
| |
|
| |
|
| | def empty_solution(): |
| | '''Return a minimal valid solution, i.e. 2 vertices and 1 edge.''' |
| | return np.zeros((2,3)), [(0, 1)] |
| | |
| |
|
| | def convert_entry_to_human_readable(entry): |
| | out = {} |
| | already_good = ['__key__', 'wf_vertices', 'wf_edges', 'edge_semantics', 'mesh_vertices', 'mesh_faces', 'face_semantics', 'K', 'R', 't'] |
| | for k, v in entry.items(): |
| | if k in already_good: |
| | out[k] = v |
| | continue |
| | if k == 'points3d': |
| | out[k] = read_points3D_binary(fid=io.BytesIO(v)) |
| | if k == 'cameras': |
| | out[k] = read_cameras_binary(fid=io.BytesIO(v)) |
| | if k == 'images': |
| | out[k] = read_images_binary(fid=io.BytesIO(v)) |
| | if k in ['ade20k', 'gestalt']: |
| | out[k] = [PImage.open(io.BytesIO(x)).convert('RGB') for x in v] |
| | if k == 'depthcm': |
| | out[k] = [PImage.open(io.BytesIO(x)) for x in entry['depthcm']] |
| | return out |
| |
|
| |
|
| | def get_vertices_and_edges_from_segmentation(gest_seg_np, edge_th = 50.0): |
| | '''Get the vertices and edges from the gestalt segmentation mask of the house''' |
| | vertices = [] |
| | connections = [] |
| | |
| | apex_color = np.array(gestalt_color_mapping['apex']) |
| | apex_mask = cv2.inRange(gest_seg_np, apex_color-0.5, apex_color+0.5) |
| | if apex_mask.sum() > 0: |
| | output = cv2.connectedComponentsWithStats(apex_mask, 8, cv2.CV_32S) |
| | (numLabels, labels, stats, centroids) = output |
| | stats, centroids = stats[1:], centroids[1:] |
| | |
| | for i in range(numLabels-1): |
| | vert = {"xy": centroids[i], "type": "apex"} |
| | vertices.append(vert) |
| | |
| | eave_end_color = np.array(gestalt_color_mapping['eave_end_point']) |
| | eave_end_mask = cv2.inRange(gest_seg_np, eave_end_color-0.5, eave_end_color+0.5) |
| | if eave_end_mask.sum() > 0: |
| | output = cv2.connectedComponentsWithStats(eave_end_mask, 8, cv2.CV_32S) |
| | (numLabels, labels, stats, centroids) = output |
| | stats, centroids = stats[1:], centroids[1:] |
| | |
| | for i in range(numLabels-1): |
| | vert = {"xy": centroids[i], "type": "eave_end_point"} |
| | vertices.append(vert) |
| | |
| | apex_pts = [] |
| | apex_pts_idxs = [] |
| | for j, v in enumerate(vertices): |
| | apex_pts.append(v['xy']) |
| | apex_pts_idxs.append(j) |
| | apex_pts = np.array(apex_pts) |
| | |
| | |
| | for edge_class in ['eave', 'ridge', 'rake', 'valley']: |
| | edge_color = np.array(gestalt_color_mapping[edge_class]) |
| | mask = cv2.morphologyEx(cv2.inRange(gest_seg_np, |
| | edge_color-0.5, |
| | edge_color+0.5), |
| | cv2.MORPH_DILATE, np.ones((11, 11))) |
| | line_img = np.copy(gest_seg_np) * 0 |
| | if mask.sum() > 0: |
| | output = cv2.connectedComponentsWithStats(mask, 8, cv2.CV_32S) |
| | (numLabels, labels, stats, centroids) = output |
| | stats, centroids = stats[1:], centroids[1:] |
| | edges = [] |
| | for i in range(1, numLabels): |
| | y,x = np.where(labels == i) |
| | xleft_idx = np.argmin(x) |
| | x_left = x[xleft_idx] |
| | y_left = y[xleft_idx] |
| | xright_idx = np.argmax(x) |
| | x_right = x[xright_idx] |
| | y_right = y[xright_idx] |
| | edges.append((x_left, y_left, x_right, y_right)) |
| | cv2.line(line_img, (x_left, y_left), (x_right, y_right), (255, 255, 255), 2) |
| | edges = np.array(edges) |
| | if (len(apex_pts) < 2) or len(edges) <1: |
| | continue |
| | pts_to_edges_dist = np.minimum(cdist(apex_pts, edges[:,:2]), cdist(apex_pts, edges[:,2:])) |
| | connectivity_mask = pts_to_edges_dist <= edge_th |
| | edge_connects = connectivity_mask.sum(axis=0) |
| | for edge_idx, edgesum in enumerate(edge_connects): |
| | if edgesum>=2: |
| | connected_verts = np.where(connectivity_mask[:,edge_idx])[0] |
| | for a_i, a in enumerate(connected_verts): |
| | for b in connected_verts[a_i+1:]: |
| | connections.append((a, b)) |
| | return vertices, connections |
| |
|
| | def get_uv_depth(vertices, depth): |
| | '''Get the depth of the vertices from the depth image''' |
| | uv = [] |
| | for v in vertices: |
| | uv.append(v['xy']) |
| | uv = np.array(uv) |
| | uv_int = uv.astype(np.int32) |
| | H, W = depth.shape[:2] |
| | uv_int[:, 0] = np.clip( uv_int[:, 0], 0, W-1) |
| | uv_int[:, 1] = np.clip( uv_int[:, 1], 0, H-1) |
| | vertex_depth = depth[(uv_int[:, 1] , uv_int[:, 0])] |
| | return uv, vertex_depth |
| |
|
| |
|
| | def merge_vertices_3d(vert_edge_per_image, th=0.1): |
| | '''Merge vertices that are close to each other in 3D space and are of same types''' |
| | all_3d_vertices = [] |
| | connections_3d = [] |
| | all_indexes = [] |
| | cur_start = 0 |
| | types = [] |
| | for cimg_idx, (vertices, connections, vertices_3d) in vert_edge_per_image.items(): |
| | types += [int(v['type']=='apex') for v in vertices] |
| | all_3d_vertices.append(vertices_3d) |
| | connections_3d+=[(x+cur_start,y+cur_start) for (x,y) in connections] |
| | cur_start+=len(vertices_3d) |
| | all_3d_vertices = np.concatenate(all_3d_vertices, axis=0) |
| | |
| | distmat = cdist(all_3d_vertices, all_3d_vertices) |
| | types = np.array(types).reshape(-1,1) |
| | same_types = cdist(types, types) |
| | mask_to_merge = (distmat <= th) & (same_types==0) |
| | new_vertices = [] |
| | new_connections = [] |
| | to_merge = sorted(list(set([tuple(a.nonzero()[0].tolist()) for a in mask_to_merge]))) |
| | to_merge_final = defaultdict(list) |
| | for i in range(len(all_3d_vertices)): |
| | for j in to_merge: |
| | if i in j: |
| | to_merge_final[i]+=j |
| | for k, v in to_merge_final.items(): |
| | to_merge_final[k] = list(set(v)) |
| | already_there = set() |
| | merged = [] |
| | for k, v in to_merge_final.items(): |
| | if k in already_there: |
| | continue |
| | merged.append(v) |
| | for vv in v: |
| | already_there.add(vv) |
| | old_idx_to_new = {} |
| | count=0 |
| | for idxs in merged: |
| | new_vertices.append(all_3d_vertices[idxs].mean(axis=0)) |
| | for idx in idxs: |
| | old_idx_to_new[idx] = count |
| | count +=1 |
| | |
| | new_vertices=np.array(new_vertices) |
| | |
| | for conn in connections_3d: |
| | new_con = sorted((old_idx_to_new[conn[0]], old_idx_to_new[conn[1]])) |
| | if new_con[0] == new_con[1]: |
| | continue |
| | if new_con not in new_connections: |
| | new_connections.append(new_con) |
| | |
| | return new_vertices, new_connections |
| |
|
| | def prune_not_connected(all_3d_vertices, connections_3d): |
| | '''Prune vertices that are not connected to any other vertex''' |
| | connected = defaultdict(list) |
| | for c in connections_3d: |
| | connected[c[0]].append(c) |
| | connected[c[1]].append(c) |
| | new_indexes = {} |
| | new_verts = [] |
| | connected_out = [] |
| | for k,v in connected.items(): |
| | vert = all_3d_vertices[k] |
| | if tuple(vert) not in new_verts: |
| | new_verts.append(tuple(vert)) |
| | new_indexes[k]=len(new_verts) -1 |
| | for k,v in connected.items(): |
| | for vv in v: |
| | connected_out.append((new_indexes[vv[0]],new_indexes[vv[1]])) |
| | connected_out=list(set(connected_out)) |
| | |
| | return np.array(new_verts), connected_out |
| |
|
| |
|
| | def predict(entry, visualize=False) -> Tuple[np.ndarray, List[int]]: |
| | good_entry = convert_entry_to_human_readable(entry) |
| | vert_edge_per_image = {} |
| | for i, (gest, depth, K, R, t) in enumerate(zip(good_entry['gestalt'], |
| | good_entry['depthcm'], |
| | good_entry['K'], |
| | good_entry['R'], |
| | good_entry['t'] |
| | )): |
| | gest_seg = gest.resize(depth.size) |
| | gest_seg_np = np.array(gest_seg).astype(np.uint8) |
| | |
| | depth_np = np.array(depth) / 2.5 |
| | vertices, connections = get_vertices_and_edges_from_segmentation(gest_seg_np, edge_th = 20.) |
| | if (len(vertices) < 2) or (len(connections) < 1): |
| | print (f'Not enough vertices or connections in image {i}') |
| | vert_edge_per_image[i] = np.empty((0, 2)), [], np.empty((0, 3)) |
| | continue |
| | uv, depth_vert = get_uv_depth(vertices, depth_np) |
| | |
| | xy_local = np.ones((len(uv), 3)) |
| | xy_local[:, 0] = (uv[:, 0] - K[0,2]) / K[0,0] |
| | xy_local[:, 1] = (uv[:, 1] - K[1,2]) / K[1,1] |
| | |
| | vertices_3d_local = depth_vert[...,None] * (xy_local/np.linalg.norm(xy_local, axis=1)[...,None]) |
| | world_to_cam = np.eye(4) |
| | world_to_cam[:3, :3] = R |
| | world_to_cam[:3, 3] = t.reshape(-1) |
| | cam_to_world = np.linalg.inv(world_to_cam) |
| | vertices_3d = cv2.transform(cv2.convertPointsToHomogeneous(vertices_3d_local), cam_to_world) |
| | vertices_3d = cv2.convertPointsFromHomogeneous(vertices_3d).reshape(-1, 3) |
| | vert_edge_per_image[i] = vertices, connections, vertices_3d |
| | all_3d_vertices, connections_3d = merge_vertices_3d(vert_edge_per_image, 3.0) |
| | all_3d_vertices_clean, connections_3d_clean = prune_not_connected(all_3d_vertices, connections_3d) |
| | if (len(all_3d_vertices_clean) < 2) or len(connections_3d_clean) < 1: |
| | print (f'Not enough vertices or connections in the 3D vertices') |
| | return (good_entry['__key__'], *empty_solution()) |
| | if visualize: |
| | from hoho.viz3d import plot_estimate_and_gt |
| | plot_estimate_and_gt( all_3d_vertices_clean, |
| | connections_3d_clean, |
| | good_entry['wf_vertices'], |
| | good_entry['wf_edges']) |
| | return good_entry['__key__'], all_3d_vertices_clean, connections_3d_clean |
| |
|