""" V5.1 Post-Processing Pipeline — "The Hybrid Eye" ================================================= يشتغل بعد V5 مباشرة بدون أي تدريب جديد. Pipeline: 1. V5 (Hunter) → masks أولية 2. MobileSAM → يقطع الكتل المتلاصقة لـ sub-masks 3. SigLIP → Zero-Shot: building vs non-building 4. Geometric Rules → area + shape filter + area_m2 التثبيت: pip install git+https://github.com/ChaoningZhang/MobileSAM.git pip install transformers torch """ import math import numpy as np import cv2 import torch from PIL import Image # ============================================================ # === تحميل الموديلات (مرة واحدة) === # ============================================================ _mobile_sam = None _sam_predictor = None _siglip_model = None _siglip_processor = None DEVICE = "cuda" if torch.cuda.is_available() else "cpu" def load_mobile_sam(): """تحميل MobileSAM (5.78MB فقط — خفيف جداً).""" global _mobile_sam, _sam_predictor if _sam_predictor is not None: return _sam_predictor try: from mobile_sam import sam_model_registry, SamPredictor from huggingface_hub import hf_hub_download print("📥 تحميل MobileSAM...") ckpt = hf_hub_download( repo_id="dhkim2810/MobileSAM", filename="mobile_sam.pt", ) _mobile_sam = sam_model_registry["vit_t"](checkpoint=ckpt) _mobile_sam.to(DEVICE).eval() _sam_predictor = SamPredictor(_mobile_sam) print("✅ MobileSAM جاهز!") return _sam_predictor except Exception as e: print(f"⚠️ MobileSAM مش متاح: {e}") return None def load_clip(): """تحميل CLIP للـ Zero-Shot material classification (بديل SigLIP - مستقر 100%).""" global _siglip_model, _siglip_processor if _siglip_model is not None: return _siglip_model, _siglip_processor try: from transformers import CLIPProcessor, CLIPModel print("📥 تحميل CLIP...") model_id = "openai/clip-vit-base-patch32" _siglip_processor = CLIPProcessor.from_pretrained(model_id) _siglip_model = CLIPModel.from_pretrained( model_id, torch_dtype=torch.float32, ).to(DEVICE).eval() print("✅ CLIP جاهز!") return _siglip_model, _siglip_processor except Exception as e: print(f"⚠️ CLIP مش متاح: {e}") return None, None # ============================================================ # === STEP 1: MobileSAM — Surgical Cutting === # ============================================================ def split_mask_with_sam(image_rgb: np.ndarray, mask: np.ndarray, predictor) -> list: """ بياخد mask واحد (ممكن يكون فيه 4 بيوت) ويقطعه لـ sub-masks. Args: image_rgb: صورة كاملة (H, W, 3) mask: binary mask (H, W) من V5 predictor: SamPredictor instance Returns: list of binary masks — كل mask مبنى لوحده """ if predictor is None: return [mask] # fallback: رجّع الـ mask الأصلي # لو الـ mask صغير (مبنى واحد) → مش محتاج قطع area = mask.sum() if area < 2000: # ~45×45 pixels → مبنى واحد على الأرجح return [mask] try: # جهّز الصورة للـ SAM predictor.set_image(image_rgb) # استخدم الـ bounding box بتاع الـ mask كـ Prompt ys, xs = np.where(mask) x1, x2 = xs.min(), xs.max() y1, y2 = ys.min(), ys.max() box = np.array([x1, y1, x2, y2]) # اطلب من SAM يقطع masks_out, scores, _ = predictor.predict( box=box, multimask_output=True, # <-- اطلب أكتر من اقتراح ) # فلتر الـ sub-masks اللي منطقية (جوه الـ mask الأصلي) valid_masks = [] for sub_mask in masks_out: # الـ sub-mask لازم يتداخل مع الـ mask الأصلي overlap = (sub_mask & mask.astype(bool)).sum() if overlap > 200: # على الأقل 200 pixel مشتركة valid_masks.append(sub_mask.astype(np.uint8)) return valid_masks if valid_masks else [mask] except Exception as e: print(f"⚠️ SAM splitter error: {e}") return [mask] # ============================================================ # === STEP 2: SigLIP — Zero-Shot Material Check === # ============================================================ # نصوص المقارنة — بدون تدريب BUILDING_TEXTS = [ "a satellite view of a building rooftop", "concrete roof of a building seen from above", "residential building viewed from satellite", "rooftop of a house or apartment building", ] NON_BUILDING_TEXTS = [ "farmland and agricultural fields from above", "green vegetation and trees from satellite", "water surface river or lake from above", "empty desert or bare soil from satellite", "road or highway seen from above", "swimming pool seen from satellite", ] ALL_TEXTS = BUILDING_TEXTS + NON_BUILDING_TEXTS NUM_BUILDING = len(BUILDING_TEXTS) @torch.no_grad() def is_building_clip( image_rgb: np.ndarray, mask: np.ndarray, model, processor, threshold: float = 0.5, ) -> bool: """ CLIP Zero-Shot: يتحقق إن الـ mask ده مبنى فعلاً. Returns True لو مبنى، False لو لا. """ if model is None: return True try: ys, xs = np.where(mask) if len(ys) == 0: return False x1 = max(0, xs.min() - 5); x2 = min(image_rgb.shape[1], xs.max() + 5) y1 = max(0, ys.min() - 5); y2 = min(image_rgb.shape[0], ys.max() + 5) crop = image_rgb[y1:y2, x1:x2] if crop.size == 0: return False pil_crop = Image.fromarray(crop) building_texts = [ "a satellite view of a building rooftop", "rooftop of a house seen from above", ] non_building_texts = [ "farmland or vegetation from satellite", "road or parking lot from above", "water or swimming pool from satellite", ] all_texts = building_texts + non_building_texts inputs = processor( text=all_texts, images=[pil_crop], return_tensors="pt", padding=True, ) inputs = {k: v.to(DEVICE) for k, v in inputs.items()} outputs = model(**inputs) probs = outputs.logits_per_image[0].softmax(dim=0).cpu().float().numpy() building_score = probs[:len(building_texts)].sum() return float(building_score) > threshold except Exception as e: print(f"⚠️ CLIP check error: {e}") return True # ============================================================ # === STEP 3: Geometric Rules === # ============================================================ def _mask_area_m2(mask, pixel_size_m=0.597): """ تحويل عدد pixels لـ متر مربع. pixel_size_m = حجم البيكسل عند Zoom 18 (~0.6 متر) """ return mask.sum() * (pixel_size_m ** 2) def _aspect_ratio(mask): """نسبة الطول للعرض — لو > 10 فالشكل غريب جداً.""" ys, xs = np.where(mask) if len(ys) == 0: return 1.0 h = ys.max() - ys.min() + 1 w = xs.max() - xs.min() + 1 return max(h, w) / max(min(h, w), 1) def apply_geometric_rules(masks: list, min_area_m2=20.0, max_area_m2=15000.0, max_aspect=10.0): """ يفلتر الـ masks بقواعد هندسية: - مساحة < 20 م² → احذف (noise) - مساحة > 15,000 م² → حذّر (probably wrong) - aspect ratio > 10 → احذف (شكل غريب مش مبنى) Returns: list of (mask, area_m2) tuples """ result = [] for mask in masks: area = _mask_area_m2(mask) if area < min_area_m2: continue if _aspect_ratio(mask) > max_aspect: continue result.append((mask, round(area, 1))) return result # ============================================================ # === MAIN: run_v51_pipeline === # ============================================================ def run_v51_pipeline( image_rgb: np.ndarray, v5_masks: list, v5_scores: list, use_sam: bool = True, use_siglip: bool = True, siglip_threshold: float = 0.4, ) -> list: """ الـ Pipeline الكامل لـ V5.1. Args: image_rgb: الصورة كـ numpy array (H, W, 3) v5_masks: list of binary masks من V5 v5_scores: list of confidence scores من V5 use_sam: تفعيل MobileSAM splitting use_siglip: تفعيل SigLIP material check Returns: list of dicts: [{"mask": np.array, "score": float, "area_m2": float}] """ # تحميل الموديلات sam_predictor = load_mobile_sam() if use_sam else None clip_model, clip_proc = load_clip() if use_siglip else (None, None) all_masks = [] all_scores = [] # ── STEP 1: MobileSAM Splitting ───────────────────────── for mask, score in zip(v5_masks, v5_scores): sub_masks = split_mask_with_sam(image_rgb, mask, sam_predictor) all_masks.extend(sub_masks) all_scores.extend([score] * len(sub_masks)) print(f" SAM: {len(v5_masks)} → {len(all_masks)} masks") # ── STEP 2: SigLIP Material Check ─────────────────────── if use_siglip and clip_model is not None: filtered_masks = [] filtered_scores = [] removed = 0 for mask, score in zip(all_masks, all_scores): if is_building_clip(image_rgb, mask, clip_model, clip_proc): filtered_masks.append(mask) filtered_scores.append(score) else: removed += 1 print(f" CLIP: حذف {removed} غير مباني") all_masks, all_scores = filtered_masks, filtered_scores # ── STEP 3: Geometric Rules ────────────────────────────── geo_filtered = apply_geometric_rules(all_masks) print(f" Geometric: {len(all_masks)} → {len(geo_filtered)} masks") # ── Build result ───────────────────────────────────────── results = [] for i, (mask, area_m2) in enumerate(geo_filtered): score = all_scores[i] if i < len(all_scores) else 0.5 results.append({ "mask": mask, "score": score, "area_m2": area_m2, }) return results