hyper-reality-visualizer / backend /services /segmentation_service.py
eduardo4547's picture
Upload 278 files
499671b verified
from pathlib import Path
from typing import Any, cast
import cv2
import numpy as np
try:
import torch
_TORCH_AVAILABLE = True
except ImportError:
torch = None # type: ignore[assignment]
_TORCH_AVAILABLE = False
from fastapi import HTTPException
from PIL import Image
from core.config import (
DEPTH_MODEL_ID,
OUTPUT_DIR,
SEMANTIC_MODEL_ID,
log_timing_end,
log_timing_start,
logger,
)
from models.schemas import (
ExteriorBrickRequest,
ExteriorDepthRequest,
ExteriorGrabCutRequest,
ExteriorHybridRequest,
GuidedSegmentRequest,
SegmentVideoRequest,
)
from services.image_service import load_image_rgb_for_edit, save_label_map_for_owner
from services.sam2_service import (
SAM2_UNLOAD_AFTER_USE,
depth_load_lock,
get_sam2_image_predictor,
release_resources,
sam2_predict_lock,
semantic_load_lock,
)
from services.scene_service import (
build_component_label_map,
build_mask_overlay,
generate_label_map,
merge_sam2_wall_fragments,
normalize_exterior_target,
rank_exterior_candidates,
)
try:
from transformers import ( # type: ignore[import-untyped]
AutoImageProcessor,
DPTForDepthEstimation,
DPTImageProcessor,
SegformerForSemanticSegmentation,
)
_TRANSFORMERS_AVAILABLE = True
except ImportError:
_TRANSFORMERS_AVAILABLE = False
import services.sam2_service as _sam2_svc
def parse_mask_index(mask_filename: str) -> int:
try:
parts = Path(mask_filename).stem.split("_")
return int(parts[-1])
except (ValueError, IndexError):
raise HTTPException(status_code=400, detail="Invalid mask filename format") from None
def parse_rgb_color(color: str) -> tuple[int, int, int]:
color = str(color).strip().lstrip("#")
if len(color) != 6:
raise HTTPException(status_code=400, detail="Color must be a hex color like #RRGGBB")
try:
r = int(color[0:2], 16)
g = int(color[2:4], 16)
b = int(color[4:6], 16)
return (r, g, b)
except ValueError as exc:
raise HTTPException(status_code=400, detail="Invalid hex color format") from exc
def generate_guided_label_map(
image_rgb: np.ndarray,
point_coords: list[list[float]],
point_labels: list[int],
box_xyxy: list[float],
multimask_output: bool = False,
) -> tuple[np.ndarray, list[float]]:
predictor = get_sam2_image_predictor()
started = log_timing_start("SAM2_PREDICT")
with sam2_predict_lock:
predictor.set_image(image_rgb)
input_points = np.array(point_coords, dtype=np.float32) if point_coords and point_labels else None
input_labels = np.array(point_labels, dtype=np.int32) if point_coords and point_labels else None
input_box = np.array(box_xyxy, dtype=np.float32) if box_xyxy and len(box_xyxy) == 4 else None
masks, scores, _ = predictor.predict(
point_coords=input_points,
point_labels=input_labels,
box=input_box,
multimask_output=multimask_output,
)
sorted_indices = np.argsort(scores)[::-1]
label_map = np.zeros((image_rgb.shape[0], image_rgb.shape[1]), dtype=np.uint8)
for idx, orig_idx in enumerate(sorted_indices, start=1):
mask = masks[orig_idx]
label_map[np.asarray(mask, dtype=bool)] = idx
ranked_scores = [float(scores[i]) for i in sorted_indices]
try:
log_timing_end("SAM2_PREDICT", started)
logger.info(f"[SAM2_PREDICT] masks={len(masks)} top_scores={ranked_scores[:3]}")
except Exception:
pass
if SAM2_UNLOAD_AFTER_USE:
try:
release_resources(full_unload=True)
except Exception:
logger.exception("Error unloading SAM after generate_guided_label_map")
return label_map, ranked_scores
def analyze_material_texture_complexity(binary_mask: np.ndarray, image_rgb: np.ndarray) -> float:
mask_u8 = (binary_mask > 0).astype(np.uint8)
if mask_u8.max() == 0:
return 0.0
masked_region = image_rgb.copy()
masked_region[mask_u8 == 0] = [128, 128, 128]
gray = cv2.cvtColor(masked_region, cv2.COLOR_RGB2GRAY)
edges = np.asarray(cv2.Canny(gray, 50, 150), dtype=np.uint8)
edge_pixels = int(np.count_nonzero((edges > 0) & (mask_u8 > 0)))
mask_pixels = np.count_nonzero(mask_u8)
if mask_pixels == 0:
return 0.0
edge_density = edge_pixels / float(mask_pixels)
return float(np.clip(edge_density / 0.35, 0.0, 1.0))
def analyze_material_color(binary_mask: np.ndarray, image_rgb: np.ndarray) -> tuple[float, dict[str, float]]:
mask_u8 = (binary_mask > 0).astype(np.uint8)
if mask_u8.max() == 0:
return 0.0, {}
masked_rgb = image_rgb[mask_u8 > 0]
if masked_rgb.shape[0] == 0:
return 0.0, {}
masked_rgb_img = Image.fromarray(masked_rgb.reshape(-1, 1, 3).astype(np.uint8))
masked_hsv = cv2.cvtColor(np.array(masked_rgb_img), cv2.COLOR_RGB2HSV)
h = masked_hsv[:, :, 0]
s = masked_hsv[:, :, 1]
v = masked_hsv[:, :, 2]
brick_hue_mask = ((h <= 15) | (h >= 165))
brick_sat_mask = (s > 40)
brick_val_mask = (v > 40)
brick_pixels = np.count_nonzero(brick_hue_mask & brick_sat_mask & brick_val_mask)
total_pixels = h.size
brick_score = brick_pixels / float(max(1, total_pixels))
smooth_hue_mask = ((h >= 15) & (h <= 60)) | ((h >= 70) & (h <= 140))
smooth_sat_mask = (s < 60)
smooth_pixels = np.count_nonzero(smooth_hue_mask & smooth_sat_mask)
smooth_score = smooth_pixels / float(max(1, total_pixels))
stats: dict[str, float] = {
"brick_score": float(brick_score),
"smooth_score": float(smooth_score),
"mean_h": float(np.mean(h)),
"mean_s": float(np.mean(s)),
"mean_v": float(np.mean(v)),
}
return brick_score, stats
def classify_segment_material(binary_mask: np.ndarray, image_rgb: np.ndarray) -> tuple[str, dict[str, Any]]:
texture_score = analyze_material_texture_complexity(binary_mask, image_rgb)
brick_color_score, color_stats = analyze_material_color(binary_mask, image_rgb)
combined_brick_score = (0.6 * texture_score) + (0.4 * brick_color_score)
analysis: dict[str, Any] = {
"texture_score": round(texture_score, 4),
"color_brick_score": round(brick_color_score, 4),
"combined_score": round(combined_brick_score, 4),
"color_stats": {k: round(v, 2) for k, v in color_stats.items()},
}
if combined_brick_score >= 0.55:
material = "brick"
elif combined_brick_score <= 0.35:
material = "smooth"
else:
material = "mixed"
analysis["material_type"] = material
return material, analysis
def separate_materials_by_label(label_map: np.ndarray, image_rgb: np.ndarray) -> dict[str, Any]:
unique_labels = np.unique(label_map[label_map > 0])
brick_indices: list[int] = []
smooth_indices: list[int] = []
mixed_indices: list[int] = []
analysis_by_label: dict[int, dict[str, Any]] = {}
for label_idx in unique_labels.tolist():
binary_mask = (label_map == label_idx).astype(np.uint8)
material, analysis = classify_segment_material(binary_mask, image_rgb)
analysis_by_label[int(label_idx)] = analysis
if material == "brick":
brick_indices.append(int(label_idx))
elif material == "smooth":
smooth_indices.append(int(label_idx))
else:
mixed_indices.append(int(label_idx))
return {
"brick_indices": brick_indices,
"smooth_indices": smooth_indices,
"mixed_indices": mixed_indices,
"analysis_by_label": analysis_by_label,
}
def smooth_texture_for_segmentation(image_bgr: np.ndarray, strength: int) -> np.ndarray:
strength = max(1, min(strength, 3))
smoothed = image_bgr.copy()
d = 15
sigma_color = 55 * strength
sigma_space = 55 * strength
for _ in range(strength):
smoothed = cv2.bilateralFilter(smoothed, d, float(sigma_color), float(sigma_space))
return smoothed
def get_semantic_segmenter() -> tuple[Any, Any]:
svc = _sam2_svc
if svc.semantic_processor is not None and svc.semantic_model is not None:
return svc.semantic_processor, svc.semantic_model
with semantic_load_lock:
if svc.semantic_processor is not None and svc.semantic_model is not None:
return svc.semantic_processor, svc.semantic_model
started = log_timing_start("SEMANTIC_LOAD")
try:
svc.semantic_processor = cast(Any, AutoImageProcessor.from_pretrained(SEMANTIC_MODEL_ID))
svc.semantic_model = cast(Any, SegformerForSemanticSegmentation.from_pretrained(SEMANTIC_MODEL_ID))
device = "cuda" if torch.cuda.is_available() else "cpu"
try:
svc.semantic_model = svc.semantic_model.to(device)
except Exception:
pass
svc.semantic_model.eval()
svc.semantic_load_error = None
logger.info(f"[SEMANTIC] loaded on {device}")
except Exception as exc:
svc.semantic_processor = None
svc.semantic_model = None
svc.semantic_load_error = str(exc)
raise HTTPException(status_code=500, detail=f"Failed to load semantic model: {exc}") from exc
finally:
log_timing_end("SEMANTIC_LOAD", started)
return cast(tuple[Any, Any], (svc.semantic_processor, svc.semantic_model))
def semantic_exterior_mask(
image_rgb: np.ndarray,
semantic_keywords: tuple[str, ...],
) -> tuple[np.ndarray, list[str], float]:
processor, model = get_semantic_segmenter()
pil_image = Image.fromarray(image_rgb)
with torch.no_grad():
inputs = processor(images=pil_image, return_tensors="pt")
device = "cuda" if torch.cuda.is_available() else "cpu"
try:
inputs = {k: (v.to(device) if isinstance(v, torch.Tensor) else v) for k, v in inputs.items()}
except Exception:
pass
outputs = model(**inputs)
logits = outputs.logits
upsampled_logits = torch.nn.functional.interpolate(
logits, size=image_rgb.shape[:2], mode="bilinear", align_corners=False,
)
pred = upsampled_logits.argmax(dim=1)[0].cpu().numpy().astype(np.int32)
id2label = getattr(getattr(model, "config", None), "id2label", {}) or {}
keywords = [k.strip().lower() for k in semantic_keywords if k.strip()]
if not keywords:
keywords = ["building", "wall", "house", "roof", "facade"]
matched_ids: list[int] = []
matched_labels: list[str] = []
for class_id_raw, class_name_raw in id2label.items():
try:
class_id = int(class_id_raw)
except Exception:
continue
class_name = str(class_name_raw).lower()
if any(token in class_name for token in keywords):
matched_ids.append(class_id)
matched_labels.append(str(class_name_raw))
semantic_mask = np.zeros(pred.shape, dtype=np.uint8)
if matched_ids:
semantic_mask = np.isin(pred, np.asarray(matched_ids, dtype=np.int32)).astype(np.uint8)
if not np.any(semantic_mask):
class_ids, counts = np.unique(pred, return_counts=True)
sorted_pairs = sorted(zip(class_ids.tolist(), counts.tolist()), key=lambda it: it[1], reverse=True)
fallback_ids: list[int] = []
for class_id, _count in sorted_pairs:
name = str(id2label.get(class_id, class_id)).lower()
if any(bad in name for bad in ("sky", "road", "grass", "tree", "plant", "water", "person", "car")):
continue
fallback_ids.append(int(class_id))
if len(fallback_ids) >= 2:
break
if fallback_ids:
semantic_mask = np.isin(pred, np.asarray(fallback_ids, dtype=np.int32)).astype(np.uint8)
matched_labels = [str(id2label.get(i, i)) for i in fallback_ids]
area_ratio = float(np.count_nonzero(semantic_mask)) / float(pred.shape[0] * pred.shape[1])
return semantic_mask.astype(np.uint8), matched_labels, area_ratio
def get_depth_estimator() -> tuple[Any, Any]:
svc = _sam2_svc
if svc.depth_processor is not None and svc.depth_model is not None:
return svc.depth_processor, svc.depth_model
with depth_load_lock:
if svc.depth_processor is not None and svc.depth_model is not None:
return svc.depth_processor, svc.depth_model
started = log_timing_start("DEPTH_LOAD")
try:
svc.depth_processor = cast(Any, DPTImageProcessor.from_pretrained(DEPTH_MODEL_ID))
svc.depth_model = cast(Any, DPTForDepthEstimation.from_pretrained(DEPTH_MODEL_ID))
device = "cuda" if torch.cuda.is_available() else "cpu"
try:
svc.depth_model = svc.depth_model.to(device)
except Exception:
pass
svc.depth_model.eval()
svc.depth_load_error = None
logger.info(f"[DEPTH] loaded on {device}")
except Exception as exc:
svc.depth_processor = None
svc.depth_model = None
svc.depth_load_error = str(exc)
raise HTTPException(status_code=500, detail=f"Failed to load depth model: {exc}") from exc
finally:
log_timing_end("DEPTH_LOAD", started)
return cast(tuple[Any, Any], (svc.depth_processor, svc.depth_model))
def estimate_depth_map(image_rgb: np.ndarray) -> np.ndarray:
processor, model = get_depth_estimator()
h, w = image_rgb.shape[:2]
pil_image = Image.fromarray(image_rgb)
device = "cuda" if torch.cuda.is_available() else "cpu"
with torch.no_grad():
inputs = processor(images=pil_image, return_tensors="pt")
try:
inputs = {k: (v.to(device) if isinstance(v, torch.Tensor) else v) for k, v in inputs.items()}
except Exception:
pass
outputs = model(**inputs)
predicted_depth = outputs.predicted_depth
depth = torch.nn.functional.interpolate(
predicted_depth.unsqueeze(1), size=(h, w), mode="bicubic", align_corners=False,
).squeeze().cpu().numpy()
d_min, d_max = float(depth.min()), float(depth.max())
if d_max - d_min < 1e-8:
return np.zeros((h, w), dtype=np.uint8)
return ((depth - d_min) / (d_max - d_min) * 255.0).astype(np.uint8)
def extract_depth_wall_mask(depth_map: np.ndarray, target: str = "wall") -> np.ndarray:
h, w = depth_map.shape[:2]
depth_f = depth_map.astype(np.float32)
upper_h = max(1, h // 4)
upper_region = depth_f[:upper_h, :]
sky_pct = float(np.percentile(upper_region, 35))
sky_mask = (depth_f <= sky_pct * 1.1).astype(np.uint8)
if target == "roof":
roi = np.zeros((h, w), dtype=np.uint8)
roi[: int(h * 0.55), :] = 1
upper_half = depth_f[: int(h * 0.55), :]
non_sky_vals = upper_half[upper_half > sky_pct]
if non_sky_vals.size == 0:
return roi
low = float(np.percentile(non_sky_vals, 5))
high = float(np.percentile(non_sky_vals, 88))
depth_range_mask = ((depth_f >= low) & (depth_f <= high)).astype(np.uint8)
candidate = (depth_range_mask & roi & (sky_mask == 0)).astype(np.uint8)
else:
cy_lo, cy_hi = int(h * 0.20), int(h * 0.85)
cx_lo, cx_hi = int(w * 0.10), int(w * 0.90)
center_region = depth_f[cy_lo:cy_hi, cx_lo:cx_hi]
non_sky = center_region[center_region > sky_pct]
if non_sky.size == 0:
non_sky = center_region.flatten()
low = float(np.percentile(non_sky, 8))
high = float(np.percentile(non_sky, 90))
depth_range_mask = ((depth_f >= low) & (depth_f <= high)).astype(np.uint8)
candidate = (depth_range_mask & (sky_mask == 0)).astype(np.uint8)
candidate = cv2.morphologyEx(candidate, cv2.MORPH_OPEN, np.ones((5, 5), np.uint8), iterations=1)
candidate = cv2.morphologyEx(candidate, cv2.MORPH_CLOSE, np.ones((9, 9), np.uint8), iterations=2)
return candidate
def segment_video_sync(payload: SegmentVideoRequest) -> dict[str, Any]:
from core.config import VIDEO_OUTPUT_DIR, VIDEO_UPLOAD_DIR
step = "SEGMENT_VIDEO"
started = log_timing_start(step)
try:
safe_name = Path(payload.filename).name
if not safe_name:
raise HTTPException(status_code=400, detail="Invalid filename")
video_path = VIDEO_UPLOAD_DIR / safe_name
if not video_path.exists() or not video_path.is_file():
raise HTTPException(status_code=404, detail=f"Video not found: {safe_name}")
sample_every = max(1, int(payload.sample_every_n_frames))
max_frames = max(1, min(int(payload.max_frames_to_segment), 3000))
mask_mode = str(payload.mask_mode).strip().lower()
if mask_mode not in {"exterior", "largest"}:
raise HTTPException(status_code=400, detail="mask_mode must be 'exterior' or 'largest'")
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
raise HTTPException(status_code=400, detail="Video could not be opened")
fps = cap.get(cv2.CAP_PROP_FPS)
if not fps or fps <= 0:
fps = 24.0
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0)
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0)
if width <= 0 or height <= 0:
cap.release()
raise HTTPException(status_code=400, detail="Video has invalid dimensions")
stem = Path(safe_name).stem
out_filename = f"{stem}_sam2_overlay.mp4"
out_path = VIDEO_OUTPUT_DIR / out_filename
fourcc_fn = getattr(cv2, "VideoWriter_fourcc")
writer = cv2.VideoWriter(str(out_path), int(fourcc_fn(*"mp4v")), float(fps), (width, height))
if not writer.isOpened():
cap.release()
raise HTTPException(status_code=500, detail="Failed to create output video")
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
frame_idx = 0
segmented_frames = 0
highlighted_frames = 0
while True:
ok, frame_bgr = cap.read()
if not ok:
break
should_segment = (frame_idx % sample_every == 0) and (segmented_frames < max_frames)
if should_segment:
frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
binary_mask = np.zeros((height, width), dtype=bool)
if mask_mode == "largest":
from services.sam2_service import get_sam2_mask_generator
with sam2_predict_lock:
frame_mask_start = log_timing_start("SAM2_FRAME_GENERATE")
masks = get_sam2_mask_generator().generate(frame_rgb)
log_timing_end("SAM2_FRAME_GENERATE", frame_mask_start)
if masks:
best = max(masks, key=lambda m: int(m.get("area", 0)))
seg = best.get("segmentation")
if seg is not None:
binary_mask = np.asarray(seg, dtype=bool)
else:
label_map, _ = generate_label_map(frame_rgb)
candidates = rank_exterior_candidates(label_map, 8)
if candidates:
merged = merge_sam2_wall_fragments(label_map, 8)
if np.any(merged):
binary_mask = merged.astype(bool)
else:
best_idx = int(candidates[0]["mask_index"])
binary_mask = label_map == best_idx
if bool(np.any(binary_mask)):
frame_bgr = build_mask_overlay(frame_bgr, binary_mask, payload.overlay_alpha)
highlighted_frames += 1
segmented_frames += 1
writer.write(frame_bgr)
frame_idx += 1
cap.release()
writer.release()
return {
"message": "Video segmentation completed",
"input_filename": safe_name,
"output_filename": out_filename,
"output_url": f"/seg/output-video/{out_filename}",
"total_frames": total_frames,
"segmented_frames": segmented_frames,
"highlighted_frames": highlighted_frames,
"sample_every_n_frames": sample_every,
"mask_mode": mask_mode,
}
finally:
log_timing_end(step, started)
try:
release_resources()
except Exception:
logger.exception("Error releasing resources after SEGMENT_VIDEO")
def segment_exterior_grabcut_sync(payload: ExteriorGrabCutRequest) -> dict[str, Any]:
step = "EXTERIOR_GRABCUT"
started = log_timing_start(step)
try:
safe_name, image_rgb = load_image_rgb_for_edit(payload.filename)
image_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR)
h, w = image_bgr.shape[:2]
if payload.rect_xywh is not None:
x, y, rw, rh = [int(v) for v in payload.rect_xywh]
else:
x, y, rw, rh = int(0.06 * w), int(0.10 * h), int(0.88 * w), int(0.84 * h)
x = max(0, min(x, w - 2))
y = max(0, min(y, h - 2))
rw = max(2, min(rw, w - x))
rh = max(2, min(rh, h - y))
rect = (x, y, rw, rh)
iter_count = max(1, min(int(payload.iterations), 12))
mask = np.zeros((h, w), np.uint8)
bg_model = np.zeros((1, 65), np.float64)
fg_model = np.zeros((1, 65), np.float64)
cv2.grabCut(image_bgr, mask, rect, bg_model, fg_model, iter_count, cv2.GC_INIT_WITH_RECT)
fg_mask = np.where((mask == cv2.GC_FGD) | (mask == cv2.GC_PR_FGD), 1, 0).astype(np.uint8)
kernel = np.ones((5, 5), np.uint8)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, kernel, iterations=1)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel, iterations=1)
if payload.use_sam2_hint:
sam_label_map, _ = generate_label_map(image_rgb)
sam_hint = merge_sam2_wall_fragments(sam_label_map, 8)
if np.any(sam_hint):
fg_mask = np.where((fg_mask > 0) | (sam_hint > 0), 1, 0).astype(np.uint8)
if not np.any(fg_mask):
raise HTTPException(status_code=400, detail="GrabCut did not find a foreground region")
label_map, component_count, recommended_idx = build_component_label_map(fg_mask, min_area_ratio=0.012)
if component_count == 0:
label_map = np.where(fg_mask > 0, 1, 0).astype(np.uint8)
component_count = 1
recommended_idx = 1
label_owner = f"{Path(safe_name).stem}_exterior_grabcut.jpg"
saved_owner = save_label_map_for_owner(label_owner, label_map)
preview = build_mask_overlay(image_bgr, fg_mask.astype(bool), 0.42, color_bgr=(15, 170, 245))
preview_filename = f"{Path(safe_name).stem}_exterior_grabcut_preview.jpg"
if not cv2.imwrite(str(OUTPUT_DIR / preview_filename), preview):
raise HTTPException(status_code=500, detail="Failed to save GrabCut preview image")
area_ratio = float(np.count_nonzero(fg_mask)) / float(h * w)
return {
"message": "Exterior segmentation with GrabCut completed",
"filename": safe_name,
"original_filename_for_apply": saved_owner,
"mask_count": component_count,
"available_mask_indices": list(range(1, component_count + 1)),
"recommended_mask_index": recommended_idx,
"foreground_area_ratio": round(area_ratio, 6),
"preview_filename": preview_filename,
"preview_url": f"/seg/ai/{preview_filename}",
"rect_xywh": [x, y, rw, rh],
"iterations": iter_count,
"used_sam2_hint": bool(payload.use_sam2_hint),
}
finally:
log_timing_end(step, started)
try:
release_resources()
except Exception:
logger.exception("Error releasing resources after EXTERIOR_GRABCUT")
def segment_exterior_hybrid_sync(payload: ExteriorHybridRequest) -> dict[str, Any]:
step = "EXTERIOR_HYBRID"
started = log_timing_start(step)
try:
safe_name, image_rgb = load_image_rgb_for_edit(payload.filename)
image_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR)
h, w = image_bgr.shape[:2]
if payload.rect_xywh is not None:
x, y, rw, rh = [int(v) for v in payload.rect_xywh]
else:
x, y, rw, rh = int(0.06 * w), int(0.10 * h), int(0.88 * w), int(0.84 * h)
x = max(0, min(x, w - 2))
y = max(0, min(y, h - 2))
rw = max(2, min(rw, w - x))
rh = max(2, min(rh, h - y))
rect = (x, y, rw, rh)
iter_count = max(1, min(int(payload.iterations), 12))
hint_mask = np.zeros((h, w), dtype=np.uint8)
semantic_labels: list[str] = []
semantic_area_ratio = 0.0
if payload.use_semantic_hint:
sem_mask, semantic_labels, semantic_area_ratio = semantic_exterior_mask(image_rgb, payload.semantic_keywords)
hint_mask = np.where(sem_mask > 0, 1, hint_mask).astype(np.uint8)
if payload.use_sam2_hint:
sam_label_map, _ = generate_label_map(image_rgb)
sam_hint = merge_sam2_wall_fragments(sam_label_map, 10)
if np.any(sam_hint):
hint_mask = np.where(sam_hint > 0, 1, hint_mask).astype(np.uint8)
gc_mask = np.full((h, w), cv2.GC_PR_BGD, dtype=np.uint8)
outside_rect = np.ones((h, w), dtype=bool)
outside_rect[y : y + rh, x : x + rw] = False
gc_mask[outside_rect] = cv2.GC_BGD
if np.any(hint_mask):
gc_mask[hint_mask > 0] = cv2.GC_PR_FGD
sure_fg = cv2.erode((hint_mask * 255).astype(np.uint8), np.ones((7, 7), np.uint8), iterations=1)
gc_mask[sure_fg > 0] = cv2.GC_FGD
bg_model = np.zeros((1, 65), np.float64)
fg_model = np.zeros((1, 65), np.float64)
cv2.grabCut(image_bgr, gc_mask, rect, bg_model, fg_model, iter_count, cv2.GC_INIT_WITH_MASK)
fg_mask = np.where((gc_mask == cv2.GC_FGD) | (gc_mask == cv2.GC_PR_FGD), 1, 0).astype(np.uint8)
kernel = np.ones((5, 5), np.uint8)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, kernel, iterations=1)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel, iterations=1)
if not np.any(fg_mask):
raise HTTPException(status_code=400, detail="Hybrid exterior segmentation did not find a foreground region")
label_map, component_count, recommended_idx = build_component_label_map(fg_mask, min_area_ratio=0.01)
if component_count == 0:
label_map = np.where(fg_mask > 0, 1, 0).astype(np.uint8)
component_count = 1
recommended_idx = 1
label_owner = f"{Path(safe_name).stem}_exterior_hybrid.jpg"
saved_owner = save_label_map_for_owner(label_owner, label_map)
preview = build_mask_overlay(image_bgr, fg_mask.astype(bool), 0.44, color_bgr=(0, 180, 255))
preview_filename = f"{Path(safe_name).stem}_exterior_hybrid_preview.jpg"
if not cv2.imwrite(str(OUTPUT_DIR / preview_filename), preview):
raise HTTPException(status_code=500, detail="Failed to save hybrid preview image")
area_ratio = float(np.count_nonzero(fg_mask)) / float(h * w)
return {
"message": "Hybrid exterior segmentation completed",
"filename": safe_name,
"original_filename_for_apply": saved_owner,
"mask_count": component_count,
"available_mask_indices": list(range(1, component_count + 1)),
"recommended_mask_index": recommended_idx,
"foreground_area_ratio": round(area_ratio, 6),
"preview_filename": preview_filename,
"preview_url": f"/seg/ai/{preview_filename}",
"rect_xywh": [x, y, rw, rh],
"iterations": iter_count,
"used_sam2_hint": bool(payload.use_sam2_hint),
"used_semantic_hint": bool(payload.use_semantic_hint),
"semantic_labels": semantic_labels,
"semantic_area_ratio": round(float(semantic_area_ratio), 6),
}
finally:
log_timing_end(step, started)
try:
release_resources()
except Exception:
logger.exception("Error releasing resources after EXTERIOR_HYBRID")
def segment_exterior_brick_sync(payload: ExteriorBrickRequest) -> dict[str, Any]:
step = "EXTERIOR_BRICK"
started = log_timing_start(step)
try:
safe_name, image_rgb = load_image_rgb_for_edit(payload.filename)
image_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR)
h, w = image_bgr.shape[:2]
strength = max(1, min(int(payload.smooth_strength), 3))
smoothed_bgr = smooth_texture_for_segmentation(image_bgr, strength)
smoothed_rgb = cv2.cvtColor(smoothed_bgr, cv2.COLOR_BGR2RGB)
if payload.rect_xywh is not None:
x, y, rw, rh = [int(v) for v in payload.rect_xywh]
else:
x, y, rw, rh = int(0.05 * w), int(0.08 * h), int(0.90 * w), int(0.86 * h)
x = max(0, min(x, w - 2))
y = max(0, min(y, h - 2))
rw = max(2, min(rw, w - x))
rh = max(2, min(rh, h - y))
rect = (x, y, rw, rh)
iter_count = max(1, min(int(payload.iterations), 12))
hint_mask = np.zeros((h, w), dtype=np.uint8)
semantic_labels: list[str] = []
semantic_area_ratio = 0.0
if payload.use_semantic_hint:
sem_mask, semantic_labels, semantic_area_ratio = semantic_exterior_mask(smoothed_rgb, payload.semantic_keywords)
hint_mask = np.where(sem_mask > 0, 1, hint_mask).astype(np.uint8)
sam_label_map, _ = generate_label_map(smoothed_rgb)
merged_sam_mask = merge_sam2_wall_fragments(sam_label_map, int(payload.sam2_merge_top_k))
if np.any(merged_sam_mask):
hint_mask = np.where(merged_sam_mask > 0, 1, hint_mask).astype(np.uint8)
gc_mask = np.full((h, w), cv2.GC_PR_BGD, dtype=np.uint8)
outside_rect = np.ones((h, w), dtype=bool)
outside_rect[y : y + rh, x : x + rw] = False
gc_mask[outside_rect] = cv2.GC_BGD
if np.any(hint_mask):
gc_mask[hint_mask > 0] = cv2.GC_PR_FGD
sure_fg = cv2.erode((hint_mask * 255).astype(np.uint8), np.ones((9, 9), np.uint8), iterations=1)
gc_mask[sure_fg > 0] = cv2.GC_FGD
bg_model = np.zeros((1, 65), np.float64)
fg_model = np.zeros((1, 65), np.float64)
cv2.grabCut(image_bgr, gc_mask, rect, bg_model, fg_model, iter_count, cv2.GC_INIT_WITH_MASK)
fg_mask = np.where((gc_mask == cv2.GC_FGD) | (gc_mask == cv2.GC_PR_FGD), 1, 0).astype(np.uint8)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, np.ones((9, 9), np.uint8), iterations=1)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, np.ones((5, 5), np.uint8), iterations=1)
if not np.any(fg_mask):
raise HTTPException(status_code=400, detail="Brick segmentation did not find a foreground region.")
label_map_out, component_count, recommended_idx = build_component_label_map(fg_mask, min_area_ratio=0.01)
if component_count == 0:
label_map_out = np.where(fg_mask > 0, 1, 0).astype(np.uint8)
component_count = 1
recommended_idx = 1
label_owner = f"{Path(safe_name).stem}_exterior_brick.jpg"
saved_owner = save_label_map_for_owner(label_owner, label_map_out)
preview = build_mask_overlay(image_bgr, fg_mask.astype(bool), 0.44, color_bgr=(20, 140, 255))
preview_filename = f"{Path(safe_name).stem}_exterior_brick_preview.jpg"
if not cv2.imwrite(str(OUTPUT_DIR / preview_filename), preview):
raise HTTPException(status_code=500, detail="Failed to save brick preview image")
area_ratio = float(np.count_nonzero(fg_mask)) / float(h * w)
material_analysis = separate_materials_by_label(label_map_out, image_rgb)
return {
"message": "Brick/masonry exterior segmentation completed",
"filename": safe_name,
"original_filename_for_apply": saved_owner,
"mask_count": component_count,
"available_mask_indices": list(range(1, component_count + 1)),
"recommended_mask_index": recommended_idx,
"foreground_area_ratio": round(area_ratio, 6),
"preview_filename": preview_filename,
"preview_url": f"/seg/ai/{preview_filename}",
"rect_xywh": [x, y, rw, rh],
"iterations": iter_count,
"smooth_strength": strength,
"sam2_merge_top_k": int(payload.sam2_merge_top_k),
"semantic_labels": semantic_labels,
"semantic_area_ratio": round(float(semantic_area_ratio), 6),
"material_classification": {
"brick_indices": material_analysis["brick_indices"],
"smooth_indices": material_analysis["smooth_indices"],
"mixed_indices": material_analysis["mixed_indices"],
"analysis_by_label": material_analysis["analysis_by_label"],
},
}
finally:
log_timing_end(step, started)
try:
release_resources()
except Exception:
logger.exception("Error releasing resources after EXTERIOR_BRICK")
def segment_exterior_depth_sync(payload: ExteriorDepthRequest) -> dict[str, Any]:
step = "EXTERIOR_DEPTH"
started = log_timing_start(step)
try:
safe_name, image_rgb = load_image_rgb_for_edit(payload.filename)
image_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR)
h, w = image_bgr.shape[:2]
target_name = normalize_exterior_target(payload.exterior_target)
strength = max(1, min(int(payload.smooth_strength), 3))
smoothed_bgr = smooth_texture_for_segmentation(image_bgr, strength)
smoothed_rgb = cv2.cvtColor(smoothed_bgr, cv2.COLOR_BGR2RGB)
if payload.rect_xywh is not None:
x, y, rw, rh = [int(v) for v in payload.rect_xywh]
else:
x, y, rw, rh = int(0.04 * w), int(0.06 * h), int(0.92 * w), int(0.88 * h)
x = max(0, min(x, w - 2))
y = max(0, min(y, h - 2))
rw = max(2, min(rw, w - x))
rh = max(2, min(rh, h - y))
rect = (x, y, rw, rh)
iter_count = max(1, min(int(payload.iterations), 12))
hint_mask = np.zeros((h, w), dtype=np.uint8)
semantic_labels: list[str] = []
semantic_area_ratio = 0.0
depth_map_arr: np.ndarray | None = None
if payload.use_semantic_hint:
try:
sem_mask, semantic_labels, semantic_area_ratio = semantic_exterior_mask(smoothed_rgb, payload.semantic_keywords)
hint_mask = np.where(sem_mask > 0, 1, hint_mask).astype(np.uint8)
except Exception:
pass
sam_label_map, _ = generate_label_map(smoothed_rgb)
merged_sam_mask = merge_sam2_wall_fragments(sam_label_map, int(payload.sam2_merge_top_k))
if np.any(merged_sam_mask):
hint_mask = np.where(merged_sam_mask > 0, 1, hint_mask).astype(np.uint8)
if payload.use_depth_hint:
try:
depth_map_arr = estimate_depth_map(image_rgb)
depth_mask = extract_depth_wall_mask(depth_map_arr, target=target_name)
hint_mask = np.where(depth_mask > 0, 1, hint_mask).astype(np.uint8)
except Exception:
depth_map_arr = None
gc_mask = np.full((h, w), cv2.GC_PR_BGD, dtype=np.uint8)
outside_rect = np.ones((h, w), dtype=bool)
outside_rect[y : y + rh, x : x + rw] = False
gc_mask[outside_rect] = cv2.GC_BGD
if np.any(hint_mask):
gc_mask[hint_mask > 0] = cv2.GC_PR_FGD
sure_fg = cv2.erode((hint_mask * 255).astype(np.uint8), np.ones((9, 9), np.uint8), iterations=1)
gc_mask[sure_fg > 0] = cv2.GC_FGD
if depth_map_arr is not None and payload.use_depth_hint:
depth_f = depth_map_arr.astype(np.float32)
upper_h = max(1, h // 4)
sky_pct = float(np.percentile(depth_f[:upper_h, :], 35))
row_idx = np.arange(h, dtype=np.int32)[:, np.newaxis]
definite_sky = np.asarray(
(row_idx < upper_h) & (depth_f <= sky_pct * 1.1) & (gc_mask == cv2.GC_PR_BGD),
dtype=bool,
)
gc_mask[definite_sky] = cv2.GC_BGD
bg_model = np.zeros((1, 65), np.float64)
fg_model = np.zeros((1, 65), np.float64)
cv2.grabCut(image_bgr, gc_mask, rect, bg_model, fg_model, iter_count, cv2.GC_INIT_WITH_MASK)
fg_mask = np.where((gc_mask == cv2.GC_FGD) | (gc_mask == cv2.GC_PR_FGD), 1, 0).astype(np.uint8)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, np.ones((9, 9), np.uint8), iterations=1)
fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, np.ones((5, 5), np.uint8), iterations=1)
if not np.any(fg_mask):
raise HTTPException(status_code=400, detail="Depth-guided segmentation found no foreground region")
label_map_out, component_count, recommended_idx = build_component_label_map(fg_mask, min_area_ratio=0.01)
if component_count == 0:
label_map_out = np.where(fg_mask > 0, 1, 0).astype(np.uint8)
component_count = 1
recommended_idx = 1
label_owner = f"{Path(safe_name).stem}_exterior_depth.jpg"
saved_owner = save_label_map_for_owner(label_owner, label_map_out)
preview = build_mask_overlay(image_bgr, fg_mask.astype(bool), 0.44, color_bgr=(30, 120, 255))
preview_filename = f"{Path(safe_name).stem}_exterior_depth_preview.jpg"
if not cv2.imwrite(str(OUTPUT_DIR / preview_filename), preview):
raise HTTPException(status_code=500, detail="Failed to save depth preview")
area_ratio = float(np.count_nonzero(fg_mask)) / float(h * w)
return {
"message": "Depth-guided exterior segmentation completed",
"filename": safe_name,
"original_filename_for_apply": saved_owner,
"mask_count": component_count,
"available_mask_indices": list(range(1, component_count + 1)),
"recommended_mask_index": recommended_idx,
"foreground_area_ratio": round(area_ratio, 6),
"preview_filename": preview_filename,
"preview_url": f"/seg/ai/{preview_filename}",
"rect_xywh": [x, y, rw, rh],
"iterations": iter_count,
"exterior_target": target_name,
"smooth_strength": strength,
"used_semantic_hint": bool(payload.use_semantic_hint),
"used_depth_hint": bool(payload.use_depth_hint),
"semantic_labels": semantic_labels,
"semantic_area_ratio": round(float(semantic_area_ratio), 6),
}
finally:
log_timing_end(step, started)
try:
release_resources()
except Exception:
logger.exception("Error releasing resources after EXTERIOR_DEPTH")