import numpy as np import networkx as nx import matplotlib.pyplot as plt import random import time class NetworkGenerator: def __init__(self, size="S", variant="F", topology="highly_connected", node_drop_fraction=0.2, bottleneck_cluster_count=None, bottleneck_edges_per_link=1): """ node_drop_fraction: Fraction of all (grid+1)^2 possible positions that are deactivated (not allowed as nodes). Example: 0.2 -> remove 1/5 of all grid positions. bottleneck_cluster_count: If None, chosen automatically based on size. Larger => more small dense clusters. bottleneck_edges_per_link: Number of edges connecting consecutive clusters (these are the bottlenecks). Keep this small (1 or 2) to preserve bottleneck behavior. """ self.size = size.upper() self.variant = variant.upper() self.topology = topology.lower() if self.topology not in ["highly_connected", "bottlenecks", "linear"]: raise ValueError("topology must be: 'highly_connected', 'bottlenecks', or 'linear'") self.size_config = { "S": {"grid": 4, "node_factor": 0.4, "diag_weights": [1, 4]}, "M": {"grid": 8, "node_factor": 0.4, "diag_weights": [1, 4]}, "L": {"grid": 16, "node_factor": 0.4, "diag_weights": [1, 8]}, } if self.size not in self.size_config: raise ValueError("Invalid size. Choose 'S', 'M', or 'L'.") if self.variant not in ["F", "R"]: raise ValueError("Invalid variant. Choose 'F' (fixed) or 'R' (random).") self.grid_size = self.size_config[self.size]["grid"] self.node_factor = self.size_config[self.size]["node_factor"] self.weight_dist = self.size_config[self.size]["diag_weights"] self.node_drop_fraction = float(node_drop_fraction) if not (0.0 <= self.node_drop_fraction < 1.0): raise ValueError("node_drop_fraction must be in [0.0, 1.0).") if bottleneck_cluster_count is None: self.bottleneck_cluster_count = {"S": 3, "M": 5, "L": 8}[self.size] else: self.bottleneck_cluster_count = int(bottleneck_cluster_count) if self.bottleneck_cluster_count < 2: raise ValueError("bottleneck_cluster_count must be >= 2.") self.bottleneck_edges_per_link = int(bottleneck_edges_per_link) if self.bottleneck_edges_per_link < 1: raise ValueError("bottleneck_edges_per_link must be >= 1.") self.graph = None self.nodes_list = None self.active_positions = None # allowed grid positions # -------------------- # Public API # -------------------- def generate(self): """Generate a connected network representing rooms in a building.""" max_attempts = 8 for _ in range(max_attempts): self._build_node_mask() self._initialize_graph() self._add_nodes() nodes = list(self.graph.nodes()) if len(nodes) < 2: continue # Topology-specific edge construction if self.topology == "bottlenecks": # Replace the usual step-1 connectivity with a cluster+bottleneck design. self._build_bottleneck_clusters(nodes) else: # --- STEP 1: CONNECTIVITY (NEARBY ROOMS ONLY) --- self._connect_all_nodes_by_nearby_growth(nodes) # --- STEP 2: ADD TOPOLOGY-SPECIFIC EXTRA EDGES --- self._add_edges() # --- STEP 3: REMOVE INTERSECTIONS & RECONNECT --- self._remove_intersections() # --- STEP 4: FINAL CONNECTIVITY CHECK --- if nx.is_connected(self.graph): return self.graph raise RuntimeError("Failed to generate a connected network after several attempts") def plot(self): plt.figure(figsize=(8, 8)) pos = {node: (node[1], -node[0]) for node in self.graph.nodes()} nx.draw(self.graph, pos, with_labels=True, node_size=300, font_size=8) plt.title(f"Generated Network ({self.size}, {self.variant}, {self.topology})") plt.grid(True) plt.show() # -------------------- # Modification 1: deactivate 1/5 of all possible nodes # -------------------- def _build_node_mask(self): """Deactivate node_drop_fraction of all (grid+1)^2 positions.""" all_positions = [ (x, y) for x in range(self.grid_size + 1) for y in range(self.grid_size + 1) ] drop = int(self.node_drop_fraction * len(all_positions)) deactivated = set(random.sample(all_positions, drop)) if drop > 0 else set() self.active_positions = set(all_positions) - deactivated # -------------------- # Node initialization and placement # -------------------- def _initialize_graph(self): self.graph = nx.Graph() # Prefer to seed from the middle region, but only from active positions. margin = max(1, self.grid_size // 4) low, high = margin, self.grid_size - margin middle_active = [(x, y) for (x, y) in self.active_positions if low <= x <= high and low <= y <= high] if middle_active: seed = random.choice(middle_active) else: seed = random.choice(list(self.active_positions)) coords = np.array([seed[0], seed[1]]) flags = np.zeros(4, dtype=int) self.nodes_list = [[coords, flags]] self.graph.add_node(tuple(coords)) def _compute_nodes(self): total_possible = (self.grid_size + 1) ** 2 # Important: total_possible is still the full grid size; # the mask reduces available positions and _add_nodes enforces that. if self.variant == "F": return int(self.node_factor * total_possible) return int(random.uniform(0.4, 0.7) * total_possible) def _add_nodes(self): """Place nodes mostly in the middle region (cluster logic), respecting active_positions.""" total_nodes = self._compute_nodes() margin = max(1, self.grid_size // 4) low, high = margin, self.grid_size - margin attempts = 0 while len(self.graph.nodes()) < total_nodes and attempts < 8000: attempts += 1 x = random.randint(low, high) y = random.randint(low, high) if (x, y) not in self.active_positions: continue if (x, y) not in self.graph: self.graph.add_node((x, y)) # -------------------- # Connectivity for non-bottleneck modes # -------------------- def _connect_all_nodes_by_nearby_growth(self, nodes): """Original connectivity step (nearby growth), unchanged except refactoring.""" connected = set() remaining = set(nodes) current = random.choice(nodes) connected.add(current) remaining.remove(current) while remaining: candidates = [ n for n in remaining if any(abs(n[0] - c[0]) <= 2 and abs(n[1] - c[1]) <= 2 for c in connected) ] candidate = random.choice(candidates) if candidates else random.choice(list(remaining)) neighbors = [ c for c in connected if abs(c[0] - candidate[0]) <= 2 and abs(c[1] - candidate[1]) <= 2 ] n = random.choice(neighbors) if neighbors else random.choice(list(connected)) # Keep your existing intersection checks (but connectivity is forced anyway) if n[0] == candidate[0] or n[1] == candidate[1]: _ = self._straight_edge_intersects(n, candidate) elif abs(n[0] - candidate[0]) == abs(n[1] - candidate[1]): _ = self._diagonal_intersects(n, candidate) self.graph.add_edge(n, candidate) connected.add(candidate) remaining.remove(candidate) # -------------------- # Modification 2: Bottleneck = multiple small dense clusters connected by bottleneck edges # -------------------- def _build_bottleneck_clusters(self, nodes): """ Build a number of small, internally dense "grids" (clusters), then connect clusters with a small number of inter-cluster edges which become the bottlenecks. """ # Clear any edges that might exist (seed node has no edges, but be explicit) self.graph.remove_edges_from(list(self.graph.edges())) clusters, centers = self._spatial_cluster_nodes(nodes, k=self.bottleneck_cluster_count) # Make each cluster internally dense. # We do "dense-without-intersections-when-possible" using your dense edge routine on subsets. for cluster in clusters: if len(cluster) < 2: continue # Ensure cluster is connected first using nearby growth inside the cluster self._connect_cluster_by_nearby_growth(cluster) # Then densify within the cluster max_edges = max(1, int(3.0 * len(cluster))) # dense-ish without becoming fully complete self._add_cluster_dense(list(cluster), max_edges=max_edges) # Connect clusters in a chain (or near-chain) by centers. order = sorted(range(len(clusters)), key=lambda i: (centers[i][0], centers[i][1])) for a_idx, b_idx in zip(order[:-1], order[1:]): self._add_bottleneck_links(clusters[a_idx], clusters[b_idx], self.bottleneck_edges_per_link) # If something ended up disconnected (e.g., tiny clusters), reconnect lightly if not nx.is_connected(self.graph): self._attempt_reconnect_components(prefer_max_distance=2) def _spatial_cluster_nodes(self, nodes, k): """ Simple spatial clustering: - pick k random centers - assign each node to closest center by Chebyshev distance - return clusters + centers """ def cheb(a, b): return max(abs(a[0] - b[0]), abs(a[1] - b[1])) nodes = list(nodes) if k >= len(nodes): # each node its own cluster (degenerate) return [[n] for n in nodes], nodes[:] centers = random.sample(nodes, k) clusters = [[] for _ in range(k)] for n in nodes: best_i = min(range(k), key=lambda i: cheb(n, centers[i])) clusters[best_i].append(n) # Recompute centers as medoid-ish: pick node closest to mean new_centers = [] for c in clusters: if not c: new_centers.append(random.choice(nodes)) continue mx = sum(p[0] for p in c) / len(c) my = sum(p[1] for p in c) / len(c) new_centers.append(min(c, key=lambda p: (p[0] - mx) ** 2 + (p[1] - my) ** 2)) # Remove empty clusters by merging them into nearest non-empty cluster non_empty = [(c, ctr) for c, ctr in zip(clusters, new_centers) if len(c) > 0] clusters = [c for c, _ in non_empty] centers = [ctr for _, ctr in non_empty] return clusters, centers def _connect_cluster_by_nearby_growth(self, cluster_nodes): """Connectivity step restricted to a cluster.""" cluster_nodes = list(cluster_nodes) connected = set() remaining = set(cluster_nodes) current = random.choice(cluster_nodes) connected.add(current) remaining.remove(current) while remaining: candidates = [ n for n in remaining if any(abs(n[0] - c[0]) <= 2 and abs(n[1] - c[1]) <= 2 for c in connected) ] candidate = random.choice(candidates) if candidates else random.choice(list(remaining)) neighbors = [ c for c in connected if abs(c[0] - candidate[0]) <= 2 and abs(c[1] - candidate[1]) <= 2 ] n = random.choice(neighbors) if neighbors else random.choice(list(connected)) self.graph.add_edge(n, candidate) connected.add(candidate) remaining.remove(candidate) def _add_bottleneck_links(self, cluster_a, cluster_b, m): """ Add m inter-cluster edges as bottlenecks. Keep m small. Prefer edges that do not create intersections, but will force-connect if needed. """ cluster_a = list(cluster_a) cluster_b = list(cluster_b) def cheb(a, b): return max(abs(a[0] - b[0]), abs(a[1] - b[1])) # Candidate pairs sorted by distance pairs = [] for u in cluster_a: for v in cluster_b: pairs.append((cheb(u, v), u, v)) pairs.sort(key=lambda t: t[0]) added = 0 used = set() for _, u, v in pairs: if added >= m: break if (u, v) in used or (v, u) in used: continue if self.graph.has_edge(u, v): continue # Prefer non-intersecting links if not self._would_create_intersection(u, v): self.graph.add_edge(u, v) used.add((u, v)) added += 1 # If we couldn't add enough without intersections, force the closest remaining if added < m: for _, u, v in pairs: if added >= m: break if self.graph.has_edge(u, v): continue self.graph.add_edge(u, v) added += 1 # -------------------- # Topology-specific extra edges (non-bottleneck modes) # -------------------- def _compute_edge_count(self): total_nodes = len(self.graph.nodes()) if self.variant == "F": return int(1.5 * total_nodes) return int(random.uniform(1.5, 2.5) * total_nodes) def _add_edges(self): nodes = list(self.graph.nodes()) total_edges = self._compute_edge_count() if self.topology == "highly_connected": self._add_cluster_dense(nodes, total_edges) elif self.topology == "linear": self._make_linear(nodes) # Note: bottlenecks are built in _build_bottleneck_clusters(), not here. # -------------------- # Dense / sparse edge routines (existing) # -------------------- def _add_cluster_dense(self, nodes, max_edges): edges_added = 0 nodes = list(nodes) random.shuffle(nodes) for i in range(len(nodes)): for j in range(i + 1, len(nodes)): if edges_added >= max_edges: return n1, n2 = nodes[i], nodes[j] # Straight edge if (n1[0] == n2[0] or n1[1] == n2[1]): if not self._straight_edge_intersects(n1, n2): self.graph.add_edge(n1, n2) edges_added += 1 continue # Diagonal edge if abs(n1[0] - n2[0]) == abs(n1[1] - n2[1]): if not self._diagonal_intersects(n1, n2): self.graph.add_edge(n1, n2) edges_added += 1 def _make_linear(self, nodes): nodes_sorted = sorted(nodes, key=lambda x: (x[0], x[1])) if not nodes_sorted: return prev = nodes_sorted[0] for nxt in nodes_sorted[1:]: x1, y1 = prev x2, y2 = nxt if x1 == x2 or y1 == y2: self.graph.add_edge(prev, nxt) prev = nxt else: if x1 != x2: step = (x1 + (1 if x2 > x1 else -1), y1) if step in nodes: self.graph.add_edge(prev, step) self.graph.add_edge(step, nxt) prev = nxt continue if y1 != y2: step = (x1, y1 + (1 if y2 > y1 else -1)) if step in nodes: self.graph.add_edge(prev, step) self.graph.add_edge(step, nxt) prev = nxt continue for node in nodes_sorted: if random.random() < 0.15: x, y = node candidates = [(x + 1, y), (x - 1, y), (x, y + 1), (x, y - 1)] random.shuffle(candidates) for c in candidates: if c in nodes and not self.graph.has_edge(node, c): if self.graph.degree(node) < 3 and self.graph.degree(c) < 3: self.graph.add_edge(node, c) break # -------------------- # Intersection checks (existing + used by reconnect) # -------------------- def _straight_edge_intersects(self, n1, n2): x1, y1 = n1 x2, y2 = n2 if not (x1 == x2 or y1 == y2): return True if (x1, y1) > (x2, y2): n1, n2 = n2, n1 x1, y1 = n1 x2, y2 = n2 for a, b in self.graph.edges(): if {a, b} == {n1, n2}: continue ax, ay = a bx, by = b if y1 == y2: # horizontal if ay == by == y1: if max(ax, bx) >= min(x1, x2) and min(ax, bx) <= max(x1, x2): return True if x1 == x2: # vertical if ax == bx == x1: if max(ay, by) >= min(y1, y2) and min(ay, by) <= max(y1, y2): return True return False def _diagonal_intersects(self, n1, n2): x1, y1 = n1 x2, y2 = n2 for a, b in self.graph.edges(): ax, ay = a bx, by = b if abs(ax - bx) == abs(ay - by): if not (max(x1, x2) < min(ax, bx) or min(x1, x2) > max(ax, bx)): if not (max(y1, y2) < min(ay, by) or min(y1, y2) > max(ay, by)): return True return False def _orientation(self, p, q, r): (px, py), (qx, qy), (rx, ry) = p, q, r val = (qy - py) * (rx - qx) - (qx - px) * (ry - qy) if val == 0: return 0 return 1 if val > 0 else 2 def _on_segment(self, p, q, r): (px, py), (qx, qy), (rx, ry) = p, q, r return (min(px, rx) <= qx <= max(px, rx) and min(py, ry) <= qy <= max(py, ry)) def _segments_intersect(self, a, b, c, d): if a in (c, d) or b in (c, d): return False o1 = self._orientation(a, b, c) o2 = self._orientation(a, b, d) o3 = self._orientation(c, d, a) o4 = self._orientation(c, d, b) if o1 != o2 and o3 != o4: return True if o1 == 0 and self._on_segment(a, c, b): return True if o2 == 0 and self._on_segment(a, d, b): return True if o3 == 0 and self._on_segment(c, a, d): return True if o4 == 0 and self._on_segment(c, b, d): return True return False def _would_create_intersection(self, u, v): for x, y in self.graph.edges(): if u in (x, y) or v in (x, y): continue if self._segments_intersect(u, v, x, y): return True return False def _remove_intersections(self): max_passes = 10 pass_no = 0 total_removed = 0 while pass_no < max_passes: pass_no += 1 edges = list(self.graph.edges()) intersections = [] for i in range(len(edges)): a, b = edges[i] for j in range(i + 1, len(edges)): c, d = edges[j] if self._segments_intersect(a, b, c, d): intersections.append((a, b, c, d)) if not intersections: break removed_this_pass = 0 for a, b, c, d in intersections: if not self.graph.has_edge(a, b) or not self.graph.has_edge(c, d): continue len1 = (a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2 len2 = (c[0] - d[0]) ** 2 + (c[1] - d[1]) ** 2 if len1 >= len2: try: self.graph.remove_edge(a, b) removed_this_pass += 1 except Exception: pass else: try: self.graph.remove_edge(c, d) removed_this_pass += 1 except Exception: pass total_removed += removed_this_pass self._attempt_reconnect_components(prefer_max_distance=2) if not nx.is_connected(self.graph): self._attempt_reconnect_components(prefer_max_distance=self.grid_size) final_edges = list(self.graph.edges()) for i in range(len(final_edges)): a, b = final_edges[i] for j in range(i + 1, len(final_edges)): c, d = final_edges[j] if self._segments_intersect(a, b, c, d): len1 = (a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2 len2 = (c[0] - d[0]) ** 2 + (c[1] - d[1]) ** 2 if len1 >= len2 and self.graph.has_edge(a, b): self.graph.remove_edge(a, b) total_removed += 1 elif self.graph.has_edge(c, d): self.graph.remove_edge(c, d) total_removed += 1 print(f"[cleanup] Removed {total_removed} intersecting edges after {pass_no} passes.") def _attempt_reconnect_components(self, prefer_max_distance=2): comps = list(nx.connected_components(self.graph)) if len(comps) <= 1: return def cheb(a, b): return max(abs(a[0] - b[0]), abs(a[1] - b[1])) comp_nodes = [list(c) for c in comps] max_relax = self.grid_size relax = prefer_max_distance while relax <= max_relax and len(comp_nodes) > 1: made_connection = False i = 0 while i < len(comp_nodes) - 1: j = i + 1 connected_this_round = False while j < len(comp_nodes): best_pair = None best_dist = None for u in comp_nodes[i]: for v in comp_nodes[j]: if u == v: continue d = cheb(u, v) if d <= relax and (best_dist is None or d < best_dist): best_pair = (u, v) best_dist = d if best_pair is not None: u, v = best_pair if not self.graph.has_edge(u, v): if not self._would_create_intersection(u, v): self.graph.add_edge(u, v) else: # force if no clean option self.graph.add_edge(u, v) made_connection = True connected_this_round = True comp_nodes[i].extend(comp_nodes[j]) comp_nodes.pop(j) break else: j += 1 if not connected_this_round: i += 1 if not made_connection: relax += 1 else: comps = list(nx.connected_components(self.graph)) comp_nodes = [list(c) for c in comps]