"""Graph utilities for bipartite user-item graphs.""" import numpy as np from collections import defaultdict, deque from typing import List, Tuple, Set, Dict, Optional def build_adjacency(edges, N, M): """Build adjacency lists for bipartite graph. Returns: user_to_items: dict[int, list[int]] item_to_users: dict[int, list[int]] edge_dict: dict[(i,j), x_ij] """ user_to_items = defaultdict(list) item_to_users = defaultdict(list) edge_dict = {} for e in edges: i, j = int(e[0]), int(e[1]) x = e[2] if len(e) > 2 else 1 user_to_items[i].append(j) item_to_users[j].append(i) edge_dict[(i, j)] = x return dict(user_to_items), dict(item_to_users), edge_dict def bfs_neighborhood(seed_nodes, adj_func, N, M, radius, user_to_items, item_to_users): """BFS on bipartite graph from seed set. Returns dict mapping node_id -> distance from seed set. Users: 0..N-1, Items: N..N+M-1 """ distances = {} queue = deque() for node in seed_nodes: distances[node] = 0 queue.append((node, 0)) while queue: node, dist = queue.popleft() if dist >= radius: continue if node < N: # User node neighbors = [j + N for j in user_to_items.get(node, [])] else: # Item node j = node - N neighbors = list(item_to_users.get(j, [])) for nbr in neighbors: if nbr not in distances: distances[nbr] = dist + 1 queue.append((nbr, dist + 1)) return distances def get_deletion_neighborhood(edge_to_remove, user_to_items, item_to_users, N, M, radius): """Get neighborhood of deletion edge (i, j, x).""" i = int(edge_to_remove[0]) j = int(edge_to_remove[1]) seed_nodes = {i, j + N} return bfs_neighborhood(seed_nodes, None, N, M, radius, user_to_items, item_to_users) def get_blocks_at_distance(distances, N): """Group blocks by distance.""" by_distance = defaultdict(list) for node, dist in distances.items(): if node < N: by_distance[dist].append(('user', node)) else: by_distance[dist].append(('item', node - N)) return dict(by_distance) def get_user_item_sets_in_radius(distances, N, radius): """Get sets of user and item indices within radius R.""" users_in_R = set() items_in_R = set() for node, dist in distances.items(): if dist <= radius: if node < N: users_in_R.add(node) else: items_in_R.add(node - N) return users_in_R, items_in_R def generate_bounded_degree_graph(N, M, avg_degree, seed=0): """Generate bipartite graph where each user has ~avg_degree items.""" rng = np.random.RandomState(seed) edges_set = set() for i in range(N): deg = max(1, rng.poisson(avg_degree)) deg = min(deg, M, 2 * avg_degree) items = rng.choice(M, size=deg, replace=False) for j in items: edges_set.add((i, int(j))) return list(edges_set) def generate_erdos_renyi_graph(N, M, avg_degree, seed=0): """Generate Erdos-Renyi bipartite graph.""" rng = np.random.RandomState(seed) p = avg_degree / M p = min(p, 1.0) edges_set = set() for i in range(N): mask = rng.random(M) < p for j in np.where(mask)[0]: edges_set.add((i, int(j))) for i in range(N): if not any(e[0] == i for e in edges_set): j = rng.randint(M) edges_set.add((i, j)) return list(edges_set) def generate_power_law_graph(N, M, avg_degree, seed=0, alpha=2.5): """Generate bipartite graph with power-law degree distribution.""" rng = np.random.RandomState(seed) edges_set = set() item_weights = np.arange(1, M + 1, dtype=float) ** (-alpha + 1) item_weights /= item_weights.sum() for i in range(N): deg = max(1, int(rng.pareto(alpha - 1) * avg_degree / max(alpha - 2, 0.5) + 1)) deg = min(deg, M) items = rng.choice(M, size=deg, replace=False, p=item_weights) for j in items: edges_set.add((i, int(j))) return list(edges_set) def compute_graph_stats(edges, N, M): """Compute graph statistics.""" user_degrees = defaultdict(int) item_degrees = defaultdict(int) for e in edges: user_degrees[e[0]] += 1 item_degrees[e[1]] += 1 udeg = list(user_degrees.values()) if user_degrees else [0] ideg = list(item_degrees.values()) if item_degrees else [0] return { 'n_edges': len(edges), 'n_users_active': len(user_degrees), 'n_items_active': len(item_degrees), 'user_degree_mean': float(np.mean(udeg)), 'user_degree_max': int(np.max(udeg)), 'user_degree_min': int(np.min(udeg)), 'item_degree_mean': float(np.mean(ideg)), 'item_degree_max': int(np.max(ideg)), 'item_degree_min': int(np.min(ideg)), }