building-detection / inference.py
yusef
Initial commit - V5.1 API
df64c50
"""
Inference Engine β€” Tile downloading + Building detection + Deduplication.
Adapted from MaskRCNN_V5_MapFlow.py for server deployment.
"""
import math
import time
import numpy as np
import cv2
import requests
from PIL import Image
from io import BytesIO
from model_manager import get_predictor, set_threshold
from post_processor import run_v51_pipeline
# ==========================================
# === Constants ===
# ==========================================
ZOOM = 18
TILE_SIZE = 256
TILES_PER_IMG = 2
IMG_SIZE = 512
MAX_TILES = 60 # Safety limit
MIN_BUILDING_AREA = 200 # Min contour area in pixels (filters tiny false positives)
# ==========================================
# === Coordinate Utils ===
# ==========================================
def lon_to_tile_x(lon):
return (lon + 180) / 360 * (2 ** ZOOM)
def lat_to_tile_y(lat):
lat_r = math.radians(lat)
return (1 - math.log(math.tan(lat_r) + 1 / math.cos(lat_r)) / math.pi) / 2 * (2 ** ZOOM)
def tile_x_to_lon(tx):
return tx / (2 ** ZOOM) * 360 - 180
def tile_y_to_lat(ty):
n = math.pi - 2 * math.pi * ty / (2 ** ZOOM)
return math.degrees(math.atan(math.sinh(n)))
def pixel_to_geo(px, py, grid_x, grid_y):
tx = grid_x * TILES_PER_IMG + px / TILE_SIZE
ty = grid_y * TILES_PER_IMG + py / TILE_SIZE
return tile_x_to_lon(tx), tile_y_to_lat(ty)
# ==========================================
# === Tile Downloading ===
# ==========================================
session = requests.Session()
session.headers.update({"User-Agent": "Mozilla/5.0"})
def download_tile_512(grid_x, grid_y):
"""Download 2Γ—2 tiles to create a 512Γ—512 satellite image."""
img = np.zeros((IMG_SIZE, IMG_SIZE, 3), dtype=np.uint8)
base_tx = grid_x * TILES_PER_IMG
base_ty = grid_y * TILES_PER_IMG
for dy in range(TILES_PER_IMG):
for dx in range(TILES_PER_IMG):
tx, ty = base_tx + dx, base_ty + dy
s = (tx + ty) % 4
url = f"https://mt{s}.google.com/vt/lyrs=s&x={tx}&y={ty}&z={ZOOM}"
try:
r = session.get(url, timeout=15)
tile = np.array(Image.open(BytesIO(r.content)).convert("RGB"))
img[dy * TILE_SIZE:(dy + 1) * TILE_SIZE,
dx * TILE_SIZE:(dx + 1) * TILE_SIZE] = tile
except Exception:
pass
return img
# ==========================================
# === Polygon β†’ Tiles ===
# ==========================================
def get_tiles_for_polygon(polygon_coords):
"""
Convert polygon coordinates to grid tile indices.
Input: list of [lat, lon] pairs.
Returns: list of (grid_x, grid_y) tuples and bounds.
"""
lats = [c[0] for c in polygon_coords]
lons = [c[1] for c in polygon_coords]
min_lat, max_lat = min(lats), max(lats)
min_lon, max_lon = min(lons), max(lons)
min_tx = lon_to_tile_x(min_lon)
max_tx = lon_to_tile_x(max_lon)
min_ty = lat_to_tile_y(max_lat)
max_ty = lat_to_tile_y(min_lat)
min_gx = int(min_tx) // TILES_PER_IMG
max_gx = int(max_tx) // TILES_PER_IMG
min_gy = int(min_ty) // TILES_PER_IMG
max_gy = int(max_ty) // TILES_PER_IMG
tiles = []
for gy in range(min_gy, max_gy + 1):
for gx in range(min_gx, max_gx + 1):
tiles.append((gx, gy))
return tiles, (min_lat, max_lat, min_lon, max_lon)
# ==========================================
# === Polygon Regularization ===
# ==========================================
def regularize_polygon(contour, rect):
"""
Regularize polygon edges by snapping to the building's dominant direction.
1. Get dominant angle from minAreaRect
2. Rotate polygon so dominant direction = horizontal
3. Snap nearly-horizontal edges β†’ exact horizontal
Snap nearly-vertical edges β†’ exact vertical
4. Rotate back
"""
points = contour.reshape(-1, 2).astype(float)
n = len(points)
if n < 4:
return contour
angle = rect[2]
angle_rad = math.radians(angle)
cos_a, sin_a = math.cos(angle_rad), math.sin(angle_rad)
center = np.mean(points, axis=0)
# Rotate to align dominant direction with horizontal axis
rotated = np.zeros_like(points)
for i, p in enumerate(points):
dx, dy = p[0] - center[0], p[1] - center[1]
rotated[i] = [dx * cos_a + dy * sin_a, -dx * sin_a + dy * cos_a]
# Snap edges within 15Β° of horizontal/vertical
SNAP_ANGLE = 15
for i in range(n):
j = (i + 1) % n
dx = rotated[j][0] - rotated[i][0]
dy = rotated[j][1] - rotated[i][1]
if abs(dx) < 1e-6 and abs(dy) < 1e-6:
continue
edge_angle = abs(math.degrees(math.atan2(abs(dy), abs(dx))))
if edge_angle < SNAP_ANGLE: # Nearly horizontal
rotated[j][1] = rotated[i][1]
elif edge_angle > (90 - SNAP_ANGLE): # Nearly vertical
rotated[j][0] = rotated[i][0]
# Rotate back
result = np.zeros_like(points)
for i, p in enumerate(rotated):
rx = p[0] * cos_a - p[1] * sin_a + center[0]
ry = p[0] * sin_a + p[1] * cos_a + center[1]
result[i] = [round(rx), round(ry)]
return result.astype(int)
# ==========================================
# === Mask β†’ GeoJSON (with regularization) ===
# ==========================================
def mask_to_geo_polygon(mask, grid_x, grid_y, score):
"""Convert a binary mask to a GeoJSON Feature with angle regularization."""
contours, _ = cv2.findContours(
mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return None
contour = max(contours, key=cv2.contourArea)
if cv2.contourArea(contour) < MIN_BUILDING_AREA:
return None
# Simplify the contour
epsilon = 0.008 * cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(contour, epsilon, True)
if len(approx) < 3:
return None
# Regularize angles (snap edges toward 90Β°)
rect = cv2.minAreaRect(contour)
if len(approx) >= 4:
pixel_points = regularize_polygon(approx, rect)
else:
pixel_points = approx.reshape(-1, 2)
# Convert pixel coordinates to geographic coordinates
geo_coords = []
for pt in pixel_points:
px, py = int(pt[0]), int(pt[1])
lon, lat = pixel_to_geo(px, py, grid_x, grid_y)
geo_coords.append([lon, lat])
geo_coords.append(geo_coords[0]) # Close polygon
return {
"type": "Feature",
"properties": {"confidence": round(float(score), 3)},
"geometry": {"type": "Polygon", "coordinates": [geo_coords]},
}
def polygon_area(coords):
"""Calculate area of a polygon using Shoelace formula."""
n = len(coords)
if n < 3:
return 0
area = 0
for i in range(n):
j = (i + 1) % n
area += coords[i][0] * coords[j][1]
area -= coords[j][0] * coords[i][1]
return abs(area) / 2
def bboxes_overlap(coords1, coords2):
"""Check if bounding boxes of two polygons overlap."""
xs1 = [c[0] for c in coords1]
ys1 = [c[1] for c in coords1]
xs2 = [c[0] for c in coords2]
ys2 = [c[1] for c in coords2]
return not (max(xs1) < min(xs2) or max(xs2) < min(xs1) or
max(ys1) < min(ys2) or max(ys2) < min(ys1))
def deduplicate_buildings(features, distance_threshold=0.0003):
"""
Remove duplicate buildings from overlapping tiles.
Uses centroid distance + area similarity + bbox overlap.
distance_threshold β‰ˆ 30 meters at the equator.
"""
if not features:
return features
# Pre-compute centroids and areas
centroids = []
areas = []
for f in features:
coords = f["geometry"]["coordinates"][0]
cx = np.mean([c[0] for c in coords])
cy = np.mean([c[1] for c in coords])
centroids.append((cx, cy))
areas.append(polygon_area(coords))
# Sort by confidence (keep higher confidence ones)
indices = sorted(
range(len(features)),
key=lambda i: features[i]["properties"]["confidence"],
reverse=True,
)
keep = []
removed = set()
for i in indices:
if i in removed:
continue
keep.append(i)
cx1, cy1 = centroids[i]
area1 = areas[i]
coords1 = features[i]["geometry"]["coordinates"][0]
for j in indices:
if j in removed or j == i or j in set(keep):
continue
cx2, cy2 = centroids[j]
area2 = areas[j]
# Quick centroid distance check
dist = math.sqrt((cx1 - cx2) ** 2 + (cy1 - cy2) ** 2)
if dist > distance_threshold:
continue
# Area similarity check (within 3x of each other)
if area1 > 0 and area2 > 0:
ratio = max(area1, area2) / min(area1, area2)
if ratio > 2.0:
continue # Very different sizes β€” probably different buildings
# Bounding box overlap check
coords2 = features[j]["geometry"]["coordinates"][0]
if bboxes_overlap(coords1, coords2):
removed.add(j)
return [features[i] for i in keep]
# ==========================================
# === Point-in-Polygon Test ===
# ==========================================
def point_in_polygon(px, py, polygon):
"""
Ray casting algorithm to check if point (px, py) is inside polygon.
polygon: list of [x, y] pairs.
"""
n = len(polygon)
inside = False
j = n - 1
for i in range(n):
xi, yi = polygon[i]
xj, yj = polygon[j]
if ((yi > py) != (yj > py)) and (px < (xj - xi) * (py - yi) / (yj - yi) + xi):
inside = not inside
j = i
return inside
# ==========================================
# === Main Processing Function ===
# ==========================================
def detect_buildings(coordinates, threshold=0.5, use_v51=False):
"""
Process a polygon area and detect buildings.
Args:
coordinates: list of [lng, lat] pairs (GeoJSON format)
threshold: detection confidence threshold
Returns:
dict with GeoJSON FeatureCollection + stats
"""
# Convert from GeoJSON [lng, lat] to [lat, lng]
coords = []
for point in coordinates:
if isinstance(point, list) and len(point) == 2:
coords.append([point[1], point[0]])
if len(coords) < 3:
return {"error": "Need at least 3 points to form a polygon"}
# Build user polygon in [lng, lat] format for clipping
user_polygon = [[c[1], c[0]] for c in coords] # [lng, lat]
predictor = get_predictor()
# Get tiles
tiles, bounds = get_tiles_for_polygon(coords)
n_tiles = len(tiles)
if n_tiles > MAX_TILES:
return {
"error": f"Area too large! {n_tiles} tiles needed, max is {MAX_TILES}. Draw a smaller polygon.",
"tiles_needed": n_tiles,
"max_tiles": MAX_TILES,
}
# Process tiles
all_features = []
start_time = time.time()
for idx, (gx, gy) in enumerate(tiles):
img = download_tile_512(gx, gy)
# Skip dark/empty tiles
if np.mean(img) < 10:
continue
img_bgr = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
outputs = predictor(img_bgr)
instances = outputs["instances"].to("cpu")
if len(instances) == 0:
continue
raw_masks = instances.pred_masks.numpy()
raw_scores = instances.scores.numpy()
# ── V5.1 Pipeline (optional) ──────────────────────────
if use_v51:
# Pre-filter by confidence first (faster)
conf_masks = [m for m, s in zip(raw_masks, raw_scores) if float(s) >= threshold]
conf_scores = [float(s) for s in raw_scores if float(s) >= threshold]
if conf_masks:
print(f" [V5.1] Tile {idx+1}/{len(tiles)}: {len(conf_masks)} masks β†’ pipeline...")
v51_results = run_v51_pipeline(
image_rgb=img,
v5_masks=conf_masks,
v5_scores=conf_scores,
use_sam=True,
use_siglip=True,
)
for res in v51_results:
feature = mask_to_geo_polygon(res["mask"], gx, gy, res["score"])
if feature:
feature["properties"]["area_m2"] = res["area_m2"]
all_features.append(feature)
# ── V5 Original Pipeline ──────────────────────────────
else:
for mask, score in zip(raw_masks, raw_scores):
if float(score) < threshold:
continue
feature = mask_to_geo_polygon(mask, gx, gy, score)
if feature:
all_features.append(feature)
# Clip to user polygon β€” only keep buildings whose centroid is inside
clipped_features = []
for f in all_features:
poly_coords = f["geometry"]["coordinates"][0]
cx = np.mean([c[0] for c in poly_coords]) # lng
cy = np.mean([c[1] for c in poly_coords]) # lat
if point_in_polygon(cx, cy, user_polygon):
clipped_features.append(f)
all_features = clipped_features
# Deduplicate
before_dedup = len(all_features)
all_features = deduplicate_buildings(all_features)
after_dedup = len(all_features)
elapsed = time.time() - start_time
# Build response
geojson = {
"type": "FeatureCollection",
"features": all_features,
}
stats = {
"buildings_detected": after_dedup,
"duplicates_removed": before_dedup - after_dedup,
"tiles_processed": n_tiles,
"processing_time_seconds": round(elapsed, 1),
"threshold": threshold,
"bounds": {
"min_lat": bounds[0],
"max_lat": bounds[1],
"min_lon": bounds[2],
"max_lon": bounds[3],
},
}
return {"geojson": geojson, "stats": stats}