| """Graph utilities for bipartite user-item graphs.""" |
| import numpy as np |
| from collections import defaultdict, deque |
| from typing import List, Tuple, Set, Dict, Optional |
|
|
|
|
| def build_adjacency(edges, N, M): |
| """Build adjacency lists for bipartite graph. |
| |
| Returns: |
| user_to_items: dict[int, list[int]] |
| item_to_users: dict[int, list[int]] |
| edge_dict: dict[(i,j), x_ij] |
| """ |
| user_to_items = defaultdict(list) |
| item_to_users = defaultdict(list) |
| edge_dict = {} |
| |
| for e in edges: |
| i, j = int(e[0]), int(e[1]) |
| x = e[2] if len(e) > 2 else 1 |
| user_to_items[i].append(j) |
| item_to_users[j].append(i) |
| edge_dict[(i, j)] = x |
| |
| return dict(user_to_items), dict(item_to_users), edge_dict |
|
|
|
|
| def bfs_neighborhood(seed_nodes, adj_func, N, M, radius, user_to_items, item_to_users): |
| """BFS on bipartite graph from seed set. |
| |
| Returns dict mapping node_id -> distance from seed set. |
| Users: 0..N-1, Items: N..N+M-1 |
| """ |
| distances = {} |
| queue = deque() |
| |
| for node in seed_nodes: |
| distances[node] = 0 |
| queue.append((node, 0)) |
| |
| while queue: |
| node, dist = queue.popleft() |
| if dist >= radius: |
| continue |
| |
| if node < N: |
| neighbors = [j + N for j in user_to_items.get(node, [])] |
| else: |
| j = node - N |
| neighbors = list(item_to_users.get(j, [])) |
| |
| for nbr in neighbors: |
| if nbr not in distances: |
| distances[nbr] = dist + 1 |
| queue.append((nbr, dist + 1)) |
| |
| return distances |
|
|
|
|
| def get_deletion_neighborhood(edge_to_remove, user_to_items, item_to_users, |
| N, M, radius): |
| """Get neighborhood of deletion edge (i, j, x).""" |
| i = int(edge_to_remove[0]) |
| j = int(edge_to_remove[1]) |
| seed_nodes = {i, j + N} |
| return bfs_neighborhood(seed_nodes, None, N, M, radius, user_to_items, item_to_users) |
|
|
|
|
| def get_blocks_at_distance(distances, N): |
| """Group blocks by distance.""" |
| by_distance = defaultdict(list) |
| for node, dist in distances.items(): |
| if node < N: |
| by_distance[dist].append(('user', node)) |
| else: |
| by_distance[dist].append(('item', node - N)) |
| return dict(by_distance) |
|
|
|
|
| def get_user_item_sets_in_radius(distances, N, radius): |
| """Get sets of user and item indices within radius R.""" |
| users_in_R = set() |
| items_in_R = set() |
| for node, dist in distances.items(): |
| if dist <= radius: |
| if node < N: |
| users_in_R.add(node) |
| else: |
| items_in_R.add(node - N) |
| return users_in_R, items_in_R |
|
|
|
|
| def generate_bounded_degree_graph(N, M, avg_degree, seed=0): |
| """Generate bipartite graph where each user has ~avg_degree items.""" |
| rng = np.random.RandomState(seed) |
| edges_set = set() |
| |
| for i in range(N): |
| deg = max(1, rng.poisson(avg_degree)) |
| deg = min(deg, M, 2 * avg_degree) |
| items = rng.choice(M, size=deg, replace=False) |
| for j in items: |
| edges_set.add((i, int(j))) |
| |
| return list(edges_set) |
|
|
|
|
| def generate_erdos_renyi_graph(N, M, avg_degree, seed=0): |
| """Generate Erdos-Renyi bipartite graph.""" |
| rng = np.random.RandomState(seed) |
| p = avg_degree / M |
| p = min(p, 1.0) |
| edges_set = set() |
| |
| for i in range(N): |
| mask = rng.random(M) < p |
| for j in np.where(mask)[0]: |
| edges_set.add((i, int(j))) |
| |
| for i in range(N): |
| if not any(e[0] == i for e in edges_set): |
| j = rng.randint(M) |
| edges_set.add((i, j)) |
| |
| return list(edges_set) |
|
|
|
|
| def generate_power_law_graph(N, M, avg_degree, seed=0, alpha=2.5): |
| """Generate bipartite graph with power-law degree distribution.""" |
| rng = np.random.RandomState(seed) |
| edges_set = set() |
| |
| item_weights = np.arange(1, M + 1, dtype=float) ** (-alpha + 1) |
| item_weights /= item_weights.sum() |
| |
| for i in range(N): |
| deg = max(1, int(rng.pareto(alpha - 1) * avg_degree / max(alpha - 2, 0.5) + 1)) |
| deg = min(deg, M) |
| items = rng.choice(M, size=deg, replace=False, p=item_weights) |
| for j in items: |
| edges_set.add((i, int(j))) |
| |
| return list(edges_set) |
|
|
|
|
| def compute_graph_stats(edges, N, M): |
| """Compute graph statistics.""" |
| user_degrees = defaultdict(int) |
| item_degrees = defaultdict(int) |
| |
| for e in edges: |
| user_degrees[e[0]] += 1 |
| item_degrees[e[1]] += 1 |
| |
| udeg = list(user_degrees.values()) if user_degrees else [0] |
| ideg = list(item_degrees.values()) if item_degrees else [0] |
| |
| return { |
| 'n_edges': len(edges), |
| 'n_users_active': len(user_degrees), |
| 'n_items_active': len(item_degrees), |
| 'user_degree_mean': float(np.mean(udeg)), |
| 'user_degree_max': int(np.max(udeg)), |
| 'user_degree_min': int(np.min(udeg)), |
| 'item_degree_mean': float(np.mean(ideg)), |
| 'item_degree_max': int(np.max(ideg)), |
| 'item_degree_min': int(np.min(ideg)), |
| } |
|
|