Spaces:
Sleeping
Sleeping
| """ | |
| Inference Engine β Tile downloading + Building detection + Deduplication. | |
| Adapted from MaskRCNN_V5_MapFlow.py for server deployment. | |
| """ | |
| import math | |
| import time | |
| import numpy as np | |
| import cv2 | |
| import requests | |
| from PIL import Image | |
| from io import BytesIO | |
| from model_manager import get_predictor, set_threshold | |
| from post_processor import run_v51_pipeline | |
| # ========================================== | |
| # === Constants === | |
| # ========================================== | |
| ZOOM = 18 | |
| TILE_SIZE = 256 | |
| TILES_PER_IMG = 2 | |
| IMG_SIZE = 512 | |
| MAX_TILES = 60 # Safety limit | |
| MIN_BUILDING_AREA = 200 # Min contour area in pixels (filters tiny false positives) | |
| # ========================================== | |
| # === Coordinate Utils === | |
| # ========================================== | |
| def lon_to_tile_x(lon): | |
| return (lon + 180) / 360 * (2 ** ZOOM) | |
| def lat_to_tile_y(lat): | |
| lat_r = math.radians(lat) | |
| return (1 - math.log(math.tan(lat_r) + 1 / math.cos(lat_r)) / math.pi) / 2 * (2 ** ZOOM) | |
| def tile_x_to_lon(tx): | |
| return tx / (2 ** ZOOM) * 360 - 180 | |
| def tile_y_to_lat(ty): | |
| n = math.pi - 2 * math.pi * ty / (2 ** ZOOM) | |
| return math.degrees(math.atan(math.sinh(n))) | |
| def pixel_to_geo(px, py, grid_x, grid_y): | |
| tx = grid_x * TILES_PER_IMG + px / TILE_SIZE | |
| ty = grid_y * TILES_PER_IMG + py / TILE_SIZE | |
| return tile_x_to_lon(tx), tile_y_to_lat(ty) | |
| # ========================================== | |
| # === Tile Downloading === | |
| # ========================================== | |
| session = requests.Session() | |
| session.headers.update({"User-Agent": "Mozilla/5.0"}) | |
| def download_tile_512(grid_x, grid_y): | |
| """Download 2Γ2 tiles to create a 512Γ512 satellite image.""" | |
| img = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8) | |
| base_tx = grid_x * TILES_PER_IMG | |
| base_ty = grid_y * TILES_PER_IMG | |
| for dy in range(TILES_PER_IMG): | |
| for dx in range(TILES_PER_IMG): | |
| tx, ty = base_tx + dx, base_ty + dy | |
| s = (tx + ty) % 4 | |
| url = f"https://mt{s}.google.com/vt/lyrs=s&x={tx}&y={ty}&z={ZOOM}" | |
| try: | |
| r = session.get(url, timeout=15) | |
| tile = np.array(Image.open(BytesIO(r.content)).convert("RGB")) | |
| img[dy * TILE_SIZE:(dy + 1) * TILE_SIZE, | |
| dx * TILE_SIZE:(dx + 1) * TILE_SIZE] = tile | |
| except Exception: | |
| pass | |
| return img | |
| # ========================================== | |
| # === Polygon β Tiles === | |
| # ========================================== | |
| def get_tiles_for_polygon(polygon_coords): | |
| """ | |
| Convert polygon coordinates to grid tile indices. | |
| Input: list of [lat, lon] pairs. | |
| Returns: list of (grid_x, grid_y) tuples and bounds. | |
| """ | |
| lats = [c[0] for c in polygon_coords] | |
| lons = [c[1] for c in polygon_coords] | |
| min_lat, max_lat = min(lats), max(lats) | |
| min_lon, max_lon = min(lons), max(lons) | |
| min_tx = lon_to_tile_x(min_lon) | |
| max_tx = lon_to_tile_x(max_lon) | |
| min_ty = lat_to_tile_y(max_lat) | |
| max_ty = lat_to_tile_y(min_lat) | |
| min_gx = int(min_tx) // TILES_PER_IMG | |
| max_gx = int(max_tx) // TILES_PER_IMG | |
| min_gy = int(min_ty) // TILES_PER_IMG | |
| max_gy = int(max_ty) // TILES_PER_IMG | |
| tiles = [] | |
| for gy in range(min_gy, max_gy + 1): | |
| for gx in range(min_gx, max_gx + 1): | |
| tiles.append((gx, gy)) | |
| return tiles, (min_lat, max_lat, min_lon, max_lon) | |
| # ========================================== | |
| # === Polygon Regularization === | |
| # ========================================== | |
| def regularize_polygon(contour, rect): | |
| """ | |
| Regularize polygon edges by snapping to the building's dominant direction. | |
| 1. Get dominant angle from minAreaRect | |
| 2. Rotate polygon so dominant direction = horizontal | |
| 3. Snap nearly-horizontal edges β exact horizontal | |
| Snap nearly-vertical edges β exact vertical | |
| 4. Rotate back | |
| """ | |
| points = contour.reshape(-1, 2).astype(float) | |
| n = len(points) | |
| if n < 4: | |
| return contour | |
| angle = rect[2] | |
| angle_rad = math.radians(angle) | |
| cos_a, sin_a = math.cos(angle_rad), math.sin(angle_rad) | |
| center = np.mean(points, axis=0) | |
| # Rotate to align dominant direction with horizontal axis | |
| rotated = np.zeros_like(points) | |
| for i, p in enumerate(points): | |
| dx, dy = p[0] - center[0], p[1] - center[1] | |
| rotated[i] = [dx * cos_a + dy * sin_a, -dx * sin_a + dy * cos_a] | |
| # Snap edges within 15Β° of horizontal/vertical | |
| SNAP_ANGLE = 15 | |
| for i in range(n): | |
| j = (i + 1) % n | |
| dx = rotated[j][0] - rotated[i][0] | |
| dy = rotated[j][1] - rotated[i][1] | |
| if abs(dx) < 1e-6 and abs(dy) < 1e-6: | |
| continue | |
| edge_angle = abs(math.degrees(math.atan2(abs(dy), abs(dx)))) | |
| if edge_angle < SNAP_ANGLE: # Nearly horizontal | |
| rotated[j][1] = rotated[i][1] | |
| elif edge_angle > (90 - SNAP_ANGLE): # Nearly vertical | |
| rotated[j][0] = rotated[i][0] | |
| # Rotate back | |
| result = np.zeros_like(points) | |
| for i, p in enumerate(rotated): | |
| rx = p[0] * cos_a - p[1] * sin_a + center[0] | |
| ry = p[0] * sin_a + p[1] * cos_a + center[1] | |
| result[i] = [round(rx), round(ry)] | |
| return result.astype(int) | |
| # ========================================== | |
| # === Mask β GeoJSON (with regularization) === | |
| # ========================================== | |
| def mask_to_geo_polygon(mask, grid_x, grid_y, score): | |
| """Convert a binary mask to a GeoJSON Feature with angle regularization.""" | |
| contours, _ = cv2.findContours( | |
| mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| if not contours: | |
| return None | |
| contour = max(contours, key=cv2.contourArea) | |
| if cv2.contourArea(contour) < MIN_BUILDING_AREA: | |
| return None | |
| # Simplify the contour | |
| epsilon = 0.008 * cv2.arcLength(contour, True) | |
| approx = cv2.approxPolyDP(contour, epsilon, True) | |
| if len(approx) < 3: | |
| return None | |
| # Regularize angles (snap edges toward 90Β°) | |
| rect = cv2.minAreaRect(contour) | |
| if len(approx) >= 4: | |
| pixel_points = regularize_polygon(approx, rect) | |
| else: | |
| pixel_points = approx.reshape(-1, 2) | |
| # Convert pixel coordinates to geographic coordinates | |
| geo_coords = [] | |
| for pt in pixel_points: | |
| px, py = int(pt[0]), int(pt[1]) | |
| lon, lat = pixel_to_geo(px, py, grid_x, grid_y) | |
| geo_coords.append([lon, lat]) | |
| geo_coords.append(geo_coords[0]) # Close polygon | |
| return { | |
| "type": "Feature", | |
| "properties": {"confidence": round(float(score), 3)}, | |
| "geometry": {"type": "Polygon", "coordinates": [geo_coords]}, | |
| } | |
| def polygon_area(coords): | |
| """Calculate area of a polygon using Shoelace formula.""" | |
| n = len(coords) | |
| if n < 3: | |
| return 0 | |
| area = 0 | |
| for i in range(n): | |
| j = (i + 1) % n | |
| area += coords[i][0] * coords[j][1] | |
| area -= coords[j][0] * coords[i][1] | |
| return abs(area) / 2 | |
| def bboxes_overlap(coords1, coords2): | |
| """Check if bounding boxes of two polygons overlap.""" | |
| xs1 = [c[0] for c in coords1] | |
| ys1 = [c[1] for c in coords1] | |
| xs2 = [c[0] for c in coords2] | |
| ys2 = [c[1] for c in coords2] | |
| return not (max(xs1) < min(xs2) or max(xs2) < min(xs1) or | |
| max(ys1) < min(ys2) or max(ys2) < min(ys1)) | |
| def deduplicate_buildings(features, distance_threshold=0.0003): | |
| """ | |
| Remove duplicate buildings from overlapping tiles. | |
| Uses centroid distance + area similarity + bbox overlap. | |
| distance_threshold β 30 meters at the equator. | |
| """ | |
| if not features: | |
| return features | |
| # Pre-compute centroids and areas | |
| centroids = [] | |
| areas = [] | |
| for f in features: | |
| coords = f["geometry"]["coordinates"][0] | |
| cx = np.mean([c[0] for c in coords]) | |
| cy = np.mean([c[1] for c in coords]) | |
| centroids.append((cx, cy)) | |
| areas.append(polygon_area(coords)) | |
| # Sort by confidence (keep higher confidence ones) | |
| indices = sorted( | |
| range(len(features)), | |
| key=lambda i: features[i]["properties"]["confidence"], | |
| reverse=True, | |
| ) | |
| keep = [] | |
| removed = set() | |
| for i in indices: | |
| if i in removed: | |
| continue | |
| keep.append(i) | |
| cx1, cy1 = centroids[i] | |
| area1 = areas[i] | |
| coords1 = features[i]["geometry"]["coordinates"][0] | |
| for j in indices: | |
| if j in removed or j == i or j in set(keep): | |
| continue | |
| cx2, cy2 = centroids[j] | |
| area2 = areas[j] | |
| # Quick centroid distance check | |
| dist = math.sqrt((cx1 - cx2) ** 2 + (cy1 - cy2) ** 2) | |
| if dist > distance_threshold: | |
| continue | |
| # Area similarity check (within 3x of each other) | |
| if area1 > 0 and area2 > 0: | |
| ratio = max(area1, area2) / min(area1, area2) | |
| if ratio > 2.0: | |
| continue # Very different sizes β probably different buildings | |
| # Bounding box overlap check | |
| coords2 = features[j]["geometry"]["coordinates"][0] | |
| if bboxes_overlap(coords1, coords2): | |
| removed.add(j) | |
| return [features[i] for i in keep] | |
| # ========================================== | |
| # === Point-in-Polygon Test === | |
| # ========================================== | |
| def point_in_polygon(px, py, polygon): | |
| """ | |
| Ray casting algorithm to check if point (px, py) is inside polygon. | |
| polygon: list of [x, y] pairs. | |
| """ | |
| n = len(polygon) | |
| inside = False | |
| j = n - 1 | |
| for i in range(n): | |
| xi, yi = polygon[i] | |
| xj, yj = polygon[j] | |
| if ((yi > py) != (yj > py)) and (px < (xj - xi) * (py - yi) / (yj - yi) + xi): | |
| inside = not inside | |
| j = i | |
| return inside | |
| # ========================================== | |
| # === Main Processing Function === | |
| # ========================================== | |
| def detect_buildings(coordinates, threshold=0.5, use_v51=False): | |
| """ | |
| Process a polygon area and detect buildings. | |
| Args: | |
| coordinates: list of [lng, lat] pairs (GeoJSON format) | |
| threshold: detection confidence threshold | |
| Returns: | |
| dict with GeoJSON FeatureCollection + stats | |
| """ | |
| # Convert from GeoJSON [lng, lat] to [lat, lng] | |
| coords = [] | |
| for point in coordinates: | |
| if isinstance(point, list) and len(point) == 2: | |
| coords.append([point[1], point[0]]) | |
| if len(coords) < 3: | |
| return {"error": "Need at least 3 points to form a polygon"} | |
| # Build user polygon in [lng, lat] format for clipping | |
| user_polygon = [[c[1], c[0]] for c in coords] # [lng, lat] | |
| predictor = get_predictor() | |
| # Get tiles | |
| tiles, bounds = get_tiles_for_polygon(coords) | |
| n_tiles = len(tiles) | |
| if n_tiles > MAX_TILES: | |
| return { | |
| "error": f"Area too large! {n_tiles} tiles needed, max is {MAX_TILES}. Draw a smaller polygon.", | |
| "tiles_needed": n_tiles, | |
| "max_tiles": MAX_TILES, | |
| } | |
| # Process tiles | |
| all_features = [] | |
| start_time = time.time() | |
| for idx, (gx, gy) in enumerate(tiles): | |
| img = download_tile_512(gx, gy) | |
| # Skip dark/empty tiles | |
| if np.mean(img) < 10: | |
| continue | |
| img_bgr = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) | |
| outputs = predictor(img_bgr) | |
| instances = outputs["instances"].to("cpu") | |
| if len(instances) == 0: | |
| continue | |
| raw_masks = instances.pred_masks.numpy() | |
| raw_scores = instances.scores.numpy() | |
| # ββ V5.1 Pipeline (optional) ββββββββββββββββββββββββββ | |
| if use_v51: | |
| # Pre-filter by confidence first (faster) | |
| conf_masks = [m for m, s in zip(raw_masks, raw_scores) if float(s) >= threshold] | |
| conf_scores = [float(s) for s in raw_scores if float(s) >= threshold] | |
| if conf_masks: | |
| print(f" [V5.1] Tile {idx+1}/{len(tiles)}: {len(conf_masks)} masks β pipeline...") | |
| v51_results = run_v51_pipeline( | |
| image_rgb=img, | |
| v5_masks=conf_masks, | |
| v5_scores=conf_scores, | |
| use_sam=True, | |
| use_siglip=True, | |
| ) | |
| for res in v51_results: | |
| feature = mask_to_geo_polygon(res["mask"], gx, gy, res["score"]) | |
| if feature: | |
| feature["properties"]["area_m2"] = res["area_m2"] | |
| all_features.append(feature) | |
| # ββ V5 Original Pipeline ββββββββββββββββββββββββββββββ | |
| else: | |
| for mask, score in zip(raw_masks, raw_scores): | |
| if float(score) < threshold: | |
| continue | |
| feature = mask_to_geo_polygon(mask, gx, gy, score) | |
| if feature: | |
| all_features.append(feature) | |
| # Clip to user polygon β only keep buildings whose centroid is inside | |
| clipped_features = [] | |
| for f in all_features: | |
| poly_coords = f["geometry"]["coordinates"][0] | |
| cx = np.mean([c[0] for c in poly_coords]) # lng | |
| cy = np.mean([c[1] for c in poly_coords]) # lat | |
| if point_in_polygon(cx, cy, user_polygon): | |
| clipped_features.append(f) | |
| all_features = clipped_features | |
| # Deduplicate | |
| before_dedup = len(all_features) | |
| all_features = deduplicate_buildings(all_features) | |
| after_dedup = len(all_features) | |
| elapsed = time.time() - start_time | |
| # Build response | |
| geojson = { | |
| "type": "FeatureCollection", | |
| "features": all_features, | |
| } | |
| stats = { | |
| "buildings_detected": after_dedup, | |
| "duplicates_removed": before_dedup - after_dedup, | |
| "tiles_processed": n_tiles, | |
| "processing_time_seconds": round(elapsed, 1), | |
| "threshold": threshold, | |
| "bounds": { | |
| "min_lat": bounds[0], | |
| "max_lat": bounds[1], | |
| "min_lon": bounds[2], | |
| "max_lon": bounds[3], | |
| }, | |
| } | |
| return {"geojson": geojson, "stats": stats} | |