Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Multi-map digitizer for Scotland Yard. | |
| Aligns Bus, Subway, and Map images to Taxi.png, projects the 153 master junctions, | |
| snaps them to local circle centroids, builds transport graphs for each type, and writes outputs. | |
| """ | |
| import argparse | |
| import csv | |
| import json | |
| import math | |
| from pathlib import Path | |
| import cv2 | |
| import networkx as nx | |
| import numpy as np | |
| from skimage.morphology import skeletonize | |
| class DigitizerArgs: | |
| # Circle snapping settings | |
| node_h_min = 10 | |
| node_h_max = 42 | |
| node_s_min = 25 | |
| node_v_min = 110 | |
| # Road segmentation / connection parameters | |
| # Taxi Yellow mask | |
| taxi_h_min = 10 | |
| taxi_h_max = 42 | |
| taxi_s_min = 35 | |
| taxi_v_min = 130 | |
| # Bus Green mask | |
| bus_h_min = 35 | |
| bus_h_max = 85 | |
| bus_s_min = 40 | |
| bus_v_min = 40 | |
| # Subway Red mask | |
| sub_h_min = 0 | |
| sub_h_max = 12 | |
| sub_h_min_wrap = 160 | |
| sub_h_max_wrap = 180 | |
| sub_s_min = 50 | |
| sub_v_min = 50 | |
| # Connection parameters | |
| node_cut_pad = 5 | |
| touch_radius_pad = 9 | |
| touch_dilate = 7 | |
| min_segment_pixels = 20 | |
| def load_master_junctions(csv_path): | |
| """Load reference coordinates from junctions.csv.""" | |
| junctions = [] | |
| with open(csv_path, "r", newline="", encoding="utf-8") as f: | |
| r = csv.reader(f) | |
| header = next(r) | |
| for row in r: | |
| if not row: | |
| continue | |
| jid, x, y, radius = int(row[0]), int(row[1]), int(row[2]), int(row[3]) | |
| junctions.append((jid, x, y, radius)) | |
| # Sort by ID to ensure consistency | |
| junctions.sort(key=lambda item: item[0]) | |
| return junctions | |
| def compute_homography(ref_img, tgt_img): | |
| """Compute Homography matrix mapping ref_img coordinates to tgt_img using ORB.""" | |
| ref_gray = cv2.cvtColor(ref_img, cv2.COLOR_BGR2GRAY) | |
| tgt_gray = cv2.cvtColor(tgt_img, cv2.COLOR_BGR2GRAY) | |
| orb = cv2.ORB_create(nfeatures=5000) | |
| kp_ref, des_ref = orb.detectAndCompute(ref_gray, None) | |
| kp_tgt, des_tgt = orb.detectAndCompute(tgt_gray, None) | |
| if des_ref is None or des_tgt is None: | |
| print("Warning: ORB descriptors are empty. Using identity matrix.") | |
| return np.eye(3, dtype=np.float32) | |
| bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) | |
| matches = bf.match(des_ref, des_tgt) | |
| matches = sorted(matches, key=lambda x: x.distance) | |
| if len(matches) < 10: | |
| print("Warning: Too few matches found. Using identity matrix.") | |
| return np.eye(3, dtype=np.float32) | |
| points_ref = np.zeros((len(matches), 2), dtype=np.float32) | |
| points_tgt = np.zeros((len(matches), 2), dtype=np.float32) | |
| for i, m in enumerate(matches): | |
| points_ref[i, :] = kp_ref[m.queryIdx].pt | |
| points_tgt[i, :] = kp_tgt[m.trainIdx].pt | |
| h, mask = cv2.findHomography(points_ref, points_tgt, cv2.RANSAC, 5.0) | |
| if h is None: | |
| print("Warning: Homography estimation failed. Using identity matrix.") | |
| return np.eye(3, dtype=np.float32) | |
| return h | |
| def snap_coordinate(img_bgr, x, y, search_r=20, args=DigitizerArgs): | |
| """Find the local yellowish/cream circle centroid and snap coordinates to it.""" | |
| hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV) | |
| h, w = img_bgr.shape[:2] | |
| x_start = max(0, int(x - search_r)) | |
| x_end = min(w, int(x + search_r + 1)) | |
| y_start = max(0, int(y - search_r)) | |
| y_end = min(h, int(y + search_r + 1)) | |
| crop_hsv = hsv[y_start:y_end, x_start:x_end] | |
| # Mask for yellow/cream circle color | |
| mask = (crop_hsv[:, :, 0] >= args.node_h_min) & (crop_hsv[:, :, 0] <= args.node_h_max) & \ | |
| (crop_hsv[:, :, 1] >= args.node_s_min) & (crop_hsv[:, :, 2] >= args.node_v_min) | |
| if np.sum(mask) == 0: | |
| return int(round(x)), int(round(y)) | |
| nlabels, labels, stats, centroids = cv2.connectedComponentsWithStats(mask.astype(np.uint8)) | |
| if nlabels <= 1: | |
| return int(round(x)), int(round(y)) | |
| # Find centroid closest to the center of the crop | |
| crop_center_y = (y_end - y_start) / 2.0 | |
| crop_center_x = (x_end - x_start) / 2.0 | |
| best_dist = float('inf') | |
| best_cx, best_cy = x, y | |
| for i in range(1, nlabels): | |
| cx, cy = centroids[i] | |
| dist = math.hypot(cx - crop_center_x, cy - crop_center_y) | |
| if dist < best_dist: | |
| best_dist = dist | |
| best_cx = x_start + cx | |
| best_cy = y_start + cy | |
| return int(round(best_cx)), int(round(best_cy)) | |
| def segment_roads(img_bgr, mode, args=DigitizerArgs): | |
| """Segment roads using color thresholding based on the map mode.""" | |
| hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV) | |
| if mode == "taxi": | |
| lower = np.array([args.taxi_h_min, args.taxi_s_min, args.taxi_v_min], dtype=np.uint8) | |
| upper = np.array([args.taxi_h_max, 255, 255], dtype=np.uint8) | |
| mask = cv2.inRange(hsv, lower, upper) | |
| elif mode == "bus": | |
| lower = np.array([args.bus_h_min, args.bus_s_min, args.bus_v_min], dtype=np.uint8) | |
| upper = np.array([args.bus_h_max, 255, 255], dtype=np.uint8) | |
| mask = cv2.inRange(hsv, lower, upper) | |
| elif mode == "subway": | |
| lower1 = np.array([args.sub_h_min, args.sub_s_min, args.sub_v_min], dtype=np.uint8) | |
| upper1 = np.array([args.sub_h_max, 255, 255], dtype=np.uint8) | |
| mask1 = cv2.inRange(hsv, lower1, upper1) | |
| lower2 = np.array([args.sub_h_min_wrap, args.sub_s_min, args.sub_v_min], dtype=np.uint8) | |
| upper2 = np.array([args.sub_h_max_wrap, 255, 255], dtype=np.uint8) | |
| mask2 = cv2.inRange(hsv, lower2, upper2) | |
| mask = cv2.bitwise_or(mask1, mask2) | |
| else: | |
| # Default to empty mask for other modes | |
| mask = np.zeros(img_bgr.shape[:2], dtype=np.uint8) | |
| # Morphological clean up | |
| k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) | |
| mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, k, iterations=2) | |
| mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, k, iterations=1) | |
| return mask | |
| def build_graph(img_bgr, junctions, mode, args=DigitizerArgs): | |
| """Build graph edges by skeletonizing road mask and checking connectivity.""" | |
| mask = segment_roads(img_bgr, mode, args) | |
| # Remove junction interiors so roads are disconnected components | |
| cut = mask.copy() | |
| for jid, x, y, r in junctions: | |
| cv2.circle(cut, (x, y), int(r + args.node_cut_pad), 0, -1) | |
| # Skeletonization | |
| skel = skeletonize(cut > 0).astype(np.uint8) * 255 | |
| skel = cv2.dilate(skel, cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3)), iterations=1) | |
| nlabels, labels, stats, _ = cv2.connectedComponentsWithStats(skel, connectivity=8) | |
| G = nx.Graph() | |
| for jid, x, y, r in junctions: | |
| G.add_node(jid, x=int(x), y=int(y), r=int(r)) | |
| for comp_id in range(1, nlabels): | |
| area = stats[comp_id, cv2.CC_STAT_AREA] | |
| if area < args.min_segment_pixels: | |
| continue | |
| comp = (labels == comp_id).astype(np.uint8) * 255 | |
| comp = cv2.dilate(comp, cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (args.touch_dilate, args.touch_dilate)), iterations=1) | |
| touched = [] | |
| for jid, x, y, r in junctions: | |
| ring = np.zeros(comp.shape, dtype=np.uint8) | |
| cv2.circle(ring, (x, y), int(r + args.touch_radius_pad), 255, -1) | |
| if cv2.countNonZero(cv2.bitwise_and(comp, ring)) > 0: | |
| touched.append(jid) | |
| if len(touched) == 2: | |
| G.add_edge(touched[0], touched[1]) | |
| elif len(touched) > 2: | |
| pts = {i: np.array([junctions[i - 1][1], junctions[i - 1][2]]) for i in touched} | |
| # Add edges between nearest pairs in the component | |
| for i in touched: | |
| ds = sorted((np.linalg.norm(pts[i] - pts[j]), j) for j in touched if j != i) | |
| for _, j in ds[:2]: | |
| G.add_edge(i, j) | |
| return G | |
| def write_outputs(img_bgr, junctions, G, out_dir): | |
| """Write labelled map image, junctions.csv, graph.json, and graph.graphml.""" | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| # Label circles on the image | |
| labelled = img_bgr.copy() | |
| for jid, x, y, r in junctions: | |
| label = str(jid) | |
| font = cv2.FONT_HERSHEY_SIMPLEX | |
| # Use a slightly larger, highly readable scale for labels | |
| scale = 0.50 if len(label) < 3 else 0.40 | |
| thickness = 1 | |
| (tw, th), _ = cv2.getTextSize(label, font, scale, thickness) | |
| # Compute dynamic radius: | |
| # 1. Base radius has +3 padding to cover the circle on the map fully. | |
| # 2. Text radius is the distance from center to corners to keep the text inside the circle. | |
| text_radius = int(math.ceil(math.hypot(tw / 2.0, th / 2.0))) | |
| draw_r = max(r + 3, text_radius + 3) | |
| # Draw backing circle | |
| cv2.circle(labelled, (x, y), draw_r, (245, 225, 160), -1) | |
| # Draw text label centered | |
| cv2.putText(labelled, label, (x - tw // 2, y + th // 2), font, scale, (20, 20, 20), thickness, cv2.LINE_AA) | |
| cv2.imwrite(str(out_dir / "junctions_labelled.png"), labelled) | |
| # CSV file | |
| with open(out_dir / "junctions.csv", "w", newline="", encoding="utf-8") as f: | |
| w = csv.writer(f) | |
| w.writerow(["id", "x", "y", "radius", "neighbors"]) | |
| for jid, x, y, r in junctions: | |
| neighbors_str = " ".join(map(str, sorted(G.neighbors(jid)))) | |
| w.writerow([jid, x, y, r, neighbors_str]) | |
| # JSON file | |
| graph_json = { | |
| "nodes": [{"id": jid, **G.nodes[jid]} for jid in G.nodes], | |
| "edges": [{"source": int(a), "target": int(b)} for a, b in sorted(G.edges)], | |
| "adjacency": {str(jid): sorted(map(int, G.neighbors(jid))) for jid in G.nodes}, | |
| } | |
| with open(out_dir / "graph.json", "w", encoding="utf-8") as f: | |
| json.dump(graph_json, f, indent=2) | |
| # GraphML file | |
| nx.write_graphml(G, out_dir / "graph.graphml") | |
| def main(): | |
| p = argparse.ArgumentParser() | |
| p.add_argument("--maps-dir", default=".", help="Directory containing Taxi.png, Bus.png, Subway.png, Map.png") | |
| args = p.parse_args() | |
| maps_dir = Path(args.maps_dir) | |
| # Find junctions.csv reference | |
| ref_csv_paths = [ | |
| maps_dir / "taxi_cv_out" / "taxi_cv_out" / "junctions.csv", | |
| maps_dir / "taxi_cv_out" / "junctions.csv", | |
| maps_dir / "junctions.csv" | |
| ] | |
| csv_path = None | |
| for p_path in ref_csv_paths: | |
| if p_path.exists(): | |
| csv_path = p_path | |
| break | |
| if not csv_path: | |
| raise SystemExit("Error: Could not find reference junctions.csv in taxi_cv_out folders or maps-dir.") | |
| print(f"Loading master junctions from: {csv_path}") | |
| master_juncs = load_master_junctions(csv_path) | |
| print(f"Loaded {len(master_juncs)} reference junctions.") | |
| # Load reference Taxi image | |
| taxi_img_path = maps_dir / "Taxi.png" | |
| if not taxi_img_path.exists(): | |
| raise SystemExit(f"Error: {taxi_img_path} not found.") | |
| ref_img = cv2.imread(str(taxi_img_path)) | |
| # Definitions of target maps | |
| targets = [ | |
| {"name": "Taxi.png", "mode": "taxi", "out": "taxi_cv_out"}, | |
| {"name": "Bus.png", "mode": "bus", "out": "bus_cv_out"}, | |
| {"name": "Subway.png", "mode": "subway", "out": "subway_cv_out"}, | |
| {"name": "Map.png", "mode": "map", "out": "map_cv_out"}, | |
| ] | |
| # We will build and store graphs to merge them for the main Map | |
| graphs = {} | |
| snapped_junctions_dict = {} | |
| for tgt in targets: | |
| name = tgt["name"] | |
| mode = tgt["mode"] | |
| out_folder = maps_dir / tgt["out"] | |
| img_path = maps_dir / name | |
| if not img_path.exists(): | |
| print(f"Skipping {name} (file not found).") | |
| continue | |
| print(f"\nProcessing {name} (mode={mode})...") | |
| img = cv2.imread(str(img_path)) | |
| # 1. Coordinate alignment & snapping | |
| if name == "Taxi.png": | |
| # For reference map, we can use coordinates directly | |
| # Or snap to ensure they are perfectly centered on local yellow nodes | |
| snapped = [] | |
| for jid, tx, ty, tr in master_juncs: | |
| sx, sy = snap_coordinate(img, tx, ty) | |
| snapped.append((jid, sx, sy, tr)) | |
| else: | |
| # Align using homography relative to Taxi.png | |
| h = compute_homography(ref_img, img) | |
| snapped = [] | |
| for jid, tx, ty, tr in master_juncs: | |
| pt = np.array([tx, ty], dtype=np.float32).reshape(1, 1, 2) | |
| mpt = cv2.perspectiveTransform(pt, h).reshape(2) | |
| sx, sy = snap_coordinate(img, mpt[0], mpt[1]) | |
| snapped.append((jid, sx, sy, tr)) | |
| snapped_junctions_dict[mode] = snapped | |
| print(f" Mapped and snapped {len(snapped)} junctions.") | |
| # 2. Build graph (for Taxi, Bus, Subway) | |
| if mode != "map": | |
| G = build_graph(img, snapped, mode) | |
| graphs[mode] = G | |
| print(f" Graph constructed: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges.") | |
| write_outputs(img, snapped, G, out_folder) | |
| print(f" Outputs saved to: {out_folder}") | |
| else: | |
| # Map.png combined graph is the union of Taxi, Bus, Subway graphs | |
| G_map = nx.Graph() | |
| # Add nodes using the Map.png snapped coordinates | |
| for jid, x, y, r in snapped: | |
| G_map.add_node(jid, x=int(x), y=int(y), r=int(r)) | |
| # Add edges from all transport graphs | |
| edge_count = 0 | |
| for g_mode, G_transport in graphs.items(): | |
| for u, v in G_transport.edges(): | |
| if not G_map.has_edge(u, v): | |
| G_map.add_edge(u, v) | |
| edge_count += 1 | |
| graphs["map"] = G_map | |
| print(f" Map graph merged: {G_map.number_of_nodes()} nodes, {G_map.number_of_edges()} edges (union of taxi/bus/subway).") | |
| write_outputs(img, snapped, G_map, out_folder) | |
| print(f" Outputs saved to: {out_folder}") | |
| print("\nDigitization complete!") | |
| if __name__ == "__main__": | |
| main() | |