Spaces:
Sleeping
Sleeping
| """ | |
| room_graph.py | |
| ------------- | |
| Builds a room connectivity graph from vectorized floor plan polygons. | |
| The graph encodes: | |
| - Nodes: rooms (with metadata β type, area, centroid) | |
| - Edges: connections between rooms via doors or shared walls | |
| This structured representation is the bridge between 2D geometry | |
| and the 3D reconstruction in Phase 4. | |
| Usage: | |
| from src.geometry.room_graph import RoomGraphBuilder | |
| builder = RoomGraphBuilder() | |
| graph = builder.build(vectorization_result, scale_estimate) | |
| print(graph.summary) | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from typing import Optional | |
| import math | |
| import numpy as np | |
| import cv2 | |
| # ββ Data structures βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class RoomNode: | |
| """A single room in the floor plan graph.""" | |
| node_id: int | |
| class_name: str | |
| class_id: int | |
| centroid: tuple[float, float] # (x, y) in pixels | |
| area_px: float | |
| area_m2: float # real-world area | |
| bbox: tuple[int, int, int, int] # (x, y, w, h) pixels | |
| polygon: list[tuple[int, int]] # pixel boundary points | |
| def label(self) -> str: | |
| return f"{self.class_name} ({self.area_m2:.1f}mΒ²)" | |
| class RoomEdge: | |
| """A connection between two rooms.""" | |
| node_a: int # node_id of room A | |
| node_b: int # node_id of room B | |
| edge_type: str # 'door', 'opening', 'shared_wall' | |
| via_element: Optional[int] = None # index of door/window polygon if any | |
| distance_px: float = 0.0 | |
| class FloorPlanGraph: | |
| """Complete room connectivity graph for one floor plan.""" | |
| nodes: list[RoomNode] = field(default_factory=list) | |
| edges: list[RoomEdge] = field(default_factory=list) | |
| scale_method: str = "unknown" | |
| def summary(self) -> dict: | |
| return { | |
| "rooms": len(self.nodes), | |
| "connections": len(self.edges), | |
| "room_types": [n.class_name for n in self.nodes], | |
| "total_area_m2": round(sum(n.area_m2 for n in self.nodes), 2), | |
| "scale_method": self.scale_method, | |
| } | |
| def get_node(self, node_id: int) -> Optional[RoomNode]: | |
| for node in self.nodes: | |
| if node.node_id == node_id: | |
| return node | |
| return None | |
| def get_neighbours(self, node_id: int) -> list[RoomNode]: | |
| neighbour_ids = set() | |
| for edge in self.edges: | |
| if edge.node_a == node_id: | |
| neighbour_ids.add(edge.node_b) | |
| elif edge.node_b == node_id: | |
| neighbour_ids.add(edge.node_a) | |
| return [self.get_node(nid) for nid in neighbour_ids if self.get_node(nid)] | |
| def to_dict(self) -> dict: | |
| """Serialisable representation for saving/loading.""" | |
| return { | |
| "nodes": [ | |
| { | |
| "id": n.node_id, | |
| "class_name": n.class_name, | |
| "class_id": n.class_id, | |
| "centroid": list(n.centroid), | |
| "area_px": n.area_px, | |
| "area_m2": n.area_m2, | |
| "bbox": list(n.bbox), | |
| "polygon": n.polygon, | |
| } | |
| for n in self.nodes | |
| ], | |
| "edges": [ | |
| { | |
| "node_a": e.node_a, | |
| "node_b": e.node_b, | |
| "edge_type": e.edge_type, | |
| "distance": e.distance_px, | |
| } | |
| for e in self.edges | |
| ], | |
| "summary": self.summary, | |
| } | |
| # ββ Graph builder βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class RoomGraphBuilder: | |
| """ | |
| Builds a room connectivity graph from Phase 3 vectorization output. | |
| Connection strategy: | |
| 1. Door proximity β if a door polygon centroid is between two room | |
| centroids, connect those rooms with a 'door' edge. | |
| 2. Centroid proximity β rooms whose centroids are within | |
| proximity_threshold pixels are connected with a 'shared_wall' edge. | |
| Args: | |
| proximity_threshold: Max distance between room centroids to infer | |
| adjacency (pixels). Scaled with image size. | |
| door_proximity: Max distance from a door to a room centroid | |
| to consider the door as connecting that room. | |
| """ | |
| def __init__( | |
| self, | |
| proximity_threshold: int = 200, | |
| door_proximity: int = 150, | |
| ): | |
| self.proximity_threshold = proximity_threshold | |
| self.door_proximity = door_proximity | |
| def build( | |
| self, | |
| vectorization_result, | |
| scale_estimate=None, | |
| ) -> FloorPlanGraph: | |
| """ | |
| Build the room graph from a VectorizationResult. | |
| Args: | |
| vectorization_result: Output from WallVectorizer.extract() | |
| scale_estimate: Output from ScaleEstimator.estimate() | |
| Returns: | |
| FloorPlanGraph with nodes and edges. | |
| """ | |
| graph = FloorPlanGraph( | |
| scale_method=scale_estimate.method if scale_estimate else "none" | |
| ) | |
| # ββ Step 1: Create room nodes ββββββββββββββββββββββββββββββββββββββ | |
| for i, room in enumerate(vectorization_result.rooms): | |
| area_m2 = 0.0 | |
| if scale_estimate: | |
| area_m2 = scale_estimate.area_px_to_m2(room.area) | |
| cx, cy = room.centroid | |
| node = RoomNode( | |
| node_id=i, | |
| class_name=room.class_name, | |
| class_id=room.class_id, | |
| centroid=(cx, cy), | |
| area_px=room.area, | |
| area_m2=round(area_m2, 2), | |
| bbox=room.bbox, | |
| polygon=room.points, | |
| ) | |
| graph.nodes.append(node) | |
| if not graph.nodes: | |
| return graph | |
| # ββ Step 2: Connect rooms via doors βββββββββββββββββββββββββββββββ | |
| for door_idx, door in enumerate(vectorization_result.doors): | |
| door_cx, door_cy = door.centroid | |
| nearby_rooms = [] | |
| for node in graph.nodes: | |
| dist = _euclidean(door_cx, door_cy, *node.centroid) | |
| if dist <= self.door_proximity: | |
| nearby_rooms.append((dist, node.node_id)) | |
| nearby_rooms.sort() | |
| if len(nearby_rooms) >= 2: | |
| _, id_a = nearby_rooms[0] | |
| _, id_b = nearby_rooms[1] | |
| if id_a != id_b and not _edge_exists(graph, id_a, id_b): | |
| graph.edges.append(RoomEdge( | |
| node_a=id_a, | |
| node_b=id_b, | |
| edge_type="door", | |
| via_element=door_idx, | |
| distance_px=nearby_rooms[1][0], | |
| )) | |
| # ββ Step 3: Connect adjacent rooms by centroid proximity ββββββββββ | |
| n = len(graph.nodes) | |
| for i in range(n): | |
| for j in range(i + 1, n): | |
| node_a = graph.nodes[i] | |
| node_b = graph.nodes[j] | |
| dist = _euclidean(*node_a.centroid, *node_b.centroid) | |
| if dist <= self.proximity_threshold: | |
| if not _edge_exists(graph, node_a.node_id, node_b.node_id): | |
| graph.edges.append(RoomEdge( | |
| node_a=node_a.node_id, | |
| node_b=node_b.node_id, | |
| edge_type="shared_wall", | |
| distance_px=dist, | |
| )) | |
| return graph | |
| def draw( | |
| self, | |
| image: np.ndarray, | |
| graph: FloorPlanGraph, | |
| ) -> np.ndarray: | |
| """ | |
| Visualise the room graph overlaid on the floor plan image. | |
| Nodes drawn as circles with room labels. | |
| Edges drawn as lines between centroids, coloured by type. | |
| """ | |
| if len(image.shape) == 2: | |
| canvas = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) | |
| else: | |
| canvas = image.copy() | |
| edge_colors = { | |
| "door": (50, 200, 200), | |
| "shared_wall": (150, 150, 150), | |
| "opening": (200, 180, 50), | |
| } | |
| # Draw edges first (behind nodes) | |
| for edge in graph.edges: | |
| node_a = graph.get_node(edge.node_a) | |
| node_b = graph.get_node(edge.node_b) | |
| if node_a and node_b: | |
| color = edge_colors.get(edge.edge_type, (200, 200, 200)) | |
| pt_a = (int(node_a.centroid[0]), int(node_a.centroid[1])) | |
| pt_b = (int(node_b.centroid[0]), int(node_b.centroid[1])) | |
| cv2.line(canvas, pt_a, pt_b, color, 2) | |
| # Label edge type at midpoint | |
| mid = ((pt_a[0] + pt_b[0]) // 2, (pt_a[1] + pt_b[1]) // 2) | |
| cv2.putText(canvas, edge.edge_type, mid, | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.3, color, 1) | |
| # Draw nodes | |
| for node in graph.nodes: | |
| cx, cy = int(node.centroid[0]), int(node.centroid[1]) | |
| cv2.circle(canvas, (cx, cy), 12, (50, 50, 200), -1) | |
| cv2.circle(canvas, (cx, cy), 12, (255, 255, 255), 2) | |
| cv2.putText( | |
| canvas, | |
| f"{node.class_name}", | |
| (cx - 30, cy - 16), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1, | |
| ) | |
| cv2.putText( | |
| canvas, | |
| f"{node.area_m2:.1f}mΒ²", | |
| (cx - 20, cy + 26), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.35, (200, 220, 200), 1, | |
| ) | |
| return canvas | |
| # ββ Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _euclidean(x1, y1, x2, y2) -> float: | |
| return math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2) | |
| def _edge_exists(graph: FloorPlanGraph, id_a: int, id_b: int) -> bool: | |
| for edge in graph.edges: | |
| if (edge.node_a == id_a and edge.node_b == id_b) or \ | |
| (edge.node_a == id_b and edge.node_b == id_a): | |
| return True | |
| return False | |