GraphGeneratorKIT / graphGen5.py
TahaRasouli's picture
Upload 4 files
1c6109f verified
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
import random
import time
class NetworkGenerator:
def __init__(self, size="S", variant="F", topology="highly_connected",
node_drop_fraction=0.1,
bottleneck_cluster_count=None,
bottleneck_edges_per_link=1):
"""
node_drop_fraction:
Fraction of all (grid+1)^2 possible positions that are deactivated (not allowed as nodes).
Example: 0.2 -> remove 1/5 of all grid positions.
bottleneck_cluster_count:
If None, chosen automatically based on size.
Larger => more small dense clusters.
bottleneck_edges_per_link:
Number of edges connecting consecutive clusters (these are the bottlenecks).
Keep this small (1 or 2) to preserve bottleneck behavior.
"""
self.size = size.upper()
self.variant = variant.upper()
self.topology = topology.lower()
if self.topology not in ["highly_connected", "bottlenecks", "linear"]:
raise ValueError("topology must be: 'highly_connected', 'bottlenecks', or 'linear'")
self.size_config = {
"S": {"grid": 4, "node_factor": 0.4, "diag_weights": [1, 4]},
"M": {"grid": 8, "node_factor": 0.4, "diag_weights": [1, 4]},
"L": {"grid": 16, "node_factor": 0.4, "diag_weights": [1, 8]},
}
if self.size not in self.size_config:
raise ValueError("Invalid size. Choose 'S', 'M', or 'L'.")
if self.variant not in ["F", "R"]:
raise ValueError("Invalid variant. Choose 'F' (fixed) or 'R' (random).")
self.grid_size = self.size_config[self.size]["grid"]
self.node_factor = self.size_config[self.size]["node_factor"]
self.weight_dist = self.size_config[self.size]["diag_weights"]
self.node_drop_fraction = float(node_drop_fraction)
if not (0.0 <= self.node_drop_fraction < 1.0):
raise ValueError("node_drop_fraction must be in [0.0, 1.0).")
if bottleneck_cluster_count is None:
self.bottleneck_cluster_count = {"S": 3, "M": 5, "L": 8}[self.size]
else:
self.bottleneck_cluster_count = int(bottleneck_cluster_count)
if self.bottleneck_cluster_count < 2:
raise ValueError("bottleneck_cluster_count must be >= 2.")
self.bottleneck_edges_per_link = int(bottleneck_edges_per_link)
if self.bottleneck_edges_per_link < 1:
raise ValueError("bottleneck_edges_per_link must be >= 1.")
self.graph = None
self.nodes_list = None
self.active_positions = None # allowed grid positions
# --------------------
# Public API
# --------------------
def generate(self):
"""Generate a connected network representing rooms in a building."""
max_attempts = 8
for _ in range(max_attempts):
self._build_node_mask()
self._initialize_graph()
self._add_nodes()
nodes = list(self.graph.nodes())
if len(nodes) < 2:
continue
# Topology-specific edge construction
if self.topology == "bottlenecks":
# Replace the usual step-1 connectivity with a cluster+bottleneck design.
self._build_bottleneck_clusters(nodes)
else:
# --- STEP 1: CONNECTIVITY (NEARBY ROOMS ONLY) ---
self._connect_all_nodes_by_nearby_growth(nodes)
# --- STEP 2: ADD TOPOLOGY-SPECIFIC EXTRA EDGES ---
self._add_edges()
# --- STEP 3: REMOVE INTERSECTIONS & RECONNECT ---
self._remove_intersections()
self._enforce_edge_budget()
# --- STEP 4: FINAL CONNECTIVITY CHECK ---
if nx.is_connected(self.graph):
return self.graph
raise RuntimeError("Failed to generate a connected network after several attempts")
def plot(self):
plt.figure(figsize=(8, 8))
pos = {node: (node[1], -node[0]) for node in self.graph.nodes()}
nx.draw(self.graph, pos, with_labels=True, node_size=300, font_size=8)
plt.title(f"Generated Network ({self.size}, {self.variant}, {self.topology})")
plt.grid(True)
plt.show()
# --------------------
# Modification 1: deactivate 1/5 of all possible nodes
# --------------------
def _build_node_mask(self):
"""Deactivate node_drop_fraction of all (grid+1)^2 positions."""
all_positions = [
(x, y)
for x in range(self.grid_size + 1)
for y in range(self.grid_size + 1)
]
drop_frac = self._effective_node_drop_fraction()
drop = int(drop_frac * len(all_positions))
deactivated = set(random.sample(all_positions, drop)) if drop > 0 else set()
self.active_positions = set(all_positions) - deactivated
# --------------------
# Node initialization and placement
# --------------------
def _initialize_graph(self):
self.graph = nx.Graph()
# Prefer to seed from the middle region, but only from active positions.
margin = max(1, self.grid_size // 4)
low, high = margin, self.grid_size - margin
middle_active = [(x, y) for (x, y) in self.active_positions if low <= x <= high and low <= y <= high]
if middle_active:
seed = random.choice(middle_active)
else:
seed = random.choice(list(self.active_positions))
coords = np.array([seed[0], seed[1]])
flags = np.zeros(4, dtype=int)
self.nodes_list = [[coords, flags]]
self.graph.add_node(tuple(coords))
def _compute_nodes(self):
total_possible = (self.grid_size + 1) ** 2
if self.variant == "F":
base = self.node_factor
else:
base = random.uniform(0.4, 0.7)
# Topology-specific scaling
if self.topology == "highly_connected":
scale = 1.2
elif self.topology == "bottlenecks":
scale = 0.85
elif self.topology == "linear":
scale = 0.75
else:
scale = 1.0
return int(base * scale * total_possible)
def _add_nodes(self):
"""Place nodes mostly in the middle region (cluster logic), respecting active_positions."""
total_nodes = self._compute_nodes()
margin = max(1, self.grid_size // 4)
low, high = margin, self.grid_size - margin
attempts = 0
while len(self.graph.nodes()) < total_nodes and attempts < 8000:
attempts += 1
x = random.randint(low, high)
y = random.randint(low, high)
if (x, y) not in self.active_positions:
continue
if (x, y) not in self.graph:
self.graph.add_node((x, y))
def _compute_edge_budget(self):
"""
Hard cap on number of edges to make topology differences explicit.
"""
n = len(self.graph.nodes())
if self.topology == "highly_connected":
# Dense graph
if self.variant == "F":
return int(2.8 * n)
return int(random.uniform(2.5, 3.2) * n)
if self.topology == "bottlenecks":
# Sparse, cluster-based
if self.variant == "F":
return int(1.1 * n)
return int(random.uniform(0.9, 1.3) * n)
if self.topology == "linear":
# Near-tree
return max(n - 1, int(0.9 * n))
def _enforce_edge_budget(self):
"""
Remove excess edges while preserving connectivity and avoiding intersections.
Prefer removing long edges first.
"""
budget = self._compute_edge_budget()
edges = list(self.graph.edges())
if len(edges) <= budget:
return
# Sort edges by length (longest first)
def edge_len(e):
(x1, y1), (x2, y2) = e
return (x1 - x2) ** 2 + (y1 - y2) ** 2
edges_sorted = sorted(edges, key=edge_len, reverse=True)
for u, v in edges_sorted:
if len(self.graph.edges()) <= budget:
break
# Do not disconnect the graph
self.graph.remove_edge(u, v)
if not nx.is_connected(self.graph):
self.graph.add_edge(u, v)
# --------------------
# Connectivity for non-bottleneck modes
# --------------------
def _connect_all_nodes_by_nearby_growth(self, nodes):
"""Original connectivity step (nearby growth), unchanged except refactoring."""
connected = set()
remaining = set(nodes)
current = random.choice(nodes)
connected.add(current)
remaining.remove(current)
while remaining:
candidates = [
n for n in remaining
if any(abs(n[0] - c[0]) <= 2 and abs(n[1] - c[1]) <= 2 for c in connected)
]
candidate = random.choice(candidates) if candidates else random.choice(list(remaining))
neighbors = [
c for c in connected
if abs(c[0] - candidate[0]) <= 2 and abs(c[1] - candidate[1]) <= 2
]
n = random.choice(neighbors) if neighbors else random.choice(list(connected))
# Keep your existing intersection checks (but connectivity is forced anyway)
if n[0] == candidate[0] or n[1] == candidate[1]:
_ = self._straight_edge_intersects(n, candidate)
elif abs(n[0] - candidate[0]) == abs(n[1] - candidate[1]):
_ = self._diagonal_intersects(n, candidate)
self.graph.add_edge(n, candidate)
connected.add(candidate)
remaining.remove(candidate)
# --------------------
# Modification 2: Bottleneck = multiple small dense clusters connected by bottleneck edges
# --------------------
def _build_bottleneck_clusters(self, nodes):
"""
Build a number of small, internally dense "grids" (clusters),
then connect clusters with a small number of inter-cluster edges
which become the bottlenecks.
"""
# Clear any edges that might exist (seed node has no edges, but be explicit)
self.graph.remove_edges_from(list(self.graph.edges()))
clusters, centers = self._spatial_cluster_nodes(nodes, k=self.bottleneck_cluster_count)
# Make each cluster internally dense.
# We do "dense-without-intersections-when-possible" using your dense edge routine on subsets.
for cluster in clusters:
if len(cluster) < 2:
continue
# Ensure cluster is connected first using nearby growth inside the cluster
self._connect_cluster_by_nearby_growth(cluster)
# Then densify within the cluster
max_edges = max(1, int(3.0 * len(cluster))) # dense-ish without becoming fully complete
self._add_cluster_dense(list(cluster), max_edges=max_edges)
# Connect clusters in a chain (or near-chain) by centers.
order = sorted(range(len(clusters)), key=lambda i: (centers[i][0], centers[i][1]))
for a_idx, b_idx in zip(order[:-1], order[1:]):
self._add_bottleneck_links(clusters[a_idx], clusters[b_idx], self.bottleneck_edges_per_link)
# If something ended up disconnected (e.g., tiny clusters), reconnect lightly
if not nx.is_connected(self.graph):
self._attempt_reconnect_components(prefer_max_distance=2)
def _spatial_cluster_nodes(self, nodes, k):
"""
Simple spatial clustering:
- pick k random centers
- assign each node to closest center by Chebyshev distance
- return clusters + centers
"""
def cheb(a, b):
return max(abs(a[0] - b[0]), abs(a[1] - b[1]))
nodes = list(nodes)
if k >= len(nodes):
# each node its own cluster (degenerate)
return [[n] for n in nodes], nodes[:]
centers = random.sample(nodes, k)
clusters = [[] for _ in range(k)]
for n in nodes:
best_i = min(range(k), key=lambda i: cheb(n, centers[i]))
clusters[best_i].append(n)
# Recompute centers as medoid-ish: pick node closest to mean
new_centers = []
for c in clusters:
if not c:
new_centers.append(random.choice(nodes))
continue
mx = sum(p[0] for p in c) / len(c)
my = sum(p[1] for p in c) / len(c)
new_centers.append(min(c, key=lambda p: (p[0] - mx) ** 2 + (p[1] - my) ** 2))
# Remove empty clusters by merging them into nearest non-empty cluster
non_empty = [(c, ctr) for c, ctr in zip(clusters, new_centers) if len(c) > 0]
clusters = [c for c, _ in non_empty]
centers = [ctr for _, ctr in non_empty]
return clusters, centers
def _connect_cluster_by_nearby_growth(self, cluster_nodes):
"""Connectivity step restricted to a cluster."""
cluster_nodes = list(cluster_nodes)
connected = set()
remaining = set(cluster_nodes)
current = random.choice(cluster_nodes)
connected.add(current)
remaining.remove(current)
while remaining:
candidates = [
n for n in remaining
if any(abs(n[0] - c[0]) <= 2 and abs(n[1] - c[1]) <= 2 for c in connected)
]
candidate = random.choice(candidates) if candidates else random.choice(list(remaining))
neighbors = [
c for c in connected
if abs(c[0] - candidate[0]) <= 2 and abs(c[1] - candidate[1]) <= 2
]
n = random.choice(neighbors) if neighbors else random.choice(list(connected))
self.graph.add_edge(n, candidate)
connected.add(candidate)
remaining.remove(candidate)
def _add_bottleneck_links(self, cluster_a, cluster_b, m):
"""
Add m inter-cluster edges as bottlenecks. Keep m small.
Prefer edges that do not create intersections, but will force-connect if needed.
"""
cluster_a = list(cluster_a)
cluster_b = list(cluster_b)
def cheb(a, b):
return max(abs(a[0] - b[0]), abs(a[1] - b[1]))
# Candidate pairs sorted by distance
pairs = []
for u in cluster_a:
for v in cluster_b:
pairs.append((cheb(u, v), u, v))
pairs.sort(key=lambda t: t[0])
added = 0
used = set()
for _, u, v in pairs:
if added >= m:
break
if (u, v) in used or (v, u) in used:
continue
if self.graph.has_edge(u, v):
continue
# Prefer non-intersecting links
if not self._would_create_intersection(u, v):
self.graph.add_edge(u, v)
used.add((u, v))
added += 1
# If we couldn't add enough without intersections, force the closest remaining
if added < m:
for _, u, v in pairs:
if added >= m:
break
if self.graph.has_edge(u, v):
continue
self.graph.add_edge(u, v)
added += 1
# --------------------
# Topology-specific extra edges (non-bottleneck modes)
# --------------------
def _effective_node_drop_fraction(self):
"""
Topology-aware node dropout.
"""
base = self.node_drop_fraction
if self.topology == "highly_connected":
return max(0.0, base * 0.5) # fewer dropped nodes
if self.topology == "bottlenecks":
return min(0.9, base * 1.5) # more dropped nodes
if self.topology == "linear":
return min(0.95, base * 2.0) # very sparse
return base
def _compute_edge_count(self):
"""Compute the number of edges for the graph based on the topology mode."""
total_nodes = len(self.graph.nodes())
if self.topology == "highly_connected":
# Increase edge count for fully connected mode (e.g., by multiplying by a factor)
return int(4.0 * total_nodes) # Example: higher factor for full connection
elif self.topology == "bottlenecks":
# Use the default edge count calculation for bottleneck mode
return int(2.0 * total_nodes)
else:
# Default edge count for other topologies
return int(random.uniform(1.5, 2.5) * total_nodes)
def _add_edges(self):
nodes = list(self.graph.nodes())
total_edges = self._compute_edge_count()
if self.topology == "highly_connected":
self._add_cluster_dense(nodes, total_edges)
elif self.topology == "linear":
self._make_linear(nodes)
# Note: bottlenecks are built in _build_bottleneck_clusters(), not here.
# --------------------
# Dense / sparse edge routines (existing)
# --------------------
def _add_cluster_dense(self, nodes, max_edges):
edges_added = 0
nodes = list(nodes)
random.shuffle(nodes)
for i in range(len(nodes)):
for j in range(i + 1, len(nodes)):
if edges_added >= max_edges:
return
n1, n2 = nodes[i], nodes[j]
# Straight edge
if (n1[0] == n2[0] or n1[1] == n2[1]):
if not self._straight_edge_intersects(n1, n2):
self.graph.add_edge(n1, n2)
edges_added += 1
continue
# Diagonal edge
if abs(n1[0] - n2[0]) == abs(n1[1] - n2[1]):
if not self._diagonal_intersects(n1, n2):
self.graph.add_edge(n1, n2)
edges_added += 1
def _make_linear(self, nodes):
nodes_sorted = sorted(nodes, key=lambda x: (x[0], x[1]))
if not nodes_sorted:
return
prev = nodes_sorted[0]
for nxt in nodes_sorted[1:]:
x1, y1 = prev
x2, y2 = nxt
if x1 == x2 or y1 == y2:
self.graph.add_edge(prev, nxt)
prev = nxt
else:
if x1 != x2:
step = (x1 + (1 if x2 > x1 else -1), y1)
if step in nodes:
self.graph.add_edge(prev, step)
self.graph.add_edge(step, nxt)
prev = nxt
continue
if y1 != y2:
step = (x1, y1 + (1 if y2 > y1 else -1))
if step in nodes:
self.graph.add_edge(prev, step)
self.graph.add_edge(step, nxt)
prev = nxt
continue
for node in nodes_sorted:
if random.random() < 0.15:
x, y = node
candidates = [(x + 1, y), (x - 1, y), (x, y + 1), (x, y - 1)]
random.shuffle(candidates)
for c in candidates:
if c in nodes and not self.graph.has_edge(node, c):
if self.graph.degree(node) < 3 and self.graph.degree(c) < 3:
self.graph.add_edge(node, c)
break
# --------------------
# Intersection checks (existing + used by reconnect)
# --------------------
def _straight_edge_intersects(self, n1, n2):
x1, y1 = n1
x2, y2 = n2
if not (x1 == x2 or y1 == y2):
return True
if (x1, y1) > (x2, y2):
n1, n2 = n2, n1
x1, y1 = n1
x2, y2 = n2
for a, b in self.graph.edges():
if {a, b} == {n1, n2}:
continue
ax, ay = a
bx, by = b
if y1 == y2: # horizontal
if ay == by == y1:
if max(ax, bx) >= min(x1, x2) and min(ax, bx) <= max(x1, x2):
return True
if x1 == x2: # vertical
if ax == bx == x1:
if max(ay, by) >= min(y1, y2) and min(ay, by) <= max(y1, y2):
return True
return False
def _diagonal_intersects(self, n1, n2):
x1, y1 = n1
x2, y2 = n2
for a, b in self.graph.edges():
ax, ay = a
bx, by = b
if abs(ax - bx) == abs(ay - by):
if not (max(x1, x2) < min(ax, bx) or min(x1, x2) > max(ax, bx)):
if not (max(y1, y2) < min(ay, by) or min(y1, y2) > max(ay, by)):
return True
return False
def _orientation(self, p, q, r):
(px, py), (qx, qy), (rx, ry) = p, q, r
val = (qy - py) * (rx - qx) - (qx - px) * (ry - qy)
if val == 0:
return 0
return 1 if val > 0 else 2
def _on_segment(self, p, q, r):
(px, py), (qx, qy), (rx, ry) = p, q, r
return (min(px, rx) <= qx <= max(px, rx) and
min(py, ry) <= qy <= max(py, ry))
def _segments_intersect(self, a, b, c, d):
if a in (c, d) or b in (c, d):
return False
o1 = self._orientation(a, b, c)
o2 = self._orientation(a, b, d)
o3 = self._orientation(c, d, a)
o4 = self._orientation(c, d, b)
if o1 != o2 and o3 != o4:
return True
if o1 == 0 and self._on_segment(a, c, b):
return True
if o2 == 0 and self._on_segment(a, d, b):
return True
if o3 == 0 and self._on_segment(c, a, d):
return True
if o4 == 0 and self._on_segment(c, b, d):
return True
return False
def _would_create_intersection(self, u, v):
for x, y in self.graph.edges():
if u in (x, y) or v in (x, y):
continue
if self._segments_intersect(u, v, x, y):
return True
return False
def _remove_intersections(self):
max_passes = 10
pass_no = 0
total_removed = 0
while pass_no < max_passes:
pass_no += 1
edges = list(self.graph.edges())
intersections = []
for i in range(len(edges)):
a, b = edges[i]
for j in range(i + 1, len(edges)):
c, d = edges[j]
if self._segments_intersect(a, b, c, d):
intersections.append((a, b, c, d))
if not intersections:
break
removed_this_pass = 0
for a, b, c, d in intersections:
if not self.graph.has_edge(a, b) or not self.graph.has_edge(c, d):
continue
len1 = (a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2
len2 = (c[0] - d[0]) ** 2 + (c[1] - d[1]) ** 2
if len1 >= len2:
try:
self.graph.remove_edge(a, b)
removed_this_pass += 1
except Exception:
pass
else:
try:
self.graph.remove_edge(c, d)
removed_this_pass += 1
except Exception:
pass
total_removed += removed_this_pass
self._attempt_reconnect_components(prefer_max_distance=2)
if not nx.is_connected(self.graph):
self._attempt_reconnect_components(prefer_max_distance=self.grid_size)
final_edges = list(self.graph.edges())
for i in range(len(final_edges)):
a, b = final_edges[i]
for j in range(i + 1, len(final_edges)):
c, d = final_edges[j]
if self._segments_intersect(a, b, c, d):
len1 = (a[0] - b[0]) ** 2 + (a[1] - b[1]) ** 2
len2 = (c[0] - d[0]) ** 2 + (c[1] - d[1]) ** 2
if len1 >= len2 and self.graph.has_edge(a, b):
self.graph.remove_edge(a, b)
total_removed += 1
elif self.graph.has_edge(c, d):
self.graph.remove_edge(c, d)
total_removed += 1
print(f"[cleanup] Removed {total_removed} intersecting edges after {pass_no} passes.")
def _attempt_reconnect_components(self, prefer_max_distance=2):
comps = list(nx.connected_components(self.graph))
if len(comps) <= 1:
return
def cheb(a, b):
return max(abs(a[0] - b[0]), abs(a[1] - b[1]))
comp_nodes = [list(c) for c in comps]
max_relax = self.grid_size
relax = prefer_max_distance
while relax <= max_relax and len(comp_nodes) > 1:
made_connection = False
i = 0
while i < len(comp_nodes) - 1:
j = i + 1
connected_this_round = False
while j < len(comp_nodes):
best_pair = None
best_dist = None
for u in comp_nodes[i]:
for v in comp_nodes[j]:
if u == v:
continue
d = cheb(u, v)
if d <= relax and (best_dist is None or d < best_dist):
best_pair = (u, v)
best_dist = d
if best_pair is not None:
u, v = best_pair
if not self.graph.has_edge(u, v):
if not self._would_create_intersection(u, v):
self.graph.add_edge(u, v)
else:
self.graph.add_edge(u, v)
made_connection = True
connected_this_round = True
comp_nodes[i].extend(comp_nodes[j])
comp_nodes.pop(j)
break
else:
j += 1
if not connected_this_round:
i += 1
if not made_connection:
relax += 1
else:
comps = list(nx.connected_components(self.graph))
comp_nodes = [list(c) for c in comps]