""" Inference Engine — Tile downloading + Building detection + Deduplication. Adapted from MaskRCNN_V5_MapFlow.py for server deployment. """ import math import time import numpy as np import cv2 import requests from PIL import Image from io import BytesIO from model_manager import get_predictor, set_threshold from post_processor import run_v51_pipeline # ========================================== # === Constants === # ========================================== ZOOM = 18 TILE_SIZE = 256 TILES_PER_IMG = 2 IMG_SIZE = 512 MAX_TILES = 60 # Safety limit MIN_BUILDING_AREA = 200 # Min contour area in pixels (filters tiny false positives) # ========================================== # === Coordinate Utils === # ========================================== def lon_to_tile_x(lon): return (lon + 180) / 360 * (2 ** ZOOM) def lat_to_tile_y(lat): lat_r = math.radians(lat) return (1 - math.log(math.tan(lat_r) + 1 / math.cos(lat_r)) / math.pi) / 2 * (2 ** ZOOM) def tile_x_to_lon(tx): return tx / (2 ** ZOOM) * 360 - 180 def tile_y_to_lat(ty): n = math.pi - 2 * math.pi * ty / (2 ** ZOOM) return math.degrees(math.atan(math.sinh(n))) def pixel_to_geo(px, py, grid_x, grid_y): tx = grid_x * TILES_PER_IMG + px / TILE_SIZE ty = grid_y * TILES_PER_IMG + py / TILE_SIZE return tile_x_to_lon(tx), tile_y_to_lat(ty) # ========================================== # === Tile Downloading === # ========================================== session = requests.Session() session.headers.update({"User-Agent": "Mozilla/5.0"}) def download_tile_512(grid_x, grid_y): """Download 2×2 tiles to create a 512×512 satellite image.""" img = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8) base_tx = grid_x * TILES_PER_IMG base_ty = grid_y * TILES_PER_IMG for dy in range(TILES_PER_IMG): for dx in range(TILES_PER_IMG): tx, ty = base_tx + dx, base_ty + dy s = (tx + ty) % 4 url = f"https://mt{s}.google.com/vt/lyrs=s&x={tx}&y={ty}&z={ZOOM}" try: r = session.get(url, timeout=15) tile = np.array(Image.open(BytesIO(r.content)).convert("RGB")) img[dy * TILE_SIZE:(dy + 1) * TILE_SIZE, dx * TILE_SIZE:(dx + 1) * TILE_SIZE] = tile except Exception: pass return img # ========================================== # === Polygon → Tiles === # ========================================== def get_tiles_for_polygon(polygon_coords): """ Convert polygon coordinates to grid tile indices. Input: list of [lat, lon] pairs. Returns: list of (grid_x, grid_y) tuples and bounds. """ lats = [c[0] for c in polygon_coords] lons = [c[1] for c in polygon_coords] min_lat, max_lat = min(lats), max(lats) min_lon, max_lon = min(lons), max(lons) min_tx = lon_to_tile_x(min_lon) max_tx = lon_to_tile_x(max_lon) min_ty = lat_to_tile_y(max_lat) max_ty = lat_to_tile_y(min_lat) min_gx = int(min_tx) // TILES_PER_IMG max_gx = int(max_tx) // TILES_PER_IMG min_gy = int(min_ty) // TILES_PER_IMG max_gy = int(max_ty) // TILES_PER_IMG tiles = [] for gy in range(min_gy, max_gy + 1): for gx in range(min_gx, max_gx + 1): tiles.append((gx, gy)) return tiles, (min_lat, max_lat, min_lon, max_lon) # ========================================== # === Polygon Regularization === # ========================================== def regularize_polygon(contour, rect): """ Regularize polygon edges by snapping to the building's dominant direction. 1. Get dominant angle from minAreaRect 2. Rotate polygon so dominant direction = horizontal 3. Snap nearly-horizontal edges → exact horizontal Snap nearly-vertical edges → exact vertical 4. Rotate back """ points = contour.reshape(-1, 2).astype(float) n = len(points) if n < 4: return contour angle = rect[2] angle_rad = math.radians(angle) cos_a, sin_a = math.cos(angle_rad), math.sin(angle_rad) center = np.mean(points, axis=0) # Rotate to align dominant direction with horizontal axis rotated = np.zeros_like(points) for i, p in enumerate(points): dx, dy = p[0] - center[0], p[1] - center[1] rotated[i] = [dx * cos_a + dy * sin_a, -dx * sin_a + dy * cos_a] # Snap edges within 15° of horizontal/vertical SNAP_ANGLE = 15 for i in range(n): j = (i + 1) % n dx = rotated[j][0] - rotated[i][0] dy = rotated[j][1] - rotated[i][1] if abs(dx) < 1e-6 and abs(dy) < 1e-6: continue edge_angle = abs(math.degrees(math.atan2(abs(dy), abs(dx)))) if edge_angle < SNAP_ANGLE: # Nearly horizontal rotated[j][1] = rotated[i][1] elif edge_angle > (90 - SNAP_ANGLE): # Nearly vertical rotated[j][0] = rotated[i][0] # Rotate back result = np.zeros_like(points) for i, p in enumerate(rotated): rx = p[0] * cos_a - p[1] * sin_a + center[0] ry = p[0] * sin_a + p[1] * cos_a + center[1] result[i] = [round(rx), round(ry)] return result.astype(int) # ========================================== # === Mask → GeoJSON (with regularization) === # ========================================== def mask_to_geo_polygon(mask, grid_x, grid_y, score): """Convert a binary mask to a GeoJSON Feature with angle regularization.""" contours, _ = cv2.findContours( mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: return None contour = max(contours, key=cv2.contourArea) if cv2.contourArea(contour) < MIN_BUILDING_AREA: return None # Simplify the contour epsilon = 0.008 * cv2.arcLength(contour, True) approx = cv2.approxPolyDP(contour, epsilon, True) if len(approx) < 3: return None # Regularize angles (snap edges toward 90°) rect = cv2.minAreaRect(contour) if len(approx) >= 4: pixel_points = regularize_polygon(approx, rect) else: pixel_points = approx.reshape(-1, 2) # Convert pixel coordinates to geographic coordinates geo_coords = [] for pt in pixel_points: px, py = int(pt[0]), int(pt[1]) lon, lat = pixel_to_geo(px, py, grid_x, grid_y) geo_coords.append([lon, lat]) geo_coords.append(geo_coords[0]) # Close polygon return { "type": "Feature", "properties": {"confidence": round(float(score), 3)}, "geometry": {"type": "Polygon", "coordinates": [geo_coords]}, } def polygon_area(coords): """Calculate area of a polygon using Shoelace formula.""" n = len(coords) if n < 3: return 0 area = 0 for i in range(n): j = (i + 1) % n area += coords[i][0] * coords[j][1] area -= coords[j][0] * coords[i][1] return abs(area) / 2 def bboxes_overlap(coords1, coords2): """Check if bounding boxes of two polygons overlap.""" xs1 = [c[0] for c in coords1] ys1 = [c[1] for c in coords1] xs2 = [c[0] for c in coords2] ys2 = [c[1] for c in coords2] return not (max(xs1) < min(xs2) or max(xs2) < min(xs1) or max(ys1) < min(ys2) or max(ys2) < min(ys1)) def deduplicate_buildings(features, distance_threshold=0.0003): """ Remove duplicate buildings from overlapping tiles. Uses centroid distance + area similarity + bbox overlap. distance_threshold ≈ 30 meters at the equator. """ if not features: return features # Pre-compute centroids and areas centroids = [] areas = [] for f in features: coords = f["geometry"]["coordinates"][0] cx = np.mean([c[0] for c in coords]) cy = np.mean([c[1] for c in coords]) centroids.append((cx, cy)) areas.append(polygon_area(coords)) # Sort by confidence (keep higher confidence ones) indices = sorted( range(len(features)), key=lambda i: features[i]["properties"]["confidence"], reverse=True, ) keep = [] removed = set() for i in indices: if i in removed: continue keep.append(i) cx1, cy1 = centroids[i] area1 = areas[i] coords1 = features[i]["geometry"]["coordinates"][0] for j in indices: if j in removed or j == i or j in set(keep): continue cx2, cy2 = centroids[j] area2 = areas[j] # Quick centroid distance check dist = math.sqrt((cx1 - cx2) ** 2 + (cy1 - cy2) ** 2) if dist > distance_threshold: continue # Area similarity check (within 3x of each other) if area1 > 0 and area2 > 0: ratio = max(area1, area2) / min(area1, area2) if ratio > 2.0: continue # Very different sizes — probably different buildings # Bounding box overlap check coords2 = features[j]["geometry"]["coordinates"][0] if bboxes_overlap(coords1, coords2): removed.add(j) return [features[i] for i in keep] # ========================================== # === Point-in-Polygon Test === # ========================================== def point_in_polygon(px, py, polygon): """ Ray casting algorithm to check if point (px, py) is inside polygon. polygon: list of [x, y] pairs. """ n = len(polygon) inside = False j = n - 1 for i in range(n): xi, yi = polygon[i] xj, yj = polygon[j] if ((yi > py) != (yj > py)) and (px < (xj - xi) * (py - yi) / (yj - yi) + xi): inside = not inside j = i return inside # ========================================== # === Main Processing Function === # ========================================== def detect_buildings(coordinates, threshold=0.5, use_v51=False): """ Process a polygon area and detect buildings. Args: coordinates: list of [lng, lat] pairs (GeoJSON format) threshold: detection confidence threshold Returns: dict with GeoJSON FeatureCollection + stats """ # Convert from GeoJSON [lng, lat] to [lat, lng] coords = [] for point in coordinates: if isinstance(point, list) and len(point) == 2: coords.append([point[1], point[0]]) if len(coords) < 3: return {"error": "Need at least 3 points to form a polygon"} # Build user polygon in [lng, lat] format for clipping user_polygon = [[c[1], c[0]] for c in coords] # [lng, lat] predictor = get_predictor() # Get tiles tiles, bounds = get_tiles_for_polygon(coords) n_tiles = len(tiles) if n_tiles > MAX_TILES: return { "error": f"Area too large! {n_tiles} tiles needed, max is {MAX_TILES}. Draw a smaller polygon.", "tiles_needed": n_tiles, "max_tiles": MAX_TILES, } # Process tiles all_features = [] start_time = time.time() for idx, (gx, gy) in enumerate(tiles): img = download_tile_512(gx, gy) # Skip dark/empty tiles if np.mean(img) < 10: continue img_bgr = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) outputs = predictor(img_bgr) instances = outputs["instances"].to("cpu") if len(instances) == 0: continue raw_masks = instances.pred_masks.numpy() raw_scores = instances.scores.numpy() # ── V5.1 Pipeline (optional) ────────────────────────── if use_v51: # Pre-filter by confidence first (faster) conf_masks = [m for m, s in zip(raw_masks, raw_scores) if float(s) >= threshold] conf_scores = [float(s) for s in raw_scores if float(s) >= threshold] if conf_masks: print(f" [V5.1] Tile {idx+1}/{len(tiles)}: {len(conf_masks)} masks → pipeline...") v51_results = run_v51_pipeline( image_rgb=img, v5_masks=conf_masks, v5_scores=conf_scores, use_sam=True, use_siglip=True, ) for res in v51_results: feature = mask_to_geo_polygon(res["mask"], gx, gy, res["score"]) if feature: feature["properties"]["area_m2"] = res["area_m2"] all_features.append(feature) # ── V5 Original Pipeline ────────────────────────────── else: for mask, score in zip(raw_masks, raw_scores): if float(score) < threshold: continue feature = mask_to_geo_polygon(mask, gx, gy, score) if feature: all_features.append(feature) # Clip to user polygon — only keep buildings whose centroid is inside clipped_features = [] for f in all_features: poly_coords = f["geometry"]["coordinates"][0] cx = np.mean([c[0] for c in poly_coords]) # lng cy = np.mean([c[1] for c in poly_coords]) # lat if point_in_polygon(cx, cy, user_polygon): clipped_features.append(f) all_features = clipped_features # Deduplicate before_dedup = len(all_features) all_features = deduplicate_buildings(all_features) after_dedup = len(all_features) elapsed = time.time() - start_time # Build response geojson = { "type": "FeatureCollection", "features": all_features, } stats = { "buildings_detected": after_dedup, "duplicates_removed": before_dedup - after_dedup, "tiles_processed": n_tiles, "processing_time_seconds": round(elapsed, 1), "threshold": threshold, "bounds": { "min_lat": bounds[0], "max_lat": bounds[1], "min_lon": bounds[2], "max_lon": bounds[3], }, } return {"geojson": geojson, "stats": stats}