Spaces:
Sleeping
Sleeping
| """ | |
| V5.1 Post-Processing Pipeline โ "The Hybrid Eye" | |
| ================================================= | |
| ูุดุชุบู ุจุนุฏ V5 ู ุจุงุดุฑุฉ ุจุฏูู ุฃู ุชุฏุฑูุจ ุฌุฏูุฏ. | |
| Pipeline: | |
| 1. V5 (Hunter) โ masks ุฃูููุฉ | |
| 2. MobileSAM โ ููุทุน ุงููุชู ุงูู ุชูุงุตูุฉ ูู sub-masks | |
| 3. SigLIP โ Zero-Shot: building vs non-building | |
| 4. Geometric Rules โ area + shape filter + area_m2 | |
| ุงูุชุซุจูุช: | |
| pip install git+https://github.com/ChaoningZhang/MobileSAM.git | |
| pip install transformers torch | |
| """ | |
| import math | |
| import numpy as np | |
| import cv2 | |
| import torch | |
| from PIL import Image | |
| # ============================================================ | |
| # === ุชุญู ูู ุงูู ูุฏููุงุช (ู ุฑุฉ ูุงุญุฏุฉ) === | |
| # ============================================================ | |
| _mobile_sam = None | |
| _sam_predictor = None | |
| _siglip_model = None | |
| _siglip_processor = None | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| def load_mobile_sam(): | |
| """ุชุญู ูู MobileSAM (5.78MB ููุท โ ุฎููู ุฌุฏุงู).""" | |
| global _mobile_sam, _sam_predictor | |
| if _sam_predictor is not None: | |
| return _sam_predictor | |
| try: | |
| from mobile_sam import sam_model_registry, SamPredictor | |
| from huggingface_hub import hf_hub_download | |
| print("๐ฅ ุชุญู ูู MobileSAM...") | |
| ckpt = hf_hub_download( | |
| repo_id="dhkim2810/MobileSAM", | |
| filename="mobile_sam.pt", | |
| ) | |
| _mobile_sam = sam_model_registry["vit_t"](checkpoint=ckpt) | |
| _mobile_sam.to(DEVICE).eval() | |
| _sam_predictor = SamPredictor(_mobile_sam) | |
| print("โ MobileSAM ุฌุงูุฒ!") | |
| return _sam_predictor | |
| except Exception as e: | |
| print(f"โ ๏ธ MobileSAM ู ุด ู ุชุงุญ: {e}") | |
| return None | |
| def load_clip(): | |
| """ุชุญู ูู CLIP ููู Zero-Shot material classification (ุจุฏูู SigLIP - ู ุณุชูุฑ 100%).""" | |
| global _siglip_model, _siglip_processor | |
| if _siglip_model is not None: | |
| return _siglip_model, _siglip_processor | |
| try: | |
| from transformers import CLIPProcessor, CLIPModel | |
| print("๐ฅ ุชุญู ูู CLIP...") | |
| model_id = "openai/clip-vit-base-patch32" | |
| _siglip_processor = CLIPProcessor.from_pretrained(model_id) | |
| _siglip_model = CLIPModel.from_pretrained( | |
| model_id, | |
| torch_dtype=torch.float32, | |
| ).to(DEVICE).eval() | |
| print("โ CLIP ุฌุงูุฒ!") | |
| return _siglip_model, _siglip_processor | |
| except Exception as e: | |
| print(f"โ ๏ธ CLIP ู ุด ู ุชุงุญ: {e}") | |
| return None, None | |
| # ============================================================ | |
| # === STEP 1: MobileSAM โ Surgical Cutting === | |
| # ============================================================ | |
| def split_mask_with_sam(image_rgb: np.ndarray, mask: np.ndarray, predictor) -> list: | |
| """ | |
| ุจูุงุฎุฏ mask ูุงุญุฏ (ู ู ูู ูููู ููู 4 ุจููุช) ูููุทุนู ูู sub-masks. | |
| Args: | |
| image_rgb: ุตูุฑุฉ ูุงู ูุฉ (H, W, 3) | |
| mask: binary mask (H, W) ู ู V5 | |
| predictor: SamPredictor instance | |
| Returns: | |
| list of binary masks โ ูู mask ู ุจูู ููุญุฏู | |
| """ | |
| if predictor is None: | |
| return [mask] # fallback: ุฑุฌูุน ุงูู mask ุงูุฃุตูู | |
| # ูู ุงูู mask ุตุบูุฑ (ู ุจูู ูุงุญุฏ) โ ู ุด ู ุญุชุงุฌ ูุทุน | |
| area = mask.sum() | |
| if area < 2000: # ~45ร45 pixels โ ู ุจูู ูุงุญุฏ ุนูู ุงูุฃุฑุฌุญ | |
| return [mask] | |
| try: | |
| # ุฌููุฒ ุงูุตูุฑุฉ ููู SAM | |
| predictor.set_image(image_rgb) | |
| # ุงุณุชุฎุฏู ุงูู bounding box ุจุชุงุน ุงูู mask ูู Prompt | |
| ys, xs = np.where(mask) | |
| x1, x2 = xs.min(), xs.max() | |
| y1, y2 = ys.min(), ys.max() | |
| box = np.array([x1, y1, x2, y2]) | |
| # ุงุทูุจ ู ู SAM ููุทุน | |
| masks_out, scores, _ = predictor.predict( | |
| box=box, | |
| multimask_output=True, # <-- ุงุทูุจ ุฃูุชุฑ ู ู ุงูุชุฑุงุญ | |
| ) | |
| # ููุชุฑ ุงูู sub-masks ุงููู ู ูุทููุฉ (ุฌูู ุงูู mask ุงูุฃุตูู) | |
| valid_masks = [] | |
| for sub_mask in masks_out: | |
| # ุงูู sub-mask ูุงุฒู ูุชุฏุงุฎู ู ุน ุงูู mask ุงูุฃุตูู | |
| overlap = (sub_mask & mask.astype(bool)).sum() | |
| if overlap > 200: # ุนูู ุงูุฃูู 200 pixel ู ุดุชุฑูุฉ | |
| valid_masks.append(sub_mask.astype(np.uint8)) | |
| return valid_masks if valid_masks else [mask] | |
| except Exception as e: | |
| print(f"โ ๏ธ SAM splitter error: {e}") | |
| return [mask] | |
| # ============================================================ | |
| # === STEP 2: SigLIP โ Zero-Shot Material Check === | |
| # ============================================================ | |
| # ูุตูุต ุงูู ูุงุฑูุฉ โ ุจุฏูู ุชุฏุฑูุจ | |
| BUILDING_TEXTS = [ | |
| "a satellite view of a building rooftop", | |
| "concrete roof of a building seen from above", | |
| "residential building viewed from satellite", | |
| "rooftop of a house or apartment building", | |
| ] | |
| NON_BUILDING_TEXTS = [ | |
| "farmland and agricultural fields from above", | |
| "green vegetation and trees from satellite", | |
| "water surface river or lake from above", | |
| "empty desert or bare soil from satellite", | |
| "road or highway seen from above", | |
| "swimming pool seen from satellite", | |
| ] | |
| ALL_TEXTS = BUILDING_TEXTS + NON_BUILDING_TEXTS | |
| NUM_BUILDING = len(BUILDING_TEXTS) | |
| def is_building_clip( | |
| image_rgb: np.ndarray, | |
| mask: np.ndarray, | |
| model, | |
| processor, | |
| threshold: float = 0.5, | |
| ) -> bool: | |
| """ | |
| CLIP Zero-Shot: ูุชุญูู ุฅู ุงูู mask ุฏู ู ุจูู ูุนูุงู. | |
| Returns True ูู ู ุจููุ False ูู ูุง. | |
| """ | |
| if model is None: | |
| return True | |
| try: | |
| ys, xs = np.where(mask) | |
| if len(ys) == 0: | |
| return False | |
| x1 = max(0, xs.min() - 5); x2 = min(image_rgb.shape[1], xs.max() + 5) | |
| y1 = max(0, ys.min() - 5); y2 = min(image_rgb.shape[0], ys.max() + 5) | |
| crop = image_rgb[y1:y2, x1:x2] | |
| if crop.size == 0: | |
| return False | |
| pil_crop = Image.fromarray(crop) | |
| building_texts = [ | |
| "a satellite view of a building rooftop", | |
| "rooftop of a house seen from above", | |
| ] | |
| non_building_texts = [ | |
| "farmland or vegetation from satellite", | |
| "road or parking lot from above", | |
| "water or swimming pool from satellite", | |
| ] | |
| all_texts = building_texts + non_building_texts | |
| inputs = processor( | |
| text=all_texts, | |
| images=[pil_crop], | |
| return_tensors="pt", | |
| padding=True, | |
| ) | |
| inputs = {k: v.to(DEVICE) for k, v in inputs.items()} | |
| outputs = model(**inputs) | |
| probs = outputs.logits_per_image[0].softmax(dim=0).cpu().float().numpy() | |
| building_score = probs[:len(building_texts)].sum() | |
| return float(building_score) > threshold | |
| except Exception as e: | |
| print(f"โ ๏ธ CLIP check error: {e}") | |
| return True | |
| # ============================================================ | |
| # === STEP 3: Geometric Rules === | |
| # ============================================================ | |
| def _mask_area_m2(mask, pixel_size_m=0.597): | |
| """ | |
| ุชุญููู ุนุฏุฏ pixels ูู ู ุชุฑ ู ุฑุจุน. | |
| pixel_size_m = ุญุฌู ุงูุจููุณู ุนูุฏ Zoom 18 (~0.6 ู ุชุฑ) | |
| """ | |
| return mask.sum() * (pixel_size_m ** 2) | |
| def _aspect_ratio(mask): | |
| """ูุณุจุฉ ุงูุทูู ููุนุฑุถ โ ูู > 10 ูุงูุดูู ุบุฑูุจ ุฌุฏุงู.""" | |
| ys, xs = np.where(mask) | |
| if len(ys) == 0: | |
| return 1.0 | |
| h = ys.max() - ys.min() + 1 | |
| w = xs.max() - xs.min() + 1 | |
| return max(h, w) / max(min(h, w), 1) | |
| def apply_geometric_rules(masks: list, min_area_m2=20.0, max_area_m2=15000.0, max_aspect=10.0): | |
| """ | |
| ูููุชุฑ ุงูู masks ุจููุงุนุฏ ููุฏุณูุฉ: | |
| - ู ุณุงุญุฉ < 20 ู ยฒ โ ุงุญุฐู (noise) | |
| - ู ุณุงุญุฉ > 15,000 ู ยฒ โ ุญุฐูุฑ (probably wrong) | |
| - aspect ratio > 10 โ ุงุญุฐู (ุดูู ุบุฑูุจ ู ุด ู ุจูู) | |
| Returns: list of (mask, area_m2) tuples | |
| """ | |
| result = [] | |
| for mask in masks: | |
| area = _mask_area_m2(mask) | |
| if area < min_area_m2: | |
| continue | |
| if _aspect_ratio(mask) > max_aspect: | |
| continue | |
| result.append((mask, round(area, 1))) | |
| return result | |
| # ============================================================ | |
| # === MAIN: run_v51_pipeline === | |
| # ============================================================ | |
| def run_v51_pipeline( | |
| image_rgb: np.ndarray, | |
| v5_masks: list, | |
| v5_scores: list, | |
| use_sam: bool = True, | |
| use_siglip: bool = True, | |
| siglip_threshold: float = 0.4, | |
| ) -> list: | |
| """ | |
| ุงูู Pipeline ุงููุงู ู ูู V5.1. | |
| Args: | |
| image_rgb: ุงูุตูุฑุฉ ูู numpy array (H, W, 3) | |
| v5_masks: list of binary masks ู ู V5 | |
| v5_scores: list of confidence scores ู ู V5 | |
| use_sam: ุชูุนูู MobileSAM splitting | |
| use_siglip: ุชูุนูู SigLIP material check | |
| Returns: | |
| list of dicts: [{"mask": np.array, "score": float, "area_m2": float}] | |
| """ | |
| # ุชุญู ูู ุงูู ูุฏููุงุช | |
| sam_predictor = load_mobile_sam() if use_sam else None | |
| clip_model, clip_proc = load_clip() if use_siglip else (None, None) | |
| all_masks = [] | |
| all_scores = [] | |
| # โโ STEP 1: MobileSAM Splitting โโโโโโโโโโโโโโโโโโโโโโโโโ | |
| for mask, score in zip(v5_masks, v5_scores): | |
| sub_masks = split_mask_with_sam(image_rgb, mask, sam_predictor) | |
| all_masks.extend(sub_masks) | |
| all_scores.extend([score] * len(sub_masks)) | |
| print(f" SAM: {len(v5_masks)} โ {len(all_masks)} masks") | |
| # โโ STEP 2: SigLIP Material Check โโโโโโโโโโโโโโโโโโโโโโโ | |
| if use_siglip and clip_model is not None: | |
| filtered_masks = [] | |
| filtered_scores = [] | |
| removed = 0 | |
| for mask, score in zip(all_masks, all_scores): | |
| if is_building_clip(image_rgb, mask, clip_model, clip_proc): | |
| filtered_masks.append(mask) | |
| filtered_scores.append(score) | |
| else: | |
| removed += 1 | |
| print(f" CLIP: ุญุฐู {removed} ุบูุฑ ู ุจุงูู") | |
| all_masks, all_scores = filtered_masks, filtered_scores | |
| # โโ STEP 3: Geometric Rules โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| geo_filtered = apply_geometric_rules(all_masks) | |
| print(f" Geometric: {len(all_masks)} โ {len(geo_filtered)} masks") | |
| # โโ Build result โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ | |
| results = [] | |
| for i, (mask, area_m2) in enumerate(geo_filtered): | |
| score = all_scores[i] if i < len(all_scores) else 0.5 | |
| results.append({ | |
| "mask": mask, | |
| "score": score, | |
| "area_m2": area_m2, | |
| }) | |
| return results | |