| """Metrics computation for unlearning experiments.""" |
| import numpy as np |
| from scipy import stats |
| from scipy.special import digamma, polygamma |
| from typing import Dict, List, Tuple, Optional |
| from collections import defaultdict |
|
|
| from src.graph_utils import build_adjacency, get_deletion_neighborhood, get_blocks_at_distance |
|
|
|
|
| |
| |
| |
|
|
| def compute_block_param_vector(params: dict, node_type: str, idx: int, model_family: str) -> np.ndarray: |
| """Extract flat parameter vector for a single block.""" |
| if model_family == 'poisson_gamma': |
| if node_type == 'user': |
| return np.concatenate([params['a'][idx], params['b'][idx]]) |
| else: |
| return np.concatenate([params['c'][idx], params['d'][idx]]) |
| elif model_family == 'gaussian_gaussian': |
| if node_type == 'user': |
| return np.concatenate([params['m_U'][idx], params['s_U'][idx]]) |
| else: |
| return np.concatenate([params['m_V'][idx], params['s_V'][idx]]) |
| elif model_family == 'gaussian_gamma_map': |
| if node_type == 'user': |
| return params['alpha'][idx].copy() |
| else: |
| return params['beta'][idx].copy() |
| else: |
| raise ValueError(f"Unknown model family: {model_family}") |
|
|
|
|
| def compute_all_param_vector(params: dict, model_family: str) -> np.ndarray: |
| """Flatten all parameters into a single vector.""" |
| if model_family == 'poisson_gamma': |
| return np.concatenate([params['a'].ravel(), params['b'].ravel(), |
| params['c'].ravel(), params['d'].ravel()]) |
| elif model_family == 'gaussian_gaussian': |
| return np.concatenate([params['m_U'].ravel(), params['s_U'].ravel(), |
| params['m_V'].ravel(), params['s_V'].ravel()]) |
| elif model_family == 'gaussian_gamma_map': |
| return np.concatenate([params['alpha'].ravel(), params['beta'].ravel()]) |
| else: |
| raise ValueError(f"Unknown model family: {model_family}") |
|
|
|
|
| def compute_deletion_influence_by_distance(full_params, exact_params, edge_to_remove, |
| edges, N, M, model_family, max_radius=6): |
| """Compute deletion influence Delta_u(z) = ||lambda_u^* - lambda_u^{\\z}|| |
| grouped by graph distance from seed set.""" |
| user_to_items, item_to_users, _ = build_adjacency(edges, N, M) |
| distances = get_deletion_neighborhood(edge_to_remove, user_to_items, item_to_users, |
| N, M, max_radius) |
| blocks_by_dist = get_blocks_at_distance(distances, N) |
| |
| influence_by_dist = {} |
| for dist, blocks in blocks_by_dist.items(): |
| influences = [] |
| for node_type, idx in blocks: |
| v_full = compute_block_param_vector(full_params, node_type, idx, model_family) |
| v_exact = compute_block_param_vector(exact_params, node_type, idx, model_family) |
| delta = np.linalg.norm(v_full - v_exact) |
| influences.append(delta) |
| influence_by_dist[dist] = { |
| 'mean': float(np.mean(influences)), |
| 'std': float(np.std(influences)), |
| 'median': float(np.median(influences)), |
| 'max': float(np.max(influences)), |
| 'n_blocks': len(influences), |
| 'values': [float(v) for v in influences], |
| } |
| |
| return influence_by_dist |
|
|
|
|
| def fit_exponential_decay(influence_by_dist: dict, min_shells: int = 3, eps: float = 1e-12): |
| """Fit log(Delta(r)) = alpha - mu * r.""" |
| distances = sorted(influence_by_dist.keys()) |
| r_vals = [] |
| log_vals = [] |
| |
| for r in distances: |
| mean_inf = influence_by_dist[r]['mean'] |
| if mean_inf > eps and influence_by_dist[r]['n_blocks'] >= 2: |
| r_vals.append(r) |
| log_vals.append(np.log(mean_inf + eps)) |
| |
| if len(r_vals) < min_shells: |
| return { |
| 'mu_emp': None, |
| 'intercept': None, |
| 'r_squared': None, |
| 'n_shells': len(r_vals), |
| 'valid': False, |
| } |
| |
| r_arr = np.array(r_vals, dtype=float) |
| log_arr = np.array(log_vals) |
| |
| slope, intercept, r_value, p_value, std_err = stats.linregress(r_arr, log_arr) |
| |
| return { |
| 'mu_emp': float(-slope), |
| 'intercept': float(intercept), |
| 'r_squared': float(r_value ** 2), |
| 'n_shells': len(r_vals), |
| 'p_value': float(p_value), |
| 'std_err': float(std_err), |
| 'valid': True, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def compute_local_error(local_params, exact_params, model_family): |
| """Err_R(z) = ||lambda^(R)_local - lambda^{\\z}||""" |
| v_local = compute_all_param_vector(local_params, model_family) |
| v_exact = compute_all_param_vector(exact_params, model_family) |
| |
| err = np.linalg.norm(v_local - v_exact) |
| rel_err = err / (1.0 + np.linalg.norm(v_exact)) |
| |
| return { |
| 'param_error': float(err), |
| 'relative_error': float(rel_err), |
| } |
|
|
|
|
| |
| |
| |
|
|
| def compute_objective_gap(model, edges_without, exact_params, approx_params): |
| """Gap_R = L_{\\z}(lambda^{\\z}) - L_{\\z}(lambda^(R)).""" |
| try: |
| obj_exact = model.compute_elbo(edges_without, exact_params) if hasattr(model, 'compute_elbo') \ |
| else model.compute_objective(edges_without, exact_params) |
| obj_approx = model.compute_elbo(edges_without, approx_params) if hasattr(model, 'compute_elbo') \ |
| else model.compute_objective(edges_without, approx_params) |
| return float(obj_exact - obj_approx) |
| except Exception as e: |
| return None |
|
|
|
|
| |
| |
| |
|
|
| def compute_chi_poisson_gamma(edge_to_remove, edges, params, N, M, K, |
| a0=0.3, b0=1.0, c0=0.3, d0=1.0): |
| """Compute chi statistics for Poisson-Gamma model.""" |
| a, b, c, d = params['a'], params['b'], params['c'], params['d'] |
| |
| |
| a_min = np.min(a[a > 0]) if np.any(a > 0) else a0 |
| b_min = np.min(b[b > 0]) if np.any(b > 0) else b0 |
| c_min = np.min(c[c > 0]) if np.any(c > 0) else c0 |
| d_min = np.min(d[d > 0]) if np.any(d > 0) else d0 |
| a_max = np.max(a) |
| c_max = np.max(c) |
| |
| C_x = 0.5 * polygamma(1, max(c_min, 1e-3)) + 0.5 / max(d_min, 1e-6) |
| C_0 = 1.0 / max(d_min, 1e-6) + c_max / max(d_min**2, 1e-12) |
| |
| C_tilde_x = 0.5 * polygamma(1, max(a_min, 1e-3)) + 0.5 / max(b_min, 1e-6) |
| C_tilde_0 = 1.0 / max(b_min, 1e-6) + a_max / max(b_min**2, 1e-12) |
| |
| |
| user_to_items = defaultdict(list) |
| item_to_users = defaultdict(list) |
| edge_dict = {} |
| for i, j, x in edges: |
| user_to_items[i].append(j) |
| item_to_users[j].append(i) |
| edge_dict[(i, j)] = x |
| |
| i_del, j_del, x_del = edge_to_remove |
| |
| |
| chi_i = sum(C_x * edge_dict.get((i_del, j), 0) + C_0 for j in user_to_items.get(i_del, [])) |
| |
| |
| chi_tilde_j = sum(C_tilde_x * edge_dict.get((i, j_del), 0) + C_tilde_0 |
| for i in item_to_users.get(j_del, [])) |
| |
| chi_max = max(chi_i, chi_tilde_j) |
| chi_sum = chi_i + chi_tilde_j |
| |
| |
| seed_degree = len(user_to_items.get(i_del, [])) + len(item_to_users.get(j_del, [])) |
| seed_count_sum = sum(edge_dict.get((i_del, j), 0) for j in user_to_items.get(i_del, [])) + \ |
| sum(edge_dict.get((i, j_del), 0) for i in item_to_users.get(j_del, [])) |
| |
| return { |
| 'chi_i': float(chi_i), |
| 'chi_tilde_j': float(chi_tilde_j), |
| 'chi_max': float(chi_max), |
| 'chi_sum': float(chi_sum), |
| 'seed_degree': int(seed_degree), |
| 'seed_count_sum': float(seed_count_sum), |
| 'C_x': float(C_x), |
| 'C_0': float(C_0), |
| 'C_tilde_x': float(C_tilde_x), |
| 'C_tilde_0': float(C_tilde_0), |
| } |
|
|
|
|
| def compute_chi_gaussian(edge_to_remove, edges, params, N, M, K, sigma_x, |
| model_family='gaussian_gaussian'): |
| """Compute interaction proxy for Gaussian models.""" |
| user_to_items = defaultdict(list) |
| item_to_users = defaultdict(list) |
| edge_dict = {} |
| for i, j, x in edges: |
| user_to_items[i].append(j) |
| item_to_users[j].append(i) |
| edge_dict[(i, j)] = x |
| |
| i_del, j_del, x_del = edge_to_remove |
| prec_x = 1.0 / (sigma_x ** 2) |
| |
| if model_family == 'gaussian_gaussian': |
| m_U, s_U = params['m_U'], params['s_U'] |
| m_V, s_V = params['m_V'], params['s_V'] |
| |
| chi_i = sum(prec_x * np.sum(m_V[j]**2 + s_V[j]) for j in user_to_items.get(i_del, [])) |
| chi_tilde_j = sum(prec_x * np.sum(m_U[i]**2 + s_U[i]) for i in item_to_users.get(j_del, [])) |
| elif model_family == 'gaussian_gamma_map': |
| from src.model import GaussianGammaMAP |
| sp = lambda x: np.log1p(np.exp(np.clip(x, -20, 20))) |
| U = sp(params['alpha']) |
| V = sp(params['beta']) |
| |
| chi_i = sum(prec_x * np.sum(V[j]**2) for j in user_to_items.get(i_del, [])) |
| chi_tilde_j = sum(prec_x * np.sum(U[i]**2) for i in item_to_users.get(j_del, [])) |
| |
| chi_max = max(chi_i, chi_tilde_j) |
| chi_sum = chi_i + chi_tilde_j |
| |
| seed_degree = len(user_to_items.get(i_del, [])) + len(item_to_users.get(j_del, [])) |
| |
| return { |
| 'chi_i': float(chi_i), |
| 'chi_tilde_j': float(chi_tilde_j), |
| 'chi_max': float(chi_max), |
| 'chi_sum': float(chi_sum), |
| 'seed_degree': int(seed_degree), |
| } |
|
|
|
|
| |
| |
| |
|
|
| def compute_gradient_interference(full_params, exact_params, local_params, model_family): |
| """Compute gradient interference proxy. |
| |
| g_del = lambda^* - lambda^{\\z} |
| g_ret = lambda^{\\z} - lambda^(R)_local |
| I(z) = |sum_u <g_del_u, g_ret_u>| |
| """ |
| v_full = compute_all_param_vector(full_params, model_family) |
| v_exact = compute_all_param_vector(exact_params, model_family) |
| v_local = compute_all_param_vector(local_params, model_family) |
| |
| g_del = v_full - v_exact |
| g_ret = v_exact - v_local |
| |
| raw_interference = float(np.abs(np.dot(g_del, g_ret))) |
| |
| norm_del = np.linalg.norm(g_del) |
| norm_ret = np.linalg.norm(g_ret) |
| eps = 1e-12 |
| cosine_interference = raw_interference / (norm_del * norm_ret + eps) |
| |
| return { |
| 'interference_raw': float(raw_interference), |
| 'interference_cosine': float(cosine_interference), |
| 'g_del_norm': float(norm_del), |
| 'g_ret_norm': float(norm_ret), |
| } |
|
|
|
|
| |
| |
| |
|
|
| def compute_all_metrics(full_params, exact_params, local_params_by_radius, |
| warm_start_params, one_step_params, |
| edge_to_remove, edges, N, M, K, |
| model_family, model=None, radii=[1, 2, 3, 4], |
| model_kwargs=None): |
| """Compute all metrics for one deletion.""" |
| results = {} |
| |
| |
| influence = compute_deletion_influence_by_distance( |
| full_params, exact_params, edge_to_remove, edges, N, M, model_family, |
| max_radius=max(radii) + 2) |
| results['influence_by_distance'] = {str(k): v['mean'] for k, v in influence.items()} |
| results['influence_by_distance_full'] = influence |
| |
| |
| decay = fit_exponential_decay(influence) |
| results['empirical_decay_mu'] = decay['mu_emp'] |
| results['empirical_decay_r2'] = decay['r_squared'] |
| results['decay_valid'] = decay['valid'] |
| |
| |
| if model_kwargs is None: |
| model_kwargs = {} |
| |
| if model_family == 'poisson_gamma': |
| chi = compute_chi_poisson_gamma( |
| edge_to_remove, edges, full_params, N, M, K, |
| a0=model_kwargs.get('a0', 0.3), b0=model_kwargs.get('b0', 1.0), |
| c0=model_kwargs.get('c0', 0.3), d0=model_kwargs.get('d0', 1.0)) |
| else: |
| chi = compute_chi_gaussian( |
| edge_to_remove, edges, full_params, N, M, K, |
| sigma_x=model_kwargs.get('sigma_x', 1.0), model_family=model_family) |
| |
| results['chi_seed_max'] = chi['chi_max'] |
| results['chi_seed_sum'] = chi['chi_sum'] |
| results['seed_degree'] = chi['seed_degree'] |
| |
| |
| for R in radii: |
| if R in local_params_by_radius: |
| err = compute_local_error(local_params_by_radius[R], exact_params, model_family) |
| results[f'error_R{R}'] = err['param_error'] |
| results[f'rel_error_R{R}'] = err['relative_error'] |
| |
| |
| interf = compute_gradient_interference( |
| full_params, exact_params, local_params_by_radius[R], model_family) |
| results[f'interference_raw_R{R}'] = interf['interference_raw'] |
| results[f'interference_cosine_R{R}'] = interf['interference_cosine'] |
| |
| |
| if warm_start_params is not None: |
| ws_err = compute_local_error(warm_start_params, exact_params, model_family) |
| results['error_warm_start'] = ws_err['param_error'] |
| results['rel_error_warm_start'] = ws_err['relative_error'] |
| |
| |
| if one_step_params is not None: |
| os_err = compute_local_error(one_step_params, exact_params, model_family) |
| results['error_one_step'] = os_err['param_error'] |
| results['rel_error_one_step'] = os_err['relative_error'] |
| |
| return results |
|
|
|
|
| |
| |
| |
|
|
| def bootstrap_ci(values, n_boot=1000, ci=0.95, seed=42): |
| """Compute bootstrap CI for mean.""" |
| rng = np.random.RandomState(seed) |
| values = np.array(values) |
| values = values[np.isfinite(values)] |
| if len(values) < 3: |
| return {'mean': float(np.mean(values)) if len(values) > 0 else 0, |
| 'ci_low': float(np.nan), 'ci_high': float(np.nan), |
| 'se': float(np.nan), 'n': len(values)} |
| |
| boot_means = np.array([ |
| np.mean(rng.choice(values, len(values), replace=True)) |
| for _ in range(n_boot) |
| ]) |
| |
| alpha = (1 - ci) / 2 |
| ci_low = float(np.percentile(boot_means, 100 * alpha)) |
| ci_high = float(np.percentile(boot_means, 100 * (1 - alpha))) |
| |
| return { |
| 'mean': float(np.mean(values)), |
| 'ci_low': ci_low, |
| 'ci_high': ci_high, |
| 'se': float(np.std(boot_means)), |
| 'n': len(values), |
| } |
|
|
|
|
| def compute_bootstrap_summary(df, group_cols, metric_cols, n_boot=1000): |
| """Compute bootstrap CIs for all metrics grouped by specified columns.""" |
| import pandas as pd |
| rows = [] |
| |
| available_groups = [c for c in group_cols if c in df.columns] |
| if not available_groups: |
| groups = [('all', df)] |
| else: |
| groups = df.groupby(available_groups) |
| |
| for grp_name, grp_df in groups: |
| row = {} |
| if isinstance(grp_name, tuple): |
| for col, val in zip(available_groups, grp_name): |
| row[col] = val |
| elif available_groups: |
| row[available_groups[0]] = grp_name |
| |
| for col in metric_cols: |
| if col in grp_df.columns: |
| vals = grp_df[col].dropna().values |
| ci = bootstrap_ci(vals, n_boot=n_boot) |
| row[f'{col}_mean'] = ci['mean'] |
| row[f'{col}_ci_low'] = ci['ci_low'] |
| row[f'{col}_ci_high'] = ci['ci_high'] |
| row[f'{col}_se'] = ci['se'] |
| row[f'{col}_n'] = ci['n'] |
| |
| rows.append(row) |
| |
| return pd.DataFrame(rows) |
|
|