| from pathlib import Path |
| from typing import Any, cast |
|
|
| import cv2 |
| import numpy as np |
| try: |
| import torch |
| _TORCH_AVAILABLE = True |
| except ImportError: |
| torch = None |
| _TORCH_AVAILABLE = False |
| from fastapi import HTTPException |
| from PIL import Image |
|
|
| from core.config import ( |
| DEPTH_MODEL_ID, |
| OUTPUT_DIR, |
| SEMANTIC_MODEL_ID, |
| log_timing_end, |
| log_timing_start, |
| logger, |
| ) |
| from models.schemas import ( |
| ExteriorBrickRequest, |
| ExteriorDepthRequest, |
| ExteriorGrabCutRequest, |
| ExteriorHybridRequest, |
| GuidedSegmentRequest, |
| SegmentVideoRequest, |
| ) |
| from services.image_service import load_image_rgb_for_edit, save_label_map_for_owner |
| from services.sam2_service import ( |
| SAM2_UNLOAD_AFTER_USE, |
| depth_load_lock, |
| get_sam2_image_predictor, |
| release_resources, |
| sam2_predict_lock, |
| semantic_load_lock, |
| ) |
| from services.scene_service import ( |
| build_component_label_map, |
| build_mask_overlay, |
| generate_label_map, |
| merge_sam2_wall_fragments, |
| normalize_exterior_target, |
| rank_exterior_candidates, |
| ) |
|
|
| try: |
| from transformers import ( |
| AutoImageProcessor, |
| DPTForDepthEstimation, |
| DPTImageProcessor, |
| SegformerForSemanticSegmentation, |
| ) |
| _TRANSFORMERS_AVAILABLE = True |
| except ImportError: |
| _TRANSFORMERS_AVAILABLE = False |
|
|
| import services.sam2_service as _sam2_svc |
|
|
|
|
| def parse_mask_index(mask_filename: str) -> int: |
| try: |
| parts = Path(mask_filename).stem.split("_") |
| return int(parts[-1]) |
| except (ValueError, IndexError): |
| raise HTTPException(status_code=400, detail="Invalid mask filename format") from None |
|
|
|
|
| def parse_rgb_color(color: str) -> tuple[int, int, int]: |
| color = str(color).strip().lstrip("#") |
| if len(color) != 6: |
| raise HTTPException(status_code=400, detail="Color must be a hex color like #RRGGBB") |
| try: |
| r = int(color[0:2], 16) |
| g = int(color[2:4], 16) |
| b = int(color[4:6], 16) |
| return (r, g, b) |
| except ValueError as exc: |
| raise HTTPException(status_code=400, detail="Invalid hex color format") from exc |
|
|
|
|
| def generate_guided_label_map( |
| image_rgb: np.ndarray, |
| point_coords: list[list[float]], |
| point_labels: list[int], |
| box_xyxy: list[float], |
| multimask_output: bool = False, |
| ) -> tuple[np.ndarray, list[float]]: |
| predictor = get_sam2_image_predictor() |
| started = log_timing_start("SAM2_PREDICT") |
| with sam2_predict_lock: |
| predictor.set_image(image_rgb) |
|
|
| input_points = np.array(point_coords, dtype=np.float32) if point_coords and point_labels else None |
| input_labels = np.array(point_labels, dtype=np.int32) if point_coords and point_labels else None |
| input_box = np.array(box_xyxy, dtype=np.float32) if box_xyxy and len(box_xyxy) == 4 else None |
|
|
| masks, scores, _ = predictor.predict( |
| point_coords=input_points, |
| point_labels=input_labels, |
| box=input_box, |
| multimask_output=multimask_output, |
| ) |
|
|
| sorted_indices = np.argsort(scores)[::-1] |
| label_map = np.zeros((image_rgb.shape[0], image_rgb.shape[1]), dtype=np.uint8) |
| for idx, orig_idx in enumerate(sorted_indices, start=1): |
| mask = masks[orig_idx] |
| label_map[np.asarray(mask, dtype=bool)] = idx |
|
|
| ranked_scores = [float(scores[i]) for i in sorted_indices] |
| try: |
| log_timing_end("SAM2_PREDICT", started) |
| logger.info(f"[SAM2_PREDICT] masks={len(masks)} top_scores={ranked_scores[:3]}") |
| except Exception: |
| pass |
|
|
| if SAM2_UNLOAD_AFTER_USE: |
| try: |
| release_resources(full_unload=True) |
| except Exception: |
| logger.exception("Error unloading SAM after generate_guided_label_map") |
| return label_map, ranked_scores |
|
|
|
|
| def analyze_material_texture_complexity(binary_mask: np.ndarray, image_rgb: np.ndarray) -> float: |
| mask_u8 = (binary_mask > 0).astype(np.uint8) |
| if mask_u8.max() == 0: |
| return 0.0 |
| masked_region = image_rgb.copy() |
| masked_region[mask_u8 == 0] = [128, 128, 128] |
| gray = cv2.cvtColor(masked_region, cv2.COLOR_RGB2GRAY) |
| edges = np.asarray(cv2.Canny(gray, 50, 150), dtype=np.uint8) |
| edge_pixels = int(np.count_nonzero((edges > 0) & (mask_u8 > 0))) |
| mask_pixels = np.count_nonzero(mask_u8) |
| if mask_pixels == 0: |
| return 0.0 |
| edge_density = edge_pixels / float(mask_pixels) |
| return float(np.clip(edge_density / 0.35, 0.0, 1.0)) |
|
|
|
|
| def analyze_material_color(binary_mask: np.ndarray, image_rgb: np.ndarray) -> tuple[float, dict[str, float]]: |
| mask_u8 = (binary_mask > 0).astype(np.uint8) |
| if mask_u8.max() == 0: |
| return 0.0, {} |
| masked_rgb = image_rgb[mask_u8 > 0] |
| if masked_rgb.shape[0] == 0: |
| return 0.0, {} |
| masked_rgb_img = Image.fromarray(masked_rgb.reshape(-1, 1, 3).astype(np.uint8)) |
| masked_hsv = cv2.cvtColor(np.array(masked_rgb_img), cv2.COLOR_RGB2HSV) |
| h = masked_hsv[:, :, 0] |
| s = masked_hsv[:, :, 1] |
| v = masked_hsv[:, :, 2] |
| brick_hue_mask = ((h <= 15) | (h >= 165)) |
| brick_sat_mask = (s > 40) |
| brick_val_mask = (v > 40) |
| brick_pixels = np.count_nonzero(brick_hue_mask & brick_sat_mask & brick_val_mask) |
| total_pixels = h.size |
| brick_score = brick_pixels / float(max(1, total_pixels)) |
| smooth_hue_mask = ((h >= 15) & (h <= 60)) | ((h >= 70) & (h <= 140)) |
| smooth_sat_mask = (s < 60) |
| smooth_pixels = np.count_nonzero(smooth_hue_mask & smooth_sat_mask) |
| smooth_score = smooth_pixels / float(max(1, total_pixels)) |
| stats: dict[str, float] = { |
| "brick_score": float(brick_score), |
| "smooth_score": float(smooth_score), |
| "mean_h": float(np.mean(h)), |
| "mean_s": float(np.mean(s)), |
| "mean_v": float(np.mean(v)), |
| } |
| return brick_score, stats |
|
|
|
|
| def classify_segment_material(binary_mask: np.ndarray, image_rgb: np.ndarray) -> tuple[str, dict[str, Any]]: |
| texture_score = analyze_material_texture_complexity(binary_mask, image_rgb) |
| brick_color_score, color_stats = analyze_material_color(binary_mask, image_rgb) |
| combined_brick_score = (0.6 * texture_score) + (0.4 * brick_color_score) |
| analysis: dict[str, Any] = { |
| "texture_score": round(texture_score, 4), |
| "color_brick_score": round(brick_color_score, 4), |
| "combined_score": round(combined_brick_score, 4), |
| "color_stats": {k: round(v, 2) for k, v in color_stats.items()}, |
| } |
| if combined_brick_score >= 0.55: |
| material = "brick" |
| elif combined_brick_score <= 0.35: |
| material = "smooth" |
| else: |
| material = "mixed" |
| analysis["material_type"] = material |
| return material, analysis |
|
|
|
|
| def separate_materials_by_label(label_map: np.ndarray, image_rgb: np.ndarray) -> dict[str, Any]: |
| unique_labels = np.unique(label_map[label_map > 0]) |
| brick_indices: list[int] = [] |
| smooth_indices: list[int] = [] |
| mixed_indices: list[int] = [] |
| analysis_by_label: dict[int, dict[str, Any]] = {} |
| for label_idx in unique_labels.tolist(): |
| binary_mask = (label_map == label_idx).astype(np.uint8) |
| material, analysis = classify_segment_material(binary_mask, image_rgb) |
| analysis_by_label[int(label_idx)] = analysis |
| if material == "brick": |
| brick_indices.append(int(label_idx)) |
| elif material == "smooth": |
| smooth_indices.append(int(label_idx)) |
| else: |
| mixed_indices.append(int(label_idx)) |
| return { |
| "brick_indices": brick_indices, |
| "smooth_indices": smooth_indices, |
| "mixed_indices": mixed_indices, |
| "analysis_by_label": analysis_by_label, |
| } |
|
|
|
|
| def smooth_texture_for_segmentation(image_bgr: np.ndarray, strength: int) -> np.ndarray: |
| strength = max(1, min(strength, 3)) |
| smoothed = image_bgr.copy() |
| d = 15 |
| sigma_color = 55 * strength |
| sigma_space = 55 * strength |
| for _ in range(strength): |
| smoothed = cv2.bilateralFilter(smoothed, d, float(sigma_color), float(sigma_space)) |
| return smoothed |
|
|
|
|
| def get_semantic_segmenter() -> tuple[Any, Any]: |
| svc = _sam2_svc |
| if svc.semantic_processor is not None and svc.semantic_model is not None: |
| return svc.semantic_processor, svc.semantic_model |
|
|
| with semantic_load_lock: |
| if svc.semantic_processor is not None and svc.semantic_model is not None: |
| return svc.semantic_processor, svc.semantic_model |
|
|
| started = log_timing_start("SEMANTIC_LOAD") |
| try: |
| svc.semantic_processor = cast(Any, AutoImageProcessor.from_pretrained(SEMANTIC_MODEL_ID)) |
| svc.semantic_model = cast(Any, SegformerForSemanticSegmentation.from_pretrained(SEMANTIC_MODEL_ID)) |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| try: |
| svc.semantic_model = svc.semantic_model.to(device) |
| except Exception: |
| pass |
| svc.semantic_model.eval() |
| svc.semantic_load_error = None |
| logger.info(f"[SEMANTIC] loaded on {device}") |
| except Exception as exc: |
| svc.semantic_processor = None |
| svc.semantic_model = None |
| svc.semantic_load_error = str(exc) |
| raise HTTPException(status_code=500, detail=f"Failed to load semantic model: {exc}") from exc |
| finally: |
| log_timing_end("SEMANTIC_LOAD", started) |
|
|
| return cast(tuple[Any, Any], (svc.semantic_processor, svc.semantic_model)) |
|
|
|
|
| def semantic_exterior_mask( |
| image_rgb: np.ndarray, |
| semantic_keywords: tuple[str, ...], |
| ) -> tuple[np.ndarray, list[str], float]: |
| processor, model = get_semantic_segmenter() |
| pil_image = Image.fromarray(image_rgb) |
|
|
| with torch.no_grad(): |
| inputs = processor(images=pil_image, return_tensors="pt") |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| try: |
| inputs = {k: (v.to(device) if isinstance(v, torch.Tensor) else v) for k, v in inputs.items()} |
| except Exception: |
| pass |
| outputs = model(**inputs) |
| logits = outputs.logits |
| upsampled_logits = torch.nn.functional.interpolate( |
| logits, size=image_rgb.shape[:2], mode="bilinear", align_corners=False, |
| ) |
| pred = upsampled_logits.argmax(dim=1)[0].cpu().numpy().astype(np.int32) |
|
|
| id2label = getattr(getattr(model, "config", None), "id2label", {}) or {} |
| keywords = [k.strip().lower() for k in semantic_keywords if k.strip()] |
| if not keywords: |
| keywords = ["building", "wall", "house", "roof", "facade"] |
|
|
| matched_ids: list[int] = [] |
| matched_labels: list[str] = [] |
| for class_id_raw, class_name_raw in id2label.items(): |
| try: |
| class_id = int(class_id_raw) |
| except Exception: |
| continue |
| class_name = str(class_name_raw).lower() |
| if any(token in class_name for token in keywords): |
| matched_ids.append(class_id) |
| matched_labels.append(str(class_name_raw)) |
|
|
| semantic_mask = np.zeros(pred.shape, dtype=np.uint8) |
| if matched_ids: |
| semantic_mask = np.isin(pred, np.asarray(matched_ids, dtype=np.int32)).astype(np.uint8) |
|
|
| if not np.any(semantic_mask): |
| class_ids, counts = np.unique(pred, return_counts=True) |
| sorted_pairs = sorted(zip(class_ids.tolist(), counts.tolist()), key=lambda it: it[1], reverse=True) |
| fallback_ids: list[int] = [] |
| for class_id, _count in sorted_pairs: |
| name = str(id2label.get(class_id, class_id)).lower() |
| if any(bad in name for bad in ("sky", "road", "grass", "tree", "plant", "water", "person", "car")): |
| continue |
| fallback_ids.append(int(class_id)) |
| if len(fallback_ids) >= 2: |
| break |
| if fallback_ids: |
| semantic_mask = np.isin(pred, np.asarray(fallback_ids, dtype=np.int32)).astype(np.uint8) |
| matched_labels = [str(id2label.get(i, i)) for i in fallback_ids] |
|
|
| area_ratio = float(np.count_nonzero(semantic_mask)) / float(pred.shape[0] * pred.shape[1]) |
| return semantic_mask.astype(np.uint8), matched_labels, area_ratio |
|
|
|
|
| def get_depth_estimator() -> tuple[Any, Any]: |
| svc = _sam2_svc |
| if svc.depth_processor is not None and svc.depth_model is not None: |
| return svc.depth_processor, svc.depth_model |
| with depth_load_lock: |
| if svc.depth_processor is not None and svc.depth_model is not None: |
| return svc.depth_processor, svc.depth_model |
| started = log_timing_start("DEPTH_LOAD") |
| try: |
| svc.depth_processor = cast(Any, DPTImageProcessor.from_pretrained(DEPTH_MODEL_ID)) |
| svc.depth_model = cast(Any, DPTForDepthEstimation.from_pretrained(DEPTH_MODEL_ID)) |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| try: |
| svc.depth_model = svc.depth_model.to(device) |
| except Exception: |
| pass |
| svc.depth_model.eval() |
| svc.depth_load_error = None |
| logger.info(f"[DEPTH] loaded on {device}") |
| except Exception as exc: |
| svc.depth_processor = None |
| svc.depth_model = None |
| svc.depth_load_error = str(exc) |
| raise HTTPException(status_code=500, detail=f"Failed to load depth model: {exc}") from exc |
| finally: |
| log_timing_end("DEPTH_LOAD", started) |
| return cast(tuple[Any, Any], (svc.depth_processor, svc.depth_model)) |
|
|
|
|
| def estimate_depth_map(image_rgb: np.ndarray) -> np.ndarray: |
| processor, model = get_depth_estimator() |
| h, w = image_rgb.shape[:2] |
| pil_image = Image.fromarray(image_rgb) |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| with torch.no_grad(): |
| inputs = processor(images=pil_image, return_tensors="pt") |
| try: |
| inputs = {k: (v.to(device) if isinstance(v, torch.Tensor) else v) for k, v in inputs.items()} |
| except Exception: |
| pass |
| outputs = model(**inputs) |
| predicted_depth = outputs.predicted_depth |
| depth = torch.nn.functional.interpolate( |
| predicted_depth.unsqueeze(1), size=(h, w), mode="bicubic", align_corners=False, |
| ).squeeze().cpu().numpy() |
| d_min, d_max = float(depth.min()), float(depth.max()) |
| if d_max - d_min < 1e-8: |
| return np.zeros((h, w), dtype=np.uint8) |
| return ((depth - d_min) / (d_max - d_min) * 255.0).astype(np.uint8) |
|
|
|
|
| def extract_depth_wall_mask(depth_map: np.ndarray, target: str = "wall") -> np.ndarray: |
| h, w = depth_map.shape[:2] |
| depth_f = depth_map.astype(np.float32) |
| upper_h = max(1, h // 4) |
| upper_region = depth_f[:upper_h, :] |
| sky_pct = float(np.percentile(upper_region, 35)) |
| sky_mask = (depth_f <= sky_pct * 1.1).astype(np.uint8) |
|
|
| if target == "roof": |
| roi = np.zeros((h, w), dtype=np.uint8) |
| roi[: int(h * 0.55), :] = 1 |
| upper_half = depth_f[: int(h * 0.55), :] |
| non_sky_vals = upper_half[upper_half > sky_pct] |
| if non_sky_vals.size == 0: |
| return roi |
| low = float(np.percentile(non_sky_vals, 5)) |
| high = float(np.percentile(non_sky_vals, 88)) |
| depth_range_mask = ((depth_f >= low) & (depth_f <= high)).astype(np.uint8) |
| candidate = (depth_range_mask & roi & (sky_mask == 0)).astype(np.uint8) |
| else: |
| cy_lo, cy_hi = int(h * 0.20), int(h * 0.85) |
| cx_lo, cx_hi = int(w * 0.10), int(w * 0.90) |
| center_region = depth_f[cy_lo:cy_hi, cx_lo:cx_hi] |
| non_sky = center_region[center_region > sky_pct] |
| if non_sky.size == 0: |
| non_sky = center_region.flatten() |
| low = float(np.percentile(non_sky, 8)) |
| high = float(np.percentile(non_sky, 90)) |
| depth_range_mask = ((depth_f >= low) & (depth_f <= high)).astype(np.uint8) |
| candidate = (depth_range_mask & (sky_mask == 0)).astype(np.uint8) |
|
|
| candidate = cv2.morphologyEx(candidate, cv2.MORPH_OPEN, np.ones((5, 5), np.uint8), iterations=1) |
| candidate = cv2.morphologyEx(candidate, cv2.MORPH_CLOSE, np.ones((9, 9), np.uint8), iterations=2) |
| return candidate |
|
|
|
|
| def segment_video_sync(payload: SegmentVideoRequest) -> dict[str, Any]: |
| from core.config import VIDEO_OUTPUT_DIR, VIDEO_UPLOAD_DIR |
|
|
| step = "SEGMENT_VIDEO" |
| started = log_timing_start(step) |
| try: |
| safe_name = Path(payload.filename).name |
| if not safe_name: |
| raise HTTPException(status_code=400, detail="Invalid filename") |
|
|
| video_path = VIDEO_UPLOAD_DIR / safe_name |
| if not video_path.exists() or not video_path.is_file(): |
| raise HTTPException(status_code=404, detail=f"Video not found: {safe_name}") |
|
|
| sample_every = max(1, int(payload.sample_every_n_frames)) |
| max_frames = max(1, min(int(payload.max_frames_to_segment), 3000)) |
| mask_mode = str(payload.mask_mode).strip().lower() |
| if mask_mode not in {"exterior", "largest"}: |
| raise HTTPException(status_code=400, detail="mask_mode must be 'exterior' or 'largest'") |
|
|
| cap = cv2.VideoCapture(str(video_path)) |
| if not cap.isOpened(): |
| raise HTTPException(status_code=400, detail="Video could not be opened") |
|
|
| fps = cap.get(cv2.CAP_PROP_FPS) |
| if not fps or fps <= 0: |
| fps = 24.0 |
|
|
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0) |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0) |
| if width <= 0 or height <= 0: |
| cap.release() |
| raise HTTPException(status_code=400, detail="Video has invalid dimensions") |
|
|
| stem = Path(safe_name).stem |
| out_filename = f"{stem}_sam2_overlay.mp4" |
| out_path = VIDEO_OUTPUT_DIR / out_filename |
|
|
| fourcc_fn = getattr(cv2, "VideoWriter_fourcc") |
| writer = cv2.VideoWriter(str(out_path), int(fourcc_fn(*"mp4v")), float(fps), (width, height)) |
| if not writer.isOpened(): |
| cap.release() |
| raise HTTPException(status_code=500, detail="Failed to create output video") |
|
|
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) |
| frame_idx = 0 |
| segmented_frames = 0 |
| highlighted_frames = 0 |
|
|
| while True: |
| ok, frame_bgr = cap.read() |
| if not ok: |
| break |
|
|
| should_segment = (frame_idx % sample_every == 0) and (segmented_frames < max_frames) |
| if should_segment: |
| frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) |
| binary_mask = np.zeros((height, width), dtype=bool) |
|
|
| if mask_mode == "largest": |
| from services.sam2_service import get_sam2_mask_generator |
| with sam2_predict_lock: |
| frame_mask_start = log_timing_start("SAM2_FRAME_GENERATE") |
| masks = get_sam2_mask_generator().generate(frame_rgb) |
| log_timing_end("SAM2_FRAME_GENERATE", frame_mask_start) |
| if masks: |
| best = max(masks, key=lambda m: int(m.get("area", 0))) |
| seg = best.get("segmentation") |
| if seg is not None: |
| binary_mask = np.asarray(seg, dtype=bool) |
| else: |
| label_map, _ = generate_label_map(frame_rgb) |
| candidates = rank_exterior_candidates(label_map, 8) |
| if candidates: |
| merged = merge_sam2_wall_fragments(label_map, 8) |
| if np.any(merged): |
| binary_mask = merged.astype(bool) |
| else: |
| best_idx = int(candidates[0]["mask_index"]) |
| binary_mask = label_map == best_idx |
|
|
| if bool(np.any(binary_mask)): |
| frame_bgr = build_mask_overlay(frame_bgr, binary_mask, payload.overlay_alpha) |
| highlighted_frames += 1 |
|
|
| segmented_frames += 1 |
|
|
| writer.write(frame_bgr) |
| frame_idx += 1 |
|
|
| cap.release() |
| writer.release() |
|
|
| return { |
| "message": "Video segmentation completed", |
| "input_filename": safe_name, |
| "output_filename": out_filename, |
| "output_url": f"/seg/output-video/{out_filename}", |
| "total_frames": total_frames, |
| "segmented_frames": segmented_frames, |
| "highlighted_frames": highlighted_frames, |
| "sample_every_n_frames": sample_every, |
| "mask_mode": mask_mode, |
| } |
| finally: |
| log_timing_end(step, started) |
| try: |
| release_resources() |
| except Exception: |
| logger.exception("Error releasing resources after SEGMENT_VIDEO") |
|
|
|
|
| def segment_exterior_grabcut_sync(payload: ExteriorGrabCutRequest) -> dict[str, Any]: |
| step = "EXTERIOR_GRABCUT" |
| started = log_timing_start(step) |
| try: |
| safe_name, image_rgb = load_image_rgb_for_edit(payload.filename) |
| image_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR) |
| h, w = image_bgr.shape[:2] |
|
|
| if payload.rect_xywh is not None: |
| x, y, rw, rh = [int(v) for v in payload.rect_xywh] |
| else: |
| x, y, rw, rh = int(0.06 * w), int(0.10 * h), int(0.88 * w), int(0.84 * h) |
|
|
| x = max(0, min(x, w - 2)) |
| y = max(0, min(y, h - 2)) |
| rw = max(2, min(rw, w - x)) |
| rh = max(2, min(rh, h - y)) |
| rect = (x, y, rw, rh) |
| iter_count = max(1, min(int(payload.iterations), 12)) |
|
|
| mask = np.zeros((h, w), np.uint8) |
| bg_model = np.zeros((1, 65), np.float64) |
| fg_model = np.zeros((1, 65), np.float64) |
| cv2.grabCut(image_bgr, mask, rect, bg_model, fg_model, iter_count, cv2.GC_INIT_WITH_RECT) |
| fg_mask = np.where((mask == cv2.GC_FGD) | (mask == cv2.GC_PR_FGD), 1, 0).astype(np.uint8) |
|
|
| kernel = np.ones((5, 5), np.uint8) |
| fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, kernel, iterations=1) |
| fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel, iterations=1) |
|
|
| if payload.use_sam2_hint: |
| sam_label_map, _ = generate_label_map(image_rgb) |
| sam_hint = merge_sam2_wall_fragments(sam_label_map, 8) |
| if np.any(sam_hint): |
| fg_mask = np.where((fg_mask > 0) | (sam_hint > 0), 1, 0).astype(np.uint8) |
|
|
| if not np.any(fg_mask): |
| raise HTTPException(status_code=400, detail="GrabCut did not find a foreground region") |
|
|
| label_map, component_count, recommended_idx = build_component_label_map(fg_mask, min_area_ratio=0.012) |
| if component_count == 0: |
| label_map = np.where(fg_mask > 0, 1, 0).astype(np.uint8) |
| component_count = 1 |
| recommended_idx = 1 |
|
|
| label_owner = f"{Path(safe_name).stem}_exterior_grabcut.jpg" |
| saved_owner = save_label_map_for_owner(label_owner, label_map) |
|
|
| preview = build_mask_overlay(image_bgr, fg_mask.astype(bool), 0.42, color_bgr=(15, 170, 245)) |
| preview_filename = f"{Path(safe_name).stem}_exterior_grabcut_preview.jpg" |
| if not cv2.imwrite(str(OUTPUT_DIR / preview_filename), preview): |
| raise HTTPException(status_code=500, detail="Failed to save GrabCut preview image") |
|
|
| area_ratio = float(np.count_nonzero(fg_mask)) / float(h * w) |
| return { |
| "message": "Exterior segmentation with GrabCut completed", |
| "filename": safe_name, |
| "original_filename_for_apply": saved_owner, |
| "mask_count": component_count, |
| "available_mask_indices": list(range(1, component_count + 1)), |
| "recommended_mask_index": recommended_idx, |
| "foreground_area_ratio": round(area_ratio, 6), |
| "preview_filename": preview_filename, |
| "preview_url": f"/seg/ai/{preview_filename}", |
| "rect_xywh": [x, y, rw, rh], |
| "iterations": iter_count, |
| "used_sam2_hint": bool(payload.use_sam2_hint), |
| } |
| finally: |
| log_timing_end(step, started) |
| try: |
| release_resources() |
| except Exception: |
| logger.exception("Error releasing resources after EXTERIOR_GRABCUT") |
|
|
|
|
| def segment_exterior_hybrid_sync(payload: ExteriorHybridRequest) -> dict[str, Any]: |
| step = "EXTERIOR_HYBRID" |
| started = log_timing_start(step) |
| try: |
| safe_name, image_rgb = load_image_rgb_for_edit(payload.filename) |
| image_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR) |
| h, w = image_bgr.shape[:2] |
|
|
| if payload.rect_xywh is not None: |
| x, y, rw, rh = [int(v) for v in payload.rect_xywh] |
| else: |
| x, y, rw, rh = int(0.06 * w), int(0.10 * h), int(0.88 * w), int(0.84 * h) |
|
|
| x = max(0, min(x, w - 2)) |
| y = max(0, min(y, h - 2)) |
| rw = max(2, min(rw, w - x)) |
| rh = max(2, min(rh, h - y)) |
| rect = (x, y, rw, rh) |
| iter_count = max(1, min(int(payload.iterations), 12)) |
| hint_mask = np.zeros((h, w), dtype=np.uint8) |
| semantic_labels: list[str] = [] |
| semantic_area_ratio = 0.0 |
|
|
| if payload.use_semantic_hint: |
| sem_mask, semantic_labels, semantic_area_ratio = semantic_exterior_mask(image_rgb, payload.semantic_keywords) |
| hint_mask = np.where(sem_mask > 0, 1, hint_mask).astype(np.uint8) |
|
|
| if payload.use_sam2_hint: |
| sam_label_map, _ = generate_label_map(image_rgb) |
| sam_hint = merge_sam2_wall_fragments(sam_label_map, 10) |
| if np.any(sam_hint): |
| hint_mask = np.where(sam_hint > 0, 1, hint_mask).astype(np.uint8) |
|
|
| gc_mask = np.full((h, w), cv2.GC_PR_BGD, dtype=np.uint8) |
| outside_rect = np.ones((h, w), dtype=bool) |
| outside_rect[y : y + rh, x : x + rw] = False |
| gc_mask[outside_rect] = cv2.GC_BGD |
|
|
| if np.any(hint_mask): |
| gc_mask[hint_mask > 0] = cv2.GC_PR_FGD |
| sure_fg = cv2.erode((hint_mask * 255).astype(np.uint8), np.ones((7, 7), np.uint8), iterations=1) |
| gc_mask[sure_fg > 0] = cv2.GC_FGD |
|
|
| bg_model = np.zeros((1, 65), np.float64) |
| fg_model = np.zeros((1, 65), np.float64) |
| cv2.grabCut(image_bgr, gc_mask, rect, bg_model, fg_model, iter_count, cv2.GC_INIT_WITH_MASK) |
|
|
| fg_mask = np.where((gc_mask == cv2.GC_FGD) | (gc_mask == cv2.GC_PR_FGD), 1, 0).astype(np.uint8) |
| kernel = np.ones((5, 5), np.uint8) |
| fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, kernel, iterations=1) |
| fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel, iterations=1) |
|
|
| if not np.any(fg_mask): |
| raise HTTPException(status_code=400, detail="Hybrid exterior segmentation did not find a foreground region") |
|
|
| label_map, component_count, recommended_idx = build_component_label_map(fg_mask, min_area_ratio=0.01) |
| if component_count == 0: |
| label_map = np.where(fg_mask > 0, 1, 0).astype(np.uint8) |
| component_count = 1 |
| recommended_idx = 1 |
|
|
| label_owner = f"{Path(safe_name).stem}_exterior_hybrid.jpg" |
| saved_owner = save_label_map_for_owner(label_owner, label_map) |
|
|
| preview = build_mask_overlay(image_bgr, fg_mask.astype(bool), 0.44, color_bgr=(0, 180, 255)) |
| preview_filename = f"{Path(safe_name).stem}_exterior_hybrid_preview.jpg" |
| if not cv2.imwrite(str(OUTPUT_DIR / preview_filename), preview): |
| raise HTTPException(status_code=500, detail="Failed to save hybrid preview image") |
|
|
| area_ratio = float(np.count_nonzero(fg_mask)) / float(h * w) |
| return { |
| "message": "Hybrid exterior segmentation completed", |
| "filename": safe_name, |
| "original_filename_for_apply": saved_owner, |
| "mask_count": component_count, |
| "available_mask_indices": list(range(1, component_count + 1)), |
| "recommended_mask_index": recommended_idx, |
| "foreground_area_ratio": round(area_ratio, 6), |
| "preview_filename": preview_filename, |
| "preview_url": f"/seg/ai/{preview_filename}", |
| "rect_xywh": [x, y, rw, rh], |
| "iterations": iter_count, |
| "used_sam2_hint": bool(payload.use_sam2_hint), |
| "used_semantic_hint": bool(payload.use_semantic_hint), |
| "semantic_labels": semantic_labels, |
| "semantic_area_ratio": round(float(semantic_area_ratio), 6), |
| } |
| finally: |
| log_timing_end(step, started) |
| try: |
| release_resources() |
| except Exception: |
| logger.exception("Error releasing resources after EXTERIOR_HYBRID") |
|
|
|
|
| def segment_exterior_brick_sync(payload: ExteriorBrickRequest) -> dict[str, Any]: |
| step = "EXTERIOR_BRICK" |
| started = log_timing_start(step) |
| try: |
| safe_name, image_rgb = load_image_rgb_for_edit(payload.filename) |
| image_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR) |
| h, w = image_bgr.shape[:2] |
|
|
| strength = max(1, min(int(payload.smooth_strength), 3)) |
| smoothed_bgr = smooth_texture_for_segmentation(image_bgr, strength) |
| smoothed_rgb = cv2.cvtColor(smoothed_bgr, cv2.COLOR_BGR2RGB) |
|
|
| if payload.rect_xywh is not None: |
| x, y, rw, rh = [int(v) for v in payload.rect_xywh] |
| else: |
| x, y, rw, rh = int(0.05 * w), int(0.08 * h), int(0.90 * w), int(0.86 * h) |
|
|
| x = max(0, min(x, w - 2)) |
| y = max(0, min(y, h - 2)) |
| rw = max(2, min(rw, w - x)) |
| rh = max(2, min(rh, h - y)) |
| rect = (x, y, rw, rh) |
| iter_count = max(1, min(int(payload.iterations), 12)) |
| hint_mask = np.zeros((h, w), dtype=np.uint8) |
| semantic_labels: list[str] = [] |
| semantic_area_ratio = 0.0 |
|
|
| if payload.use_semantic_hint: |
| sem_mask, semantic_labels, semantic_area_ratio = semantic_exterior_mask(smoothed_rgb, payload.semantic_keywords) |
| hint_mask = np.where(sem_mask > 0, 1, hint_mask).astype(np.uint8) |
|
|
| sam_label_map, _ = generate_label_map(smoothed_rgb) |
| merged_sam_mask = merge_sam2_wall_fragments(sam_label_map, int(payload.sam2_merge_top_k)) |
| if np.any(merged_sam_mask): |
| hint_mask = np.where(merged_sam_mask > 0, 1, hint_mask).astype(np.uint8) |
|
|
| gc_mask = np.full((h, w), cv2.GC_PR_BGD, dtype=np.uint8) |
| outside_rect = np.ones((h, w), dtype=bool) |
| outside_rect[y : y + rh, x : x + rw] = False |
| gc_mask[outside_rect] = cv2.GC_BGD |
| if np.any(hint_mask): |
| gc_mask[hint_mask > 0] = cv2.GC_PR_FGD |
| sure_fg = cv2.erode((hint_mask * 255).astype(np.uint8), np.ones((9, 9), np.uint8), iterations=1) |
| gc_mask[sure_fg > 0] = cv2.GC_FGD |
|
|
| bg_model = np.zeros((1, 65), np.float64) |
| fg_model = np.zeros((1, 65), np.float64) |
| cv2.grabCut(image_bgr, gc_mask, rect, bg_model, fg_model, iter_count, cv2.GC_INIT_WITH_MASK) |
|
|
| fg_mask = np.where((gc_mask == cv2.GC_FGD) | (gc_mask == cv2.GC_PR_FGD), 1, 0).astype(np.uint8) |
| fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, np.ones((9, 9), np.uint8), iterations=1) |
| fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, np.ones((5, 5), np.uint8), iterations=1) |
|
|
| if not np.any(fg_mask): |
| raise HTTPException(status_code=400, detail="Brick segmentation did not find a foreground region.") |
|
|
| label_map_out, component_count, recommended_idx = build_component_label_map(fg_mask, min_area_ratio=0.01) |
| if component_count == 0: |
| label_map_out = np.where(fg_mask > 0, 1, 0).astype(np.uint8) |
| component_count = 1 |
| recommended_idx = 1 |
|
|
| label_owner = f"{Path(safe_name).stem}_exterior_brick.jpg" |
| saved_owner = save_label_map_for_owner(label_owner, label_map_out) |
|
|
| preview = build_mask_overlay(image_bgr, fg_mask.astype(bool), 0.44, color_bgr=(20, 140, 255)) |
| preview_filename = f"{Path(safe_name).stem}_exterior_brick_preview.jpg" |
| if not cv2.imwrite(str(OUTPUT_DIR / preview_filename), preview): |
| raise HTTPException(status_code=500, detail="Failed to save brick preview image") |
|
|
| area_ratio = float(np.count_nonzero(fg_mask)) / float(h * w) |
| material_analysis = separate_materials_by_label(label_map_out, image_rgb) |
|
|
| return { |
| "message": "Brick/masonry exterior segmentation completed", |
| "filename": safe_name, |
| "original_filename_for_apply": saved_owner, |
| "mask_count": component_count, |
| "available_mask_indices": list(range(1, component_count + 1)), |
| "recommended_mask_index": recommended_idx, |
| "foreground_area_ratio": round(area_ratio, 6), |
| "preview_filename": preview_filename, |
| "preview_url": f"/seg/ai/{preview_filename}", |
| "rect_xywh": [x, y, rw, rh], |
| "iterations": iter_count, |
| "smooth_strength": strength, |
| "sam2_merge_top_k": int(payload.sam2_merge_top_k), |
| "semantic_labels": semantic_labels, |
| "semantic_area_ratio": round(float(semantic_area_ratio), 6), |
| "material_classification": { |
| "brick_indices": material_analysis["brick_indices"], |
| "smooth_indices": material_analysis["smooth_indices"], |
| "mixed_indices": material_analysis["mixed_indices"], |
| "analysis_by_label": material_analysis["analysis_by_label"], |
| }, |
| } |
| finally: |
| log_timing_end(step, started) |
| try: |
| release_resources() |
| except Exception: |
| logger.exception("Error releasing resources after EXTERIOR_BRICK") |
|
|
|
|
| def segment_exterior_depth_sync(payload: ExteriorDepthRequest) -> dict[str, Any]: |
| step = "EXTERIOR_DEPTH" |
| started = log_timing_start(step) |
| try: |
| safe_name, image_rgb = load_image_rgb_for_edit(payload.filename) |
| image_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR) |
| h, w = image_bgr.shape[:2] |
| target_name = normalize_exterior_target(payload.exterior_target) |
|
|
| strength = max(1, min(int(payload.smooth_strength), 3)) |
| smoothed_bgr = smooth_texture_for_segmentation(image_bgr, strength) |
| smoothed_rgb = cv2.cvtColor(smoothed_bgr, cv2.COLOR_BGR2RGB) |
|
|
| if payload.rect_xywh is not None: |
| x, y, rw, rh = [int(v) for v in payload.rect_xywh] |
| else: |
| x, y, rw, rh = int(0.04 * w), int(0.06 * h), int(0.92 * w), int(0.88 * h) |
|
|
| x = max(0, min(x, w - 2)) |
| y = max(0, min(y, h - 2)) |
| rw = max(2, min(rw, w - x)) |
| rh = max(2, min(rh, h - y)) |
| rect = (x, y, rw, rh) |
| iter_count = max(1, min(int(payload.iterations), 12)) |
|
|
| hint_mask = np.zeros((h, w), dtype=np.uint8) |
| semantic_labels: list[str] = [] |
| semantic_area_ratio = 0.0 |
| depth_map_arr: np.ndarray | None = None |
|
|
| if payload.use_semantic_hint: |
| try: |
| sem_mask, semantic_labels, semantic_area_ratio = semantic_exterior_mask(smoothed_rgb, payload.semantic_keywords) |
| hint_mask = np.where(sem_mask > 0, 1, hint_mask).astype(np.uint8) |
| except Exception: |
| pass |
|
|
| sam_label_map, _ = generate_label_map(smoothed_rgb) |
| merged_sam_mask = merge_sam2_wall_fragments(sam_label_map, int(payload.sam2_merge_top_k)) |
| if np.any(merged_sam_mask): |
| hint_mask = np.where(merged_sam_mask > 0, 1, hint_mask).astype(np.uint8) |
|
|
| if payload.use_depth_hint: |
| try: |
| depth_map_arr = estimate_depth_map(image_rgb) |
| depth_mask = extract_depth_wall_mask(depth_map_arr, target=target_name) |
| hint_mask = np.where(depth_mask > 0, 1, hint_mask).astype(np.uint8) |
| except Exception: |
| depth_map_arr = None |
|
|
| gc_mask = np.full((h, w), cv2.GC_PR_BGD, dtype=np.uint8) |
| outside_rect = np.ones((h, w), dtype=bool) |
| outside_rect[y : y + rh, x : x + rw] = False |
| gc_mask[outside_rect] = cv2.GC_BGD |
| if np.any(hint_mask): |
| gc_mask[hint_mask > 0] = cv2.GC_PR_FGD |
| sure_fg = cv2.erode((hint_mask * 255).astype(np.uint8), np.ones((9, 9), np.uint8), iterations=1) |
| gc_mask[sure_fg > 0] = cv2.GC_FGD |
|
|
| if depth_map_arr is not None and payload.use_depth_hint: |
| depth_f = depth_map_arr.astype(np.float32) |
| upper_h = max(1, h // 4) |
| sky_pct = float(np.percentile(depth_f[:upper_h, :], 35)) |
| row_idx = np.arange(h, dtype=np.int32)[:, np.newaxis] |
| definite_sky = np.asarray( |
| (row_idx < upper_h) & (depth_f <= sky_pct * 1.1) & (gc_mask == cv2.GC_PR_BGD), |
| dtype=bool, |
| ) |
| gc_mask[definite_sky] = cv2.GC_BGD |
|
|
| bg_model = np.zeros((1, 65), np.float64) |
| fg_model = np.zeros((1, 65), np.float64) |
| cv2.grabCut(image_bgr, gc_mask, rect, bg_model, fg_model, iter_count, cv2.GC_INIT_WITH_MASK) |
|
|
| fg_mask = np.where((gc_mask == cv2.GC_FGD) | (gc_mask == cv2.GC_PR_FGD), 1, 0).astype(np.uint8) |
| fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, np.ones((9, 9), np.uint8), iterations=1) |
| fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, np.ones((5, 5), np.uint8), iterations=1) |
|
|
| if not np.any(fg_mask): |
| raise HTTPException(status_code=400, detail="Depth-guided segmentation found no foreground region") |
|
|
| label_map_out, component_count, recommended_idx = build_component_label_map(fg_mask, min_area_ratio=0.01) |
| if component_count == 0: |
| label_map_out = np.where(fg_mask > 0, 1, 0).astype(np.uint8) |
| component_count = 1 |
| recommended_idx = 1 |
|
|
| label_owner = f"{Path(safe_name).stem}_exterior_depth.jpg" |
| saved_owner = save_label_map_for_owner(label_owner, label_map_out) |
|
|
| preview = build_mask_overlay(image_bgr, fg_mask.astype(bool), 0.44, color_bgr=(30, 120, 255)) |
| preview_filename = f"{Path(safe_name).stem}_exterior_depth_preview.jpg" |
| if not cv2.imwrite(str(OUTPUT_DIR / preview_filename), preview): |
| raise HTTPException(status_code=500, detail="Failed to save depth preview") |
|
|
| area_ratio = float(np.count_nonzero(fg_mask)) / float(h * w) |
| return { |
| "message": "Depth-guided exterior segmentation completed", |
| "filename": safe_name, |
| "original_filename_for_apply": saved_owner, |
| "mask_count": component_count, |
| "available_mask_indices": list(range(1, component_count + 1)), |
| "recommended_mask_index": recommended_idx, |
| "foreground_area_ratio": round(area_ratio, 6), |
| "preview_filename": preview_filename, |
| "preview_url": f"/seg/ai/{preview_filename}", |
| "rect_xywh": [x, y, rw, rh], |
| "iterations": iter_count, |
| "exterior_target": target_name, |
| "smooth_strength": strength, |
| "used_semantic_hint": bool(payload.use_semantic_hint), |
| "used_depth_hint": bool(payload.use_depth_hint), |
| "semantic_labels": semantic_labels, |
| "semantic_area_ratio": round(float(semantic_area_ratio), 6), |
| } |
| finally: |
| log_timing_end(step, started) |
| try: |
| release_resources() |
| except Exception: |
| logger.exception("Error releasing resources after EXTERIOR_DEPTH") |
|
|