| """Model implementations: Poisson-Gamma VI, Gaussian-Gaussian VI, Gaussian-Gamma MAP. |
| |
| All implementations use vectorized numpy operations over edge arrays for performance. |
| """ |
| import numpy as np |
| import time |
| from scipy.special import digamma, polygamma |
| from typing import Dict, Optional, Tuple, List |
| from collections import defaultdict |
|
|
| from src.utils import FitResult, stable_softmax, relative_param_change |
| from src.graph_utils import build_adjacency, get_deletion_neighborhood, get_user_item_sets_in_radius |
|
|
|
|
| |
| |
| |
|
|
| class PoissonGammaVI: |
| """Augmented Gamma-Poisson MF with mean-field CAVI. Vectorized.""" |
| |
| def __init__(self, N, M, K, a0=0.3, b0=1.0, c0=0.3, d0=1.0, |
| max_iter=200, tol=1e-5, damping=1.0, seed=0): |
| self.N = N |
| self.M = M |
| self.K = K |
| self.a0 = a0 |
| self.b0 = b0 |
| self.c0 = c0 |
| self.d0 = d0 |
| self.max_iter = max_iter |
| self.tol = tol |
| self.damping = damping |
| self.seed = seed |
| |
| def _init_params(self, rng=None): |
| if rng is None: |
| rng = np.random.RandomState(self.seed) |
| return { |
| 'a': np.full((self.N, self.K), self.a0) + rng.gamma(1, 0.1, (self.N, self.K)), |
| 'b': np.full((self.N, self.K), self.b0) + rng.gamma(1, 0.1, (self.N, self.K)), |
| 'c': np.full((self.M, self.K), self.c0) + rng.gamma(1, 0.1, (self.M, self.K)), |
| 'd': np.full((self.M, self.K), self.d0) + rng.gamma(1, 0.1, (self.M, self.K)), |
| } |
| |
| def _prepare_edges(self, edges): |
| """Convert edge list to vectorized arrays.""" |
| n_edges = len(edges) |
| I = np.array([e[0] for e in edges], dtype=np.int32) |
| J = np.array([e[1] for e in edges], dtype=np.int32) |
| X = np.array([e[2] for e in edges], dtype=np.float64) |
| return I, J, X, n_edges |
| |
| def _cavi_sweep(self, I, J, X, n_edges, params, |
| update_users=None, update_items=None): |
| """One vectorized CAVI sweep.""" |
| a, b, c, d = params['a'], params['b'], params['c'], params['d'] |
| K = self.K |
| |
| |
| |
| psi_a = digamma(a) |
| log_b = np.log(b + 1e-30) |
| psi_c = digamma(c) |
| log_d = np.log(d + 1e-30) |
| |
| log_r = psi_a[I] - log_b[I] + psi_c[J] - log_d[J] |
| log_r -= log_r.max(axis=1, keepdims=True) |
| r = np.exp(log_r) |
| r /= r.sum(axis=1, keepdims=True) + 1e-30 |
| |
| |
| xr = X[:, None] * r |
| |
| |
| EV = c / d |
| EU = a / b |
| |
| |
| a_new = np.full_like(a, self.a0) |
| b_new = np.full_like(b, self.b0) |
| |
| |
| np.add.at(a_new, I, xr) |
| |
| np.add.at(b_new, I, EV[J]) |
| |
| |
| c_new = np.full_like(c, self.c0) |
| d_new = np.full_like(d, self.d0) |
| |
| np.add.at(c_new, J, xr) |
| np.add.at(d_new, J, EU[I]) |
| |
| |
| if update_users is not None: |
| mask_u = np.zeros(self.N, dtype=bool) |
| mask_u[list(update_users)] = True |
| a_out = np.where(mask_u[:, None], a_new, a) |
| b_out = np.where(mask_u[:, None], b_new, b) |
| else: |
| a_out = a_new |
| b_out = b_new |
| |
| if update_items is not None: |
| mask_v = np.zeros(self.M, dtype=bool) |
| mask_v[list(update_items)] = True |
| c_out = np.where(mask_v[:, None], c_new, c) |
| d_out = np.where(mask_v[:, None], d_new, d) |
| else: |
| c_out = c_new |
| d_out = d_new |
| |
| |
| if self.damping < 1.0: |
| alpha = self.damping |
| a_out = (1 - alpha) * a + alpha * a_out |
| b_out = (1 - alpha) * b + alpha * b_out |
| c_out = (1 - alpha) * c + alpha * c_out |
| d_out = (1 - alpha) * d + alpha * d_out |
| |
| return {'a': a_out, 'b': b_out, 'c': c_out, 'd': d_out} |
| |
| def compute_elbo(self, edges, params): |
| """Approximate ELBO (likelihood term only for speed).""" |
| I, J, X, n_edges = self._prepare_edges(edges) |
| a, b, c, d = params['a'], params['b'], params['c'], params['d'] |
| |
| EU = a / b |
| EV = c / d |
| |
| |
| pred = np.sum(EU[I] * EV[J], axis=1) |
| |
| |
| elbo = np.sum(X * np.log(pred + 1e-30) - pred) |
| |
| return float(elbo) |
| |
| def fit_full(self, edges, config=None, init_params=None): |
| t0 = time.time() |
| I, J, X, n_edges = self._prepare_edges(edges) |
| |
| if init_params is not None: |
| params = {k: v.copy() for k, v in init_params.items()} |
| else: |
| params = self._init_params() |
| |
| elbo_trace = [] |
| converged = False |
| |
| for it in range(self.max_iter): |
| old_params = {k: v.copy() for k, v in params.items()} |
| params = self._cavi_sweep(I, J, X, n_edges, params) |
| |
| change = relative_param_change(old_params, params) |
| |
| if it % 50 == 0: |
| elbo = self.compute_elbo(edges, params) |
| elbo_trace.append(elbo) |
| |
| if change < self.tol: |
| converged = True |
| break |
| |
| return FitResult( |
| params=params, objective_trace=elbo_trace, |
| n_iterations=it + 1, converged=converged, |
| runtime_sec=time.time() - t0, |
| model_family='poisson_gamma', inference_type='vi', |
| likelihood='poisson', prior='gamma', |
| ) |
| |
| def fit_without_edge(self, edges, edge_to_remove, config=None, init_params=None): |
| i_del, j_del = int(edge_to_remove[0]), int(edge_to_remove[1]) |
| filtered = [(i, j, x) for i, j, x in edges if not (i == i_del and j == j_del)] |
| return self.fit_full(filtered, config, init_params) |
| |
| def fit_local(self, edges, edge_to_remove, radius, config=None, init_params=None): |
| t0 = time.time() |
| i_del, j_del = int(edge_to_remove[0]), int(edge_to_remove[1]) |
| filtered = [(i, j, x) for i, j, x in edges if not (i == i_del and j == j_del)] |
| |
| if init_params is None: |
| raise ValueError("Local unlearning requires init_params") |
| params = {k: v.copy() for k, v in init_params.items()} |
| |
| |
| u2i_orig, i2u_orig, _ = build_adjacency(edges, self.N, self.M) |
| distances = get_deletion_neighborhood(edge_to_remove, u2i_orig, i2u_orig, |
| self.N, self.M, radius) |
| users_in_R, items_in_R = get_user_item_sets_in_radius(distances, self.N, radius) |
| |
| |
| |
| |
| |
| local_edges = [(i, j, x) for i, j, x in filtered |
| if i in users_in_R or j in items_in_R] |
| |
| I, J, X, n_edges = self._prepare_edges(local_edges) |
| |
| converged = False |
| for it in range(self.max_iter): |
| old_a = params['a'].copy() |
| old_b = params['b'].copy() |
| old_c = params['c'].copy() |
| old_d = params['d'].copy() |
| |
| params = self._cavi_sweep(I, J, X, n_edges, params, |
| update_users=users_in_R, update_items=items_in_R) |
| |
| |
| max_change = 0.0 |
| if users_in_R: |
| ul = list(users_in_R) |
| max_change = max(max_change, |
| np.max(np.abs(params['a'][ul] - old_a[ul]) / (1 + np.abs(old_a[ul]))), |
| np.max(np.abs(params['b'][ul] - old_b[ul]) / (1 + np.abs(old_b[ul])))) |
| if items_in_R: |
| il = list(items_in_R) |
| max_change = max(max_change, |
| np.max(np.abs(params['c'][il] - old_c[il]) / (1 + np.abs(old_c[il]))), |
| np.max(np.abs(params['d'][il] - old_d[il]) / (1 + np.abs(old_d[il])))) |
| |
| if max_change < self.tol: |
| converged = True |
| break |
| |
| return FitResult( |
| params=params, objective_trace=[], |
| n_iterations=it + 1, converged=converged, |
| runtime_sec=time.time() - t0, |
| model_family='poisson_gamma', inference_type='vi', |
| likelihood='poisson', prior='gamma', |
| diagnostics={'radius': radius, 'n_users_updated': len(users_in_R), |
| 'n_items_updated': len(items_in_R)} |
| ) |
| |
| def fit_warm_start_global(self, edges, edge_to_remove, config=None, init_params=None): |
| i_del, j_del = int(edge_to_remove[0]), int(edge_to_remove[1]) |
| filtered = [(i, j, x) for i, j, x in edges if not (i == i_del and j == j_del)] |
| return self.fit_full(filtered, config, init_params) |
|
|
|
|
| |
| |
| |
|
|
| class GaussianGaussianVI: |
| """Gaussian-Gaussian MF with mean-field Gaussian VI. Vectorized.""" |
| |
| def __init__(self, N, M, K, sigma_U=1.0, sigma_V=1.0, sigma_x=1.0, |
| max_iter=200, tol=1e-5, damping=1.0, seed=0): |
| self.N = N |
| self.M = M |
| self.K = K |
| self.sigma_U = sigma_U |
| self.sigma_V = sigma_V |
| self.sigma_x = sigma_x |
| self.max_iter = max_iter |
| self.tol = tol |
| self.damping = damping |
| self.seed = seed |
| |
| def _init_params(self, rng=None): |
| if rng is None: |
| rng = np.random.RandomState(self.seed) |
| return { |
| 'm_U': rng.randn(self.N, self.K) * 0.1, |
| 's_U': np.ones((self.N, self.K)) * 0.5, |
| 'm_V': rng.randn(self.M, self.K) * 0.1, |
| 's_V': np.ones((self.M, self.K)) * 0.5, |
| } |
| |
| def _prepare_edges(self, edges): |
| n_edges = len(edges) |
| I = np.array([e[0] for e in edges], dtype=np.int32) |
| J = np.array([e[1] for e in edges], dtype=np.int32) |
| X = np.array([e[2] for e in edges], dtype=np.float64) |
| return I, J, X, n_edges |
| |
| def _cavi_sweep(self, I, J, X, n_edges, params, |
| update_users=None, update_items=None): |
| """Vectorized Gaussian-Gaussian CAVI using coordinate updates.""" |
| m_U = params['m_U'].copy() |
| s_U = params['s_U'].copy() |
| m_V = params['m_V'].copy() |
| s_V = params['s_V'].copy() |
| |
| prec_x = 1.0 / (self.sigma_x ** 2) |
| prec_U = 1.0 / (self.sigma_U ** 2) |
| prec_V = 1.0 / (self.sigma_V ** 2) |
| |
| |
| |
| |
| |
| V_sq_plus_var = m_V ** 2 + s_V |
| U_sq_plus_var = m_U ** 2 + s_U |
| |
| |
| user_prec_sum = np.zeros((self.N, self.K)) |
| np.add.at(user_prec_sum, I, V_sq_plus_var[J]) |
| |
| s_U_new = 1.0 / (prec_U + prec_x * user_prec_sum) |
| |
| |
| |
| |
| |
| predicted = np.sum(m_U[I] * m_V[J], axis=1) |
| |
| m_U_new = np.zeros((self.N, self.K)) |
| for k in range(self.K): |
| |
| resid_k = X - predicted + m_U[I, k] * m_V[J, k] |
| contrib = m_V[J, k] * resid_k |
| user_sum = np.zeros(self.N) |
| np.add.at(user_sum, I, contrib) |
| m_U_new[:, k] = s_U_new[:, k] * prec_x * user_sum |
| |
| |
| item_prec_sum = np.zeros((self.M, self.K)) |
| np.add.at(item_prec_sum, J, U_sq_plus_var[I]) |
| |
| s_V_new = 1.0 / (prec_V + prec_x * item_prec_sum) |
| |
| |
| predicted_new = np.sum(m_U_new[I] * m_V[J], axis=1) |
| |
| m_V_new = np.zeros((self.M, self.K)) |
| for k in range(self.K): |
| resid_k = X - predicted_new + m_U_new[I, k] * m_V[J, k] |
| contrib = m_U_new[I, k] * resid_k |
| item_sum = np.zeros(self.M) |
| np.add.at(item_sum, J, contrib) |
| m_V_new[:, k] = s_V_new[:, k] * prec_x * item_sum |
| |
| |
| if update_users is not None: |
| mask_u = np.zeros(self.N, dtype=bool) |
| mask_u[list(update_users)] = True |
| m_U_new = np.where(mask_u[:, None], m_U_new, m_U) |
| s_U_new = np.where(mask_u[:, None], s_U_new, s_U) |
| |
| if update_items is not None: |
| mask_v = np.zeros(self.M, dtype=bool) |
| mask_v[list(update_items)] = True |
| m_V_new = np.where(mask_v[:, None], m_V_new, m_V) |
| s_V_new = np.where(mask_v[:, None], s_V_new, s_V) |
| |
| if self.damping < 1.0: |
| alpha = self.damping |
| m_U_new = (1 - alpha) * m_U + alpha * m_U_new |
| s_U_new = (1 - alpha) * s_U + alpha * s_U_new |
| m_V_new = (1 - alpha) * m_V + alpha * m_V_new |
| s_V_new = (1 - alpha) * s_V + alpha * s_V_new |
| |
| return {'m_U': m_U_new, 's_U': s_U_new, 'm_V': m_V_new, 's_V': s_V_new} |
| |
| def compute_objective(self, edges, params): |
| """Approximate ELBO (likelihood only).""" |
| I, J, X, _ = self._prepare_edges(edges) |
| m_U, m_V = params['m_U'], params['m_V'] |
| pred = np.sum(m_U[I] * m_V[J], axis=1) |
| mse = np.mean((X - pred) ** 2) |
| return -float(mse) |
| |
| def fit_full(self, edges, config=None, init_params=None): |
| t0 = time.time() |
| I, J, X, n_edges = self._prepare_edges(edges) |
| |
| params = {k: v.copy() for k, v in (init_params or self._init_params()).items()} |
| obj_trace = [] |
| converged = False |
| |
| for it in range(self.max_iter): |
| old_params = {k: v.copy() for k, v in params.items()} |
| params = self._cavi_sweep(I, J, X, n_edges, params) |
| change = relative_param_change(old_params, params) |
| if it % 50 == 0: |
| obj_trace.append(self.compute_objective(edges, params)) |
| if change < self.tol: |
| converged = True |
| break |
| |
| return FitResult( |
| params=params, objective_trace=obj_trace, |
| n_iterations=it + 1, converged=converged, |
| runtime_sec=time.time() - t0, |
| model_family='gaussian_gaussian', inference_type='vi', |
| likelihood='gaussian', prior='gaussian', |
| ) |
| |
| def fit_without_edge(self, edges, edge_to_remove, config=None, init_params=None): |
| i_del, j_del = int(edge_to_remove[0]), int(edge_to_remove[1]) |
| filtered = [(i, j, x) for i, j, x in edges if not (i == i_del and j == j_del)] |
| return self.fit_full(filtered, config, init_params) |
| |
| def fit_local(self, edges, edge_to_remove, radius, config=None, init_params=None): |
| t0 = time.time() |
| i_del, j_del = int(edge_to_remove[0]), int(edge_to_remove[1]) |
| filtered = [(i, j, x) for i, j, x in edges if not (i == i_del and j == j_del)] |
| |
| if init_params is None: |
| raise ValueError("Local unlearning requires init_params") |
| params = {k: v.copy() for k, v in init_params.items()} |
| |
| u2i_orig, i2u_orig, _ = build_adjacency(edges, self.N, self.M) |
| distances = get_deletion_neighborhood(edge_to_remove, u2i_orig, i2u_orig, |
| self.N, self.M, radius) |
| users_in_R, items_in_R = get_user_item_sets_in_radius(distances, self.N, radius) |
| |
| |
| local_edges = [(i, j, x) for i, j, x in filtered |
| if i in users_in_R or j in items_in_R] |
| I, J, X, n_edges = self._prepare_edges(local_edges) |
| converged = False |
| for it in range(self.max_iter): |
| old_params = {k: v.copy() for k, v in params.items()} |
| params = self._cavi_sweep(I, J, X, n_edges, params, |
| update_users=users_in_R, update_items=items_in_R) |
| change = relative_param_change(old_params, params) |
| if change < self.tol: |
| converged = True |
| break |
| |
| return FitResult( |
| params=params, objective_trace=[], |
| n_iterations=it + 1, converged=converged, |
| runtime_sec=time.time() - t0, |
| model_family='gaussian_gaussian', inference_type='vi', |
| likelihood='gaussian', prior='gaussian', |
| diagnostics={'radius': radius} |
| ) |
| |
| def fit_warm_start_global(self, edges, edge_to_remove, config=None, init_params=None): |
| i_del, j_del = int(edge_to_remove[0]), int(edge_to_remove[1]) |
| filtered = [(i, j, x) for i, j, x in edges if not (i == i_del and j == j_del)] |
| return self.fit_full(filtered, config, init_params) |
|
|
|
|
| |
| |
| |
|
|
| class GaussianGammaMAP: |
| """Gaussian likelihood + Gamma prior, MAP via softplus parameterization. |
| |
| Uses Adam optimizer with gradient clipping for stable convergence. |
| """ |
| |
| def __init__(self, N, M, K, a0=0.3, b0=1.0, c0=0.3, d0=1.0, |
| sigma_x=1.0, lr=0.01, max_iter=500, tol=1e-5, seed=0, |
| grad_clip=5.0, adam_beta1=0.9, adam_beta2=0.999): |
| self.N = N |
| self.M = M |
| self.K = K |
| self.a0 = a0 |
| self.b0 = b0 |
| self.c0 = c0 |
| self.d0 = d0 |
| self.sigma_x = sigma_x |
| self.lr = lr |
| self.max_iter = max_iter |
| self.tol = tol |
| self.seed = seed |
| self.grad_clip = grad_clip |
| self.adam_beta1 = adam_beta1 |
| self.adam_beta2 = adam_beta2 |
| |
| def _softplus(self, x): |
| return np.log1p(np.exp(np.clip(x, -20, 20))) |
| |
| def _softplus_grad(self, x): |
| return 1.0 / (1.0 + np.exp(-np.clip(x, -20, 20))) |
| |
| def _inv_softplus(self, y): |
| """Inverse of softplus: log(exp(y) - 1).""" |
| return np.log(np.exp(np.clip(y, 1e-8, 20)) - 1 + 1e-30) |
| |
| def _init_params(self, rng=None, edges=None): |
| if rng is None: |
| rng = np.random.RandomState(self.seed) |
| |
| if edges is not None: |
| I = np.array([e[0] for e in edges], dtype=np.int32) |
| J = np.array([e[1] for e in edges], dtype=np.int32) |
| X = np.array([e[2] for e in edges], dtype=np.float64) |
| |
| x_mean = np.abs(X).mean() |
| init_scale = np.sqrt(np.abs(x_mean) / self.K + 0.1) |
| else: |
| init_scale = 0.5 |
| U_init = np.abs(rng.randn(self.N, self.K)) * init_scale + 0.1 |
| V_init = np.abs(rng.randn(self.M, self.K)) * init_scale + 0.1 |
| return { |
| 'alpha': self._inv_softplus(U_init), |
| 'beta': self._inv_softplus(V_init), |
| } |
| |
| def _prepare_edges(self, edges): |
| n_edges = len(edges) |
| I = np.array([e[0] for e in edges], dtype=np.int32) |
| J = np.array([e[1] for e in edges], dtype=np.int32) |
| X = np.array([e[2] for e in edges], dtype=np.float64) |
| return I, J, X, n_edges |
| |
| def compute_objective(self, edges, params): |
| I, J, X, _ = self._prepare_edges(edges) |
| U = self._softplus(params['alpha']) |
| V = self._softplus(params['beta']) |
| |
| pred = np.sum(U[I] * V[J], axis=1) |
| prec_x = 1.0 / (self.sigma_x ** 2) |
| |
| obj = -0.5 * prec_x * np.sum((X - pred) ** 2) |
| obj += np.sum((self.a0 - 1) * np.log(U + 1e-30) - self.b0 * U) |
| obj += np.sum((self.c0 - 1) * np.log(V + 1e-30) - self.d0 * V) |
| return float(obj) |
| |
| def _compute_gradients(self, I, J, X, params, update_users=None, update_items=None): |
| """Compute gradients with clipping.""" |
| U = self._softplus(params['alpha']) |
| V = self._softplus(params['beta']) |
| prec_x = 1.0 / (self.sigma_x ** 2) |
| |
| pred = np.sum(U[I] * V[J], axis=1) |
| residual = X - pred |
| |
| sp_grad_alpha = self._softplus_grad(params['alpha']) |
| sp_grad_beta = self._softplus_grad(params['beta']) |
| |
| grad_U = np.zeros_like(U) |
| for k in range(self.K): |
| contrib = prec_x * residual * V[J, k] |
| np.add.at(grad_U[:, k], I, contrib) |
| |
| prior_grad_U = (self.a0 - 1) / (U + 1e-6) - self.b0 |
| grad_U += prior_grad_U |
| grad_alpha = grad_U * sp_grad_alpha |
| |
| grad_V = np.zeros_like(V) |
| for k in range(self.K): |
| contrib = prec_x * residual * U[I, k] |
| np.add.at(grad_V[:, k], J, contrib) |
| |
| prior_grad_V = (self.c0 - 1) / (V + 1e-6) - self.d0 |
| grad_V += prior_grad_V |
| grad_beta = grad_V * sp_grad_beta |
| |
| |
| if self.grad_clip > 0: |
| gnorm_a = np.linalg.norm(grad_alpha) |
| if gnorm_a > self.grad_clip: |
| grad_alpha *= self.grad_clip / gnorm_a |
| gnorm_b = np.linalg.norm(grad_beta) |
| if gnorm_b > self.grad_clip: |
| grad_beta *= self.grad_clip / gnorm_b |
| |
| return grad_alpha, grad_beta |
| |
| def _fit_internal(self, edges, params, max_iter=None, |
| update_users=None, update_items=None): |
| """Internal fit with Adam optimizer.""" |
| t0 = time.time() |
| if max_iter is None: |
| max_iter = self.max_iter |
| I, J, X, n_edges = self._prepare_edges(edges) |
| |
| |
| m_alpha = np.zeros_like(params['alpha']) |
| v_alpha = np.zeros_like(params['alpha']) |
| m_beta = np.zeros_like(params['beta']) |
| v_beta = np.zeros_like(params['beta']) |
| eps_adam = 1e-8 |
| |
| obj_trace = [] |
| converged = False |
| |
| for it in range(max_iter): |
| old_params = {k: v.copy() for k, v in params.items()} |
| |
| grad_alpha, grad_beta = self._compute_gradients( |
| I, J, X, params, update_users, update_items) |
| |
| |
| t_adam = it + 1 |
| m_alpha = self.adam_beta1 * m_alpha + (1 - self.adam_beta1) * grad_alpha |
| v_alpha = self.adam_beta2 * v_alpha + (1 - self.adam_beta2) * grad_alpha**2 |
| m_hat_a = m_alpha / (1 - self.adam_beta1**t_adam) |
| v_hat_a = v_alpha / (1 - self.adam_beta2**t_adam) |
| |
| m_beta = self.adam_beta1 * m_beta + (1 - self.adam_beta1) * grad_beta |
| v_beta = self.adam_beta2 * v_beta + (1 - self.adam_beta2) * grad_beta**2 |
| m_hat_b = m_beta / (1 - self.adam_beta1**t_adam) |
| v_hat_b = v_beta / (1 - self.adam_beta2**t_adam) |
| |
| step_alpha = self.lr * m_hat_a / (np.sqrt(v_hat_a) + eps_adam) |
| step_beta = self.lr * m_hat_b / (np.sqrt(v_hat_b) + eps_adam) |
| |
| if update_users is not None: |
| ul = list(update_users) |
| params['alpha'][ul] += step_alpha[ul] |
| else: |
| params['alpha'] = params['alpha'] + step_alpha |
| |
| if update_items is not None: |
| il = list(update_items) |
| params['beta'][il] += step_beta[il] |
| else: |
| params['beta'] = params['beta'] + step_beta |
| |
| change = relative_param_change(old_params, params) |
| if it % 50 == 0: |
| obj_trace.append(self.compute_objective(edges, params)) |
| if change < self.tol: |
| converged = True |
| break |
| |
| return params, obj_trace, it + 1, converged, time.time() - t0 |
| |
| def fit_full(self, edges, config=None, init_params=None): |
| if init_params is not None: |
| params = {k: v.copy() for k, v in init_params.items()} |
| else: |
| params = self._init_params(edges=edges) |
| |
| params, obj_trace, n_iter, converged, runtime = self._fit_internal(edges, params) |
| |
| return FitResult( |
| params=params, objective_trace=obj_trace, |
| n_iterations=n_iter, converged=converged, |
| runtime_sec=runtime, |
| model_family='gaussian_gamma_map', inference_type='map', |
| likelihood='gaussian', prior='gamma', |
| ) |
| |
| def fit_without_edge(self, edges, edge_to_remove, config=None, init_params=None): |
| i_del, j_del = int(edge_to_remove[0]), int(edge_to_remove[1]) |
| filtered = [(i, j, x) for i, j, x in edges if not (i == i_del and j == j_del)] |
| return self.fit_full(filtered, config, init_params) |
| |
| def fit_local(self, edges, edge_to_remove, radius, config=None, init_params=None): |
| i_del, j_del = int(edge_to_remove[0]), int(edge_to_remove[1]) |
| filtered = [(i, j, x) for i, j, x in edges if not (i == i_del and j == j_del)] |
| |
| if init_params is None: |
| raise ValueError("Local unlearning requires init_params") |
| params = {k: v.copy() for k, v in init_params.items()} |
| |
| u2i_orig, i2u_orig, _ = build_adjacency(edges, self.N, self.M) |
| distances = get_deletion_neighborhood(edge_to_remove, u2i_orig, i2u_orig, |
| self.N, self.M, radius) |
| users_in_R, items_in_R = get_user_item_sets_in_radius(distances, self.N, radius) |
| |
| |
| local_edges = [(i, j, x) for i, j, x in filtered |
| if i in users_in_R or j in items_in_R] |
| |
| params, obj_trace, n_iter, converged, runtime = self._fit_internal( |
| local_edges, params, update_users=users_in_R, update_items=items_in_R) |
| |
| return FitResult( |
| params=params, objective_trace=obj_trace, |
| n_iterations=n_iter, converged=converged, |
| runtime_sec=runtime, |
| model_family='gaussian_gamma_map', inference_type='map', |
| likelihood='gaussian', prior='gamma', |
| diagnostics={'radius': radius, 'n_users_updated': len(users_in_R), |
| 'n_items_updated': len(items_in_R)} |
| ) |
| |
| def fit_warm_start_global(self, edges, edge_to_remove, config=None, init_params=None): |
| i_del, j_del = int(edge_to_remove[0]), int(edge_to_remove[1]) |
| filtered = [(i, j, x) for i, j, x in edges if not (i == i_del and j == j_del)] |
| return self.fit_full(filtered, config, init_params) |
|
|
|
|
| def get_model(model_family, N, M, K, **kwargs): |
| """Factory function.""" |
| if model_family == 'poisson_gamma': |
| valid = {'a0', 'b0', 'c0', 'd0', 'max_iter', 'tol', 'damping', 'seed'} |
| return PoissonGammaVI(N, M, K, **{k: v for k, v in kwargs.items() if k in valid}) |
| elif model_family == 'gaussian_gaussian': |
| valid = {'sigma_U', 'sigma_V', 'sigma_x', 'max_iter', 'tol', 'damping', 'seed'} |
| return GaussianGaussianVI(N, M, K, **{k: v for k, v in kwargs.items() if k in valid}) |
| elif model_family == 'gaussian_gamma_map': |
| valid = {'a0', 'b0', 'c0', 'd0', 'sigma_x', 'lr', 'max_iter', 'tol', 'seed', 'grad_clip'} |
| return GaussianGammaMAP(N, M, K, **{k: v for k, v in kwargs.items() if k in valid}) |
| else: |
| raise ValueError(f"Unknown model family: {model_family}") |
|
|