File size: 4,191 Bytes
21d81dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import io
import base64
from typing import Optional

import cv2
import numpy as np
from PIL import Image
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware

from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.projects.point_rend import add_pointrend_config

# -------------------------------
# FastAPI Setup
# -------------------------------
app = FastAPI(title="Rooftop Segmentation API")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# -------------------------------
# Detectron2 Config + Predictor
# -------------------------------
cfg = get_cfg()
add_pointrend_config(cfg)
pointrend_cfg_path = "detectron2/projects/PointRend/configs/InstanceSegmentation/pointrend_rcnn_R_50_FPN_3x_coco.yaml"
cfg.merge_from_file(pointrend_cfg_path)

cfg.MODEL.ROI_HEADS.NUM_CLASSES = 2
cfg.MODEL.POINT_HEAD.NUM_CLASSES = cfg.MODEL.ROI_HEADS.NUM_CLASSES
cfg.MODEL.WEIGHTS = "/app/model_final.pth"  # adapt path for Hugging Face
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5
cfg.MODEL.DEVICE = "cpu"

predictor = DefaultPredictor(cfg)

# -------------------------------
# Utility Functions
# -------------------------------
def morphological_open(mask, kernel_size=20, iterations=1):
    """Apply morphological open to clean mask."""
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_size, kernel_size))
    opened = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel, iterations=iterations)
    return opened

def postprocess_simplified(mask: np.ndarray, epsilon_ratio: float = 0.004) -> Optional[np.ndarray]:
    """Simplify mask to main polygon contour after cleaning."""
    if mask is None:
        return None

    mask_clean = morphological_open(mask)
    mask_uint8 = (mask_clean * 255).astype(np.uint8)
    
    contours, _ = cv2.findContours(mask_uint8, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    simp = np.zeros_like(mask_uint8)
    if contours:
        c = max(contours, key=cv2.contourArea)
        epsilon = epsilon_ratio * cv2.arcLength(c, True)
        approx = cv2.approxPolyDP(c, epsilon, True)
        cv2.fillPoly(simp, [approx], 255)
    return simp

def mask_to_polygon(mask: np.ndarray) -> Optional[np.ndarray]:
    """Convert simplified mask to polygon coordinates."""
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if not contours:
        return None
    largest = max(contours, key=cv2.contourArea)
    return largest.reshape(-1, 2)

def im_to_b64_png(im: np.ndarray) -> str:
    """Convert BGR image to base64 PNG."""
    _, buffer = cv2.imencode(".png", im)
    return base64.b64encode(buffer).decode()

# -------------------------------
# API Endpoints
# -------------------------------
@app.get("/")
def root():
    return {"message": "Rooftop Segmentation API is running!"}

@app.post("/polygon")
async def polygon_endpoint(file: UploadFile = File(...), epsilon: float = 0.004):
    contents = await file.read()
    try:
        im_pil = Image.open(io.BytesIO(contents)).convert("RGB")
    except Exception as e:
        return JSONResponse(status_code=400, content={"error": "Invalid image", "detail": str(e)})

    im = np.array(im_pil)[:, :, ::-1].copy()  # RGB -> BGR
    outputs = predictor(im)
    instances = outputs["instances"].to("cpu")

    if len(instances) == 0:
        return {"chosen": None, "polygon": None, "image": None}

    # Take the first instance (or highest score)
    idx = int(instances.scores.argmax().item())
    raw_mask = instances.pred_masks[idx].numpy().astype(np.uint8)
    
    # --- CLEAN MASK + POSTPROCESS ---
    simp_mask = postprocess_simplified(raw_mask, epsilon_ratio=epsilon)
    poly = mask_to_polygon(simp_mask)

    # --- Visualization ---
    overlay = im.copy()
    poly_list = None
    if poly is not None:
        cv2.polylines(overlay, [poly.astype(np.int32)], True, (0, 0, 255), 2)
        poly_list = poly.tolist()

    img_b64 = im_to_b64_png(overlay)

    return {"chosen": "rooftop", "polygon": poly_list, "image": img_b64}