import numpy as np import os import cv2 from pathlib import Path import trimesh as tm from sklearn.neighbors import KDTree from tqdm import tqdm from tqdm.contrib.concurrent import thread_map def process_frame(frame, vertices, intrinsics, source_path, base_path, key): frame_id = str(frame['frame_id']).zfill(5) mask_path = frame['mask_path'] mask_path = base_path / mask_path mask = np.load(mask_path, allow_pickle=True) mask = mask == key depth = cv2.imread(source_path / f'{frame_id}.png', cv2.IMREAD_UNCHANGED) / 1000. extrinsics = np.loadtxt(source_path / f'{frame_id}.txt') point_mask = np.zeros(len(vertices), dtype=bool) kernel_size = 3 post_process_erosion = True post_process_dilation = False post_process_component = True post_process_component_num = 1 img = np.uint8(mask) * 255 # Define the kernel for morphological operations using cv.getStructuringElement # Поддержка различных форм ядер: MORPH_RECT, MORPH_CROSS, MORPH_ELLIPSE kernel_shape = cv2.MORPH_ELLIPSE # Эллиптическая форма для более плавной эрозии kernel = cv2.getStructuringElement(kernel_shape, (2 * kernel_size + 1, 2 * kernel_size + 1), (kernel_size, kernel_size)) # Apply morphological erosion if requested if post_process_erosion: # Увеличиваем количество итераций эрозии для более сильного уменьшения img = cv2.erode(img, kernel, iterations=1) # Apply morphological dilation if requested if post_process_dilation: # Уменьшаем дилатацию, чтобы не компенсировать эрозию полностью img = cv2.dilate(img, kernel, iterations=1) # Find all connected components num_labels, labels_im = cv2.connectedComponents( img ) # label 0 is background, so start from 1 if post_process_component and num_labels > 1: # Calculate the area of each component and sort them, keeping the largest k component_areas = [ (label, np.sum(labels_im == label)) for label in range(1, num_labels) ] component_areas.sort(key=lambda x: x[1], reverse=True) largest_components = [ x[0] for x in component_areas[: post_process_component_num] ] img = np.isin(labels_im, largest_components).astype(np.uint8) # Return the processed image as a boolean mask # cv2.imwrite("new_mask.png", img * 255) mask = cv2.resize(img, depth.shape[::-1]) mask = mask > 0.5 mask = mask & (depth > 0) cv2.imwrite("mask.png", (mask * 255).astype(np.uint8)) # cv2.imwrite("new_mask_wd.png", (mask).astype(np.uint8) * 255) depth_y, depth_x = np.where(mask) depths = depth[mask] if len(depth_x) == 0: return np.zeros(len(vertices), dtype=bool) # Создаем однородные координаты пикселей pixel_coords = np.vstack([depth_x, depth_y, np.ones(len(depth_x))]) # Шаг 1: Обратная проекция пикселей в нормализованные координаты камеры normalized_coords = np.linalg.inv(intrinsics) @ pixel_coords # Шаг 2: Масштабируем нормализованные координаты на глубину для получения 3D точек в системе камеры camera_points_3d = normalized_coords * depths[np.newaxis, :] # Шаг 3: Добавляем однородную координату для трансформации в мировые координаты camera_points_homogeneous = np.vstack([camera_points_3d, np.ones(len(depth_x))]) # Шаг 4: Трансформируем из координат камеры в мировые координаты # Используем прямую трансформацию extrinsics (camera-to-world) world_points_homogeneous = extrinsics @ camera_points_homogeneous # Шаг 5: Нормализуем однородные координаты points = (world_points_homogeneous[:3, :] / world_points_homogeneous[3, :]).T points = points[~np.isnan(points).any(axis=1)] if len(points) == 0: return np.zeros(len(vertices), dtype=bool) tree = KDTree(vertices) dist, ind = tree.query(points, k=1) ind = ind.flatten() dist = dist.flatten() max_distance = 0.05 # 10 см максимальное расстояние valid_matches = dist < max_distance ind = ind[valid_matches] ind = np.unique(ind) print(f"unique ind: {len(ind)}") if valid_matches.sum() > 0: point_mask[ind] = True return point_mask def process_object(data): key, item, vertices, intrinsics, source_path, base_path, num_frames = data frames = item['frames'] total_points_mask = np.zeros(len(vertices), dtype=bool) for frame in frames[:num_frames]: point_mask = process_frame(frame, vertices, intrinsics, source_path, base_path, key) total_points_mask = total_points_mask | point_mask return total_points_mask def load_scan(pcd_path): pcd_data = np.fromfile(pcd_path, dtype=np.float32).reshape(-1, 6)[:, :3] return pcd_data def process_scene(data): scene_id, exp_name = data pred_path = Path(f"data/prediction/scannet/baseline_scannet200/{scene_id}.npz") out_path = Path(f"data/prediction/scannet/{exp_name}/{scene_id}.npz") base_path = Path(f"/home/jovyan/users/lemeshko/scripts/gsam_result/yolo/{scene_id}") source_path = Path(f"/home/jovyan/users/kolodiazhnyi/data/scannet/posed_images/{scene_id}") scan_path = Path(f"/home/jovyan/users/bulat/workspace/3drec/Indoor/OKNO/data/scannet200/points/{scene_id}.bin") info_path = base_path / "infos.npy" # if out_path.exists(): # return vertices = load_scan(scan_path) info_data = np.load(info_path, allow_pickle=True).item() base_data = np.load(pred_path, allow_pickle=True) # Диагностика меша print(f"Mesh vertices shape: {vertices.shape}") print(f"Mesh vertices range:") print(f" X: [{vertices[:, 0].min():.3f}, {vertices[:, 0].max():.3f}]") print(f" Y: [{vertices[:, 1].min():.3f}, {vertices[:, 1].max():.3f}]") print(f" Z: [{vertices[:, 2].min():.3f}, {vertices[:, 2].max():.3f}]") intrinsics = np.loadtxt(source_path / 'intrinsic.txt')[:3, :3] intrinsics[0, :] *= 640 / 1296 intrinsics[1, :] *= 480 / 968 num_frames = 500 object_data = [[key, item, vertices, intrinsics, source_path, base_path, num_frames] for key, item in info_data.items()] total_points_masks = thread_map(process_object, object_data, chunksize=100) new_data = { k: v for k, v in base_data.items() } for i, key in enumerate(info_data.keys()): new_data['pred_masks'][:, i] = total_points_masks[i] out_path.parent.mkdir(parents=True, exist_ok=True) vs = [] cs = [] for i in range(new_data['pred_masks'].shape[1]): os.makedirs(f"pred_masks", exist_ok=True) v = vertices[new_data['pred_masks'][:, i]] c = np.random.rand(3) c = np.repeat(c[np.newaxis, :], len(v), axis=0) vs.append(v) cs.append(c) tm.PointCloud(np.concatenate(vs, axis=0), colors=np.concatenate(cs, axis=0)).export(f"pred_masks/{scene_id}_mask.ply") print("uniques", np.unique(new_data['pred_masks'].sum(1)), [[k, v.shape] for k, v in new_data.items()]) np.savez(out_path, **new_data) if __name__ == "__main__": exp_name = "erode_mask" scenes = np.loadtxt("/home/jovyan/users/bulat/workspace/3drec/Indoor/MaskClustering/splits/scannet200_subset.txt", dtype=str) for scene in scenes: process_scene((scene, exp_name))