Spaces:
Running
Running
| import time | |
| from typing import Optional, Sequence | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| from huggingface_hub import PyTorchModelHubMixin | |
| from PIL import Image | |
| from depth_anything_3.cfg import create_object, load_config | |
| from depth_anything_3.registry import MODEL_REGISTRY | |
| from depth_anything_3.specs import Prediction | |
| from depth_anything_3.utils.export import export | |
| from depth_anything_3.utils.geometry import affine_inverse | |
| from depth_anything_3.utils.io.input_processor import InputProcessor | |
| from depth_anything_3.utils.io.output_processor import OutputProcessor | |
| from depth_anything_3.utils.logger import logger | |
| from depth_anything_3.utils.pose_align import align_poses_umeyama | |
| torch.backends.cudnn.benchmark = False | |
| class DepthAnything3(nn.Module, PyTorchModelHubMixin): | |
| """ | |
| CPU-only Depth Anything 3 API class. | |
| This class provides depth estimation with all tensors and models on CPU. | |
| """ | |
| _commit_hash: str | None = None # Set by mixin when loading from Hub | |
| def __init__(self, model_name: str = "da3-large", **kwargs): | |
| super().__init__() | |
| self.model_name = model_name | |
| # Build network and force CPU | |
| self.config = load_config(MODEL_REGISTRY[self.model_name]) | |
| self.model = create_object(self.config) | |
| self.model.eval() | |
| self.device = torch.device("cpu") | |
| self.model.to(self.device) | |
| # Initialize processors | |
| self.input_processor = InputProcessor() | |
| self.output_processor = OutputProcessor() | |
| def forward( | |
| self, | |
| image: torch.Tensor, | |
| extrinsics: torch.Tensor | None = None, | |
| intrinsics: torch.Tensor | None = None, | |
| export_feat_layers: list[int] | None = None, | |
| infer_gs: bool = False, | |
| ) -> dict[str, torch.Tensor]: | |
| """Forward pass on CPU.""" | |
| image = image.to(self.device) | |
| return self.model(image, extrinsics, intrinsics, export_feat_layers, infer_gs) | |
| def inference( | |
| self, | |
| image: list[np.ndarray | Image.Image | str], | |
| extrinsics: np.ndarray | None = None, | |
| intrinsics: np.ndarray | None = None, | |
| align_to_input_ext_scale: bool = True, | |
| infer_gs: bool = False, | |
| render_exts: np.ndarray | None = None, | |
| render_ixts: np.ndarray | None = None, | |
| render_hw: tuple[int, int] | None = None, | |
| process_res: int = 504, | |
| process_res_method: str = "upper_bound_resize", | |
| export_dir: str | None = None, | |
| export_format: str = "mini_npz", | |
| export_feat_layers: Sequence[int] | None = None, | |
| conf_thresh_percentile: float = 40.0, | |
| num_max_points: int = 1_000_000, | |
| show_cameras: bool = True, | |
| feat_vis_fps: int = 15, | |
| export_kwargs: Optional[dict] = {}, | |
| ) -> Prediction: | |
| """Run inference on input images (CPU).""" | |
| if "gs" in export_format: | |
| assert infer_gs, "must set `infer_gs=True` to perform gs-related export." | |
| imgs_cpu, extrinsics, intrinsics = self._preprocess_inputs( | |
| image, extrinsics, intrinsics, process_res, process_res_method | |
| ) | |
| imgs, ex_t, in_t = self._prepare_model_inputs(imgs_cpu, extrinsics, intrinsics) | |
| ex_t_norm = self._normalize_extrinsics(ex_t.clone() if ex_t is not None else None) | |
| export_feat_layers = list(export_feat_layers) if export_feat_layers is not None else [] | |
| raw_output = self._run_model_forward(imgs, ex_t_norm, in_t, export_feat_layers, infer_gs) | |
| prediction = self._convert_to_prediction(raw_output) | |
| prediction = self._align_to_input_extrinsics_intrinsics(extrinsics, intrinsics, prediction, align_to_input_ext_scale) | |
| prediction = self._add_processed_images(prediction, imgs_cpu) | |
| if export_dir is not None: | |
| if "gs" in export_format and infer_gs: | |
| if "gs_video" not in export_format: | |
| export_format = f"{export_format}-gs_video" | |
| if "gs_video" in export_format and "gs_video" not in export_kwargs: | |
| export_kwargs["gs_video"] = {} | |
| export_kwargs["gs_video"].update({ | |
| "extrinsics": render_exts, | |
| "intrinsics": render_ixts, | |
| "out_image_hw": render_hw, | |
| }) | |
| if "glb" in export_format: | |
| if "glb" not in export_kwargs: | |
| export_kwargs["glb"] = {} | |
| export_kwargs["glb"].update({ | |
| "conf_thresh_percentile": conf_thresh_percentile, | |
| "num_max_points": num_max_points, | |
| "show_cameras": show_cameras, | |
| }) | |
| if "feat_vis" in export_format: | |
| if "feat_vis" not in export_kwargs: | |
| export_kwargs["feat_vis"] = {} | |
| export_kwargs["feat_vis"].update({"fps": feat_vis_fps}) | |
| self._export_results(prediction, export_format, export_dir, **export_kwargs) | |
| return prediction | |
| def _preprocess_inputs(self, image, extrinsics=None, intrinsics=None, process_res=504, process_res_method="upper_bound_resize"): | |
| """Preprocess input images on CPU.""" | |
| start_time = time.time() | |
| imgs_cpu, extrinsics, intrinsics = self.input_processor( | |
| image, | |
| extrinsics.copy() if extrinsics is not None else None, | |
| intrinsics.copy() if intrinsics is not None else None, | |
| process_res, | |
| process_res_method, | |
| ) | |
| logger.info("Processed Images Done taking", time.time() - start_time, "seconds. Shape:", imgs_cpu.shape) | |
| return imgs_cpu, extrinsics, intrinsics | |
| def _prepare_model_inputs(self, imgs_cpu, extrinsics, intrinsics): | |
| """Prepare tensors for model input (CPU-only).""" | |
| imgs = imgs_cpu[None].float().to(self.device) | |
| ex_t = extrinsics[None].float().to(self.device) if extrinsics is not None else None | |
| in_t = intrinsics[None].float().to(self.device) if intrinsics is not None else None | |
| return imgs, ex_t, in_t | |
| def _normalize_extrinsics(self, ex_t): | |
| if ex_t is None: | |
| return None | |
| transform = affine_inverse(ex_t[:, :1]) | |
| ex_t_norm = ex_t @ transform | |
| c2ws = affine_inverse(ex_t_norm) | |
| translations = c2ws[..., :3, 3] | |
| dists = translations.norm(dim=-1) | |
| median_dist = torch.clamp(torch.median(dists), min=1e-1) | |
| ex_t_norm[..., :3, 3] = ex_t_norm[..., :3, 3] / median_dist | |
| return ex_t_norm | |
| def _align_to_input_extrinsics_intrinsics(self, extrinsics, intrinsics, prediction, align_to_input_ext_scale=True, ransac_view_thresh=10): | |
| if extrinsics is None: | |
| return prediction | |
| prediction.intrinsics = intrinsics.numpy() | |
| _, _, scale, aligned_extrinsics = align_poses_umeyama( | |
| prediction.extrinsics, | |
| extrinsics.numpy(), | |
| ransac=len(extrinsics) >= ransac_view_thresh, | |
| return_aligned=True, | |
| random_state=42, | |
| ) | |
| if align_to_input_ext_scale: | |
| prediction.extrinsics = extrinsics[..., :3, :].numpy() | |
| prediction.depth /= scale | |
| else: | |
| prediction.extrinsics = aligned_extrinsics | |
| return prediction | |
| def _run_model_forward(self, imgs, ex_t, in_t, export_feat_layers=None, infer_gs=False): | |
| start_time = time.time() | |
| output = self.forward(imgs, ex_t, in_t, export_feat_layers, infer_gs) | |
| logger.info(f"Model Forward Pass Done (CPU). Time: {time.time() - start_time} seconds") | |
| return output | |
| def _convert_to_prediction(self, raw_output): | |
| start_time = time.time() | |
| output = self.output_processor(raw_output) | |
| logger.info(f"Conversion to Prediction Done. Time: {time.time() - start_time} seconds") | |
| return output | |
| def _add_processed_images(self, prediction, imgs_cpu): | |
| processed_imgs = imgs_cpu.permute(0, 2, 3, 1).cpu().numpy() | |
| mean = np.array([0.485, 0.456, 0.406]) | |
| std = np.array([0.229, 0.224, 0.225]) | |
| processed_imgs = np.clip(processed_imgs * std + mean, 0, 1) | |
| processed_imgs = (processed_imgs * 255).astype(np.uint8) | |
| prediction.processed_images = processed_imgs | |
| return prediction | |
| def _export_results(self, prediction, export_format, export_dir, **kwargs): | |
| start_time = time.time() | |
| export(prediction, export_format, export_dir, **kwargs) | |
| logger.info(f"Export Results Done. Time: {time.time() - start_time} seconds") | |
| def _get_model_device(self): | |
| """Always return CPU device.""" | |
| return torch.device("cpu") | |