File size: 3,646 Bytes
1c4206e
5c36daa
1c4206e
 
 
012b29b
 
1c4206e
 
 
 
 
012b29b
1c4206e
5e832fe
5c36daa
a5f8d15
5c36daa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1c4206e
45eb65b
012b29b
1c4206e
45eb65b
 
 
 
1c4206e
012b29b
 
 
1c4206e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
012b29b
 
 
 
 
1c4206e
012b29b
 
 
5c36daa
1c4206e
012b29b
1c4206e
 
 
 
5c36daa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import logging
from typing import Sequence

import numpy as np
import torch
from PIL import Image
from transformers import AutoImageProcessor, AutoModelForDepthEstimation

from .base import DepthEstimator, DepthResult


class DepthAnythingV2Estimator(DepthEstimator):
    """Depth-Anything depth estimator (Transformers-compatible)."""

    name = "depth"
    supports_batch = True
    max_batch_size = 16

    def _resize_depth(self, raw_depth, height, width):
        if raw_depth.dim() == 2:
            raw_depth = raw_depth.unsqueeze(0).unsqueeze(0)
        elif raw_depth.dim() == 3:
            raw_depth = raw_depth.unsqueeze(1) if raw_depth.shape[0] == 1 else raw_depth.unsqueeze(0)

        if raw_depth.shape[-2:] != (height, width):
            import torch.nn.functional as F
            raw_depth = F.interpolate(
                raw_depth,
                size=(height, width),
                mode="bilinear",
                align_corners=False,
            )
        return raw_depth.squeeze().cpu().numpy().astype(np.float32, copy=False)

    def __init__(self, device: str = None) -> None:
        logging.info("Loading Depth-Anything model from Hugging Face (transformers)...")

        if device:
             self.device = torch.device(device)
        else:
             self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        model_id = "LiheYoung/depth-anything-large-hf"
        self.image_processor = AutoImageProcessor.from_pretrained(model_id)
        self.model = AutoModelForDepthEstimation.from_pretrained(model_id).to(self.device).eval()

        if torch.cuda.is_available():
            logging.info("Depth-Anything V2 model loaded on GPU")
        else:
            logging.warning("Depth-Anything V2 model loaded on CPU (no CUDA available)")

    def predict(self, frame: np.ndarray) -> DepthResult:
        """
        Run depth estimation on a single frame.

        Args:
            frame: HxWx3 BGR uint8 numpy array (OpenCV format)

        Returns:
            DepthResult with depth_map (HxW float32) and focal_length
        """
        try:
            rgb_frame = frame[:, :, ::-1]  # BGR -> RGB
            pil_image = Image.fromarray(rgb_frame)
            height, width = pil_image.height, pil_image.width

            inputs = self.image_processor(images=pil_image, return_tensors="pt").to(self.device)
            with torch.no_grad():
                outputs = self.model(**inputs)

            raw_depth = outputs.predicted_depth
            depth_map = self._resize_depth(raw_depth, height, width)
        except Exception as exc:
            logging.error("Depth-Anything inference failed: %s", exc)
            h, w = frame.shape[:2]
            return DepthResult(depth_map=np.zeros((h, w), dtype=np.float32), focal_length=1.0)

        return DepthResult(depth_map=depth_map, focal_length=1.0)

    def predict_batch(self, frames: Sequence[np.ndarray]) -> Sequence[DepthResult]:
        # Convert frames to PIL images
        pil_images = [Image.fromarray(f[:, :, ::-1]) for f in frames]  # BGR->RGB
        sizes = [(img.height, img.width) for img in pil_images]

        inputs = self.image_processor(images=pil_images, return_tensors="pt").to(self.device)

        with torch.no_grad():
            outputs = self.model(**inputs)

        # outputs.predicted_depth is (B, H, W)
        depths = outputs.predicted_depth
        
        results = []
        for i, (h, w) in enumerate(sizes):
            depth_map = self._resize_depth(depths[i], h, w)
            results.append(DepthResult(depth_map=depth_map, focal_length=1.0))

        return results