File size: 4,588 Bytes
4eeefd1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import os
import sys
import glob
import numpy as np
import cv2
from typing import List, Optional

"""
Lightweight runner for CropFormer/Mask2Former inference without spawning a process.
Keeps a global singleton VisualizationDemo so the model is initialized only once.
"""

# Insert CropFormer project into sys.path only once
_CROPF_DIR = None
def make_cropformer_dir(MK_PATH: str) -> str:
    global _CROPF_DIR
    _CROPF_DIR = os.path.join(MK_PATH, "third_party/detectron2/projects/CropFormer")
    if _CROPF_DIR not in sys.path:
        sys.path.insert(0, _CROPF_DIR)
        sys.path.insert(0, os.path.join(_CROPF_DIR, "demo_cropformer"))

# Globals (singleton)
_DEMO = None
_CFG_KEY = None  # (config_file_abs, tuple(opts))

def _build_key(config_file: str, opts: Optional[List[str]]) -> tuple:
    return (os.path.abspath(config_file), tuple(opts) if opts else ())

def preload_cropformer_model(config_file: str, opts: Optional[List[str]] = None) -> bool:
    """
    Public helper to initialize the model once at script startup.
    Returns True if initialized or already available.
    """
    _ensure_demo(config_file, opts)
    return True

def _ensure_demo(config_file: str, opts: Optional[List[str]]):
    """
    Build or reuse a global VisualizationDemo for given config/options.
    """
    global _DEMO, _CFG_KEY
    key = _build_key(config_file, opts)
    if _DEMO is not None and _CFG_KEY == key:
        return _DEMO

    # Lazy imports to avoid import cost at module import time
    from detectron2.config import get_cfg
    from detectron2.projects.deeplab import add_deeplab_config
    from mask2former import add_maskformer2_config
    from predictor import VisualizationDemo

    cfg = get_cfg()
    add_deeplab_config(cfg)
    add_maskformer2_config(cfg)
    cfg.merge_from_file(config_file)
    if opts:
        cfg.merge_from_list(opts)
    cfg.freeze()
    _DEMO = VisualizationDemo(cfg)
    _CFG_KEY = key
    return _DEMO

def run_cropformer_mask_predict(
    config_file: str,
    root: str,
    image_path_pattern: str,
    dataset: str,
    seq_name_list: str,
    confidence_threshold: float = 0.5,
    opts: Optional[List[str]] = None,
) -> None:
    """
    Run CropFormer/Mask2Former demo (mask_predict) logic directly from Python.
    Writes mask PNGs into {root}/{seq}/output/mask (or special matterport3d path).
    """
    from detectron2.data.detection_utils import read_image
    import torch

    demo = _ensure_demo(config_file, opts)

    # Support multiple sequences joined by '+'
    seq_names = seq_name_list.split("+")
    for seq_name in seq_names:
        seq_dir = os.path.join(root, seq_name)
        image_list = sorted(glob.glob(os.path.join(seq_dir, image_path_pattern)))
        if dataset == "matterport3d":
            output_dir = os.path.join(seq_dir, seq_name, "output/mask")
        else:
            output_dir = os.path.join(seq_dir, "output/mask")
        os.makedirs(output_dir, exist_ok=True)

        for path in image_list:
            # Read BGR image as in original demo
            img = read_image(path, format="BGR")
            predictions = demo.run_on_image(img)

            pred_masks = predictions["instances"].pred_masks
            pred_scores = predictions["instances"].scores

            # Select by threshold
            selected_indexes = (pred_scores >= confidence_threshold)
            selected_scores = pred_scores[selected_indexes]
            selected_masks = pred_masks[selected_indexes]

            if selected_masks.numel() == 0:
                # Still write an empty mask to keep pipeline consistent
                h, w = img.shape[:2]
                cv2.imwrite(
                    os.path.join(output_dir, os.path.basename(path).split(".")[0] + ".png"),
                    np.zeros((h, w), dtype=np.uint8),
                )
                continue

            _, m_H, m_W = selected_masks.shape
            mask_image = np.zeros((m_H, m_W), dtype=np.uint8)

            # Rank by score (ascending as in original script)
            mask_id = 1
            selected_scores, ranks = torch.sort(selected_scores)
            for index in ranks:
                num_pixels = torch.sum(selected_masks[index])
                if num_pixels < 400:
                    # ignore small masks
                    continue
                mask_image[(selected_masks[index] == 1).cpu().numpy()] = mask_id
                mask_id += 1

            cv2.imwrite(
                os.path.join(output_dir, os.path.basename(path).split(".")[0] + ".png"),
                mask_image,
            )