Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import networkx as nx | |
| import matplotlib.pyplot as plt | |
| import random | |
| import time | |
| class NetworkGenerator: | |
| def __init__(self, size="S", variant="F", topology="highly_connected", | |
| node_drop_fraction=0.2, | |
| bottleneck_cluster_count=None, | |
| bottleneck_edges_per_link=1): | |
| """ | |
| node_drop_fraction: | |
| Fraction of all (grid+1)^2 possible positions that are deactivated (not allowed as nodes). | |
| Example: 0.2 -> remove 1/5 of all grid positions. | |
| bottleneck_cluster_count: | |
| If None, chosen automatically based on size. | |
| Larger => more small dense clusters. | |
| bottleneck_edges_per_link: | |
| Number of edges connecting consecutive clusters (these are the bottlenecks). | |
| Keep this small (1 or 2) to preserve bottleneck behavior. | |
| """ | |
| self.size = size.upper() | |
| self.variant = variant.upper() | |
| self.topology = topology.lower() | |
| if self.topology not in ["highly_connected", "bottlenecks", "linear"]: | |
| raise ValueError("topology must be: 'highly_connected', 'bottlenecks', or 'linear'") | |
| self.size_config = { | |
| "S": {"grid": 4, "node_factor": 0.4, "diag_weights": [1, 4]}, | |
| "M": {"grid": 8, "node_factor": 0.4, "diag_weights": [1, 4]}, | |
| "L": {"grid": 16, "node_factor": 0.4, "diag_weights": [1, 8]}, | |
| } | |
| if self.size not in self.size_config: | |
| raise ValueError("Invalid size. Choose 'S', 'M', or 'L'.") | |
| if self.variant not in ["F", "R"]: | |
| raise ValueError("Invalid variant. Choose 'F' (fixed) or 'R' (random).") | |
| self.grid_size = self.size_config[self.size]["grid"] | |
| self.node_factor = self.size_config[self.size]["node_factor"] | |
| self.weight_dist = self.size_config[self.size]["diag_weights"] | |
| self.node_drop_fraction = float(node_drop_fraction) | |
| if not (0.0 <= self.node_drop_fraction < 1.0): | |
| raise ValueError("node_drop_fraction must be in [0.0, 1.0).") | |
| if bottleneck_cluster_count is None: | |
| self.bottleneck_cluster_count = {"S": 3, "M": 5, "L": 8}[self.size] | |
| else: | |
| self.bottleneck_cluster_count = int(bottleneck_cluster_count) | |
| if self.bottleneck_cluster_count < 2: | |
| raise ValueError("bottleneck_cluster_count must be >= 2.") | |
| self.bottleneck_edges_per_link = int(bottleneck_edges_per_link) | |
| if self.bottleneck_edges_per_link < 1: | |
| raise ValueError("bottleneck_edges_per_link must be >= 1.") | |
| self.graph = None | |
| self.nodes_list = None | |
| self.active_positions = None # allowed grid positions | |
| # -------------------- | |
| # Public API | |
| # -------------------- | |
| def generate(self): | |
| """Generate a connected network representing rooms in a building.""" | |
| max_attempts = 8 | |
| for _ in range(max_attempts): | |
| self._build_node_mask() | |
| self._initialize_graph() | |
| self._add_nodes() | |
| nodes = list(self.graph.nodes()) | |
| if len(nodes) < 2: | |
| continue | |
| # Topology-specific edge construction | |
| if self.topology == "bottlenecks": | |
| # Replace the usual step-1 connectivity with a cluster+bottleneck design. | |
| self._build_bottleneck_clusters(nodes) | |
| else: | |
| # --- STEP 1: CONNECTIVITY (NEARBY ROOMS ONLY) --- | |
| self._connect_all_nodes_by_nearby_growth(nodes) | |
| # --- STEP 2: ADD TOPOLOGY-SPECIFIC EXTRA EDGES --- | |
| self._add_edges() | |
| # --- STEP 3: REMOVE INTERSECTIONS & RECONNECT --- | |
| self._remove_intersections() | |
| # --- STEP 4: FINAL CONNECTIVITY CHECK --- | |
| if nx.is_connected(self.graph): | |
| return self.graph | |
| raise RuntimeError("Failed to generate a connected network after several attempts") | |
| def plot(self): | |
| plt.figure(figsize=(8, 8)) | |
| pos = {node: (node[1], -node[0]) for node in self.graph.nodes()} | |
| nx.draw(self.graph, pos, with_labels=True, node_size=300, font_size=8) | |
| plt.title(f"Generated Network ({self.size}, {self.variant}, {self.topology})") | |
| plt.grid(True) | |
| plt.show() | |
| # -------------------- | |
| # Modification 1: deactivate 1/5 of all possible nodes | |
| # -------------------- | |
| def _build_node_mask(self): | |
| """Deactivate node_drop_fraction of all (grid+1)^2 positions.""" | |
| all_positions = [ | |
| (x, y) | |
| for x in range(self.grid_size + 1) | |
| for y in range(self.grid_size + 1) | |
| ] | |
| drop = int(self.node_drop_fraction * len(all_positions)) | |
| deactivated = set(random.sample(all_positions, drop)) if drop > 0 else set() | |
| self.active_positions = set(all_positions) - deactivated | |
| # -------------------- | |
| # Node initialization and placement | |
| # -------------------- | |
| def _initialize_graph(self): | |
| self.graph = nx.Graph() | |
| # Prefer to seed from the middle region, but only from active positions. | |
| margin = max(1, self.grid_size // 4) | |
| low, high = margin, self.grid_size - margin | |
| middle_active = [(x, y) for (x, y) in self.active_positions if low <= x <= high and low <= y <= high] | |
| if middle_active: | |
| seed = random.choice(middle_active) | |
| else: | |
| seed = random.choice(list(self.active_positions)) | |
| coords = np.array([seed[0], seed[1]]) | |
| flags = np.zeros(4, dtype=int) | |
| self.nodes_list = [[coords, flags]] | |
| self.graph.add_node(tuple(coords)) | |
| def _compute_nodes(self): | |
| total_possible = (self.grid_size + 1) ** 2 | |
| # Important: total_possible is still the full grid size; | |
| # the mask reduces available positions and _add_nodes enforces that. | |
| if self.variant == "F": | |
| return int(self.node_factor * total_possible) | |
| return int(random.uniform(0.4, 0.7) * total_possible) | |
| def _add_nodes(self): | |
| """Place nodes mostly in the middle region (cluster logic), respecting active_positions.""" | |
| total_nodes = self._compute_nodes() | |
| margin = max(1, self.grid_size // 4) | |
| low, high = margin, self.grid_size - margin | |
| attempts = 0 | |
| while len(self.graph.nodes()) < total_nodes and attempts < 8000: | |
| attempts += 1 | |
| x = random.randint(low, high) | |
| y = random.randint(low, high) | |
| if (x, y) not in self.active_positions: | |
| continue | |
| if (x, y) not in self.graph: | |
| self.graph.add_node((x, y)) | |
| # -------------------- | |
| # Connectivity for non-bottleneck modes | |
| # -------------------- | |
| def _connect_all_nodes_by_nearby_growth(self, nodes): | |
| """Original connectivity step (nearby growth), unchanged except refactoring.""" | |
| connected = set() | |
| remaining = set(nodes) | |
| current = random.choice(nodes) | |
| connected.add(current) | |
| remaining.remove(current) | |
| while remaining: | |
| candidates = [ | |
| n for n in remaining | |
| if any(abs(n[0] - c[0]) <= 2 and abs(n[1] - c[1]) <= 2 for c in connected) | |
| ] | |
| candidate = random.choice(candidates) if candidates else random.choice(list(remaining)) | |
| neighbors = [ | |
| c for c in connected | |
| if abs(c[0] - candidate[0]) <= 2 and abs(c[1] - candidate[1]) <= 2 | |
| ] | |
| n = random.choice(neighbors) if neighbors else random.choice(list(connected)) | |
| # Keep your existing intersection checks (but connectivity is forced anyway) | |
| if n[0] == candidate[0] or n[1] == candidate[1]: | |
| _ = self._straight_edge_intersects(n, candidate) | |
| elif abs(n[0] - candidate[0]) == abs(n[1] - candidate[1]): | |
| _ = self._diagonal_intersects(n, candidate) | |
| self.graph.add_edge(n, candidate) | |
| connected.add(candidate) | |
| remaining.remove(candidate) | |
| # -------------------- | |
| # Modification 2: Bottleneck = multiple small dense clusters connected by bottleneck edges | |
| # -------------------- | |
| def _build_bottleneck_clusters(self, nodes): | |
| """ | |
| Build a number of small, internally dense "grids" (clusters), | |
| then connect clusters with a small number of inter-cluster edges | |
| which become the bottlenecks. | |
| """ | |
| # Clear any edges that might exist (seed node has no edges, but be explicit) | |
| self.graph.remove_edges_from(list(self.graph.edges())) | |
| clusters, centers = self._spatial_cluster_nodes(nodes, k=self.bottleneck_cluster_count) | |
| # Make each cluster internally dense. | |
| # We do "dense-without-intersections-when-possible" using your dense edge routine on subsets. | |
| for cluster in clusters: | |
| if len(cluster) < 2: | |
| continue | |
| # Ensure cluster is connected first using nearby growth inside the cluster | |
| self._connect_cluster_by_nearby_growth(cluster) | |
| # Then densify within the cluster | |
| max_edges = max(1, int(3.0 * len(cluster))) # dense-ish without becoming fully complete | |
| self._add_cluster_dense(list(cluster), max_edges=max_edges) | |
| # Connect clusters in a chain (or near-chain) by centers. | |
| order = sorted(range(len(clusters)), key=lambda i: (centers[i][0], centers[i][1])) | |
| for a_idx, b_idx in zip(order[:-1], order[1:]): | |
| self._add_bottleneck_links(clusters[a_idx], clusters[b_idx], self.bottleneck_edges_per_link) | |
| # If something ended up disconnected (e.g., tiny clusters), reconnect lightly | |
| if not nx.is_connected(self.graph): | |
| self._attempt_reconnect_components(prefer_max_distance=2) | |
| def _spatial_cluster_nodes(self, nodes, k): | |
| """ | |
| Simple spatial clustering: | |
| - pick k random centers | |
| - assign each node to closest center by Chebyshev distance | |
| - return clusters + centers | |
| """ | |
| def cheb(a, b): | |
| return max(abs(a[0] - b[0]), abs(a[1] - b[1])) | |
| nodes = list(nodes) | |
| if k >= len(nodes): | |
| # each node its own cluster (degenerate) | |
| return [[n] for n in nodes], nodes[:] | |
| centers = random.sample(nodes, k) | |
| clusters = [[] for _ in range(k)] | |
| for n in nodes: | |
| best_i = min(range(k), key=lambda i: cheb(n, centers[i])) | |
| clusters[best_i].append(n) | |
| # Recompute centers as medoid-ish: pick node closest to mean | |
| new_centers = [] | |
| for c in clusters: | |
| if not c: | |
| new_centers.append(random.choice(nodes)) | |
| continue | |
| mx = sum(p[0] for p in c) / len(c) | |
| my = sum(p[1] for p in c) / len(c) | |
| new_centers.append(min(c, key=lambda p: (p[0] - mx) ** 2 + (p[1] - my) ** 2)) | |
| # Remove empty clusters by merging them into nearest non-empty cluster | |
| non_empty = [(c, ctr) for c, ctr in zip(clusters, new_centers) if len(c) > 0] | |
| clusters = [c for c, _ in non_empty] | |
| centers = [ctr for _, ctr in non_empty] | |
| return clusters, centers | |
| def _connect_cluster_by_nearby_growth(self, cluster_nodes): | |
| """Connectivity step restricted to a cluster.""" | |
| cluster_nodes = list(cluster_nodes) | |
| connected = set() | |
| remaining = set(cluster_nodes) | |
| current = random.choice(cluster_nodes) | |
| connected.add(current) | |
| remaining.remove(current) | |
| while remaining: | |
| candidates = [ | |
| n for n in remaining | |
| if any(abs(n[0] - c[0]) <= 2 and abs(n[1] - c[1]) <= 2 for c in connected) | |
| ] | |
| candidate = random.choice(candidates) if candidates else random.choice(list(remaining)) | |
| neighbors = [ | |
| c for c in connected | |
| if abs(c[0] - candidate[0]) <= 2 and abs(c[1] - candidate[1]) <= 2 | |
| ] | |
| n = random.choice(neighbors) if neighbors else random.choice(list(connected)) | |
| self.graph.add_edge(n, candidate) | |
| connected.add(candidate) | |
| remaining.remove(candidate) | |
| def _add_bottleneck_links(self, cluster_a, cluster_b, m): | |
| """ | |
| Add m inter-cluster edges as bottlenecks. Keep m small. | |
| Prefer edges that do not create intersections, but will force-connect if needed. | |
| """ | |
| cluster_a = list(cluster_a) | |
| cluster_b = list(cluster_b) | |
| def cheb(a, b): | |
| return max(abs(a[0] - b[0]), abs(a[1] - b[1])) | |
| # Candidate pairs sorted by distance | |
| pairs = [] | |
| for u in cluster_a: | |
| for v in cluster_b: | |
| pairs.append((cheb(u, v), u, v)) | |
| pairs.sort(key=lambda t: t[0]) | |
| added = 0 | |
| used = set() | |
| for _, u, v in pairs: | |
| if added >= m: | |
| break | |
| if (u, v) in used or (v, u) in used: | |
| continue | |
| if self.graph.has_edge(u, v): | |
| continue | |
| # Prefer non-intersecting links | |
| if not self._would_create_intersection(u, v): | |
| self.graph.add_edge(u, v) | |
| used.add((u, v)) | |
| added += 1 | |
| # If we couldn't add enough without intersections, force the closest remaining | |
| if added < m: | |
| for _, u, v in pairs: | |
| if added >= m: | |
| break | |
| if self.graph.has_edge(u, v): | |
| continue | |
| self.graph.add_edge(u, v) | |
| added += 1 | |
| # -------------------- | |
| # Topology-specific extra edges (non-bottleneck modes) | |
| # -------------------- | |
| def _compute_edge_count(self): | |
| total_nodes = len(self.graph.nodes()) | |
| if self.variant == "F": | |
| return int(1.5 * total_nodes) | |
| return int(random.uniform(1.5, 2.5) * total_nodes) | |
| def _add_edges(self): | |
| nodes = list(self.graph.nodes()) | |
| total_edges = self._compute_edge_count() | |
| if self.topology == "highly_connected": | |
| self._add_cluster_dense(nodes, total_edges) | |
| elif self.topology == "linear": | |
| self._make_linear(nodes) | |
| # Note: bottlenecks are built in _build_bottleneck_clusters(), not here. | |
| # -------------------- | |
| # Dense / sparse edge routines (existing) | |
| # -------------------- | |
| def _add_cluster_dense(self, nodes, max_edges): | |
| edges_added = 0 | |
| nodes = list(nodes) | |
| random.shuffle(nodes) | |
| for i in range(len(nodes)): | |
| for j in range(i + 1, len(nodes)): | |
| if edges_added >= max_edges: | |
| return | |
| n1, n2 = nodes[i], nodes[j] | |
| # Straight edge | |
| if (n1[0] == n2[0] or n1[1] == n2[1]): | |
| if not self._straight_edge_intersects(n1, n2): | |
| self.graph.add_edge(n1, n2) | |
| edges_added += 1 | |
| continue | |
| # Diagonal edge | |
| if abs(n1[0] - n2[0]) == abs(n1[1] - n2[1]): | |
| if not self._diagonal_intersects(n1, n2): | |
| self.graph.add_edge(n1, n2) | |
| edges_added += 1 | |
| def _make_linear(self, nodes): | |
| nodes_sorted = sorted(nodes, key=lambda x: (x[0], x[1])) | |
| if not nodes_sorted: | |
| return | |
| prev = nodes_sorted[0] | |
| for nxt in nodes_sorted[1:]: | |
| x1, y1 = prev | |
| x2, y2 = nxt | |
| if x1 == x2 or y1 == y2: | |
| self.graph.add_edge(prev, nxt) | |
| prev = nxt | |
| else: | |
| if x1 != x2: | |
| step = (x1 + (1 if x2 > x1 else -1), y1) | |
| if step in nodes: | |
| self.graph.add_edge(prev, step) | |
| self.graph.add_edge(step, nxt) | |
| prev = nxt | |
| continue | |
| if y1 != y2: | |
| step = (x1, y1 + (1 if y2 > y1 else -1)) | |
| if step in nodes: | |
| self.graph.add_edge(prev, step) | |
| self.graph.add_edge(step, nxt) | |
| prev = nxt | |
| continue | |
| for node in nodes_sorted: | |
| if random.random() < 0.15: | |
| x, y = node | |
| candidates = [(x + 1, y), (x - 1, y), (x, y + 1), (x, y - 1)] | |
| random.shuffle(candidates) | |
| for c in candidates: | |
| if c in nodes and not self.graph.has_edge(node, c): | |
| if self.graph.degree(node) < 3 and self.graph.degree(c) < 3: | |
| self.graph.add_edge(node, c) | |
| break | |
| # -------------------- | |
| # Intersection checks (existing + used by reconnect) | |
| # -------------------- | |
| def _straight_edge_intersects(self, n1, n2): | |
| x1, y1 = n1 | |
| x2, y2 = n2 | |
| if not (x1 == x2 or y1 == y2): | |
| return True | |
| if (x1, y1) > (x2, y2): | |
| n1, n2 = n2, n1 | |
| x1, y1 = n1 | |
| x2, y2 = n2 | |
| for a, b in self.graph.edges(): | |
| if {a, b} == {n1, n2}: | |
| continue | |
| ax, ay = a | |
| bx, by = b | |
| if y1 == y2: # horizontal | |
| if ay == by == y1: | |
| if max(ax, bx) >= min(x1, x2) and min(ax, bx) <= max(x1, x2): | |
| return True | |
| if x1 == x2: # vertical | |
| if ax == bx == x1: | |
| if max(ay, by) >= min(y1, y2) and min(ay, by) <= max(y1, y2): | |
| return True | |
| return False | |
| def _diagonal_intersects(self, n1, n2): | |
| x1, y1 = n1 | |
| x2, y2 = n2 | |
| for a, b in self.graph.edges(): | |
| ax, ay = a | |
| bx, by = b | |
| if abs(ax - bx) == abs(ay - by): | |
| if not (max(x1, x2) < min(ax, bx) or min(x1, x2) > max(ax, bx)): | |
| if not (max(y1, y2) < min(ay, by) or min(y1, y2) > max(ay, by)): | |
| return True | |
| return False | |
| def _orientation(self, p, q, r): | |
| (px, py), (qx, qy), (rx, ry) = p, q, r | |
| val = (qy - py) * (rx - qx) - (qx - px) * (ry - qy) | |
| if val == 0: | |
| return 0 | |
| return 1 if val > 0 else 2 | |
| def _on_segment(self, p, q, r): | |
| (px, py), (qx, qy), (rx, ry) = p, q, r | |
| return (min(px, rx) <= qx <= max(px, rx) and | |
| min(py, ry) <= qy <= max(py, ry)) | |
| def _segments_intersect(self, a, b, c, d): | |
| if a in (c, d) or b in (c, d): | |
| return False | |
| o1 = self._orientation(a, b, c) | |
| o2 = self._orientation(a, b, d) | |
| o3 = self._orientation(c, d, a) | |
| o4 = self._orientation(c, d, b) | |
| if o1 != o2 and o3 != o4: | |
| return True | |
| if o1 == 0 and self._on_segment(a, c, b): | |
| return True | |
| if o2 == 0 and self._on_segment(a, d, b): | |
| return True | |
| if o3 == 0 and self._on_segment(c, a, d): | |
| return True | |
| if o4 == 0 and self._on_segment(c, b, d): | |
| return True | |
| return False | |
| def _would_create_intersection(self, u, v): | |
| for x, y in self.graph.edges(): | |
| if u in (x, y) or v in (x, y): | |
| continue | |
| if self._segments_intersect(u, v, x, y): | |
| return True | |
| return False | |
| def _remove_intersections(self): | |
| max_passes = 10 | |
| pass_no = 0 | |
| total_removed = 0 | |
| while pass_no < max_passes: | |
| pass_no += 1 | |
| edges = list(self.graph.edges()) | |
| intersections = [] | |
| for i in range(len(edges)): | |
| a, b = edges[i] | |
| for j in range(i + 1, len(edges)): | |
| c, d = edges[j] | |
| if self._segments_intersect(a, b, c, d): | |
| intersections.append((a, b, c, d)) | |
| if not intersections: | |
| break | |
| removed_this_pass = 0 | |
| for a, b, c, d in intersections: | |
| if not self.graph.has_edge(a, b) or not self.graph.has_edge(c, d): | |
| continue | |
| len1 = (a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2 | |
| len2 = (c[0] - d[0]) ** 2 + (c[1] - d[1]) ** 2 | |
| if len1 >= len2: | |
| try: | |
| self.graph.remove_edge(a, b) | |
| removed_this_pass += 1 | |
| except Exception: | |
| pass | |
| else: | |
| try: | |
| self.graph.remove_edge(c, d) | |
| removed_this_pass += 1 | |
| except Exception: | |
| pass | |
| total_removed += removed_this_pass | |
| self._attempt_reconnect_components(prefer_max_distance=2) | |
| if not nx.is_connected(self.graph): | |
| self._attempt_reconnect_components(prefer_max_distance=self.grid_size) | |
| final_edges = list(self.graph.edges()) | |
| for i in range(len(final_edges)): | |
| a, b = final_edges[i] | |
| for j in range(i + 1, len(final_edges)): | |
| c, d = final_edges[j] | |
| if self._segments_intersect(a, b, c, d): | |
| len1 = (a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2 | |
| len2 = (c[0] - d[0]) ** 2 + (c[1] - d[1]) ** 2 | |
| if len1 >= len2 and self.graph.has_edge(a, b): | |
| self.graph.remove_edge(a, b) | |
| total_removed += 1 | |
| elif self.graph.has_edge(c, d): | |
| self.graph.remove_edge(c, d) | |
| total_removed += 1 | |
| print(f"[cleanup] Removed {total_removed} intersecting edges after {pass_no} passes.") | |
| def _attempt_reconnect_components(self, prefer_max_distance=2): | |
| comps = list(nx.connected_components(self.graph)) | |
| if len(comps) <= 1: | |
| return | |
| def cheb(a, b): | |
| return max(abs(a[0] - b[0]), abs(a[1] - b[1])) | |
| comp_nodes = [list(c) for c in comps] | |
| max_relax = self.grid_size | |
| relax = prefer_max_distance | |
| while relax <= max_relax and len(comp_nodes) > 1: | |
| made_connection = False | |
| i = 0 | |
| while i < len(comp_nodes) - 1: | |
| j = i + 1 | |
| connected_this_round = False | |
| while j < len(comp_nodes): | |
| best_pair = None | |
| best_dist = None | |
| for u in comp_nodes[i]: | |
| for v in comp_nodes[j]: | |
| if u == v: | |
| continue | |
| d = cheb(u, v) | |
| if d <= relax and (best_dist is None or d < best_dist): | |
| best_pair = (u, v) | |
| best_dist = d | |
| if best_pair is not None: | |
| u, v = best_pair | |
| if not self.graph.has_edge(u, v): | |
| if not self._would_create_intersection(u, v): | |
| self.graph.add_edge(u, v) | |
| else: | |
| # force if no clean option | |
| self.graph.add_edge(u, v) | |
| made_connection = True | |
| connected_this_round = True | |
| comp_nodes[i].extend(comp_nodes[j]) | |
| comp_nodes.pop(j) | |
| break | |
| else: | |
| j += 1 | |
| if not connected_this_round: | |
| i += 1 | |
| if not made_connection: | |
| relax += 1 | |
| else: | |
| comps = list(nx.connected_components(self.graph)) | |
| comp_nodes = [list(c) for c in comps] |