""" Detection refinement module for improving door and window detection. This module provides geometry-based refinement of YOLO segmentation results, using wall structure, spatial relationships, and computer vision techniques. Example usage: >>> from src.detection.refinement import refine_detections, RefinementConfig >>> >>> # Use default configuration >>> refined_result = refine_detections(yolo_output, image, geometry_output) >>> >>> # Use custom configuration >>> config = RefinementConfig( ... canny_low_threshold=40, ... canny_high_threshold=160, ... door_width=100 ... ) >>> refined_result = refine_detections( ... yolo_output, image, geometry_output, config=config ... ) """ from dataclasses import dataclass from typing import Optional, List, Tuple import logging import numpy as np import cv2 from src.segmentation.predictor import SegmentationResult from src.geometry.wall_vectorizer import WallPolygon, VectorizationResult # Configure logging logger = logging.getLogger(__name__) @dataclass class RefinementConfig: """Configuration for detection refinement algorithms.""" # Edge detection parameters (Canny) canny_low_threshold: int = 50 canny_high_threshold: int = 150 # Line detection parameters (HoughLinesP) hough_min_line_length: int = 30 hough_max_line_gap: int = 10 hough_threshold: int = 50 # Window detection parameters parallel_line_proximity: int = 20 # max distance between parallel lines (px) parallel_angle_tolerance: float = 15.0 # degrees window_min_length: int = 30 # minimum window length (px) # Door detection parameters wall_gap_threshold: int = 20 # minimum gap width to consider as door (px) room_adjacency_threshold: int = 50 # max distance for rooms to be adjacent (px) door_width: int = 90 # standard door width in pixels (~0.9m) # Spatial analysis parameters wall_thickness_tolerance: int = 10 # tolerance for point-on-wall checks (px) # Performance parameters enable_door_detection: bool = True enable_window_detection: bool = True def __post_init__(self): """Validate and clamp configuration parameters to reasonable ranges.""" # Validate Canny thresholds self.canny_low_threshold = max(1, min(255, self.canny_low_threshold)) self.canny_high_threshold = max(1, min(255, self.canny_high_threshold)) if self.canny_high_threshold <= self.canny_low_threshold: logger.warning( f"Invalid Canny thresholds: low={self.canny_low_threshold}, " f"high={self.canny_high_threshold}. Using defaults." ) self.canny_low_threshold = 50 self.canny_high_threshold = 150 # Validate positive integer parameters self.hough_min_line_length = max(1, self.hough_min_line_length) self.hough_max_line_gap = max(0, self.hough_max_line_gap) self.hough_threshold = max(1, self.hough_threshold) self.parallel_line_proximity = max(1, self.parallel_line_proximity) self.window_min_length = max(1, self.window_min_length) self.wall_gap_threshold = max(1, self.wall_gap_threshold) self.room_adjacency_threshold = max(1, self.room_adjacency_threshold) self.door_width = max(1, self.door_width) self.wall_thickness_tolerance = max(1, self.wall_thickness_tolerance) # Validate angle tolerance self.parallel_angle_tolerance = max(0.0, min(90.0, self.parallel_angle_tolerance)) # Type aliases for clarity LineSegment = Tuple[Tuple[int, int], Tuple[int, int]] # ((x1, y1), (x2, y2)) class DetectionRefiner: """Main orchestrator for detection refinement.""" def __init__(self, config: Optional[RefinementConfig] = None): """ Initialize the detection refiner. Parameters ---------- config : RefinementConfig, optional Configuration for refinement algorithms. If None, uses defaults. """ self.config = config or RefinementConfig() self.door_detector = DoorDetector(self.config) self.window_detector = WindowDetector(self.config) def refine( self, yolo_output: SegmentationResult, image: np.ndarray, geometry_output: VectorizationResult ) -> VectorizationResult: """ Main refinement function that replaces YOLO doors/windows with geometry-based detections. Parameters ---------- yolo_output : SegmentationResult Original YOLO segmentation result image : np.ndarray Preprocessed floor plan image (grayscale) geometry_output : VectorizationResult Vectorized geometry from Phase 3 Returns ------- VectorizationResult Refined VectorizationResult with improved doors/windows """ logger.info("Starting detection refinement") refined_doors = [] refined_windows = [] # Try door detection if self.config.enable_door_detection: try: refined_doors = self.door_detector.detect( geometry_output.walls, geometry_output.rooms ) logger.info(f"Door detection complete: {len(refined_doors)} doors detected") except Exception as e: logger.error(f"Door detection failed: {e}", exc_info=True) logger.warning("Falling back to original YOLO door detections") refined_doors = geometry_output.doors else: refined_doors = geometry_output.doors # Try window detection independently if self.config.enable_window_detection: try: refined_windows = self.window_detector.detect( image, geometry_output.walls ) logger.info(f"Window detection complete: {len(refined_windows)} windows detected") except Exception as e: logger.error(f"Window detection failed: {e}", exc_info=True) logger.warning("Falling back to original YOLO window detections") refined_windows = geometry_output.windows else: refined_windows = geometry_output.windows # Merge results refined_result = VectorizationResult( walls=geometry_output.walls, rooms=geometry_output.rooms, doors=refined_doors, windows=refined_windows, other=geometry_output.other, image_shape=geometry_output.image_shape ) logger.info( f"Refinement complete: {len(refined_doors)} doors, " f"{len(refined_windows)} windows" ) return refined_result def refine_detections( yolo_output: SegmentationResult, image: np.ndarray, geometry_output: VectorizationResult, config: Optional[RefinementConfig] = None ) -> VectorizationResult: """ Public API function for detection refinement. This is the main entry point called by GeometryPipeline. Parameters ---------- yolo_output : SegmentationResult Original YOLO segmentation result image : np.ndarray Preprocessed floor plan image (grayscale) geometry_output : VectorizationResult Vectorized geometry from Phase 3 config : RefinementConfig, optional Optional custom configuration. If None, uses defaults. Returns ------- VectorizationResult Refined VectorizationResult with improved doors/windows Examples -------- >>> refined = refine_detections(yolo_output, image, geometry_output) >>> print(f"Detected {len(refined.doors)} doors") """ try: refiner = DetectionRefiner(config) return refiner.refine(yolo_output, image, geometry_output) except Exception as e: logger.error(f"Refinement failed completely: {e}", exc_info=True) logger.warning("Returning original geometry output unchanged") return geometry_output class SpatialAnalyzer: """Analyzes spatial relationships between rooms and walls.""" def __init__(self, config: RefinementConfig): """ Initialize spatial analyzer. Parameters ---------- config : RefinementConfig Configuration for spatial analysis parameters """ self.config = config def find_adjacent_rooms( self, room_polygons: List[WallPolygon] ) -> List[Tuple[WallPolygon, WallPolygon]]: """ Find pairs of rooms that share a wall boundary. Uses polygon proximity analysis: rooms are adjacent if their boundaries come within room_adjacency_threshold pixels. Parameters ---------- room_polygons : List[WallPolygon] List of room polygons Returns ------- List[Tuple[WallPolygon, WallPolygon]] List of (room_a, room_b) tuples representing adjacent room pairs """ if not room_polygons or len(room_polygons) < 2: return [] adjacent_pairs = [] n = len(room_polygons) for i in range(n): for j in range(i + 1, n): room_a = room_polygons[i] room_b = room_polygons[j] # Compute distance between room centroids centroid_a = room_a.centroid centroid_b = room_b.centroid distance = np.sqrt( (centroid_a[0] - centroid_b[0])**2 + (centroid_a[1] - centroid_b[1])**2 ) # Check if rooms are close enough to be adjacent if distance <= self.config.room_adjacency_threshold * 3: # Use 3x threshold for centroid distance # Verify actual boundary proximity using minimum distance between polygon points points_a = np.array(room_a.points, dtype=np.float32) points_b = np.array(room_b.points, dtype=np.float32) # Compute minimum distance between any two points min_dist = float('inf') for pt_a in points_a: for pt_b in points_b: dist = np.sqrt((pt_a[0] - pt_b[0])**2 + (pt_a[1] - pt_b[1])**2) min_dist = min(min_dist, dist) if min_dist <= self.config.room_adjacency_threshold: adjacent_pairs.append((room_a, room_b)) logger.debug(f"Found {len(adjacent_pairs)} adjacent room pairs") return adjacent_pairs def find_shared_wall_segment( self, room_a: WallPolygon, room_b: WallPolygon, wall_polygons: List[WallPolygon] ) -> Optional[np.ndarray]: """ Extract the wall segment between two adjacent rooms. Algorithm: 1. Find the line connecting room centroids 2. Find wall polygons that intersect this line 3. Extract the portion of wall between the two rooms Parameters ---------- room_a : WallPolygon First room polygon room_b : WallPolygon Second room polygon wall_polygons : List[WallPolygon] List of wall polygons Returns ------- np.ndarray or None Nx2 numpy array of wall segment points, or None if not found """ if not wall_polygons: return None centroid_a = np.array(room_a.centroid, dtype=np.float32) centroid_b = np.array(room_b.centroid, dtype=np.float32) # Find walls that lie between the two rooms # A wall is between rooms if it's close to the line connecting centroids candidate_walls = [] for wall in wall_polygons: if not wall.is_wall: continue wall_centroid = np.array(wall.centroid, dtype=np.float32) # Check if wall centroid is roughly between room centroids # using dot product to check if it's in the same direction vec_ab = centroid_b - centroid_a vec_aw = wall_centroid - centroid_a # Project wall centroid onto line AB if np.dot(vec_ab, vec_ab) > 0: t = np.dot(vec_aw, vec_ab) / np.dot(vec_ab, vec_ab) # Wall should be between rooms (0 < t < 1) if 0.2 < t < 0.8: # Allow some tolerance # Check perpendicular distance to line projection = centroid_a + t * vec_ab perp_dist = np.linalg.norm(wall_centroid - projection) if perp_dist < self.config.room_adjacency_threshold: candidate_walls.append(wall) if not candidate_walls: logger.debug("No shared wall segment found between rooms") return None # Return the wall closest to the midpoint between rooms midpoint = (centroid_a + centroid_b) / 2 closest_wall = min( candidate_walls, key=lambda w: np.linalg.norm(np.array(w.centroid) - midpoint) ) return np.array(closest_wall.points, dtype=np.float32) def is_point_on_wall( self, point: Tuple[float, float], wall_polygon: WallPolygon ) -> bool: """ Check if a point lies within wall polygon boundaries. Uses cv2.pointPolygonTest with tolerance from config. Parameters ---------- point : Tuple[float, float] Point coordinates (x, y) wall_polygon : WallPolygon Wall polygon to test against Returns ------- bool True if point is on or near the wall, False otherwise """ wall_pts = np.array(wall_polygon.points, dtype=np.int32) # Use cv2.pointPolygonTest to compute signed distance # Positive = inside, 0 = on edge, negative = outside dist = cv2.pointPolygonTest(wall_pts, point, measureDist=True) # Point is on wall if distance is within tolerance return abs(dist) <= self.config.wall_thickness_tolerance def get_wall_direction( self, wall_segment: np.ndarray ) -> float: """ Compute orientation angle of a wall segment. Uses PCA (principal component analysis) on wall points to find dominant direction. Parameters ---------- wall_segment : np.ndarray Nx2 array of wall segment points Returns ------- float Angle in degrees (0-180 range) """ if len(wall_segment) < 2: logger.warning("Wall segment has fewer than 2 points, returning 0 degrees") return 0.0 try: # Use PCA to find principal direction from sklearn.decomposition import PCA pca = PCA(n_components=1) pca.fit(wall_segment) # Get principal component (dominant direction) component = pca.components_[0] angle = np.arctan2(component[1], component[0]) # Convert to degrees, normalize to [0, 180) angle_deg = np.degrees(angle) % 180 return float(angle_deg) except Exception as e: logger.warning(f"Failed to compute wall direction using PCA: {e}") # Fallback: use simple line fitting try: # Fit line using first and last points p1 = wall_segment[0] p2 = wall_segment[-1] dx = p2[0] - p1[0] dy = p2[1] - p1[1] angle = np.arctan2(dy, dx) angle_deg = np.degrees(angle) % 180 return float(angle_deg) except Exception as e2: logger.error(f"Failed to compute wall direction: {e2}") return 0.0 class GapDetector: """Detects gaps and discontinuities in wall segments.""" def __init__(self, config: RefinementConfig): """ Initialize gap detector. Parameters ---------- config : RefinementConfig Configuration for gap detection parameters """ self.config = config def detect_wall_gaps( self, wall_segment: np.ndarray ) -> List[Tuple[np.ndarray, np.ndarray]]: """ Identify breaks or discontinuities in a wall segment. Algorithm: 1. Sort wall points along the wall direction 2. Compute distances between consecutive points 3. Identify gaps where distance > wall_gap_threshold Parameters ---------- wall_segment : np.ndarray Nx2 array of wall segment points Returns ------- List[Tuple[np.ndarray, np.ndarray]] List of (start_point, end_point) tuples indicating gap locations """ if len(wall_segment) < 2: logger.debug("Wall segment too short for gap detection") return [] try: # Compute wall direction to sort points along it centroid = np.mean(wall_segment, axis=0) # Use PCA to find principal direction from sklearn.decomposition import PCA pca = PCA(n_components=1) pca.fit(wall_segment) direction = pca.components_[0] # Project points onto principal direction projections = np.dot(wall_segment - centroid, direction) # Sort points by projection sorted_indices = np.argsort(projections) sorted_points = wall_segment[sorted_indices] except Exception as e: logger.warning(f"Failed to sort wall points using PCA: {e}, using simple sort") # Fallback: sort by x-coordinate sorted_indices = np.argsort(wall_segment[:, 0]) sorted_points = wall_segment[sorted_indices] # Compute distances between consecutive points gaps = [] for i in range(len(sorted_points) - 1): p1 = sorted_points[i] p2 = sorted_points[i + 1] distance = np.linalg.norm(p2 - p1) # If distance exceeds threshold, we found a gap if distance > self.config.wall_gap_threshold: gaps.append((p1, p2)) logger.debug(f"Found gap of width {distance:.1f}px at {p1} -> {p2}") if not gaps: logger.debug("No wall gaps detected") else: logger.debug(f"Detected {len(gaps)} wall gaps") return gaps # Placeholder classes - will be implemented in subsequent tasks class DoorDetector: """Detects doors based on wall gaps and room adjacency.""" def __init__(self, config: RefinementConfig): self.config = config self.spatial_analyzer = SpatialAnalyzer(config) self.gap_detector = GapDetector(config) def detect( self, wall_polygons: List[WallPolygon], room_polygons: List[WallPolygon] ) -> List[WallPolygon]: """ Detect door locations from wall geometry and room adjacency. Parameters ---------- wall_polygons : List[WallPolygon] List of vectorized wall polygons room_polygons : List[WallPolygon] List of vectorized room polygons Returns ------- List[WallPolygon] List of door polygons with class_id=3, class_name="Door" """ # Input validation if not wall_polygons: logger.warning("No wall polygons provided, skipping door detection") return [] if not room_polygons: logger.warning("No room polygons provided, skipping door detection") return [] logger.info(f"Detecting doors from {len(wall_polygons)} walls and {len(room_polygons)} rooms") doors = [] # Find all pairs of adjacent rooms adjacent_pairs = self.spatial_analyzer.find_adjacent_rooms(room_polygons) if not adjacent_pairs: logger.info("No adjacent rooms found, no doors to place") return [] logger.info(f"Found {len(adjacent_pairs)} adjacent room pairs") # For each adjacent room pair, place a door for room_a, room_b in adjacent_pairs: try: # Find the shared wall segment shared_wall = self.spatial_analyzer.find_shared_wall_segment( room_a, room_b, wall_polygons ) if shared_wall is None: logger.debug(f"No shared wall found between rooms, skipping") continue # Detect gaps in the wall gaps = self.gap_detector.detect_wall_gaps(shared_wall) # Determine door position if gaps: # Place door at first gap gap_start, gap_end = gaps[0] door_position = (gap_start + gap_end) / 2 logger.debug(f"Placing door at gap location: {door_position}") else: # Fallback: place door at midpoint of shared wall door_position = np.mean(shared_wall, axis=0) logger.debug(f"No gaps found, placing door at wall midpoint: {door_position}") # Get wall direction wall_direction = self.spatial_analyzer.get_wall_direction(shared_wall) # Create door polygon door = create_door_polygon( position=tuple(door_position), wall_direction=wall_direction, door_width=self.config.door_width ) doors.append(door) except Exception as e: logger.warning(f"Failed to place door between rooms: {e}") continue logger.info(f"Successfully detected {len(doors)} doors") return doors def create_door_polygon( position: Tuple[float, float], wall_direction: float, door_width: int ) -> WallPolygon: """ Create a door polygon at the specified position. The door is oriented perpendicular to the wall direction. Parameters ---------- position : Tuple[float, float] (x, y) center point for door wall_direction : float Wall orientation in degrees (0-180) door_width : int Door width in pixels Returns ------- WallPolygon WallPolygon with class_id=3, class_name="Door" """ # Door is perpendicular to wall door_angle = (wall_direction + 90) % 180 door_angle_rad = np.radians(door_angle) # Create door as a rectangle perpendicular to wall half_width = door_width / 2 half_depth = door_width / 4 # Door depth is half the width # Direction vectors dir_along = np.array([np.cos(door_angle_rad), np.sin(door_angle_rad)]) dir_perp = np.array([-np.sin(door_angle_rad), np.cos(door_angle_rad)]) # Four corners of the door rectangle center = np.array(position) p1 = center - half_width * dir_along - half_depth * dir_perp p2 = center + half_width * dir_along - half_depth * dir_perp p3 = center + half_width * dir_along + half_depth * dir_perp p4 = center - half_width * dir_along + half_depth * dir_perp # Convert to integer coordinates points = [ (int(p1[0]), int(p1[1])), (int(p2[0]), int(p2[1])), (int(p3[0]), int(p3[1])), (int(p4[0]), int(p4[1])) ] # Compute area and bbox points_array = np.array(points) area = float(cv2.contourArea(points_array)) x_coords = points_array[:, 0] y_coords = points_array[:, 1] bbox = ( int(x_coords.min()), int(y_coords.min()), int(x_coords.max() - x_coords.min()), int(y_coords.max() - y_coords.min()) ) return WallPolygon( class_id=3, class_name="Door", points=points, area=area, bbox=bbox, confidence=1.0 ) class EdgeAnalyzer: """Performs edge detection on floor plan images.""" def __init__(self, config: RefinementConfig): """ Initialize edge analyzer. Parameters ---------- config : RefinementConfig Configuration for edge detection parameters """ self.config = config def detect_edges( self, image: np.ndarray ) -> np.ndarray: """ Apply Canny edge detection to identify edges. Uses config.canny_low_threshold and config.canny_high_threshold. Parameters ---------- image : np.ndarray Input image (grayscale) Returns ------- np.ndarray Binary edge map (same size as input) """ if image is None or image.size == 0: logger.error("Invalid image provided to edge detection") return np.zeros((100, 100), dtype=np.uint8) try: # Ensure image is grayscale if len(image.shape) == 3: image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # Apply Canny edge detection edges = cv2.Canny( image, self.config.canny_low_threshold, self.config.canny_high_threshold ) logger.debug(f"Edge detection complete: {np.count_nonzero(edges)} edge pixels") return edges except cv2.error as e: logger.error(f"OpenCV edge detection failed: {e}") return np.zeros_like(image, dtype=np.uint8) except Exception as e: logger.error(f"Edge detection failed: {e}") return np.zeros_like(image, dtype=np.uint8) class LineDetector: """Detects and analyzes line segments in images.""" def __init__(self, config: RefinementConfig): """ Initialize line detector. Parameters ---------- config : RefinementConfig Configuration for line detection parameters """ self.config = config def detect_lines( self, edge_map: np.ndarray ) -> List[LineSegment]: """ Extract line segments using HoughLinesP. Uses config.hough_* parameters. Parameters ---------- edge_map : np.ndarray Binary edge map from edge detection Returns ------- List[LineSegment] List of ((x1, y1), (x2, y2)) line segments """ if edge_map is None or edge_map.size == 0: logger.warning("Invalid edge map provided to line detection") return [] try: # Apply HoughLinesP lines = cv2.HoughLinesP( edge_map, rho=1, theta=np.pi / 180, threshold=self.config.hough_threshold, minLineLength=self.config.hough_min_line_length, maxLineGap=self.config.hough_max_line_gap ) if lines is None: logger.debug("No lines detected by HoughLinesP") return [] # Convert to list of line segments line_segments = [] for line in lines: x1, y1, x2, y2 = line[0] line_segments.append(((int(x1), int(y1)), (int(x2), int(y2)))) logger.debug(f"Detected {len(line_segments)} line segments") return line_segments except cv2.error as e: logger.error(f"OpenCV line detection failed: {e}") return [] except Exception as e: logger.error(f"Line detection failed: {e}") return [] def compute_line_angle( self, line: LineSegment ) -> float: """ Compute orientation angle of a line segment. Parameters ---------- line : LineSegment Line segment ((x1, y1), (x2, y2)) Returns ------- float Angle in degrees (0-180 range) """ (x1, y1), (x2, y2) = line dx = x2 - x1 dy = y2 - y1 angle = np.arctan2(dy, dx) angle_deg = np.degrees(angle) % 180 return float(angle_deg) def compute_line_distance( self, line_a: LineSegment, line_b: LineSegment ) -> float: """ Compute perpendicular distance between two parallel lines. Parameters ---------- line_a : LineSegment First line segment line_b : LineSegment Second line segment Returns ------- float Distance in pixels """ # Use midpoints of lines (x1_a, y1_a), (x2_a, y2_a) = line_a (x1_b, y1_b), (x2_b, y2_b) = line_b mid_a = np.array([(x1_a + x2_a) / 2, (y1_a + y2_a) / 2]) mid_b = np.array([(x1_b + x2_b) / 2, (y1_b + y2_b) / 2]) # Compute distance between midpoints distance = np.linalg.norm(mid_b - mid_a) return float(distance) def find_parallel_pairs( self, lines: List[LineSegment] ) -> List[Tuple[int, int]]: """ Identify pairs of parallel lines with close proximity. Algorithm: 1. Compute orientation angle for each line 2. For each line pair: - Check if angles are within parallel_angle_tolerance - Check if distance between lines < parallel_line_proximity 3. Return indices of parallel pairs Parameters ---------- lines : List[LineSegment] List of line segments Returns ------- List[Tuple[int, int]] List of (line_idx_a, line_idx_b) tuples """ if not lines or len(lines) < 2: return [] pairs = [] n = len(lines) # Compute angles for all lines angles = [self.compute_line_angle(line) for line in lines] for i in range(n): for j in range(i + 1, n): angle_i = angles[i] angle_j = angles[j] # Check angle similarity (handle wraparound at 180 degrees) angle_diff = abs(angle_i - angle_j) angle_diff = min(angle_diff, 180 - angle_diff) if angle_diff > self.config.parallel_angle_tolerance: continue # Check distance distance = self.compute_line_distance(lines[i], lines[j]) if distance > self.config.parallel_line_proximity: continue # Check minimum line length (x1_i, y1_i), (x2_i, y2_i) = lines[i] (x1_j, y1_j), (x2_j, y2_j) = lines[j] len_i = np.sqrt((x2_i - x1_i)**2 + (y2_i - y1_i)**2) len_j = np.sqrt((x2_j - x1_j)**2 + (y2_j - y1_j)**2) if len_i < self.config.window_min_length or len_j < self.config.window_min_length: continue pairs.append((i, j)) logger.debug( f"Found parallel pair: lines {i} and {j}, " f"angle_diff={angle_diff:.1f}°, distance={distance:.1f}px" ) logger.debug(f"Found {len(pairs)} parallel line pairs") return pairs class WindowDetector: """Detects windows using edge detection and line analysis.""" def __init__(self, config: RefinementConfig): self.config = config self.edge_analyzer = EdgeAnalyzer(config) self.line_detector = LineDetector(config) def detect( self, image: np.ndarray, wall_polygons: List[WallPolygon] ) -> List[WallPolygon]: """ Detect window locations using edge detection and line analysis. Parameters ---------- image : np.ndarray Preprocessed floor plan image (grayscale) wall_polygons : List[WallPolygon] List of vectorized wall polygons Returns ------- List[WallPolygon] List of window polygons with class_id=2, class_name="Window" """ # Input validation if image is None or image.size == 0: logger.warning("Invalid image provided, skipping window detection") return [] if not wall_polygons: logger.warning("No wall polygons provided, skipping window detection") return [] logger.info(f"Detecting windows from image and {len(wall_polygons)} walls") # Step 1: Edge detection edges = self.edge_analyzer.detect_edges(image) # Step 2: Line detection lines = self.line_detector.detect_lines(edges) if not lines: logger.info("No lines detected, no windows to place") return [] # Step 3: Find parallel line pairs parallel_pairs = self.line_detector.find_parallel_pairs(lines) if not parallel_pairs: logger.info("No parallel line pairs found, no windows to place") return [] logger.info(f"Found {len(parallel_pairs)} parallel line pairs") windows = [] # Step 4: Create window polygons from parallel pairs for idx_a, idx_b in parallel_pairs: try: line_a = lines[idx_a] line_b = lines[idx_b] # Compute window center (x1_a, y1_a), (x2_a, y2_a) = line_a (x1_b, y1_b), (x2_b, y2_b) = line_b mid_a = np.array([(x1_a + x2_a) / 2, (y1_a + y2_a) / 2]) mid_b = np.array([(x1_b + x2_b) / 2, (y1_b + y2_b) / 2]) window_center = (mid_a + mid_b) / 2 # Find which wall this window belongs to wall = self._find_containing_wall(tuple(window_center), wall_polygons) if wall is None: logger.debug(f"Window at {window_center} not on any wall, skipping") continue # Verify alignment with wall direction wall_points = np.array(wall.points, dtype=np.float32) spatial_analyzer = SpatialAnalyzer(self.config) wall_direction = spatial_analyzer.get_wall_direction(wall_points) line_direction = self.line_detector.compute_line_angle(line_a) # Check if line is parallel to wall (within tolerance) angle_diff = abs(wall_direction - line_direction) angle_diff = min(angle_diff, 180 - angle_diff) if angle_diff > self.config.parallel_angle_tolerance: logger.debug( f"Window lines not aligned with wall " f"(wall={wall_direction:.1f}°, line={line_direction:.1f}°), skipping" ) continue # Create window polygon window = create_window_polygon(line_a, line_b) windows.append(window) except Exception as e: logger.warning(f"Failed to create window from parallel lines: {e}") continue logger.info(f"Successfully detected {len(windows)} windows") return windows def _find_containing_wall( self, point: Tuple[float, float], wall_polygons: List[WallPolygon] ) -> Optional[WallPolygon]: """ Find which wall contains the given point. Parameters ---------- point : Tuple[float, float] Point coordinates (x, y) wall_polygons : List[WallPolygon] List of wall polygons Returns ------- WallPolygon or None Wall containing the point, or None if not found """ spatial_analyzer = SpatialAnalyzer(self.config) for wall in wall_polygons: if not wall.is_wall: continue if spatial_analyzer.is_point_on_wall(point, wall): return wall return None def create_window_polygon( line_a: LineSegment, line_b: LineSegment ) -> WallPolygon: """ Create a window polygon from a pair of parallel lines. Parameters ---------- line_a : LineSegment First line segment ((x1, y1), (x2, y2)) line_b : LineSegment Second line segment ((x1, y1), (x2, y2)) Returns ------- WallPolygon WallPolygon with class_id=2, class_name="Window" """ (x1_a, y1_a), (x2_a, y2_a) = line_a (x1_b, y1_b), (x2_b, y2_b) = line_b # Create rectangle from the two parallel lines # Use the endpoints of both lines as corners points = [ (int(x1_a), int(y1_a)), (int(x2_a), int(y2_a)), (int(x2_b), int(y2_b)), (int(x1_b), int(y1_b)) ] # Compute area and bbox points_array = np.array(points) area = float(cv2.contourArea(points_array)) # Handle degenerate case where contour area is 0 if area == 0: area = 1.0 x_coords = points_array[:, 0] y_coords = points_array[:, 1] bbox = ( int(x_coords.min()), int(y_coords.min()), int(x_coords.max() - x_coords.min()), int(y_coords.max() - y_coords.min()) ) return WallPolygon( class_id=2, class_name="Window", points=points, area=area, bbox=bbox, confidence=1.0 )