| | from ultralytics import SAM |
| | import cv2 |
| | from shapely.geometry import shape |
| | from rapidfuzz import process, fuzz |
| | from huggingface_hub import hf_hub_download |
| | from config import OUTPUT_DIR |
| | from pathlib import Path |
| | from PIL import Image |
| | import spaces |
| | import numpy as np |
| | import os |
| | import json |
| | from PIL import Image |
| |
|
| |
|
| | def box_inside_global(box, global_box): |
| | x1, y1, x2, y2 = box |
| | gx1, gy1, gx2, gy2 = global_box |
| | return (x1 >= gx1 and y1 >= gy1 and x2 <= gx2 and y2 <= gy2) |
| |
|
| | def nms_iou(box1, box2): |
| | x1 = max(box1[0], box2[0]) |
| | y1 = max(box1[1], box2[1]) |
| | x2 = min(box1[2], box2[2]) |
| | y2 = min(box1[3], box2[3]) |
| |
|
| | inter_area = max(0, x2 - x1) * max(0, y2 - y1) |
| | box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) |
| | box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) |
| | union_area = box1_area + box2_area - inter_area |
| |
|
| | return inter_area / union_area if union_area > 0 else 0 |
| |
|
| | def non_max_suppression(boxes, scores, iou_threshold=0.5): |
| | idxs = np.argsort(scores)[::-1] |
| | keep = [] |
| |
|
| | while len(idxs) > 0: |
| | current = idxs[0] |
| | keep.append(current) |
| | idxs = idxs[1:] |
| | idxs = np.array([i for i in idxs if nms_iou(boxes[current], boxes[i]) < iou_threshold]) |
| |
|
| | return keep |
| |
|
| |
|
| |
|
| | def tile_image_with_overlap(image_path, tile_size=1024, overlap=256): |
| | """Tile image into overlapping RGB tiles.""" |
| | image = cv2.imread(image_path) |
| | height, width, _ = image.shape |
| |
|
| | step = tile_size - overlap |
| | tile_list = [] |
| | seen = set() |
| |
|
| | for y in range(0, height, step): |
| | if y + tile_size > height: |
| | y = height - tile_size |
| | for x in range(0, width, step): |
| | if x + tile_size > width: |
| | x = width - tile_size |
| |
|
| | |
| | x_start = max(0, x) |
| | y_start = max(0, y) |
| | x_end = x_start + tile_size |
| | y_end = y_start + tile_size |
| |
|
| | coords = (x_start, y_start) |
| | if coords in seen: |
| | continue |
| | seen.add(coords) |
| |
|
| | tile = image[y_start:y_end, x_start:x_end, :] |
| | tile_list.append((tile, coords)) |
| |
|
| | return tile_list, image.shape |
| |
|
| |
|
| |
|
| | def compute_iou(box1, box2): |
| | """Compute Intersection over Union for two boxes.""" |
| | x1 = max(box1[0], box2[0]) |
| | y1 = max(box1[1], box2[1]) |
| | x2 = min(box1[2], box2[2]) |
| | y2 = min(box1[3], box2[3]) |
| |
|
| | inter_area = max(0, x2 - x1) * max(0, y2 - y1) |
| | area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) |
| | area2 = (box2[2] - box2[0]) * (box2[3] - box2[1]) |
| | union_area = area1 + area2 - inter_area |
| |
|
| | return inter_area / union_area if union_area > 0 else 0 |
| |
|
| |
|
| | def merge_boxes(boxes, iou_threshold=0.8): |
| | """Merge overlapping boxes based on IoU.""" |
| | merged = [] |
| | used = [False] * len(boxes) |
| |
|
| | for i, box in enumerate(boxes): |
| | if used[i]: |
| | continue |
| | group = [box] |
| | used[i] = True |
| | for j in range(i + 1, len(boxes)): |
| | if used[j]: |
| | continue |
| | if compute_iou(box, boxes[j]) > iou_threshold: |
| | group.append(boxes[j]) |
| | used[j] = True |
| |
|
| | |
| | x1 = min(b[0] for b in group) |
| | y1 = min(b[1] for b in group) |
| | x2 = max(b[2] for b in group) |
| | y2 = max(b[3] for b in group) |
| | merged.append([x1, y1, x2, y2]) |
| |
|
| | return merged |
| |
|
| |
|
| | def box_area(box): |
| | return max(0, box[2] - box[0]) * max(0, box[3] - box[1]) |
| |
|
| | def is_contained(box1, box2, containment_threshold=0.9): |
| | |
| | x1 = max(box1[0], box2[0]) |
| | y1 = max(box1[1], box2[1]) |
| | x2 = min(box1[2], box2[2]) |
| | y2 = min(box1[3], box2[3]) |
| | |
| | inter_area = max(0, x2 - x1) * max(0, y2 - y1) |
| | area1 = box_area(box1) |
| | area2 = box_area(box2) |
| | |
| | |
| | smaller_area = min(area1, area2) |
| | if smaller_area == 0: |
| | return False |
| | return (inter_area / smaller_area) >= containment_threshold |
| |
|
| | def merge_boxes_iterative(boxes, iou_threshold=0.25, containment_threshold=0.75): |
| | boxes = boxes.copy() |
| | changed = True |
| | |
| | while changed: |
| | changed = False |
| | merged = [] |
| | used = [False] * len(boxes) |
| |
|
| | for i, box in enumerate(boxes): |
| | if used[i]: |
| | continue |
| | group = [box] |
| | used[i] = True |
| | for j in range(i + 1, len(boxes)): |
| | if used[j]: |
| | continue |
| | iou = compute_iou(box, boxes[j]) |
| | contained = is_contained(box, boxes[j], containment_threshold) |
| | if iou > iou_threshold or contained: |
| | group.append(boxes[j]) |
| | used[j] = True |
| |
|
| | |
| | x1 = min(b[0] for b in group) |
| | y1 = min(b[1] for b in group) |
| | x2 = max(b[2] for b in group) |
| | y2 = max(b[3] for b in group) |
| | merged.append([x1, y1, x2, y2]) |
| |
|
| | if len(merged) < len(boxes): |
| | changed = True |
| | boxes = merged |
| |
|
| | return boxes |
| |
|
| |
|
| | def get_corner_points(box): |
| | x1, y1, x2, y2 = box |
| | return [ |
| | [x1, y1], |
| | [x2, y1], |
| | [x1, y2], |
| | [x2, y2], |
| | ] |
| |
|
| |
|
| | def sample_negative_points_outside_boxes(mask, num_points): |
| | points = [] |
| | tries = 0 |
| | max_tries = num_points * 20 |
| | while len(points) < num_points and tries < max_tries: |
| | x = np.random.randint(0, mask.shape[1]) |
| | y = np.random.randint(0, mask.shape[0]) |
| | if not mask[y, x]: |
| | points.append([x, y]) |
| | tries += 1 |
| | return np.array(points) |
| |
|
| | def get_inset_corner_points(box, margin=5): |
| | x1, y1, x2, y2 = box |
| |
|
| | |
| | x1i = min(x1 + margin, x2) |
| | y1i = min(y1 + margin, y2) |
| | x2i = max(x2 - margin, x1) |
| | y2i = max(y2 - margin, y1) |
| |
|
| | return [ |
| | [x1i, y1i], |
| | [x2i, y1i], |
| | [x1i, y2i], |
| | [x2i, y2i], |
| | ] |
| |
|
| |
|
| | def processYOLOBoxes(iou): |
| | |
| | BOXES_PATH = os.path.join(OUTPUT_DIR,"boxes.json") |
| | with open(BOXES_PATH, "r") as f: |
| | box_data = json.load(f) |
| |
|
| | |
| | boxes = np.array([item["bbox"] for item in box_data]) |
| | scores = np.array([item["score"] for item in box_data]) |
| | |
| | keep_indices = non_max_suppression(boxes, scores, iou) |
| | |
| | box_data = [box_data[i] for i in keep_indices] |
| | |
| | |
| | boxes_full = [b["bbox"] for b in box_data] |
| | return boxes_full |
| |
|
| | def prepare_tiles(image_path, boxes_full, tile_size=1024, overlap=50, iou=0.5, c_th=0.75, edge_margin=10): |
| | """ |
| | Tiles the image and prepares per-tile metadata including filtered boxes and point prompts. |
| | Returns full image size H, W. |
| | """ |
| | tiles, (H, W, _) = tile_image_with_overlap(image_path, tile_size, overlap) |
| | os.makedirs("tmp/tiles", exist_ok=True) |
| | meta = [] |
| |
|
| | for idx, (tile_array, (x_offset, y_offset)) in enumerate(tiles): |
| | tile_path = f"tmp/tiles/tile_{idx}.png" |
| | tile_array = cv2.cvtColor(tile_array, cv2.COLOR_BGR2RGB) |
| | Image.fromarray(tile_array).save(tile_path) |
| |
|
| | tile_h, tile_w, _ = tile_array.shape |
| |
|
| | |
| | candidate_boxes = [] |
| | for x1, y1, x2, y2 in boxes_full: |
| | if (x2 > x_offset) and (x1 < x_offset + tile_w) and (y2 > y_offset) and (y1 < y_offset + tile_h): |
| | candidate_boxes.append([x1, y1, x2, y2]) |
| |
|
| | if not candidate_boxes: |
| | meta.append({ |
| | "idx": idx, |
| | "x_off": x_offset, |
| | "y_off": y_offset, |
| | "local_boxes": [], |
| | "point_coords": [], |
| | "point_labels": [] |
| | }) |
| | continue |
| |
|
| | |
| | merged_boxes = merge_boxes_iterative(candidate_boxes, iou_threshold=iou, containment_threshold=c_th) |
| |
|
| | |
| | local_boxes = [] |
| | for x1, y1, x2, y2 in merged_boxes: |
| | new_x1 = max(0, x1 - x_offset) |
| | new_y1 = max(0, y1 - y_offset) |
| | new_x2 = min(tile_w, x2 - x_offset) |
| | new_y2 = min(tile_h, y2 - y_offset) |
| | local_boxes.append([new_x1, new_y1, new_x2, new_y2]) |
| |
|
| | |
| | filtered_local_boxes = [] |
| | for box in local_boxes: |
| | x1, y1, x2, y2 = box |
| | if (x1 > edge_margin and y1 > edge_margin and (tile_w - x2) > edge_margin and (tile_h - y2) > edge_margin): |
| | filtered_local_boxes.append(box) |
| |
|
| | if not filtered_local_boxes: |
| | meta.append({ |
| | "idx": idx, |
| | "x_off": x_offset, |
| | "y_off": y_offset, |
| | "local_boxes": [], |
| | "point_coords": [], |
| | "point_labels": [] |
| | }) |
| | continue |
| |
|
| | |
| | centroids = [((bx1 + bx2) / 2, (by1 + by2) / 2) for bx1, by1, bx2, by2 in filtered_local_boxes] |
| | negative_points_per_box = [get_inset_corner_points(box, margin=2) for box in filtered_local_boxes] |
| |
|
| | point_coords = [] |
| | point_labels = [] |
| | for centroid, neg_points in zip(centroids, negative_points_per_box): |
| | if not isinstance(neg_points, list): |
| | neg_points = neg_points.tolist() |
| | all_points = [centroid] + neg_points |
| | all_labels = [1] + [0] * len(neg_points) |
| | point_coords.append(all_points) |
| | point_labels.append(all_labels) |
| |
|
| | meta.append({ |
| | "idx": idx, |
| | "x_off": x_offset, |
| | "y_off": y_offset, |
| | "local_boxes": filtered_local_boxes, |
| | "point_coords": point_coords, |
| | "point_labels": point_labels |
| | }) |
| |
|
| | |
| | os.makedirs("tmp", exist_ok=True) |
| | with open("tmp/tiles_meta.json", "w") as f: |
| | json.dump(meta, f) |
| |
|
| | return H, W |
| |
|
| |
|
| |
|
| |
|
| | def merge_tile_masks(H, W): |
| | """ |
| | Merge predicted tile masks into a full-size image. |
| | |
| | Args: |
| | H (int): full image height |
| | W (int): full image width |
| | |
| | Returns: |
| | full_mask (np.ndarray): merged mask array |
| | """ |
| | full_mask = np.zeros((H, W), dtype=np.uint16) |
| | instance_id = 1 |
| |
|
| | |
| | with open("tmp/tiles_meta.json", "r") as f: |
| | tiles_meta = json.load(f) |
| |
|
| | for tile in tiles_meta: |
| | tile_idx = tile["idx"] |
| | x_off = tile["x_off"] |
| | y_off = tile["y_off"] |
| |
|
| | mask_path = f"tmp/masks/tile_{tile_idx}.npy" |
| | if not Path(mask_path).exists(): |
| | continue |
| |
|
| | |
| | tile_masks = np.load(mask_path) |
| |
|
| | if tile_masks.ndim == 2: |
| | tile_masks = tile_masks[None, :, :] |
| |
|
| | for mask in tile_masks: |
| | mask = mask.astype(bool) |
| |
|
| | |
| | pad_h = 1024 - mask.shape[0] |
| | pad_w = 1024 - mask.shape[1] |
| | if pad_h > 0 or pad_w > 0: |
| | mask = np.pad(mask, ((0, pad_h), (0, pad_w)), mode='constant', constant_values=0) |
| |
|
| |
|
| | h_end = min(y_off + mask.shape[0], H) |
| | w_end = min(x_off + mask.shape[1], W) |
| |
|
| | region = full_mask[y_off:h_end, x_off:w_end] |
| | mask = mask[:h_end - y_off, :w_end - x_off] |
| |
|
| | region[mask & (region == 0)] = instance_id |
| | instance_id += 1 |
| |
|
| | |
| | final_mask = Image.fromarray(full_mask) |
| | MASK_PATH = os.path.join(OUTPUT_DIR,"mask.tif") |
| | final_mask.save(MASK_PATH) |
| |
|
| |
|
| |
|
| | |
| | def chunkify(lst, n): |
| | """Yield successive n-sized chunks from lst.""" |
| | for i in range(0, len(lst), n): |
| | yield lst[i:i + n] |
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def img_shape(image_path): |
| | img = cv2.imread(image_path) |
| | return img.shape |
| |
|
| |
|
| |
|
| | def best_street_match(point, query_name, edges_gdf, max_distance=100): |
| | buffer = point.buffer(max_distance) |
| | nearby_edges = edges_gdf[edges_gdf.intersects(buffer)] |
| | |
| | if nearby_edges.empty: |
| | return None, 0 |
| | |
| | candidate_names = nearby_edges['name'].tolist() |
| | best_match = process.extractOne(query_name, candidate_names, scorer=fuzz.ratio) |
| | return best_match |