building-detection-2 / post_processor.py
yusef
Replace SigLIP with CLIP (stable zero-shot classifier)
29420ff
"""
V5.1 Post-Processing Pipeline โ€” "The Hybrid Eye"
=================================================
ูŠุดุชุบู„ ุจุนุฏ V5 ู…ุจุงุดุฑุฉ ุจุฏูˆู† ุฃูŠ ุชุฏุฑูŠุจ ุฌุฏูŠุฏ.
Pipeline:
1. V5 (Hunter) โ†’ masks ุฃูˆู„ูŠุฉ
2. MobileSAM โ†’ ูŠู‚ุทุน ุงู„ูƒุชู„ ุงู„ู…ุชู„ุงุตู‚ุฉ ู„ู€ sub-masks
3. SigLIP โ†’ Zero-Shot: building vs non-building
4. Geometric Rules โ†’ area + shape filter + area_m2
ุงู„ุชุซุจูŠุช:
pip install git+https://github.com/ChaoningZhang/MobileSAM.git
pip install transformers torch
"""
import math
import numpy as np
import cv2
import torch
from PIL import Image
# ============================================================
# === ุชุญู…ูŠู„ ุงู„ู…ูˆุฏูŠู„ุงุช (ู…ุฑุฉ ูˆุงุญุฏุฉ) ===
# ============================================================
_mobile_sam = None
_sam_predictor = None
_siglip_model = None
_siglip_processor = None
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
def load_mobile_sam():
"""ุชุญู…ูŠู„ MobileSAM (5.78MB ูู‚ุท โ€” ุฎููŠู ุฌุฏุงู‹)."""
global _mobile_sam, _sam_predictor
if _sam_predictor is not None:
return _sam_predictor
try:
from mobile_sam import sam_model_registry, SamPredictor
from huggingface_hub import hf_hub_download
print("๐Ÿ“ฅ ุชุญู…ูŠู„ MobileSAM...")
ckpt = hf_hub_download(
repo_id="dhkim2810/MobileSAM",
filename="mobile_sam.pt",
)
_mobile_sam = sam_model_registry["vit_t"](checkpoint=ckpt)
_mobile_sam.to(DEVICE).eval()
_sam_predictor = SamPredictor(_mobile_sam)
print("โœ… MobileSAM ุฌุงู‡ุฒ!")
return _sam_predictor
except Exception as e:
print(f"โš ๏ธ MobileSAM ู…ุด ู…ุชุงุญ: {e}")
return None
def load_clip():
"""ุชุญู…ูŠู„ CLIP ู„ู„ู€ Zero-Shot material classification (ุจุฏูŠู„ SigLIP - ู…ุณุชู‚ุฑ 100%)."""
global _siglip_model, _siglip_processor
if _siglip_model is not None:
return _siglip_model, _siglip_processor
try:
from transformers import CLIPProcessor, CLIPModel
print("๐Ÿ“ฅ ุชุญู…ูŠู„ CLIP...")
model_id = "openai/clip-vit-base-patch32"
_siglip_processor = CLIPProcessor.from_pretrained(model_id)
_siglip_model = CLIPModel.from_pretrained(
model_id,
torch_dtype=torch.float32,
).to(DEVICE).eval()
print("โœ… CLIP ุฌุงู‡ุฒ!")
return _siglip_model, _siglip_processor
except Exception as e:
print(f"โš ๏ธ CLIP ู…ุด ู…ุชุงุญ: {e}")
return None, None
# ============================================================
# === STEP 1: MobileSAM โ€” Surgical Cutting ===
# ============================================================
def split_mask_with_sam(image_rgb: np.ndarray, mask: np.ndarray, predictor) -> list:
"""
ุจูŠุงุฎุฏ mask ูˆุงุญุฏ (ู…ู…ูƒู† ูŠูƒูˆู† ููŠู‡ 4 ุจูŠูˆุช) ูˆูŠู‚ุทุนู‡ ู„ู€ sub-masks.
Args:
image_rgb: ุตูˆุฑุฉ ูƒุงู…ู„ุฉ (H, W, 3)
mask: binary mask (H, W) ู…ู† V5
predictor: SamPredictor instance
Returns:
list of binary masks โ€” ูƒู„ mask ู…ุจู†ู‰ ู„ูˆุญุฏู‡
"""
if predictor is None:
return [mask] # fallback: ุฑุฌู‘ุน ุงู„ู€ mask ุงู„ุฃุตู„ูŠ
# ู„ูˆ ุงู„ู€ mask ุตุบูŠุฑ (ู…ุจู†ู‰ ูˆุงุญุฏ) โ†’ ู…ุด ู…ุญุชุงุฌ ู‚ุทุน
area = mask.sum()
if area < 2000: # ~45ร—45 pixels โ†’ ู…ุจู†ู‰ ูˆุงุญุฏ ุนู„ู‰ ุงู„ุฃุฑุฌุญ
return [mask]
try:
# ุฌู‡ู‘ุฒ ุงู„ุตูˆุฑุฉ ู„ู„ู€ SAM
predictor.set_image(image_rgb)
# ุงุณุชุฎุฏู… ุงู„ู€ bounding box ุจุชุงุน ุงู„ู€ mask ูƒู€ Prompt
ys, xs = np.where(mask)
x1, x2 = xs.min(), xs.max()
y1, y2 = ys.min(), ys.max()
box = np.array([x1, y1, x2, y2])
# ุงุทู„ุจ ู…ู† SAM ูŠู‚ุทุน
masks_out, scores, _ = predictor.predict(
box=box,
multimask_output=True, # <-- ุงุทู„ุจ ุฃูƒุชุฑ ู…ู† ุงู‚ุชุฑุงุญ
)
# ูู„ุชุฑ ุงู„ู€ sub-masks ุงู„ู„ูŠ ู…ู†ุทู‚ูŠุฉ (ุฌูˆู‡ ุงู„ู€ mask ุงู„ุฃุตู„ูŠ)
valid_masks = []
for sub_mask in masks_out:
# ุงู„ู€ sub-mask ู„ุงุฒู… ูŠุชุฏุงุฎู„ ู…ุน ุงู„ู€ mask ุงู„ุฃุตู„ูŠ
overlap = (sub_mask & mask.astype(bool)).sum()
if overlap > 200: # ุนู„ู‰ ุงู„ุฃู‚ู„ 200 pixel ู…ุดุชุฑูƒุฉ
valid_masks.append(sub_mask.astype(np.uint8))
return valid_masks if valid_masks else [mask]
except Exception as e:
print(f"โš ๏ธ SAM splitter error: {e}")
return [mask]
# ============================================================
# === STEP 2: SigLIP โ€” Zero-Shot Material Check ===
# ============================================================
# ู†ุตูˆุต ุงู„ู…ู‚ุงุฑู†ุฉ โ€” ุจุฏูˆู† ุชุฏุฑูŠุจ
BUILDING_TEXTS = [
"a satellite view of a building rooftop",
"concrete roof of a building seen from above",
"residential building viewed from satellite",
"rooftop of a house or apartment building",
]
NON_BUILDING_TEXTS = [
"farmland and agricultural fields from above",
"green vegetation and trees from satellite",
"water surface river or lake from above",
"empty desert or bare soil from satellite",
"road or highway seen from above",
"swimming pool seen from satellite",
]
ALL_TEXTS = BUILDING_TEXTS + NON_BUILDING_TEXTS
NUM_BUILDING = len(BUILDING_TEXTS)
@torch.no_grad()
def is_building_clip(
image_rgb: np.ndarray,
mask: np.ndarray,
model,
processor,
threshold: float = 0.5,
) -> bool:
"""
CLIP Zero-Shot: ูŠุชุญู‚ู‚ ุฅู† ุงู„ู€ mask ุฏู‡ ู…ุจู†ู‰ ูุนู„ุงู‹.
Returns True ู„ูˆ ู…ุจู†ู‰ุŒ False ู„ูˆ ู„ุง.
"""
if model is None:
return True
try:
ys, xs = np.where(mask)
if len(ys) == 0:
return False
x1 = max(0, xs.min() - 5); x2 = min(image_rgb.shape[1], xs.max() + 5)
y1 = max(0, ys.min() - 5); y2 = min(image_rgb.shape[0], ys.max() + 5)
crop = image_rgb[y1:y2, x1:x2]
if crop.size == 0:
return False
pil_crop = Image.fromarray(crop)
building_texts = [
"a satellite view of a building rooftop",
"rooftop of a house seen from above",
]
non_building_texts = [
"farmland or vegetation from satellite",
"road or parking lot from above",
"water or swimming pool from satellite",
]
all_texts = building_texts + non_building_texts
inputs = processor(
text=all_texts,
images=[pil_crop],
return_tensors="pt",
padding=True,
)
inputs = {k: v.to(DEVICE) for k, v in inputs.items()}
outputs = model(**inputs)
probs = outputs.logits_per_image[0].softmax(dim=0).cpu().float().numpy()
building_score = probs[:len(building_texts)].sum()
return float(building_score) > threshold
except Exception as e:
print(f"โš ๏ธ CLIP check error: {e}")
return True
# ============================================================
# === STEP 3: Geometric Rules ===
# ============================================================
def _mask_area_m2(mask, pixel_size_m=0.597):
"""
ุชุญูˆูŠู„ ุนุฏุฏ pixels ู„ู€ ู…ุชุฑ ู…ุฑุจุน.
pixel_size_m = ุญุฌู… ุงู„ุจูŠูƒุณู„ ุนู†ุฏ Zoom 18 (~0.6 ู…ุชุฑ)
"""
return mask.sum() * (pixel_size_m ** 2)
def _aspect_ratio(mask):
"""ู†ุณุจุฉ ุงู„ุทูˆู„ ู„ู„ุนุฑุถ โ€” ู„ูˆ > 10 ูุงู„ุดูƒู„ ุบุฑูŠุจ ุฌุฏุงู‹."""
ys, xs = np.where(mask)
if len(ys) == 0:
return 1.0
h = ys.max() - ys.min() + 1
w = xs.max() - xs.min() + 1
return max(h, w) / max(min(h, w), 1)
def apply_geometric_rules(masks: list, min_area_m2=20.0, max_area_m2=15000.0, max_aspect=10.0):
"""
ูŠูู„ุชุฑ ุงู„ู€ masks ุจู‚ูˆุงุนุฏ ู‡ู†ุฏุณูŠุฉ:
- ู…ุณุงุญุฉ < 20 ู…ยฒ โ†’ ุงุญุฐู (noise)
- ู…ุณุงุญุฉ > 15,000 ู…ยฒ โ†’ ุญุฐู‘ุฑ (probably wrong)
- aspect ratio > 10 โ†’ ุงุญุฐู (ุดูƒู„ ุบุฑูŠุจ ู…ุด ู…ุจู†ู‰)
Returns: list of (mask, area_m2) tuples
"""
result = []
for mask in masks:
area = _mask_area_m2(mask)
if area < min_area_m2:
continue
if _aspect_ratio(mask) > max_aspect:
continue
result.append((mask, round(area, 1)))
return result
# ============================================================
# === MAIN: run_v51_pipeline ===
# ============================================================
def run_v51_pipeline(
image_rgb: np.ndarray,
v5_masks: list,
v5_scores: list,
use_sam: bool = True,
use_siglip: bool = True,
siglip_threshold: float = 0.4,
) -> list:
"""
ุงู„ู€ Pipeline ุงู„ูƒุงู…ู„ ู„ู€ V5.1.
Args:
image_rgb: ุงู„ุตูˆุฑุฉ ูƒู€ numpy array (H, W, 3)
v5_masks: list of binary masks ู…ู† V5
v5_scores: list of confidence scores ู…ู† V5
use_sam: ุชูุนูŠู„ MobileSAM splitting
use_siglip: ุชูุนูŠู„ SigLIP material check
Returns:
list of dicts: [{"mask": np.array, "score": float, "area_m2": float}]
"""
# ุชุญู…ูŠู„ ุงู„ู…ูˆุฏูŠู„ุงุช
sam_predictor = load_mobile_sam() if use_sam else None
clip_model, clip_proc = load_clip() if use_siglip else (None, None)
all_masks = []
all_scores = []
# โ”€โ”€ STEP 1: MobileSAM Splitting โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
for mask, score in zip(v5_masks, v5_scores):
sub_masks = split_mask_with_sam(image_rgb, mask, sam_predictor)
all_masks.extend(sub_masks)
all_scores.extend([score] * len(sub_masks))
print(f" SAM: {len(v5_masks)} โ†’ {len(all_masks)} masks")
# โ”€โ”€ STEP 2: SigLIP Material Check โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
if use_siglip and clip_model is not None:
filtered_masks = []
filtered_scores = []
removed = 0
for mask, score in zip(all_masks, all_scores):
if is_building_clip(image_rgb, mask, clip_model, clip_proc):
filtered_masks.append(mask)
filtered_scores.append(score)
else:
removed += 1
print(f" CLIP: ุญุฐู {removed} ุบูŠุฑ ู…ุจุงู†ูŠ")
all_masks, all_scores = filtered_masks, filtered_scores
# โ”€โ”€ STEP 3: Geometric Rules โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
geo_filtered = apply_geometric_rules(all_masks)
print(f" Geometric: {len(all_masks)} โ†’ {len(geo_filtered)} masks")
# โ”€โ”€ Build result โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
results = []
for i, (mask, area_m2) in enumerate(geo_filtered):
score = all_scores[i] if i < len(all_scores) else 0.5
results.append({
"mask": mask,
"score": score,
"area_m2": area_m2,
})
return results