serliezer's picture
Add src/graph_utils.py
1c8033c verified
"""Graph utilities for bipartite user-item graphs."""
import numpy as np
from collections import defaultdict, deque
from typing import List, Tuple, Set, Dict, Optional
def build_adjacency(edges, N, M):
"""Build adjacency lists for bipartite graph.
Returns:
user_to_items: dict[int, list[int]]
item_to_users: dict[int, list[int]]
edge_dict: dict[(i,j), x_ij]
"""
user_to_items = defaultdict(list)
item_to_users = defaultdict(list)
edge_dict = {}
for e in edges:
i, j = int(e[0]), int(e[1])
x = e[2] if len(e) > 2 else 1
user_to_items[i].append(j)
item_to_users[j].append(i)
edge_dict[(i, j)] = x
return dict(user_to_items), dict(item_to_users), edge_dict
def bfs_neighborhood(seed_nodes, adj_func, N, M, radius, user_to_items, item_to_users):
"""BFS on bipartite graph from seed set.
Returns dict mapping node_id -> distance from seed set.
Users: 0..N-1, Items: N..N+M-1
"""
distances = {}
queue = deque()
for node in seed_nodes:
distances[node] = 0
queue.append((node, 0))
while queue:
node, dist = queue.popleft()
if dist >= radius:
continue
if node < N: # User node
neighbors = [j + N for j in user_to_items.get(node, [])]
else: # Item node
j = node - N
neighbors = list(item_to_users.get(j, []))
for nbr in neighbors:
if nbr not in distances:
distances[nbr] = dist + 1
queue.append((nbr, dist + 1))
return distances
def get_deletion_neighborhood(edge_to_remove, user_to_items, item_to_users,
N, M, radius):
"""Get neighborhood of deletion edge (i, j, x)."""
i = int(edge_to_remove[0])
j = int(edge_to_remove[1])
seed_nodes = {i, j + N}
return bfs_neighborhood(seed_nodes, None, N, M, radius, user_to_items, item_to_users)
def get_blocks_at_distance(distances, N):
"""Group blocks by distance."""
by_distance = defaultdict(list)
for node, dist in distances.items():
if node < N:
by_distance[dist].append(('user', node))
else:
by_distance[dist].append(('item', node - N))
return dict(by_distance)
def get_user_item_sets_in_radius(distances, N, radius):
"""Get sets of user and item indices within radius R."""
users_in_R = set()
items_in_R = set()
for node, dist in distances.items():
if dist <= radius:
if node < N:
users_in_R.add(node)
else:
items_in_R.add(node - N)
return users_in_R, items_in_R
def generate_bounded_degree_graph(N, M, avg_degree, seed=0):
"""Generate bipartite graph where each user has ~avg_degree items."""
rng = np.random.RandomState(seed)
edges_set = set()
for i in range(N):
deg = max(1, rng.poisson(avg_degree))
deg = min(deg, M, 2 * avg_degree)
items = rng.choice(M, size=deg, replace=False)
for j in items:
edges_set.add((i, int(j)))
return list(edges_set)
def generate_erdos_renyi_graph(N, M, avg_degree, seed=0):
"""Generate Erdos-Renyi bipartite graph."""
rng = np.random.RandomState(seed)
p = avg_degree / M
p = min(p, 1.0)
edges_set = set()
for i in range(N):
mask = rng.random(M) < p
for j in np.where(mask)[0]:
edges_set.add((i, int(j)))
for i in range(N):
if not any(e[0] == i for e in edges_set):
j = rng.randint(M)
edges_set.add((i, j))
return list(edges_set)
def generate_power_law_graph(N, M, avg_degree, seed=0, alpha=2.5):
"""Generate bipartite graph with power-law degree distribution."""
rng = np.random.RandomState(seed)
edges_set = set()
item_weights = np.arange(1, M + 1, dtype=float) ** (-alpha + 1)
item_weights /= item_weights.sum()
for i in range(N):
deg = max(1, int(rng.pareto(alpha - 1) * avg_degree / max(alpha - 2, 0.5) + 1))
deg = min(deg, M)
items = rng.choice(M, size=deg, replace=False, p=item_weights)
for j in items:
edges_set.add((i, int(j)))
return list(edges_set)
def compute_graph_stats(edges, N, M):
"""Compute graph statistics."""
user_degrees = defaultdict(int)
item_degrees = defaultdict(int)
for e in edges:
user_degrees[e[0]] += 1
item_degrees[e[1]] += 1
udeg = list(user_degrees.values()) if user_degrees else [0]
ideg = list(item_degrees.values()) if item_degrees else [0]
return {
'n_edges': len(edges),
'n_users_active': len(user_degrees),
'n_items_active': len(item_degrees),
'user_degree_mean': float(np.mean(udeg)),
'user_degree_max': int(np.max(udeg)),
'user_degree_min': int(np.min(udeg)),
'item_degree_mean': float(np.mean(ideg)),
'item_degree_max': int(np.max(ideg)),
'item_degree_min': int(np.min(ideg)),
}