Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| from dataclasses import dataclass, replace | |
| from pathlib import Path | |
| from typing import Dict, Optional | |
| import time | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| from .config import DEFAULT_MODEL_ID, IMAGE_EXTS | |
| from .depth_pipeline import DepthEngine, crop_nonblack, pick_flat_patch, smooth_depth | |
| from .segmentation import SegmenterRequest, SegmenterService, get_global_segmenter | |
| from .visualization import build_result_layers | |
| class AnalysisRequest: | |
| footprint_m: float | |
| std_thresh: float | |
| grad_thresh: float | |
| use_water_mask: bool | |
| use_road_mask: bool | |
| use_roof_mask: bool | |
| use_tree_mask: bool | |
| water_prompt: str | |
| road_prompt: str | |
| roof_prompt: str | |
| tree_prompt: str | |
| altitude_m: float | |
| fov_deg: float | |
| clearance_factor: float | |
| process_res_cap: int | |
| depth_smoothing_base: float | |
| segmentation_max_side: int | |
| segmentation_model_id: str | |
| segmentation_score_thresh: float | |
| segmentation_mask_thresh: float | |
| coverage_strictness: float | |
| model_id: str | |
| openness_weight: float | |
| texture_threshold: float | |
| source_path: Optional[str] = None | |
| class AnalysisSummary: | |
| model_id: str | |
| process_resolution: int | |
| runtime_ms: float | |
| footprint_m: float | |
| footprint_depth_px: int | |
| footprint_image_px: int | |
| landing_center_depth: tuple[int, int] | |
| landing_center_image: tuple[int, int] | |
| safe_area_pct: float | |
| hazard_pct: float | |
| water_mask_pct: Optional[float] | |
| road_mask_pct: Optional[float] | |
| roof_mask_pct: Optional[float] | |
| tree_mask_pct: Optional[float] | |
| water_mask_enabled: bool | |
| road_mask_enabled: bool | |
| roof_mask_enabled: bool | |
| tree_mask_enabled: bool | |
| used_valid_center: bool | |
| warnings: list[str] | |
| std_thresh_applied: float | |
| grad_thresh_applied: float | |
| class AnalysisResult: | |
| images: Dict[str, Image.Image] | |
| summary: AnalysisSummary | |
| class SafetyAnalyzer: | |
| def __init__(self, depth_engine: DepthEngine | None = None, segmenter: SegmenterService | None = None): | |
| self.depth_engine = depth_engine or DepthEngine() | |
| self.segmenter = segmenter or get_global_segmenter() | |
| # Preload default depth model to avoid first-call latency spikes. | |
| try: | |
| self.depth_engine.get_model(DEFAULT_MODEL_ID) | |
| except Exception as exc: | |
| print(f"[WARN] Could not preload depth model {DEFAULT_MODEL_ID}: {exc}") | |
| def analyze_image(self, image: Image.Image, request: AnalysisRequest) -> AnalysisResult: | |
| t0 = time.perf_counter() | |
| rgb_np = np.array(image) | |
| t_rgb = time.perf_counter() | |
| depth_raw, depth, process_res, depth_times = self.depth_engine.predict_depth( | |
| rgb_np, request.model_id, request.process_res_cap, "least_squares" | |
| ) | |
| t_depth = time.perf_counter() | |
| res_scale = max(0.5, min(2.5, process_res / 1024)) | |
| sigma = max(0.0, request.depth_smoothing_base) * res_scale | |
| depth = smooth_depth(depth, sigma) | |
| # Keep all downstream processing at the depth resolution to avoid expensive full-res passes. | |
| proc_size = (depth.shape[1], depth.shape[0]) # (W, H) | |
| rgb_proc = cv2.resize(rgb_np, proc_size, interpolation=cv2.INTER_AREA) if rgb_np.shape[:2][::-1] != proc_size else rgb_np | |
| fov = max(10.0, min(170.0, float(request.fov_deg))) | |
| altitude = max(1.0, float(request.altitude_m)) | |
| fx = (depth.shape[1] / 2.0) / np.tan(np.radians(fov) / 2.0) | |
| patch_px = request.footprint_m * fx / altitude | |
| patch_px = max(3, min(int(round(patch_px)), min(depth.shape) - 1)) | |
| if patch_px % 2 == 0: | |
| patch_px += 1 | |
| half_span = patch_px // 2 | |
| depth_norm = (depth - depth.min()) / (np.ptp(depth) + 1e-6) | |
| vis_patch = max( | |
| 5, | |
| min( | |
| patch_px, | |
| max(7, min(depth.shape) // 8), | |
| min(depth.shape) - 1, | |
| ), | |
| ) | |
| if vis_patch % 2 == 0: | |
| vis_patch += 1 | |
| import torch.nn.functional as F | |
| import torch | |
| def box_mean_np(arr: np.ndarray, k: int): | |
| pad = k // 2 | |
| t = torch.from_numpy(arr).unsqueeze(0).unsqueeze(0) | |
| t = F.pad(t, (pad, pad, pad, pad), mode="reflect") | |
| mean = F.avg_pool2d(t, kernel_size=k, stride=1, padding=0, count_include_pad=False) | |
| return mean.squeeze(0).squeeze(0).numpy() | |
| std_map_vis = np.sqrt( | |
| np.maximum(box_mean_np(depth_norm * depth_norm, vis_patch) - box_mean_np(depth_norm, vis_patch) ** 2, 0.0) | |
| ) | |
| t_depth_post = time.perf_counter() | |
| gray = cv2.cvtColor(rgb_proc, cv2.COLOR_RGB2GRAY).astype(np.float32) / 255.0 | |
| gx = cv2.Sobel(gray, cv2.CV_32F, 1, 0, ksize=3) | |
| gy = cv2.Sobel(gray, cv2.CV_32F, 0, 1, ksize=3) | |
| texture = np.sqrt(gx * gx + gy * gy) | |
| sigma_tex = max(1.0, patch_px / 40.0) | |
| texture = cv2.GaussianBlur(texture, (0, 0), sigmaX=sigma_tex, sigmaY=sigma_tex) | |
| if texture.max() > texture.min(): | |
| texture_norm = (texture - texture.min()) / (np.ptp(texture) + 1e-6) | |
| else: | |
| texture_norm = np.zeros_like(texture) | |
| dy_depth, dx_depth = np.gradient(depth_norm) | |
| grad_mag = np.sqrt(dx_depth * dx_depth + dy_depth * dy_depth) | |
| grad_ref = np.percentile(grad_mag, 95) + 1e-6 | |
| grad_norm = np.clip(grad_mag / grad_ref, 0.0, 1.0) | |
| t_texture = time.perf_counter() | |
| water_mask_resized = None | |
| road_mask_resized = None | |
| roof_mask_resized = None | |
| tree_mask_resized = None | |
| water_mask_block = None | |
| road_mask_block = None | |
| roof_mask_block = None | |
| tree_mask_block = None | |
| def expand_mask_for_footprint(mask: np.ndarray | None) -> np.ndarray | None: | |
| if mask is None: | |
| return None | |
| if patch_px <= 1: | |
| return mask.copy() | |
| try: | |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (patch_px, patch_px)) | |
| except Exception: | |
| return mask.copy() | |
| expanded = cv2.dilate(mask.astype(np.uint8), kernel, iterations=1) | |
| return expanded.astype(bool) | |
| if request.use_water_mask or request.use_road_mask or request.use_tree_mask: | |
| masks = self.segmenter.get_masks( | |
| SegmenterRequest( | |
| image=Image.fromarray(rgb_proc), | |
| source_path=request.source_path, | |
| want_water=request.use_water_mask, | |
| want_road=request.use_road_mask, | |
| want_roof=request.use_roof_mask, | |
| want_tree=request.use_tree_mask, | |
| max_side=int(max(128, min(request.segmentation_max_side, process_res))), | |
| water_prompt=request.water_prompt, | |
| road_prompt=request.road_prompt, | |
| roof_prompt=request.roof_prompt, | |
| tree_prompt=request.tree_prompt, | |
| score_threshold=float(request.segmentation_score_thresh), | |
| mask_threshold=float(request.segmentation_mask_thresh), | |
| ), | |
| model_id=request.segmentation_model_id, | |
| ) | |
| if request.use_water_mask and masks.get("water") is not None: | |
| water_mask_resized = Image.fromarray(masks["water"].astype(np.uint8) * 255).resize( | |
| (depth.shape[1], depth.shape[0]), resample=Image.NEAREST | |
| ) | |
| water_mask_resized = np.array(water_mask_resized) > 0 | |
| water_mask_block = expand_mask_for_footprint(water_mask_resized) | |
| if request.use_road_mask and masks.get("road") is not None: | |
| road_mask_resized = Image.fromarray(masks["road"].astype(np.uint8) * 255).resize( | |
| (depth.shape[1], depth.shape[0]), resample=Image.NEAREST | |
| ) | |
| road_mask_resized = np.array(road_mask_resized) > 0 | |
| road_mask_block = expand_mask_for_footprint(road_mask_resized) | |
| if request.use_roof_mask and masks.get("roof") is not None: | |
| roof_mask_resized = Image.fromarray(masks["roof"].astype(np.uint8) * 255).resize( | |
| (depth.shape[1], depth.shape[0]), resample=Image.NEAREST | |
| ) | |
| roof_mask_resized = np.array(roof_mask_resized) > 0 | |
| roof_mask_block = expand_mask_for_footprint(roof_mask_resized) | |
| if request.use_tree_mask and masks.get("tree") is not None: | |
| tree_mask_resized = Image.fromarray(masks["tree"].astype(np.uint8) * 255).resize( | |
| (depth.shape[1], depth.shape[0]), resample=Image.NEAREST | |
| ) | |
| tree_mask_resized = np.array(tree_mask_resized) > 0 | |
| tree_mask_block = expand_mask_for_footprint(tree_mask_resized) | |
| t_masks = time.perf_counter() | |
| # Autoscale sensitivity with resolution: stricter when resolution is low | |
| std_thresh_eff = max(1e-6, float(request.std_thresh)) * (res_scale ** -0.5) | |
| grad_thresh_eff = max(1e-6, float(request.grad_thresh)) * (res_scale ** -0.3) | |
| box, std_map, grad_norm, grad_mask, landing_mask = pick_flat_patch( | |
| depth, | |
| patch=patch_px, | |
| std_thresh=std_thresh_eff, | |
| grad_thresh=grad_thresh_eff, | |
| water_mask=water_mask_block if water_mask_block is not None else water_mask_resized, | |
| ) | |
| t_pick = time.perf_counter() | |
| seg_block_mask = None | |
| for mask in (water_mask_block, road_mask_block, tree_mask_block, roof_mask_block): | |
| if mask is None: | |
| continue | |
| if seg_block_mask is None: | |
| seg_block_mask = mask.copy() | |
| else: | |
| seg_block_mask |= mask | |
| landing_mask_pre_interior = landing_mask.copy() | |
| if seg_block_mask is not None: | |
| landing_mask = landing_mask & (~seg_block_mask) | |
| if half_span > 0: | |
| if (landing_mask.shape[0] > 2 * half_span) and (landing_mask.shape[1] > 2 * half_span): | |
| interior_mask = np.zeros_like(landing_mask, dtype=bool) | |
| interior_mask[ | |
| half_span : landing_mask.shape[0] - half_span, | |
| half_span : landing_mask.shape[1] - half_span, | |
| ] = True | |
| else: | |
| interior_mask = np.zeros_like(landing_mask, dtype=bool) | |
| else: | |
| interior_mask = np.ones_like(landing_mask, dtype=bool) | |
| landing_mask = landing_mask & interior_mask | |
| texture_mask = texture_norm <= max(0.0, min(1.0, request.texture_threshold)) | |
| safe_mask = (std_map < std_thresh_eff) & (grad_norm < grad_thresh_eff) & landing_mask & texture_mask | |
| try: | |
| clearance_px = max(1, int(round(request.clearance_factor * patch_px))) | |
| if clearance_px % 2 == 0: | |
| clearance_px += 1 | |
| kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (clearance_px, clearance_px)) | |
| hazard = ~safe_mask | |
| if seg_block_mask is not None: | |
| hazard = hazard & (~seg_block_mask) | |
| buffered = cv2.dilate(hazard.astype(np.uint8), kernel, iterations=1).astype(bool) | |
| safe_mask = safe_mask & (~buffered) | |
| if seg_block_mask is not None: | |
| safe_mask = safe_mask & (~seg_block_mask) | |
| except Exception: | |
| pass | |
| try: | |
| coverage = cv2.boxFilter( | |
| safe_mask.astype(np.float32), | |
| ddepth=-1, | |
| ksize=(patch_px, patch_px), | |
| normalize=True, | |
| anchor=(patch_px // 2, patch_px // 2), | |
| ) | |
| safe_mask = coverage >= max(0.0, min(1.0, request.coverage_strictness)) | |
| except Exception: | |
| pass | |
| area_thresh = max(1, int(patch_px * patch_px)) | |
| num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(safe_mask.astype(np.uint8), connectivity=8) | |
| if num_labels > 1: | |
| keep = np.zeros_like(labels, dtype=bool) | |
| for i in range(1, num_labels): | |
| if stats[i, cv2.CC_STAT_AREA] >= area_thresh: | |
| keep |= labels == i | |
| safe_mask = keep | |
| risk_std = np.clip((std_map - std_thresh_eff) / (std_thresh_eff + 1e-6), 0.0, 1.0) | |
| risk_grad = np.clip((grad_norm - grad_thresh_eff) / (grad_thresh_eff + 1e-6), 0.0, 1.0) | |
| risk_map = np.maximum(risk_std, risk_grad) * (~safe_mask) | |
| safe_fit = safe_mask.astype(np.float32) | |
| safe_mask_uint = safe_mask.astype(np.uint8) | |
| try: | |
| distance = cv2.distanceTransform(safe_mask_uint, cv2.DIST_L2, 3) | |
| except Exception: | |
| distance = np.zeros_like(safe_fit) | |
| try: | |
| coverage = cv2.boxFilter( | |
| safe_fit.astype(np.float32), | |
| ddepth=-1, | |
| ksize=(patch_px, patch_px), | |
| normalize=True, | |
| anchor=(patch_px // 2, patch_px // 2), | |
| ) | |
| valid_centers = coverage >= 1.0 | |
| except Exception: | |
| valid_centers = safe_fit > 0.5 | |
| used_valid_center = bool(valid_centers.any()) | |
| if used_valid_center: | |
| cc_mask = valid_centers.astype(np.uint8) | |
| num_c, labels_c, stats_c, _ = cv2.connectedComponentsWithStats(cc_mask, connectivity=8) | |
| target_mask = valid_centers | |
| if num_c > 1: | |
| areas = stats_c[1:, cv2.CC_STAT_AREA] | |
| largest_idx = 1 + int(np.argmax(areas)) | |
| target_mask = labels_c == largest_idx | |
| cand = np.where(target_mask) | |
| dist_cand = distance[cand] | |
| std_cand = std_map[cand] | |
| if dist_cand.size: | |
| dist_norm = dist_cand / (dist_cand.max() + 1e-6) | |
| std_norm = (std_cand - std_cand.min()) / (np.ptp(std_cand) + 1e-6) | |
| weight = max(0.0, min(1.0, request.openness_weight)) | |
| score = dist_norm - weight * std_norm | |
| idx = int(np.argmax(score)) | |
| else: | |
| idx = int(np.argmin(std_cand)) | |
| cy, cx = cand[0][idx], cand[1][idx] | |
| else: | |
| # Fall back to safest pixel inside any safe region (even if full coverage fails) | |
| if safe_mask.any(): | |
| cand = np.where(safe_mask) | |
| dist_cand = distance[cand] | |
| std_cand = std_map[cand] | |
| if dist_cand.size: | |
| dist_norm = dist_cand / (dist_cand.max() + 1e-6) | |
| std_norm = (std_cand - std_cand.min()) / (np.ptp(std_cand) + 1e-6) | |
| weight = max(0.0, min(1.0, request.openness_weight)) | |
| score = dist_norm - weight * std_norm | |
| idx = int(np.argmax(score)) | |
| else: | |
| idx = int(np.argmin(std_cand)) | |
| cy, cx = cand[0][idx], cand[1][idx] | |
| else: | |
| fallback_mask = landing_mask.copy() | |
| if not fallback_mask.any(): | |
| fallback_mask = np.ones_like(landing_mask, dtype=bool) | |
| if seg_block_mask is not None: | |
| fallback_mask &= (~seg_block_mask) | |
| fallback_mask &= interior_mask | |
| if fallback_mask.any(): | |
| cand = np.where(fallback_mask) | |
| std_cand = std_map[cand] | |
| idx = int(np.argmin(std_cand)) | |
| cy, cx = cand[0][idx], cand[1][idx] | |
| else: | |
| y0, x0, y1, x1 = box[1], box[0], box[3], box[2] | |
| cy, cx = (y0 + y1) // 2, (x0 + x1) // 2 | |
| if half_span > 0 and depth.shape[0] > 2 * half_span: | |
| cy = min(max(int(cy), half_span), depth.shape[0] - half_span - 1) | |
| else: | |
| cy = min(max(int(cy), 0), depth.shape[0] - 1) | |
| if half_span > 0 and depth.shape[1] > 2 * half_span: | |
| cx = min(max(int(cx), half_span), depth.shape[1] - half_span - 1) | |
| else: | |
| cx = min(max(int(cx), 0), depth.shape[1] - 1) | |
| scale_x = image.width / depth.shape[1] | |
| scale_y = image.height / depth.shape[0] | |
| footprint_img_px = max(3, int(round(patch_px * scale_x))) | |
| cx_img = int(round(cx * scale_x)) | |
| cy_img = int(round(cy * scale_y)) | |
| center_img = (cx_img, cy_img) | |
| center_depth = (cx, cy) | |
| # Display mask without interior cropping so overlays are not clipped at borders. | |
| safe_display_mask = ( | |
| (std_map < std_thresh_eff) | |
| & (grad_norm < grad_thresh_eff) | |
| & landing_mask_pre_interior | |
| & texture_mask | |
| ) | |
| if seg_block_mask is not None: | |
| safe_display_mask = safe_display_mask & (~seg_block_mask) | |
| try: | |
| clearance_px = max(1, int(round(request.clearance_factor * patch_px))) | |
| if clearance_px % 2 == 0: | |
| clearance_px += 1 | |
| kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (clearance_px, clearance_px)) | |
| hazard_disp = ~safe_display_mask | |
| if seg_block_mask is not None: | |
| hazard_disp = hazard_disp & (~seg_block_mask) | |
| buffered_disp = cv2.dilate(hazard_disp.astype(np.uint8), kernel, iterations=1).astype(bool) | |
| safe_display_mask = safe_display_mask & (~buffered_disp) | |
| if seg_block_mask is not None: | |
| safe_display_mask = safe_display_mask & (~seg_block_mask) | |
| except Exception: | |
| pass | |
| try: | |
| coverage_disp = cv2.boxFilter( | |
| safe_display_mask.astype(np.float32), | |
| ddepth=-1, | |
| ksize=(patch_px, patch_px), | |
| normalize=True, | |
| anchor=(patch_px // 2, patch_px // 2), | |
| ) | |
| safe_display_mask = coverage_disp >= max(0.0, min(1.0, request.coverage_strictness)) | |
| except Exception: | |
| pass | |
| try: | |
| footprint_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (patch_px, patch_px)) | |
| safe_display_mask = cv2.dilate(safe_display_mask.astype(np.uint8), footprint_kernel, iterations=1).astype( | |
| bool | |
| ) | |
| except Exception: | |
| pass | |
| mask_union = None | |
| overlay_union = None | |
| for mask in (water_mask_resized, road_mask_resized, tree_mask_resized, roof_mask_resized): | |
| if mask is None: | |
| continue | |
| if mask_union is None: | |
| mask_union = mask.copy() | |
| else: | |
| mask_union |= mask | |
| for mask in (water_mask_resized, road_mask_resized, tree_mask_resized): | |
| if mask is None: | |
| continue | |
| if overlay_union is None: | |
| overlay_union = mask.copy() | |
| else: | |
| overlay_union |= mask | |
| seg_mask_union = mask_union.copy() if mask_union is not None else None | |
| if mask_union is not None: | |
| safe_display_mask = safe_display_mask & (~mask_union) | |
| hazard_mask = ~safe_display_mask | |
| if roof_mask_resized is not None: | |
| hazard_mask = hazard_mask & (~roof_mask_resized) | |
| layers = build_result_layers( | |
| image=image, | |
| depth_raw=depth_raw, | |
| std_map_vis=std_map_vis, | |
| grad_norm=grad_norm, | |
| grad_thresh=request.grad_thresh, | |
| safe_mask=safe_display_mask, | |
| risk_map=risk_map, | |
| footprint_img_px=footprint_img_px, | |
| center_img=center_img, | |
| water_mask=water_mask_resized, | |
| road_mask=road_mask_resized, | |
| roof_mask=roof_mask_resized, | |
| tree_mask=tree_mask_resized, | |
| hazard_mask=hazard_mask, | |
| ) | |
| try: | |
| if torch.cuda.is_available(): | |
| torch.cuda.synchronize() | |
| except Exception: | |
| pass | |
| runtime_ms = (time.perf_counter() - t0) * 1000.0 | |
| safe_area_pct = float(safe_display_mask.mean()) * 100.0 | |
| hazard_pct = 100.0 - safe_area_pct | |
| def mask_pct(mask: np.ndarray | None) -> Optional[float]: | |
| if mask is None: | |
| return None | |
| return float(mask.mean()) * 100.0 | |
| warnings: list[str] = [] | |
| if not safe_mask.any(): | |
| warnings.append("No regions satisfied safety thresholds; showing flattest candidate.") | |
| if not request.use_water_mask: | |
| warnings.append("Water mask disabled.") | |
| elif water_mask_resized is None: | |
| warnings.append("No water detected; continuing without a water mask.") | |
| if not request.use_road_mask: | |
| warnings.append("Road mask disabled.") | |
| elif road_mask_resized is None: | |
| warnings.append("Road segmentation unavailable; continuing without mask.") | |
| if not request.use_tree_mask: | |
| warnings.append("Tree mask disabled.") | |
| elif tree_mask_resized is None: | |
| warnings.append("Tree segmentation unavailable; continuing without mask.") | |
| if not request.use_roof_mask: | |
| warnings.append("Roof mask disabled.") | |
| elif roof_mask_resized is None: | |
| warnings.append("Roof segmentation unavailable; continuing without mask.") | |
| t_final = time.perf_counter() | |
| print( | |
| "[TIMING] rgb->np {:.0f}ms | depth_model {:.0f}ms | plane {:.0f}ms | depth_misc {:.0f}ms | texture {:.0f}ms | masks {:.0f}ms | pick {:.0f}ms | compose {:.0f}ms | total {:.0f}ms".format( | |
| (t_rgb - t0) * 1000, | |
| depth_times.get("model_ms", 0.0), | |
| depth_times.get("plane_ms", 0.0), | |
| depth_times.get("prep_ms", 0.0), | |
| (t_texture - t_depth_post) * 1000, | |
| (t_masks - t_texture) * 1000, | |
| (t_pick - t_masks) * 1000, | |
| (t_final - t_pick) * 1000, | |
| (t_final - t0) * 1000, | |
| ) | |
| ) | |
| summary = AnalysisSummary( | |
| model_id=request.model_id, | |
| process_resolution=process_res, | |
| runtime_ms=runtime_ms, | |
| footprint_m=request.footprint_m, | |
| footprint_depth_px=patch_px, | |
| footprint_image_px=footprint_img_px, | |
| landing_center_depth=center_depth, | |
| landing_center_image=center_img, | |
| safe_area_pct=safe_area_pct, | |
| hazard_pct=hazard_pct, | |
| water_mask_pct=mask_pct(water_mask_resized) if request.use_water_mask else None, | |
| road_mask_pct=mask_pct(road_mask_resized) if request.use_road_mask else None, | |
| roof_mask_pct=mask_pct(roof_mask_resized) if request.use_roof_mask else None, | |
| tree_mask_pct=mask_pct(tree_mask_resized) if request.use_tree_mask else None, | |
| water_mask_enabled=request.use_water_mask, | |
| road_mask_enabled=request.use_road_mask, | |
| roof_mask_enabled=request.use_roof_mask, | |
| tree_mask_enabled=request.use_tree_mask, | |
| used_valid_center=used_valid_center, | |
| warnings=warnings, | |
| std_thresh_applied=std_thresh_eff, | |
| grad_thresh_applied=grad_thresh_eff, | |
| ) | |
| return AnalysisResult(images=layers, summary=summary) | |
| def process_path(self, path: Path, request: AnalysisRequest) -> AnalysisResult: | |
| if not path.exists(): | |
| raise ValueError(f"Input path not found: {path}") | |
| if path.suffix.lower() not in IMAGE_EXTS: | |
| raise ValueError(f"Unsupported image type for path: {path}") | |
| image = crop_nonblack(Image.open(path).convert("RGB")) | |
| request_with_source = replace(request, source_path=str(path)) | |
| return self.analyze_image(image, request_with_source) | |
| def build_request(**kwargs) -> AnalysisRequest: | |
| return AnalysisRequest(**kwargs) | |
| __all__ = ["SafetyAnalyzer", "AnalysisRequest", "AnalysisResult", "AnalysisSummary", "build_request"] | |