Spaces:
Configuration error
Configuration error
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| import logging | |
| import copy | |
| import os | |
| import cv2 | |
| import matplotlib | |
| import numpy as np | |
| import requests | |
| import trimesh | |
| from scipy.spatial import cKDTree | |
| from scipy.spatial.transform import Rotation | |
| logger = logging.getLogger(__name__) | |
| def _srgb_to_linear(colors: np.ndarray) -> np.ndarray: | |
| colors = np.clip(colors, 0.0, 1.0) | |
| threshold = 0.04045 | |
| below = colors <= threshold | |
| linear = np.empty_like(colors, dtype=np.float64) | |
| linear[below] = colors[below] / 12.92 | |
| linear[~below] = ((colors[~below] + 0.055) / 1.055) ** 2.4 | |
| return linear | |
| def _linear_to_srgb(colors: np.ndarray) -> np.ndarray: | |
| colors = np.clip(colors, 0.0, 1.0) | |
| threshold = 0.0031308 | |
| srgb = np.empty_like(colors, dtype=np.float64) | |
| below = colors <= threshold | |
| srgb[below] = colors[below] * 12.92 | |
| srgb[~below] = 1.055 * np.power(colors[~below], 1 / 2.4) - 0.055 | |
| return np.clip(np.round(srgb * 255.0), 0, 255).astype(np.uint8) | |
| def voxel_reduce( | |
| points_f32: np.ndarray, | |
| colors_u8: np.ndarray, | |
| conf_f32: np.ndarray | None = None, | |
| voxel_size: float = 0.02, | |
| origin: np.ndarray | None = None, | |
| ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| points = np.asarray(points_f32, dtype=np.float32) | |
| colors = np.asarray(colors_u8, dtype=np.uint8) | |
| if points.size == 0: | |
| return ( | |
| points.reshape(-1, 3).astype(np.float32), | |
| colors.reshape(-1, 3).astype(np.uint8), | |
| np.zeros((points.shape[0],), dtype=np.float32), | |
| ) | |
| if voxel_size is None or voxel_size <= 0: | |
| weights = ( | |
| np.asarray(conf_f32, dtype=np.float32).reshape(-1) | |
| if conf_f32 is not None | |
| else np.ones(points.shape[0], dtype=np.float32) | |
| ) | |
| return points.astype(np.float32), colors.astype(np.uint8), weights | |
| weights = ( | |
| np.asarray(conf_f32, dtype=np.float32).reshape(-1) | |
| if conf_f32 is not None | |
| else np.ones(points.shape[0], dtype=np.float32) | |
| ) | |
| if weights.shape[0] != points.shape[0]: | |
| raise ValueError("conf_f32 must match the shape of points.") | |
| base = ( | |
| np.asarray(origin, dtype=np.float32) | |
| if origin is not None | |
| else points.min(axis=0).astype(np.float32) | |
| ) | |
| voxel_indices = np.floor((points - base) / voxel_size).astype(np.int64) | |
| voxel_keys, inverse_indices, counts = np.unique( | |
| voxel_indices, axis=0, return_inverse=True, return_counts=True | |
| ) | |
| reduced_count = voxel_keys.shape[0] | |
| accum_weights = np.bincount(inverse_indices, weights=weights, minlength=reduced_count) | |
| accum_weights = np.where(accum_weights <= 0, 1e-6, accum_weights) | |
| reduced_points = np.zeros((reduced_count, 3), dtype=np.float64) | |
| for dim in range(3): | |
| reduced_points[:, dim] = np.bincount( | |
| inverse_indices, | |
| weights=weights * points[:, dim], | |
| minlength=reduced_count, | |
| ) | |
| reduced_points /= accum_weights[:, None] | |
| colors_linear = _srgb_to_linear(colors.astype(np.float32) / 255.0) | |
| reduced_colors_linear = np.zeros((reduced_count, 3), dtype=np.float64) | |
| for dim in range(3): | |
| reduced_colors_linear[:, dim] = np.bincount( | |
| inverse_indices, | |
| weights=weights * colors_linear[:, dim], | |
| minlength=reduced_count, | |
| ) | |
| reduced_colors_linear /= accum_weights[:, None] | |
| reduced_colors = _linear_to_srgb(reduced_colors_linear) | |
| support = ( | |
| accum_weights.astype(np.float32) | |
| if conf_f32 is not None | |
| else counts.astype(np.float32) | |
| ) | |
| return reduced_points.astype(np.float32), reduced_colors.astype(np.uint8), support | |
| def _filter_by_support( | |
| points: np.ndarray, | |
| colors: np.ndarray, | |
| support: np.ndarray, | |
| min_support: float | None, | |
| ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| if ( | |
| support is None | |
| or support.size == 0 | |
| or min_support is None | |
| or min_support <= 0 | |
| ): | |
| return points, colors, support | |
| mask = support >= float(min_support) | |
| if not np.any(mask): | |
| return points, colors, support | |
| return points[mask], colors[mask], support[mask] | |
| def _log_point_count(stage: str, before: int, after: int) -> None: | |
| if logger.isEnabledFor(logging.INFO): | |
| logger.info("Point cloud %s: %d -> %d", stage, before, after) | |
| def o3d_outlier_filter( | |
| points_f32: np.ndarray, | |
| colors_u8: np.ndarray, | |
| *, | |
| voxel_size: float = 0.02, | |
| radius_mult: float = 3.0, | |
| nb_points: int = 16, | |
| nb_neighbors: int = 48, | |
| std_ratio: float = 1.5, | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| points = np.asarray(points_f32, dtype=np.float32) | |
| colors = np.asarray(colors_u8, dtype=np.uint8) | |
| if points.size == 0: | |
| return points.reshape(-1, 3), colors.reshape(-1, 3) | |
| try: | |
| import open3d as o3d # type: ignore | |
| except ImportError: | |
| logger.warning("Open3D not available; skipping outlier filtering.") | |
| return points.astype(np.float32), colors.astype(np.uint8) | |
| pcd = o3d.geometry.PointCloud() | |
| pcd.points = o3d.utility.Vector3dVector(points.astype(np.float64)) | |
| pcd.colors = o3d.utility.Vector3dVector(colors.astype(np.float32) / 255.0) | |
| effective_voxel = float(voxel_size) if voxel_size and voxel_size > 0 else 0.02 | |
| radius = max(float(radius_mult) * effective_voxel, 1e-4) | |
| if nb_points > 0: | |
| pcd, _ = pcd.remove_radius_outlier(nb_points=int(nb_points), radius=radius) | |
| if len(pcd.points) == 0: | |
| return np.empty((0, 3), dtype=np.float32), np.empty((0, 3), dtype=np.uint8) | |
| if nb_neighbors > 0: | |
| pcd, _ = pcd.remove_statistical_outlier( | |
| nb_neighbors=int(nb_neighbors), | |
| std_ratio=float(std_ratio), | |
| ) | |
| if len(pcd.points) == 0: | |
| return np.empty((0, 3), dtype=np.float32), np.empty((0, 3), dtype=np.uint8) | |
| filtered_points = np.asarray(pcd.points, dtype=np.float32) | |
| filtered_colors = np.asarray(pcd.colors, dtype=np.float32) | |
| filtered_colors = np.clip(np.round(filtered_colors * 255.0), 0, 255).astype(np.uint8) | |
| return filtered_points, filtered_colors | |
| def density_filter_points( | |
| points_f32: np.ndarray, | |
| colors_u8: np.ndarray, | |
| *, | |
| radius: float, | |
| min_neighbors: int, | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| points = np.asarray(points_f32, dtype=np.float32) | |
| colors = np.asarray(colors_u8, dtype=np.uint8) | |
| if points.size == 0: | |
| return points.reshape(-1, 3), colors.reshape(-1, 3) | |
| radius = max(float(radius), 1e-4) | |
| min_neighbors = max(int(min_neighbors), 1) | |
| tree = cKDTree(points) | |
| neighbor_lists = tree.query_ball_point(points, radius) | |
| mask = np.fromiter((len(nlist) >= min_neighbors for nlist in neighbor_lists), dtype=bool, count=len(neighbor_lists)) | |
| return points[mask], colors[mask] | |
| def reinflate_voxels( | |
| points: np.ndarray, | |
| colors: np.ndarray, | |
| support: np.ndarray | None, | |
| *, | |
| voxel_size: float, | |
| support_scale: float = 0.5, | |
| min_samples: int = 1, | |
| max_samples: int | None = 12, | |
| jitter_mode: str = "cube", | |
| jitter_sigma: float = 0.35, | |
| seed: int | None = None, | |
| ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| """ | |
| Expand each voxel centroid into a jittered micro-cluster to recover splat coverage. | |
| Returns expanded points, colors, and per-sample support values whose sum matches the original. | |
| """ | |
| if ( | |
| points.size == 0 | |
| or support is None | |
| or voxel_size is None | |
| or voxel_size <= 0 | |
| ): | |
| return points, colors, np.asarray(support) if support is not None else np.array([], dtype=np.float32) | |
| support = np.asarray(support, dtype=np.float32) | |
| if support.shape[0] != points.shape[0]: | |
| raise ValueError("Support array must align with points for reinflation.") | |
| voxel_size = float(voxel_size) | |
| positive_mask = support > 0 | |
| if not np.any(positive_mask): | |
| return points, colors, support | |
| counts = np.zeros_like(support, dtype=np.int32) | |
| scaled = np.round(support_scale * support[positive_mask]).astype(np.int32) | |
| if min_samples is not None: | |
| scaled = np.maximum(scaled, int(max(0, min_samples))) | |
| if max_samples is not None: | |
| scaled = np.minimum(scaled, int(max_samples)) | |
| scaled = np.maximum(scaled, 0) | |
| counts[positive_mask] = scaled | |
| total_samples = int(counts.sum()) | |
| if total_samples == 0: | |
| fallback = max(1, int(min_samples or 1)) | |
| if max_samples is not None: | |
| fallback = min(fallback, int(max_samples)) | |
| counts[positive_mask] = fallback | |
| total_samples = int(counts.sum()) | |
| rng = np.random.default_rng(seed) | |
| repeated_indices = np.repeat(np.arange(points.shape[0]), counts) | |
| expanded_points = points[repeated_indices].astype(np.float32, copy=True) | |
| expanded_colors = colors[repeated_indices].astype(np.uint8, copy=True) | |
| offsets = np.zeros_like(expanded_points, dtype=np.float32) | |
| jitter_sigma = float(max(jitter_sigma, 0.0)) | |
| if jitter_mode not in {"cube", "gaussian"}: | |
| raise ValueError("jitter_mode must be 'cube' or 'gaussian'") | |
| if jitter_sigma > 0 and total_samples > 0: | |
| if jitter_mode == "cube": | |
| span = 0.5 * voxel_size * jitter_sigma | |
| offsets = rng.uniform(-span, span, size=expanded_points.shape).astype(np.float32) | |
| else: | |
| sigma = voxel_size * jitter_sigma | |
| offsets = rng.normal(0.0, sigma, size=expanded_points.shape).astype(np.float32) | |
| max_span = 0.5 * voxel_size | |
| np.clip(offsets, -max_span, max_span, out=offsets) | |
| cumulative = np.cumsum(counts) | |
| starts = cumulative - counts | |
| offsets[starts] = 0.0 | |
| expanded_points += offsets | |
| normalized_support = np.zeros_like(support, dtype=np.float32) | |
| nonzero_counts = counts > 0 | |
| normalized_support[nonzero_counts] = support[nonzero_counts] / counts[nonzero_counts] | |
| expanded_support = np.repeat(normalized_support, counts) | |
| return expanded_points, expanded_colors, expanded_support | |
| def predictions_to_glb( | |
| predictions, | |
| conf_thres=50.0, | |
| filter_by_frames="all", | |
| mask_black_bg=False, | |
| mask_white_bg=False, | |
| show_cam=True, | |
| mask_sky=False, | |
| target_dir=None, | |
| prediction_mode="Predicted Pointmap", | |
| extra_cameras=None, | |
| extra_camera_color=(255, 0, 0), | |
| voxel_size: float | None = 0.01, | |
| voxel_after_conf: bool = True, | |
| min_voxel_support: float | None = 3, | |
| o3d_denoise: bool = True, | |
| o3d_params: dict | None = None, | |
| density_filter: bool = False, | |
| density_params: dict | None = None, | |
| reinflate_enabled: bool = True, | |
| reinflate_support_scale: float = 1.5, | |
| reinflate_min_samples: int = 3, | |
| reinflate_max_samples: int | None = 8, | |
| reinflate_jitter_mode: str = "cube", | |
| reinflate_jitter_sigma: float = 0.35, | |
| reinflate_seed: int | None = None, | |
| ceiling_percentile: float | None = None, | |
| ceiling_margin: float = 0.05, | |
| ceiling_z_max: float | None = None, | |
| ) -> trimesh.Scene: | |
| """ | |
| Converts predictions to a 3D scene represented as a GLB file. | |
| Args: | |
| predictions (dict): Dictionary containing model predictions with keys: | |
| - world_points: 3D point coordinates (S, H, W, 3) | |
| - world_points_conf: Confidence scores (S, H, W) | |
| - images: Input images (S, H, W, 3) | |
| - extrinsic: Camera extrinsic matrices (S, 3, 4) | |
| conf_thres (float): Percentage of low-confidence points to filter out (default: 50.0) | |
| filter_by_frames (str): Frame filter specification (default: "all") | |
| mask_black_bg (bool): Mask out black background pixels (default: False) | |
| mask_white_bg (bool): Mask out white background pixels (default: False) | |
| show_cam (bool): Include camera visualization (default: True) | |
| mask_sky (bool): Apply sky segmentation mask (default: False) | |
| target_dir (str): Output directory for intermediate files (default: None) | |
| prediction_mode (str): Prediction mode selector (default: "Predicted Pointmap") | |
| extra_cameras (Optional[List[np.ndarray]]): Additional camera extrinsics (3x4 or 4x4) | |
| to visualize even when show_cam=False. Useful for highlighting localized poses. | |
| extra_camera_color (tuple or list[tuple]): RGB color(s) for extra cameras. | |
| voxel_size (Optional[float]): Size of voxel grid cells (>0 enables reduction). | |
| voxel_after_conf (bool): Apply voxel reduction after confidence/background filtering. | |
| min_voxel_support (Optional[float]): Minimum aggregated support (confidence/count) per voxel. | |
| o3d_denoise (bool): Enable Open3D outlier filtering. | |
| o3d_params (Optional[dict]): Overrides for Open3D filtering parameters. | |
| density_filter (bool): Apply KD-tree based density filtering. | |
| density_params (Optional[dict]): Overrides for density filter parameters. | |
| reinflate_enabled (bool): Re-expand voxels into jittered micro-clusters. | |
| reinflate_support_scale (float): Multiplier converting support into sample count. | |
| reinflate_min_samples (int): Minimum samples emitted per voxel with positive support. | |
| reinflate_max_samples (Optional[int]): Maximum samples emitted per voxel. | |
| reinflate_jitter_mode (str): "cube" (uniform jitter) or "gaussian". | |
| reinflate_jitter_sigma (float): Jitter strength as a fraction of voxel size. | |
| reinflate_seed (Optional[int]): RNG seed for deterministic reinflation. | |
| ceiling_percentile (Optional[float]): Remove points above this Z percentile (0-100). | |
| ceiling_margin (float): Margin subtracted from percentile cutoff (meters). | |
| ceiling_z_max (Optional[float]): Remove points with Z >= this absolute height (meters). | |
| Returns: | |
| trimesh.Scene: Processed 3D scene containing point cloud and cameras | |
| Raises: | |
| ValueError: If input predictions structure is invalid | |
| """ | |
| if not isinstance(predictions, dict): | |
| raise ValueError("predictions must be a dictionary") | |
| if conf_thres is None: | |
| conf_thres = 10.0 | |
| print("Building GLB scene") | |
| selected_frame_idx = None | |
| if filter_by_frames != "all" and filter_by_frames != "All": | |
| try: | |
| # Extract the index part before the colon | |
| selected_frame_idx = int(filter_by_frames.split(":")[0]) | |
| except (ValueError, IndexError): | |
| pass | |
| if "Pointmap" in prediction_mode: | |
| print("Using Pointmap Branch") | |
| if "world_points" in predictions: | |
| pred_world_points = predictions["world_points"] # No batch dimension to remove | |
| pred_world_points_conf = predictions.get("world_points_conf", np.ones_like(pred_world_points[..., 0])) | |
| else: | |
| print("Warning: world_points not found in predictions, falling back to depth-based points") | |
| pred_world_points = predictions["world_points_from_depth"] | |
| pred_world_points_conf = predictions.get("depth_conf", np.ones_like(pred_world_points[..., 0])) | |
| else: | |
| print("Using Depthmap and Camera Branch") | |
| pred_world_points = predictions["world_points_from_depth"] | |
| pred_world_points_conf = predictions.get("depth_conf", np.ones_like(pred_world_points[..., 0])) | |
| # Get images from predictions | |
| images = predictions["images"] | |
| # Use extrinsic matrices instead of pred_extrinsic_list | |
| camera_matrices = predictions["extrinsic"] | |
| if mask_sky: | |
| if target_dir is not None: | |
| import onnxruntime | |
| skyseg_session = None | |
| target_dir_images = target_dir + "/images" | |
| image_list = sorted(os.listdir(target_dir_images)) | |
| sky_mask_list = [] | |
| # Get the shape of pred_world_points_conf to match | |
| S, H, W = ( | |
| pred_world_points_conf.shape | |
| if hasattr(pred_world_points_conf, "shape") | |
| else (len(images), images.shape[1], images.shape[2]) | |
| ) | |
| # Download skyseg.onnx if it doesn't exist | |
| if not os.path.exists("skyseg.onnx"): | |
| print("Downloading skyseg.onnx...") | |
| download_file_from_url( | |
| "https://huggingface.co/JianyuanWang/skyseg/resolve/main/skyseg.onnx", "skyseg.onnx" | |
| ) | |
| for i, image_name in enumerate(image_list): | |
| image_filepath = os.path.join(target_dir_images, image_name) | |
| mask_filepath = os.path.join(target_dir, "sky_masks", image_name) | |
| # Check if mask already exists | |
| if os.path.exists(mask_filepath): | |
| # Load existing mask | |
| sky_mask = cv2.imread(mask_filepath, cv2.IMREAD_GRAYSCALE) | |
| else: | |
| # Generate new mask | |
| if skyseg_session is None: | |
| skyseg_session = onnxruntime.InferenceSession("skyseg.onnx") | |
| sky_mask = segment_sky(image_filepath, skyseg_session, mask_filepath) | |
| # Resize mask to match H×W if needed | |
| if sky_mask.shape[0] != H or sky_mask.shape[1] != W: | |
| sky_mask = cv2.resize(sky_mask, (W, H)) | |
| sky_mask_list.append(sky_mask) | |
| # Convert list to numpy array with shape S×H×W | |
| sky_mask_array = np.array(sky_mask_list) | |
| # Apply sky mask to confidence scores | |
| sky_mask_binary = (sky_mask_array > 0.1).astype(np.float32) | |
| pred_world_points_conf = pred_world_points_conf * sky_mask_binary | |
| if selected_frame_idx is not None: | |
| pred_world_points = pred_world_points[selected_frame_idx][None] | |
| pred_world_points_conf = pred_world_points_conf[selected_frame_idx][None] | |
| images = images[selected_frame_idx][None] | |
| camera_matrices = camera_matrices[selected_frame_idx][None] | |
| vertices_3d = pred_world_points.reshape(-1, 3) | |
| # Handle different image formats - check if images need transposing | |
| if images.ndim == 4 and images.shape[1] == 3: # NCHW format | |
| colors_rgb = np.transpose(images, (0, 2, 3, 1)) | |
| else: # Assume already in NHWC format | |
| colors_rgb = images | |
| colors_rgb = (colors_rgb.reshape(-1, 3) * 255).astype(np.uint8) | |
| conf = pred_world_points_conf.reshape(-1).astype(np.float32) | |
| effective_voxel_size = float(voxel_size) if voxel_size is not None else None | |
| if effective_voxel_size is not None and effective_voxel_size <= 0: | |
| effective_voxel_size = None | |
| if effective_voxel_size is not None and not voxel_after_conf: | |
| before_count = vertices_3d.shape[0] | |
| vertices_3d, colors_rgb, conf = voxel_reduce( | |
| vertices_3d, | |
| colors_rgb, | |
| conf, | |
| voxel_size=effective_voxel_size, | |
| ) | |
| vertices_3d, colors_rgb, conf = _filter_by_support( | |
| vertices_3d, | |
| colors_rgb, | |
| conf, | |
| min_voxel_support, | |
| ) | |
| after_reduce = vertices_3d.shape[0] | |
| _log_point_count("voxel_reduce_pre_conf", before_count, after_reduce) | |
| if reinflate_enabled and after_reduce: | |
| vertices_3d, colors_rgb, conf = reinflate_voxels( | |
| vertices_3d, | |
| colors_rgb, | |
| conf, | |
| voxel_size=effective_voxel_size, | |
| support_scale=reinflate_support_scale, | |
| min_samples=reinflate_min_samples, | |
| max_samples=reinflate_max_samples, | |
| jitter_mode=reinflate_jitter_mode, | |
| jitter_sigma=reinflate_jitter_sigma, | |
| seed=reinflate_seed, | |
| ) | |
| _log_point_count("voxel_reinflate_pre_conf", after_reduce, vertices_3d.shape[0]) | |
| # Convert percentage threshold to actual confidence value | |
| if conf_thres == 0.0: | |
| conf_threshold = 0.0 | |
| else: | |
| conf_threshold = np.percentile(conf, conf_thres) | |
| conf_mask = (conf >= conf_threshold) & (conf > 1e-5) | |
| if mask_black_bg: | |
| black_bg_mask = colors_rgb.sum(axis=1) >= 16 | |
| conf_mask = conf_mask & black_bg_mask | |
| if mask_white_bg: | |
| # Filter out white background pixels (RGB values close to white) | |
| # Consider pixels white if all RGB values are above 240 | |
| white_bg_mask = ~((colors_rgb[:, 0] > 240) & (colors_rgb[:, 1] > 240) & (colors_rgb[:, 2] > 240)) | |
| conf_mask = conf_mask & white_bg_mask | |
| vertices_3d = vertices_3d[conf_mask] | |
| colors_rgb = colors_rgb[conf_mask] | |
| conf_used = conf[conf_mask] | |
| if ceiling_percentile is not None and vertices_3d.size: | |
| try: | |
| percentile_value = float(ceiling_percentile) | |
| except (TypeError, ValueError): | |
| percentile_value = None | |
| if percentile_value is not None and 0.0 < percentile_value < 100.0: | |
| cutoff = float(np.percentile(vertices_3d[:, 2], percentile_value)) | |
| margin = float(max(0.0, ceiling_margin)) | |
| threshold = cutoff - margin | |
| keep_mask = vertices_3d[:, 2] < threshold | |
| if not np.any(keep_mask): | |
| keep_mask = vertices_3d[:, 2] <= cutoff | |
| if np.any(keep_mask) and np.count_nonzero(keep_mask) < vertices_3d.shape[0]: | |
| vertices_3d = vertices_3d[keep_mask] | |
| colors_rgb = colors_rgb[keep_mask] | |
| conf_used = conf_used[keep_mask] | |
| if ceiling_z_max is not None and vertices_3d.size: | |
| try: | |
| z_limit = float(ceiling_z_max) | |
| except (TypeError, ValueError): | |
| z_limit = None | |
| if z_limit is not None: | |
| keep_mask = vertices_3d[:, 2] < z_limit | |
| if not np.any(keep_mask): | |
| keep_mask = vertices_3d[:, 2] <= z_limit | |
| if np.any(keep_mask) and np.count_nonzero(keep_mask) < vertices_3d.shape[0]: | |
| vertices_3d = vertices_3d[keep_mask] | |
| colors_rgb = colors_rgb[keep_mask] | |
| conf_used = conf_used[keep_mask] | |
| if effective_voxel_size is not None and voxel_after_conf and vertices_3d.size: | |
| before_count = vertices_3d.shape[0] | |
| vertices_3d, colors_rgb, conf_used = voxel_reduce( | |
| vertices_3d, | |
| colors_rgb, | |
| conf_used, | |
| voxel_size=effective_voxel_size, | |
| ) | |
| vertices_3d, colors_rgb, conf_used = _filter_by_support( | |
| vertices_3d, | |
| colors_rgb, | |
| conf_used, | |
| min_voxel_support, | |
| ) | |
| after_reduce = vertices_3d.shape[0] | |
| _log_point_count("voxel_reduce_post_conf", before_count, after_reduce) | |
| if reinflate_enabled and after_reduce: | |
| vertices_3d, colors_rgb, conf_used = reinflate_voxels( | |
| vertices_3d, | |
| colors_rgb, | |
| conf_used, | |
| voxel_size=effective_voxel_size, | |
| support_scale=reinflate_support_scale, | |
| min_samples=reinflate_min_samples, | |
| max_samples=reinflate_max_samples, | |
| jitter_mode=reinflate_jitter_mode, | |
| jitter_sigma=reinflate_jitter_sigma, | |
| seed=reinflate_seed, | |
| ) | |
| _log_point_count("voxel_reinflate_post_conf", after_reduce, vertices_3d.shape[0]) | |
| if o3d_denoise and vertices_3d.size: | |
| before_count = vertices_3d.shape[0] | |
| params = { | |
| "voxel_size": effective_voxel_size or 0.02, | |
| "radius_mult": 3.0, | |
| "nb_points": 16, | |
| "nb_neighbors": 48, | |
| "std_ratio": 1.5, | |
| } | |
| if o3d_params: | |
| params.update(o3d_params) | |
| vertices_3d, colors_rgb = o3d_outlier_filter(vertices_3d, colors_rgb, **params) | |
| _log_point_count("o3d_denoise", before_count, vertices_3d.shape[0]) | |
| if density_filter and vertices_3d.size: | |
| before_count = vertices_3d.shape[0] | |
| params = { | |
| "radius": (effective_voxel_size or 0.02) * 2.5, | |
| "min_neighbors": 6, | |
| } | |
| if density_params: | |
| params.update(density_params) | |
| vertices_3d, colors_rgb = density_filter_points(vertices_3d, colors_rgb, **params) | |
| _log_point_count("density_filter", before_count, vertices_3d.shape[0]) | |
| if vertices_3d is None or np.asarray(vertices_3d).size == 0: | |
| vertices_3d = np.array([[1, 0, 0]]) | |
| colors_rgb = np.array([[255, 255, 255]]) | |
| scene_scale = 1 | |
| else: | |
| # Calculate the 5th and 95th percentiles along each axis | |
| lower_percentile = np.percentile(vertices_3d, 5, axis=0) | |
| upper_percentile = np.percentile(vertices_3d, 95, axis=0) | |
| # Calculate the diagonal length of the percentile bounding box | |
| scene_scale = np.linalg.norm(upper_percentile - lower_percentile) | |
| colormap = matplotlib.colormaps.get_cmap("gist_rainbow") | |
| # Initialize a 3D scene | |
| scene_3d = trimesh.Scene() | |
| # Add point cloud data to the scene | |
| point_cloud_data = trimesh.PointCloud(vertices=vertices_3d, colors=colors_rgb) | |
| scene_3d.add_geometry(point_cloud_data) | |
| # Prepare 4x4 matrices for camera extrinsics | |
| num_cameras = len(camera_matrices) | |
| extrinsics_matrices = np.zeros((num_cameras, 4, 4)) | |
| extrinsics_matrices[:, :3, :4] = camera_matrices | |
| extrinsics_matrices[:, 3, 3] = 1 | |
| extra_cameras = [] if extra_cameras is None else list(extra_cameras) | |
| if isinstance(extra_camera_color, tuple) and len(extra_cameras) > 1: | |
| extra_colors = [extra_camera_color for _ in extra_cameras] | |
| elif isinstance(extra_camera_color, (list, tuple)) and len(extra_cameras) == len(extra_camera_color): | |
| extra_colors = list(extra_camera_color) | |
| else: | |
| extra_colors = [(255, 0, 0) for _ in extra_cameras] | |
| if show_cam: | |
| # Add camera models to the scene | |
| for i in range(num_cameras): | |
| world_to_camera = extrinsics_matrices[i] | |
| camera_to_world = np.linalg.inv(world_to_camera) | |
| rgba_color = colormap(i / num_cameras) | |
| current_color = tuple(int(255 * x) for x in rgba_color[:3]) | |
| integrate_camera_into_scene(scene_3d, camera_to_world, current_color, scene_scale) | |
| for idx, extra in enumerate(extra_cameras): | |
| extra = np.asarray(extra) | |
| if extra.shape == (3, 4): | |
| world_to_camera = np.eye(4) | |
| world_to_camera[:3, :4] = extra | |
| elif extra.shape == (4, 4): | |
| world_to_camera = extra | |
| else: | |
| raise ValueError("Extra camera extrinsic must have shape (3,4) or (4,4)") | |
| camera_to_world = np.linalg.inv(world_to_camera) | |
| integrate_camera_into_scene( | |
| scene_3d, | |
| camera_to_world, | |
| extra_colors[idx] if idx < len(extra_colors) else (255, 0, 0), | |
| scene_scale, | |
| ) | |
| # Align scene to the observation of the first camera | |
| scene_3d = apply_scene_alignment(scene_3d, extrinsics_matrices) | |
| print("GLB Scene built") | |
| return scene_3d | |
| def integrate_camera_into_scene(scene: trimesh.Scene, transform: np.ndarray, face_colors: tuple, scene_scale: float): | |
| """ | |
| Integrates a fake camera mesh into the 3D scene. | |
| Args: | |
| scene (trimesh.Scene): The 3D scene to add the camera model. | |
| transform (np.ndarray): Transformation matrix for camera positioning. | |
| face_colors (tuple): Color of the camera face. | |
| scene_scale (float): Scale of the scene. | |
| """ | |
| cam_width = scene_scale * 0.05 | |
| cam_height = scene_scale * 0.1 | |
| # Create cone shape for camera | |
| rot_45_degree = np.eye(4) | |
| rot_45_degree[:3, :3] = Rotation.from_euler("z", 45, degrees=True).as_matrix() | |
| rot_45_degree[2, 3] = -cam_height | |
| opengl_transform = get_opengl_conversion_matrix() | |
| # Combine transformations | |
| complete_transform = transform @ opengl_transform @ rot_45_degree | |
| camera_cone_shape = trimesh.creation.cone(cam_width, cam_height, sections=4) | |
| # Generate mesh for the camera | |
| slight_rotation = np.eye(4) | |
| slight_rotation[:3, :3] = Rotation.from_euler("z", 2, degrees=True).as_matrix() | |
| vertices_combined = np.concatenate( | |
| [ | |
| camera_cone_shape.vertices, | |
| 0.95 * camera_cone_shape.vertices, | |
| transform_points(slight_rotation, camera_cone_shape.vertices), | |
| ] | |
| ) | |
| vertices_transformed = transform_points(complete_transform, vertices_combined) | |
| mesh_faces = compute_camera_faces(camera_cone_shape) | |
| # Add the camera mesh to the scene | |
| camera_mesh = trimesh.Trimesh(vertices=vertices_transformed, faces=mesh_faces) | |
| camera_mesh.visual.face_colors[:, :3] = face_colors | |
| scene.add_geometry(camera_mesh) | |
| def apply_scene_alignment(scene_3d: trimesh.Scene, extrinsics_matrices: np.ndarray) -> trimesh.Scene: | |
| """ | |
| Aligns the 3D scene based on the extrinsics of the first camera. | |
| Args: | |
| scene_3d (trimesh.Scene): The 3D scene to be aligned. | |
| extrinsics_matrices (np.ndarray): Camera extrinsic matrices. | |
| Returns: | |
| trimesh.Scene: Aligned 3D scene. | |
| """ | |
| # Set transformations for scene alignment | |
| opengl_conversion_matrix = get_opengl_conversion_matrix() | |
| # Rotation matrix for alignment (180 degrees around the y-axis) | |
| align_rotation = np.eye(4) | |
| align_rotation[:3, :3] = Rotation.from_euler("y", 180, degrees=True).as_matrix() | |
| # Apply transformation | |
| initial_transformation = np.linalg.inv(extrinsics_matrices[0]) @ opengl_conversion_matrix @ align_rotation | |
| scene_3d.apply_transform(initial_transformation) | |
| return scene_3d | |
| def get_opengl_conversion_matrix() -> np.ndarray: | |
| """ | |
| Constructs and returns the OpenGL conversion matrix. | |
| Returns: | |
| numpy.ndarray: A 4x4 OpenGL conversion matrix. | |
| """ | |
| # Create an identity matrix | |
| matrix = np.identity(4) | |
| # Flip the y and z axes | |
| matrix[1, 1] = -1 | |
| matrix[2, 2] = -1 | |
| return matrix | |
| def transform_points(transformation: np.ndarray, points: np.ndarray, dim: int = None) -> np.ndarray: | |
| """ | |
| Applies a 4x4 transformation to a set of points. | |
| Args: | |
| transformation (np.ndarray): Transformation matrix. | |
| points (np.ndarray): Points to be transformed. | |
| dim (int, optional): Dimension for reshaping the result. | |
| Returns: | |
| np.ndarray: Transformed points. | |
| """ | |
| points = np.asarray(points) | |
| initial_shape = points.shape[:-1] | |
| dim = dim or points.shape[-1] | |
| # Apply transformation | |
| transformation = transformation.swapaxes(-1, -2) # Transpose the transformation matrix | |
| points = points @ transformation[..., :-1, :] + transformation[..., -1:, :] | |
| # Reshape the result | |
| result = points[..., :dim].reshape(*initial_shape, dim) | |
| return result | |
| def compute_camera_faces(cone_shape: trimesh.Trimesh) -> np.ndarray: | |
| """ | |
| Computes the faces for the camera mesh. | |
| Args: | |
| cone_shape (trimesh.Trimesh): The shape of the camera cone. | |
| Returns: | |
| np.ndarray: Array of faces for the camera mesh. | |
| """ | |
| # Create pseudo cameras | |
| faces_list = [] | |
| num_vertices_cone = len(cone_shape.vertices) | |
| for face in cone_shape.faces: | |
| if 0 in face: | |
| continue | |
| v1, v2, v3 = face | |
| v1_offset, v2_offset, v3_offset = face + num_vertices_cone | |
| v1_offset_2, v2_offset_2, v3_offset_2 = face + 2 * num_vertices_cone | |
| faces_list.extend( | |
| [ | |
| (v1, v2, v2_offset), | |
| (v1, v1_offset, v3), | |
| (v3_offset, v2, v3), | |
| (v1, v2, v2_offset_2), | |
| (v1, v1_offset_2, v3), | |
| (v3_offset_2, v2, v3), | |
| ] | |
| ) | |
| faces_list += [(v3, v2, v1) for v1, v2, v3 in faces_list] | |
| return np.array(faces_list) | |
| def segment_sky(image_path, onnx_session, mask_filename=None): | |
| """ | |
| Segments sky from an image using an ONNX model. | |
| Thanks for the great model provided by https://github.com/xiongzhu666/Sky-Segmentation-and-Post-processing | |
| Args: | |
| image_path: Path to input image | |
| onnx_session: ONNX runtime session with loaded model | |
| mask_filename: Path to save the output mask | |
| Returns: | |
| np.ndarray: Binary mask where 255 indicates non-sky regions | |
| """ | |
| assert mask_filename is not None | |
| image = cv2.imread(image_path) | |
| result_map = run_skyseg(onnx_session, [320, 320], image) | |
| # resize the result_map to the original image size | |
| result_map_original = cv2.resize(result_map, (image.shape[1], image.shape[0])) | |
| # Fix: Invert the mask so that 255 = non-sky, 0 = sky | |
| # The model outputs low values for sky, high values for non-sky | |
| output_mask = np.zeros_like(result_map_original) | |
| output_mask[result_map_original < 32] = 255 # Use threshold of 32 | |
| os.makedirs(os.path.dirname(mask_filename), exist_ok=True) | |
| cv2.imwrite(mask_filename, output_mask) | |
| return output_mask | |
| def run_skyseg(onnx_session, input_size, image): | |
| """ | |
| Runs sky segmentation inference using ONNX model. | |
| Args: | |
| onnx_session: ONNX runtime session | |
| input_size: Target size for model input (width, height) | |
| image: Input image in BGR format | |
| Returns: | |
| np.ndarray: Segmentation mask | |
| """ | |
| # Pre process:Resize, BGR->RGB, Transpose, PyTorch standardization, float32 cast | |
| temp_image = copy.deepcopy(image) | |
| resize_image = cv2.resize(temp_image, dsize=(input_size[0], input_size[1])) | |
| x = cv2.cvtColor(resize_image, cv2.COLOR_BGR2RGB) | |
| x = np.array(x, dtype=np.float32) | |
| mean = [0.485, 0.456, 0.406] | |
| std = [0.229, 0.224, 0.225] | |
| x = (x / 255 - mean) / std | |
| x = x.transpose(2, 0, 1) | |
| x = x.reshape(-1, 3, input_size[0], input_size[1]).astype("float32") | |
| # Inference | |
| input_name = onnx_session.get_inputs()[0].name | |
| output_name = onnx_session.get_outputs()[0].name | |
| onnx_result = onnx_session.run([output_name], {input_name: x}) | |
| # Post process | |
| onnx_result = np.array(onnx_result).squeeze() | |
| min_value = np.min(onnx_result) | |
| max_value = np.max(onnx_result) | |
| onnx_result = (onnx_result - min_value) / (max_value - min_value) | |
| onnx_result *= 255 | |
| onnx_result = onnx_result.astype("uint8") | |
| return onnx_result | |
| def download_file_from_url(url, filename): | |
| """Downloads a file from a Hugging Face model repo, handling redirects.""" | |
| try: | |
| # Get the redirect URL | |
| response = requests.get(url, allow_redirects=False) | |
| response.raise_for_status() # Raise HTTPError for bad requests (4xx or 5xx) | |
| if response.status_code == 302: # Expecting a redirect | |
| redirect_url = response.headers["Location"] | |
| response = requests.get(redirect_url, stream=True) | |
| response.raise_for_status() | |
| else: | |
| print(f"Unexpected status code: {response.status_code}") | |
| return | |
| with open(filename, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| print(f"Downloaded {filename} successfully.") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error downloading file: {e}") | |