from pathlib import Path from typing import Any, cast import cv2 import numpy as np try: import torch _TORCH_AVAILABLE = True except ImportError: torch = None # type: ignore[assignment] _TORCH_AVAILABLE = False from fastapi import HTTPException from PIL import Image from core.config import ( DEPTH_MODEL_ID, OUTPUT_DIR, SEMANTIC_MODEL_ID, log_timing_end, log_timing_start, logger, ) from models.schemas import ( ExteriorBrickRequest, ExteriorDepthRequest, ExteriorGrabCutRequest, ExteriorHybridRequest, GuidedSegmentRequest, SegmentVideoRequest, ) from services.image_service import load_image_rgb_for_edit, save_label_map_for_owner from services.sam2_service import ( SAM2_UNLOAD_AFTER_USE, depth_load_lock, get_sam2_image_predictor, release_resources, sam2_predict_lock, semantic_load_lock, ) from services.scene_service import ( build_component_label_map, build_mask_overlay, generate_label_map, merge_sam2_wall_fragments, normalize_exterior_target, rank_exterior_candidates, ) try: from transformers import ( # type: ignore[import-untyped] AutoImageProcessor, DPTForDepthEstimation, DPTImageProcessor, SegformerForSemanticSegmentation, ) _TRANSFORMERS_AVAILABLE = True except ImportError: _TRANSFORMERS_AVAILABLE = False import services.sam2_service as _sam2_svc def parse_mask_index(mask_filename: str) -> int: try: parts = Path(mask_filename).stem.split("_") return int(parts[-1]) except (ValueError, IndexError): raise HTTPException(status_code=400, detail="Invalid mask filename format") from None def parse_rgb_color(color: str) -> tuple[int, int, int]: color = str(color).strip().lstrip("#") if len(color) != 6: raise HTTPException(status_code=400, detail="Color must be a hex color like #RRGGBB") try: r = int(color[0:2], 16) g = int(color[2:4], 16) b = int(color[4:6], 16) return (r, g, b) except ValueError as exc: raise HTTPException(status_code=400, detail="Invalid hex color format") from exc def generate_guided_label_map( image_rgb: np.ndarray, point_coords: list[list[float]], point_labels: list[int], box_xyxy: list[float], multimask_output: bool = False, ) -> tuple[np.ndarray, list[float]]: predictor = get_sam2_image_predictor() started = log_timing_start("SAM2_PREDICT") with sam2_predict_lock: predictor.set_image(image_rgb) input_points = np.array(point_coords, dtype=np.float32) if point_coords and point_labels else None input_labels = np.array(point_labels, dtype=np.int32) if point_coords and point_labels else None input_box = np.array(box_xyxy, dtype=np.float32) if box_xyxy and len(box_xyxy) == 4 else None masks, scores, _ = predictor.predict( point_coords=input_points, point_labels=input_labels, box=input_box, multimask_output=multimask_output, ) sorted_indices = np.argsort(scores)[::-1] label_map = np.zeros((image_rgb.shape[0], image_rgb.shape[1]), dtype=np.uint8) for idx, orig_idx in enumerate(sorted_indices, start=1): mask = masks[orig_idx] label_map[np.asarray(mask, dtype=bool)] = idx ranked_scores = [float(scores[i]) for i in sorted_indices] try: log_timing_end("SAM2_PREDICT", started) logger.info(f"[SAM2_PREDICT] masks={len(masks)} top_scores={ranked_scores[:3]}") except Exception: pass if SAM2_UNLOAD_AFTER_USE: try: release_resources(full_unload=True) except Exception: logger.exception("Error unloading SAM after generate_guided_label_map") return label_map, ranked_scores def analyze_material_texture_complexity(binary_mask: np.ndarray, image_rgb: np.ndarray) -> float: mask_u8 = (binary_mask > 0).astype(np.uint8) if mask_u8.max() == 0: return 0.0 masked_region = image_rgb.copy() masked_region[mask_u8 == 0] = [128, 128, 128] gray = cv2.cvtColor(masked_region, cv2.COLOR_RGB2GRAY) edges = np.asarray(cv2.Canny(gray, 50, 150), dtype=np.uint8) edge_pixels = int(np.count_nonzero((edges > 0) & (mask_u8 > 0))) mask_pixels = np.count_nonzero(mask_u8) if mask_pixels == 0: return 0.0 edge_density = edge_pixels / float(mask_pixels) return float(np.clip(edge_density / 0.35, 0.0, 1.0)) def analyze_material_color(binary_mask: np.ndarray, image_rgb: np.ndarray) -> tuple[float, dict[str, float]]: mask_u8 = (binary_mask > 0).astype(np.uint8) if mask_u8.max() == 0: return 0.0, {} masked_rgb = image_rgb[mask_u8 > 0] if masked_rgb.shape[0] == 0: return 0.0, {} masked_rgb_img = Image.fromarray(masked_rgb.reshape(-1, 1, 3).astype(np.uint8)) masked_hsv = cv2.cvtColor(np.array(masked_rgb_img), cv2.COLOR_RGB2HSV) h = masked_hsv[:, :, 0] s = masked_hsv[:, :, 1] v = masked_hsv[:, :, 2] brick_hue_mask = ((h <= 15) | (h >= 165)) brick_sat_mask = (s > 40) brick_val_mask = (v > 40) brick_pixels = np.count_nonzero(brick_hue_mask & brick_sat_mask & brick_val_mask) total_pixels = h.size brick_score = brick_pixels / float(max(1, total_pixels)) smooth_hue_mask = ((h >= 15) & (h <= 60)) | ((h >= 70) & (h <= 140)) smooth_sat_mask = (s < 60) smooth_pixels = np.count_nonzero(smooth_hue_mask & smooth_sat_mask) smooth_score = smooth_pixels / float(max(1, total_pixels)) stats: dict[str, float] = { "brick_score": float(brick_score), "smooth_score": float(smooth_score), "mean_h": float(np.mean(h)), "mean_s": float(np.mean(s)), "mean_v": float(np.mean(v)), } return brick_score, stats def classify_segment_material(binary_mask: np.ndarray, image_rgb: np.ndarray) -> tuple[str, dict[str, Any]]: texture_score = analyze_material_texture_complexity(binary_mask, image_rgb) brick_color_score, color_stats = analyze_material_color(binary_mask, image_rgb) combined_brick_score = (0.6 * texture_score) + (0.4 * brick_color_score) analysis: dict[str, Any] = { "texture_score": round(texture_score, 4), "color_brick_score": round(brick_color_score, 4), "combined_score": round(combined_brick_score, 4), "color_stats": {k: round(v, 2) for k, v in color_stats.items()}, } if combined_brick_score >= 0.55: material = "brick" elif combined_brick_score <= 0.35: material = "smooth" else: material = "mixed" analysis["material_type"] = material return material, analysis def separate_materials_by_label(label_map: np.ndarray, image_rgb: np.ndarray) -> dict[str, Any]: unique_labels = np.unique(label_map[label_map > 0]) brick_indices: list[int] = [] smooth_indices: list[int] = [] mixed_indices: list[int] = [] analysis_by_label: dict[int, dict[str, Any]] = {} for label_idx in unique_labels.tolist(): binary_mask = (label_map == label_idx).astype(np.uint8) material, analysis = classify_segment_material(binary_mask, image_rgb) analysis_by_label[int(label_idx)] = analysis if material == "brick": brick_indices.append(int(label_idx)) elif material == "smooth": smooth_indices.append(int(label_idx)) else: mixed_indices.append(int(label_idx)) return { "brick_indices": brick_indices, "smooth_indices": smooth_indices, "mixed_indices": mixed_indices, "analysis_by_label": analysis_by_label, } def smooth_texture_for_segmentation(image_bgr: np.ndarray, strength: int) -> np.ndarray: strength = max(1, min(strength, 3)) smoothed = image_bgr.copy() d = 15 sigma_color = 55 * strength sigma_space = 55 * strength for _ in range(strength): smoothed = cv2.bilateralFilter(smoothed, d, float(sigma_color), float(sigma_space)) return smoothed def get_semantic_segmenter() -> tuple[Any, Any]: svc = _sam2_svc if svc.semantic_processor is not None and svc.semantic_model is not None: return svc.semantic_processor, svc.semantic_model with semantic_load_lock: if svc.semantic_processor is not None and svc.semantic_model is not None: return svc.semantic_processor, svc.semantic_model started = log_timing_start("SEMANTIC_LOAD") try: svc.semantic_processor = cast(Any, AutoImageProcessor.from_pretrained(SEMANTIC_MODEL_ID)) svc.semantic_model = cast(Any, SegformerForSemanticSegmentation.from_pretrained(SEMANTIC_MODEL_ID)) device = "cuda" if torch.cuda.is_available() else "cpu" try: svc.semantic_model = svc.semantic_model.to(device) except Exception: pass svc.semantic_model.eval() svc.semantic_load_error = None logger.info(f"[SEMANTIC] loaded on {device}") except Exception as exc: svc.semantic_processor = None svc.semantic_model = None svc.semantic_load_error = str(exc) raise HTTPException(status_code=500, detail=f"Failed to load semantic model: {exc}") from exc finally: log_timing_end("SEMANTIC_LOAD", started) return cast(tuple[Any, Any], (svc.semantic_processor, svc.semantic_model)) def semantic_exterior_mask( image_rgb: np.ndarray, semantic_keywords: tuple[str, ...], ) -> tuple[np.ndarray, list[str], float]: processor, model = get_semantic_segmenter() pil_image = Image.fromarray(image_rgb) with torch.no_grad(): inputs = processor(images=pil_image, return_tensors="pt") device = "cuda" if torch.cuda.is_available() else "cpu" try: inputs = {k: (v.to(device) if isinstance(v, torch.Tensor) else v) for k, v in inputs.items()} except Exception: pass outputs = model(**inputs) logits = outputs.logits upsampled_logits = torch.nn.functional.interpolate( logits, size=image_rgb.shape[:2], mode="bilinear", align_corners=False, ) pred = upsampled_logits.argmax(dim=1)[0].cpu().numpy().astype(np.int32) id2label = getattr(getattr(model, "config", None), "id2label", {}) or {} keywords = [k.strip().lower() for k in semantic_keywords if k.strip()] if not keywords: keywords = ["building", "wall", "house", "roof", "facade"] matched_ids: list[int] = [] matched_labels: list[str] = [] for class_id_raw, class_name_raw in id2label.items(): try: class_id = int(class_id_raw) except Exception: continue class_name = str(class_name_raw).lower() if any(token in class_name for token in keywords): matched_ids.append(class_id) matched_labels.append(str(class_name_raw)) semantic_mask = np.zeros(pred.shape, dtype=np.uint8) if matched_ids: semantic_mask = np.isin(pred, np.asarray(matched_ids, dtype=np.int32)).astype(np.uint8) if not np.any(semantic_mask): class_ids, counts = np.unique(pred, return_counts=True) sorted_pairs = sorted(zip(class_ids.tolist(), counts.tolist()), key=lambda it: it[1], reverse=True) fallback_ids: list[int] = [] for class_id, _count in sorted_pairs: name = str(id2label.get(class_id, class_id)).lower() if any(bad in name for bad in ("sky", "road", "grass", "tree", "plant", "water", "person", "car")): continue fallback_ids.append(int(class_id)) if len(fallback_ids) >= 2: break if fallback_ids: semantic_mask = np.isin(pred, np.asarray(fallback_ids, dtype=np.int32)).astype(np.uint8) matched_labels = [str(id2label.get(i, i)) for i in fallback_ids] area_ratio = float(np.count_nonzero(semantic_mask)) / float(pred.shape[0] * pred.shape[1]) return semantic_mask.astype(np.uint8), matched_labels, area_ratio def get_depth_estimator() -> tuple[Any, Any]: svc = _sam2_svc if svc.depth_processor is not None and svc.depth_model is not None: return svc.depth_processor, svc.depth_model with depth_load_lock: if svc.depth_processor is not None and svc.depth_model is not None: return svc.depth_processor, svc.depth_model started = log_timing_start("DEPTH_LOAD") try: svc.depth_processor = cast(Any, DPTImageProcessor.from_pretrained(DEPTH_MODEL_ID)) svc.depth_model = cast(Any, DPTForDepthEstimation.from_pretrained(DEPTH_MODEL_ID)) device = "cuda" if torch.cuda.is_available() else "cpu" try: svc.depth_model = svc.depth_model.to(device) except Exception: pass svc.depth_model.eval() svc.depth_load_error = None logger.info(f"[DEPTH] loaded on {device}") except Exception as exc: svc.depth_processor = None svc.depth_model = None svc.depth_load_error = str(exc) raise HTTPException(status_code=500, detail=f"Failed to load depth model: {exc}") from exc finally: log_timing_end("DEPTH_LOAD", started) return cast(tuple[Any, Any], (svc.depth_processor, svc.depth_model)) def estimate_depth_map(image_rgb: np.ndarray) -> np.ndarray: processor, model = get_depth_estimator() h, w = image_rgb.shape[:2] pil_image = Image.fromarray(image_rgb) device = "cuda" if torch.cuda.is_available() else "cpu" with torch.no_grad(): inputs = processor(images=pil_image, return_tensors="pt") try: inputs = {k: (v.to(device) if isinstance(v, torch.Tensor) else v) for k, v in inputs.items()} except Exception: pass outputs = model(**inputs) predicted_depth = outputs.predicted_depth depth = torch.nn.functional.interpolate( predicted_depth.unsqueeze(1), size=(h, w), mode="bicubic", align_corners=False, ).squeeze().cpu().numpy() d_min, d_max = float(depth.min()), float(depth.max()) if d_max - d_min < 1e-8: return np.zeros((h, w), dtype=np.uint8) return ((depth - d_min) / (d_max - d_min) * 255.0).astype(np.uint8) def extract_depth_wall_mask(depth_map: np.ndarray, target: str = "wall") -> np.ndarray: h, w = depth_map.shape[:2] depth_f = depth_map.astype(np.float32) upper_h = max(1, h // 4) upper_region = depth_f[:upper_h, :] sky_pct = float(np.percentile(upper_region, 35)) sky_mask = (depth_f <= sky_pct * 1.1).astype(np.uint8) if target == "roof": roi = np.zeros((h, w), dtype=np.uint8) roi[: int(h * 0.55), :] = 1 upper_half = depth_f[: int(h * 0.55), :] non_sky_vals = upper_half[upper_half > sky_pct] if non_sky_vals.size == 0: return roi low = float(np.percentile(non_sky_vals, 5)) high = float(np.percentile(non_sky_vals, 88)) depth_range_mask = ((depth_f >= low) & (depth_f <= high)).astype(np.uint8) candidate = (depth_range_mask & roi & (sky_mask == 0)).astype(np.uint8) else: cy_lo, cy_hi = int(h * 0.20), int(h * 0.85) cx_lo, cx_hi = int(w * 0.10), int(w * 0.90) center_region = depth_f[cy_lo:cy_hi, cx_lo:cx_hi] non_sky = center_region[center_region > sky_pct] if non_sky.size == 0: non_sky = center_region.flatten() low = float(np.percentile(non_sky, 8)) high = float(np.percentile(non_sky, 90)) depth_range_mask = ((depth_f >= low) & (depth_f <= high)).astype(np.uint8) candidate = (depth_range_mask & (sky_mask == 0)).astype(np.uint8) candidate = cv2.morphologyEx(candidate, cv2.MORPH_OPEN, np.ones((5, 5), np.uint8), iterations=1) candidate = cv2.morphologyEx(candidate, cv2.MORPH_CLOSE, np.ones((9, 9), np.uint8), iterations=2) return candidate def segment_video_sync(payload: SegmentVideoRequest) -> dict[str, Any]: from core.config import VIDEO_OUTPUT_DIR, VIDEO_UPLOAD_DIR step = "SEGMENT_VIDEO" started = log_timing_start(step) try: safe_name = Path(payload.filename).name if not safe_name: raise HTTPException(status_code=400, detail="Invalid filename") video_path = VIDEO_UPLOAD_DIR / safe_name if not video_path.exists() or not video_path.is_file(): raise HTTPException(status_code=404, detail=f"Video not found: {safe_name}") sample_every = max(1, int(payload.sample_every_n_frames)) max_frames = max(1, min(int(payload.max_frames_to_segment), 3000)) mask_mode = str(payload.mask_mode).strip().lower() if mask_mode not in {"exterior", "largest"}: raise HTTPException(status_code=400, detail="mask_mode must be 'exterior' or 'largest'") cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): raise HTTPException(status_code=400, detail="Video could not be opened") fps = cap.get(cv2.CAP_PROP_FPS) if not fps or fps <= 0: fps = 24.0 width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0) if width <= 0 or height <= 0: cap.release() raise HTTPException(status_code=400, detail="Video has invalid dimensions") stem = Path(safe_name).stem out_filename = f"{stem}_sam2_overlay.mp4" out_path = VIDEO_OUTPUT_DIR / out_filename fourcc_fn = getattr(cv2, "VideoWriter_fourcc") writer = cv2.VideoWriter(str(out_path), int(fourcc_fn(*"mp4v")), float(fps), (width, height)) if not writer.isOpened(): cap.release() raise HTTPException(status_code=500, detail="Failed to create output video") total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) frame_idx = 0 segmented_frames = 0 highlighted_frames = 0 while True: ok, frame_bgr = cap.read() if not ok: break should_segment = (frame_idx % sample_every == 0) and (segmented_frames < max_frames) if should_segment: frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) binary_mask = np.zeros((height, width), dtype=bool) if mask_mode == "largest": from services.sam2_service import get_sam2_mask_generator with sam2_predict_lock: frame_mask_start = log_timing_start("SAM2_FRAME_GENERATE") masks = get_sam2_mask_generator().generate(frame_rgb) log_timing_end("SAM2_FRAME_GENERATE", frame_mask_start) if masks: best = max(masks, key=lambda m: int(m.get("area", 0))) seg = best.get("segmentation") if seg is not None: binary_mask = np.asarray(seg, dtype=bool) else: label_map, _ = generate_label_map(frame_rgb) candidates = rank_exterior_candidates(label_map, 8) if candidates: merged = merge_sam2_wall_fragments(label_map, 8) if np.any(merged): binary_mask = merged.astype(bool) else: best_idx = int(candidates[0]["mask_index"]) binary_mask = label_map == best_idx if bool(np.any(binary_mask)): frame_bgr = build_mask_overlay(frame_bgr, binary_mask, payload.overlay_alpha) highlighted_frames += 1 segmented_frames += 1 writer.write(frame_bgr) frame_idx += 1 cap.release() writer.release() return { "message": "Video segmentation completed", "input_filename": safe_name, "output_filename": out_filename, "output_url": f"/seg/output-video/{out_filename}", "total_frames": total_frames, "segmented_frames": segmented_frames, "highlighted_frames": highlighted_frames, "sample_every_n_frames": sample_every, "mask_mode": mask_mode, } finally: log_timing_end(step, started) try: release_resources() except Exception: logger.exception("Error releasing resources after SEGMENT_VIDEO") def segment_exterior_grabcut_sync(payload: ExteriorGrabCutRequest) -> dict[str, Any]: step = "EXTERIOR_GRABCUT" started = log_timing_start(step) try: safe_name, image_rgb = load_image_rgb_for_edit(payload.filename) image_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR) h, w = image_bgr.shape[:2] if payload.rect_xywh is not None: x, y, rw, rh = [int(v) for v in payload.rect_xywh] else: x, y, rw, rh = int(0.06 * w), int(0.10 * h), int(0.88 * w), int(0.84 * h) x = max(0, min(x, w - 2)) y = max(0, min(y, h - 2)) rw = max(2, min(rw, w - x)) rh = max(2, min(rh, h - y)) rect = (x, y, rw, rh) iter_count = max(1, min(int(payload.iterations), 12)) mask = np.zeros((h, w), np.uint8) bg_model = np.zeros((1, 65), np.float64) fg_model = np.zeros((1, 65), np.float64) cv2.grabCut(image_bgr, mask, rect, bg_model, fg_model, iter_count, cv2.GC_INIT_WITH_RECT) fg_mask = np.where((mask == cv2.GC_FGD) | (mask == cv2.GC_PR_FGD), 1, 0).astype(np.uint8) kernel = np.ones((5, 5), np.uint8) fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, kernel, iterations=1) fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel, iterations=1) if payload.use_sam2_hint: sam_label_map, _ = generate_label_map(image_rgb) sam_hint = merge_sam2_wall_fragments(sam_label_map, 8) if np.any(sam_hint): fg_mask = np.where((fg_mask > 0) | (sam_hint > 0), 1, 0).astype(np.uint8) if not np.any(fg_mask): raise HTTPException(status_code=400, detail="GrabCut did not find a foreground region") label_map, component_count, recommended_idx = build_component_label_map(fg_mask, min_area_ratio=0.012) if component_count == 0: label_map = np.where(fg_mask > 0, 1, 0).astype(np.uint8) component_count = 1 recommended_idx = 1 label_owner = f"{Path(safe_name).stem}_exterior_grabcut.jpg" saved_owner = save_label_map_for_owner(label_owner, label_map) preview = build_mask_overlay(image_bgr, fg_mask.astype(bool), 0.42, color_bgr=(15, 170, 245)) preview_filename = f"{Path(safe_name).stem}_exterior_grabcut_preview.jpg" if not cv2.imwrite(str(OUTPUT_DIR / preview_filename), preview): raise HTTPException(status_code=500, detail="Failed to save GrabCut preview image") area_ratio = float(np.count_nonzero(fg_mask)) / float(h * w) return { "message": "Exterior segmentation with GrabCut completed", "filename": safe_name, "original_filename_for_apply": saved_owner, "mask_count": component_count, "available_mask_indices": list(range(1, component_count + 1)), "recommended_mask_index": recommended_idx, "foreground_area_ratio": round(area_ratio, 6), "preview_filename": preview_filename, "preview_url": f"/seg/ai/{preview_filename}", "rect_xywh": [x, y, rw, rh], "iterations": iter_count, "used_sam2_hint": bool(payload.use_sam2_hint), } finally: log_timing_end(step, started) try: release_resources() except Exception: logger.exception("Error releasing resources after EXTERIOR_GRABCUT") def segment_exterior_hybrid_sync(payload: ExteriorHybridRequest) -> dict[str, Any]: step = "EXTERIOR_HYBRID" started = log_timing_start(step) try: safe_name, image_rgb = load_image_rgb_for_edit(payload.filename) image_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR) h, w = image_bgr.shape[:2] if payload.rect_xywh is not None: x, y, rw, rh = [int(v) for v in payload.rect_xywh] else: x, y, rw, rh = int(0.06 * w), int(0.10 * h), int(0.88 * w), int(0.84 * h) x = max(0, min(x, w - 2)) y = max(0, min(y, h - 2)) rw = max(2, min(rw, w - x)) rh = max(2, min(rh, h - y)) rect = (x, y, rw, rh) iter_count = max(1, min(int(payload.iterations), 12)) hint_mask = np.zeros((h, w), dtype=np.uint8) semantic_labels: list[str] = [] semantic_area_ratio = 0.0 if payload.use_semantic_hint: sem_mask, semantic_labels, semantic_area_ratio = semantic_exterior_mask(image_rgb, payload.semantic_keywords) hint_mask = np.where(sem_mask > 0, 1, hint_mask).astype(np.uint8) if payload.use_sam2_hint: sam_label_map, _ = generate_label_map(image_rgb) sam_hint = merge_sam2_wall_fragments(sam_label_map, 10) if np.any(sam_hint): hint_mask = np.where(sam_hint > 0, 1, hint_mask).astype(np.uint8) gc_mask = np.full((h, w), cv2.GC_PR_BGD, dtype=np.uint8) outside_rect = np.ones((h, w), dtype=bool) outside_rect[y : y + rh, x : x + rw] = False gc_mask[outside_rect] = cv2.GC_BGD if np.any(hint_mask): gc_mask[hint_mask > 0] = cv2.GC_PR_FGD sure_fg = cv2.erode((hint_mask * 255).astype(np.uint8), np.ones((7, 7), np.uint8), iterations=1) gc_mask[sure_fg > 0] = cv2.GC_FGD bg_model = np.zeros((1, 65), np.float64) fg_model = np.zeros((1, 65), np.float64) cv2.grabCut(image_bgr, gc_mask, rect, bg_model, fg_model, iter_count, cv2.GC_INIT_WITH_MASK) fg_mask = np.where((gc_mask == cv2.GC_FGD) | (gc_mask == cv2.GC_PR_FGD), 1, 0).astype(np.uint8) kernel = np.ones((5, 5), np.uint8) fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, kernel, iterations=1) fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel, iterations=1) if not np.any(fg_mask): raise HTTPException(status_code=400, detail="Hybrid exterior segmentation did not find a foreground region") label_map, component_count, recommended_idx = build_component_label_map(fg_mask, min_area_ratio=0.01) if component_count == 0: label_map = np.where(fg_mask > 0, 1, 0).astype(np.uint8) component_count = 1 recommended_idx = 1 label_owner = f"{Path(safe_name).stem}_exterior_hybrid.jpg" saved_owner = save_label_map_for_owner(label_owner, label_map) preview = build_mask_overlay(image_bgr, fg_mask.astype(bool), 0.44, color_bgr=(0, 180, 255)) preview_filename = f"{Path(safe_name).stem}_exterior_hybrid_preview.jpg" if not cv2.imwrite(str(OUTPUT_DIR / preview_filename), preview): raise HTTPException(status_code=500, detail="Failed to save hybrid preview image") area_ratio = float(np.count_nonzero(fg_mask)) / float(h * w) return { "message": "Hybrid exterior segmentation completed", "filename": safe_name, "original_filename_for_apply": saved_owner, "mask_count": component_count, "available_mask_indices": list(range(1, component_count + 1)), "recommended_mask_index": recommended_idx, "foreground_area_ratio": round(area_ratio, 6), "preview_filename": preview_filename, "preview_url": f"/seg/ai/{preview_filename}", "rect_xywh": [x, y, rw, rh], "iterations": iter_count, "used_sam2_hint": bool(payload.use_sam2_hint), "used_semantic_hint": bool(payload.use_semantic_hint), "semantic_labels": semantic_labels, "semantic_area_ratio": round(float(semantic_area_ratio), 6), } finally: log_timing_end(step, started) try: release_resources() except Exception: logger.exception("Error releasing resources after EXTERIOR_HYBRID") def segment_exterior_brick_sync(payload: ExteriorBrickRequest) -> dict[str, Any]: step = "EXTERIOR_BRICK" started = log_timing_start(step) try: safe_name, image_rgb = load_image_rgb_for_edit(payload.filename) image_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR) h, w = image_bgr.shape[:2] strength = max(1, min(int(payload.smooth_strength), 3)) smoothed_bgr = smooth_texture_for_segmentation(image_bgr, strength) smoothed_rgb = cv2.cvtColor(smoothed_bgr, cv2.COLOR_BGR2RGB) if payload.rect_xywh is not None: x, y, rw, rh = [int(v) for v in payload.rect_xywh] else: x, y, rw, rh = int(0.05 * w), int(0.08 * h), int(0.90 * w), int(0.86 * h) x = max(0, min(x, w - 2)) y = max(0, min(y, h - 2)) rw = max(2, min(rw, w - x)) rh = max(2, min(rh, h - y)) rect = (x, y, rw, rh) iter_count = max(1, min(int(payload.iterations), 12)) hint_mask = np.zeros((h, w), dtype=np.uint8) semantic_labels: list[str] = [] semantic_area_ratio = 0.0 if payload.use_semantic_hint: sem_mask, semantic_labels, semantic_area_ratio = semantic_exterior_mask(smoothed_rgb, payload.semantic_keywords) hint_mask = np.where(sem_mask > 0, 1, hint_mask).astype(np.uint8) sam_label_map, _ = generate_label_map(smoothed_rgb) merged_sam_mask = merge_sam2_wall_fragments(sam_label_map, int(payload.sam2_merge_top_k)) if np.any(merged_sam_mask): hint_mask = np.where(merged_sam_mask > 0, 1, hint_mask).astype(np.uint8) gc_mask = np.full((h, w), cv2.GC_PR_BGD, dtype=np.uint8) outside_rect = np.ones((h, w), dtype=bool) outside_rect[y : y + rh, x : x + rw] = False gc_mask[outside_rect] = cv2.GC_BGD if np.any(hint_mask): gc_mask[hint_mask > 0] = cv2.GC_PR_FGD sure_fg = cv2.erode((hint_mask * 255).astype(np.uint8), np.ones((9, 9), np.uint8), iterations=1) gc_mask[sure_fg > 0] = cv2.GC_FGD bg_model = np.zeros((1, 65), np.float64) fg_model = np.zeros((1, 65), np.float64) cv2.grabCut(image_bgr, gc_mask, rect, bg_model, fg_model, iter_count, cv2.GC_INIT_WITH_MASK) fg_mask = np.where((gc_mask == cv2.GC_FGD) | (gc_mask == cv2.GC_PR_FGD), 1, 0).astype(np.uint8) fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, np.ones((9, 9), np.uint8), iterations=1) fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, np.ones((5, 5), np.uint8), iterations=1) if not np.any(fg_mask): raise HTTPException(status_code=400, detail="Brick segmentation did not find a foreground region.") label_map_out, component_count, recommended_idx = build_component_label_map(fg_mask, min_area_ratio=0.01) if component_count == 0: label_map_out = np.where(fg_mask > 0, 1, 0).astype(np.uint8) component_count = 1 recommended_idx = 1 label_owner = f"{Path(safe_name).stem}_exterior_brick.jpg" saved_owner = save_label_map_for_owner(label_owner, label_map_out) preview = build_mask_overlay(image_bgr, fg_mask.astype(bool), 0.44, color_bgr=(20, 140, 255)) preview_filename = f"{Path(safe_name).stem}_exterior_brick_preview.jpg" if not cv2.imwrite(str(OUTPUT_DIR / preview_filename), preview): raise HTTPException(status_code=500, detail="Failed to save brick preview image") area_ratio = float(np.count_nonzero(fg_mask)) / float(h * w) material_analysis = separate_materials_by_label(label_map_out, image_rgb) return { "message": "Brick/masonry exterior segmentation completed", "filename": safe_name, "original_filename_for_apply": saved_owner, "mask_count": component_count, "available_mask_indices": list(range(1, component_count + 1)), "recommended_mask_index": recommended_idx, "foreground_area_ratio": round(area_ratio, 6), "preview_filename": preview_filename, "preview_url": f"/seg/ai/{preview_filename}", "rect_xywh": [x, y, rw, rh], "iterations": iter_count, "smooth_strength": strength, "sam2_merge_top_k": int(payload.sam2_merge_top_k), "semantic_labels": semantic_labels, "semantic_area_ratio": round(float(semantic_area_ratio), 6), "material_classification": { "brick_indices": material_analysis["brick_indices"], "smooth_indices": material_analysis["smooth_indices"], "mixed_indices": material_analysis["mixed_indices"], "analysis_by_label": material_analysis["analysis_by_label"], }, } finally: log_timing_end(step, started) try: release_resources() except Exception: logger.exception("Error releasing resources after EXTERIOR_BRICK") def segment_exterior_depth_sync(payload: ExteriorDepthRequest) -> dict[str, Any]: step = "EXTERIOR_DEPTH" started = log_timing_start(step) try: safe_name, image_rgb = load_image_rgb_for_edit(payload.filename) image_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR) h, w = image_bgr.shape[:2] target_name = normalize_exterior_target(payload.exterior_target) strength = max(1, min(int(payload.smooth_strength), 3)) smoothed_bgr = smooth_texture_for_segmentation(image_bgr, strength) smoothed_rgb = cv2.cvtColor(smoothed_bgr, cv2.COLOR_BGR2RGB) if payload.rect_xywh is not None: x, y, rw, rh = [int(v) for v in payload.rect_xywh] else: x, y, rw, rh = int(0.04 * w), int(0.06 * h), int(0.92 * w), int(0.88 * h) x = max(0, min(x, w - 2)) y = max(0, min(y, h - 2)) rw = max(2, min(rw, w - x)) rh = max(2, min(rh, h - y)) rect = (x, y, rw, rh) iter_count = max(1, min(int(payload.iterations), 12)) hint_mask = np.zeros((h, w), dtype=np.uint8) semantic_labels: list[str] = [] semantic_area_ratio = 0.0 depth_map_arr: np.ndarray | None = None if payload.use_semantic_hint: try: sem_mask, semantic_labels, semantic_area_ratio = semantic_exterior_mask(smoothed_rgb, payload.semantic_keywords) hint_mask = np.where(sem_mask > 0, 1, hint_mask).astype(np.uint8) except Exception: pass sam_label_map, _ = generate_label_map(smoothed_rgb) merged_sam_mask = merge_sam2_wall_fragments(sam_label_map, int(payload.sam2_merge_top_k)) if np.any(merged_sam_mask): hint_mask = np.where(merged_sam_mask > 0, 1, hint_mask).astype(np.uint8) if payload.use_depth_hint: try: depth_map_arr = estimate_depth_map(image_rgb) depth_mask = extract_depth_wall_mask(depth_map_arr, target=target_name) hint_mask = np.where(depth_mask > 0, 1, hint_mask).astype(np.uint8) except Exception: depth_map_arr = None gc_mask = np.full((h, w), cv2.GC_PR_BGD, dtype=np.uint8) outside_rect = np.ones((h, w), dtype=bool) outside_rect[y : y + rh, x : x + rw] = False gc_mask[outside_rect] = cv2.GC_BGD if np.any(hint_mask): gc_mask[hint_mask > 0] = cv2.GC_PR_FGD sure_fg = cv2.erode((hint_mask * 255).astype(np.uint8), np.ones((9, 9), np.uint8), iterations=1) gc_mask[sure_fg > 0] = cv2.GC_FGD if depth_map_arr is not None and payload.use_depth_hint: depth_f = depth_map_arr.astype(np.float32) upper_h = max(1, h // 4) sky_pct = float(np.percentile(depth_f[:upper_h, :], 35)) row_idx = np.arange(h, dtype=np.int32)[:, np.newaxis] definite_sky = np.asarray( (row_idx < upper_h) & (depth_f <= sky_pct * 1.1) & (gc_mask == cv2.GC_PR_BGD), dtype=bool, ) gc_mask[definite_sky] = cv2.GC_BGD bg_model = np.zeros((1, 65), np.float64) fg_model = np.zeros((1, 65), np.float64) cv2.grabCut(image_bgr, gc_mask, rect, bg_model, fg_model, iter_count, cv2.GC_INIT_WITH_MASK) fg_mask = np.where((gc_mask == cv2.GC_FGD) | (gc_mask == cv2.GC_PR_FGD), 1, 0).astype(np.uint8) fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, np.ones((9, 9), np.uint8), iterations=1) fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, np.ones((5, 5), np.uint8), iterations=1) if not np.any(fg_mask): raise HTTPException(status_code=400, detail="Depth-guided segmentation found no foreground region") label_map_out, component_count, recommended_idx = build_component_label_map(fg_mask, min_area_ratio=0.01) if component_count == 0: label_map_out = np.where(fg_mask > 0, 1, 0).astype(np.uint8) component_count = 1 recommended_idx = 1 label_owner = f"{Path(safe_name).stem}_exterior_depth.jpg" saved_owner = save_label_map_for_owner(label_owner, label_map_out) preview = build_mask_overlay(image_bgr, fg_mask.astype(bool), 0.44, color_bgr=(30, 120, 255)) preview_filename = f"{Path(safe_name).stem}_exterior_depth_preview.jpg" if not cv2.imwrite(str(OUTPUT_DIR / preview_filename), preview): raise HTTPException(status_code=500, detail="Failed to save depth preview") area_ratio = float(np.count_nonzero(fg_mask)) / float(h * w) return { "message": "Depth-guided exterior segmentation completed", "filename": safe_name, "original_filename_for_apply": saved_owner, "mask_count": component_count, "available_mask_indices": list(range(1, component_count + 1)), "recommended_mask_index": recommended_idx, "foreground_area_ratio": round(area_ratio, 6), "preview_filename": preview_filename, "preview_url": f"/seg/ai/{preview_filename}", "rect_xywh": [x, y, rw, rh], "iterations": iter_count, "exterior_target": target_name, "smooth_strength": strength, "used_semantic_hint": bool(payload.use_semantic_hint), "used_depth_hint": bool(payload.use_depth_hint), "semantic_labels": semantic_labels, "semantic_area_ratio": round(float(semantic_area_ratio), 6), } finally: log_timing_end(step, started) try: release_resources() except Exception: logger.exception("Error releasing resources after EXTERIOR_DEPTH")