Spaces:
Running
on
Zero
Running
on
Zero
| #!/usr/bin/env python3 | |
| # -*- encoding: utf-8 -*- | |
| # Copyright FunASR (https://github.com/alibaba-damo-academy/FunASR). All Rights Reserved. | |
| # MIT License (https://opensource.org/licenses/MIT) | |
| # Modified from 3D-Speaker (https://github.com/alibaba-damo-academy/3D-Speaker) | |
| import scipy | |
| import torch | |
| import sklearn | |
| import hdbscan | |
| import numpy as np | |
| from sklearn.cluster._kmeans import k_means | |
| class SpectralCluster: | |
| r"""A spectral clustering mehtod using unnormalized Laplacian of affinity matrix. | |
| This implementation is adapted from https://github.com/speechbrain/speechbrain. | |
| """ | |
| def __init__(self, min_num_spks=1, max_num_spks=15, pval=0.022): | |
| self.min_num_spks = min_num_spks | |
| self.max_num_spks = max_num_spks | |
| self.pval = pval | |
| def __call__(self, X, oracle_num=None): | |
| # Similarity matrix computation | |
| sim_mat = self.get_sim_mat(X) | |
| # Refining similarity matrix with pval | |
| prunned_sim_mat = self.p_pruning(sim_mat) | |
| # Symmetrization | |
| sym_prund_sim_mat = 0.5 * (prunned_sim_mat + prunned_sim_mat.T) | |
| # Laplacian calculation | |
| laplacian = self.get_laplacian(sym_prund_sim_mat) | |
| # Get Spectral Embeddings | |
| emb, num_of_spk = self.get_spec_embs(laplacian, oracle_num) | |
| # Perform clustering | |
| labels = self.cluster_embs(emb, num_of_spk) | |
| return labels | |
| def get_sim_mat(self, X): | |
| # Cosine similarities | |
| M = sklearn.metrics.pairwise.cosine_similarity(X, X) | |
| return M | |
| def p_pruning(self, A): | |
| if A.shape[0] * self.pval < 6: | |
| pval = 6.0 / A.shape[0] | |
| else: | |
| pval = self.pval | |
| n_elems = int((1 - pval) * A.shape[0]) | |
| # For each row in a affinity matrix | |
| for i in range(A.shape[0]): | |
| low_indexes = np.argsort(A[i, :]) | |
| low_indexes = low_indexes[0:n_elems] | |
| # Replace smaller similarity values by 0s | |
| A[i, low_indexes] = 0 | |
| return A | |
| def get_laplacian(self, M): | |
| M[np.diag_indices(M.shape[0])] = 0 | |
| D = np.sum(np.abs(M), axis=1) | |
| D = np.diag(D) | |
| L = D - M | |
| return L | |
| def get_spec_embs(self, L, k_oracle=None): | |
| lambdas, eig_vecs = scipy.linalg.eigh(L) | |
| if k_oracle is not None: | |
| num_of_spk = k_oracle | |
| else: | |
| lambda_gap_list = self.getEigenGaps( | |
| lambdas[self.min_num_spks - 1 : self.max_num_spks + 1] | |
| ) | |
| num_of_spk = np.argmax(lambda_gap_list) + self.min_num_spks | |
| emb = eig_vecs[:, :num_of_spk] | |
| return emb, num_of_spk | |
| def cluster_embs(self, emb, k): | |
| _, labels, _ = k_means(emb, k) | |
| return labels | |
| def getEigenGaps(self, eig_vals): | |
| eig_vals_gap_list = [] | |
| for i in range(len(eig_vals) - 1): | |
| gap = float(eig_vals[i + 1]) - float(eig_vals[i]) | |
| eig_vals_gap_list.append(gap) | |
| return eig_vals_gap_list | |
| class UmapHdbscan: | |
| r""" | |
| Reference: | |
| - Siqi Zheng, Hongbin Suo. Reformulating Speaker Diarization as Community Detection With | |
| Emphasis On Topological Structure. ICASSP2022 | |
| """ | |
| def __init__( | |
| self, | |
| n_neighbors=20, | |
| n_components=60, | |
| min_samples=10, | |
| min_cluster_size=10, | |
| metric="cosine", | |
| ): | |
| self.n_neighbors = n_neighbors | |
| self.n_components = n_components | |
| self.min_samples = min_samples | |
| self.min_cluster_size = min_cluster_size | |
| self.metric = metric | |
| def __call__(self, X): | |
| import umap.umap_ as umap | |
| umap_X = umap.UMAP( | |
| n_neighbors=self.n_neighbors, | |
| min_dist=0.0, | |
| n_components=min(self.n_components, X.shape[0] - 2), | |
| metric=self.metric, | |
| ).fit_transform(X) | |
| labels = hdbscan.HDBSCAN( | |
| min_samples=self.min_samples, | |
| min_cluster_size=self.min_cluster_size, | |
| allow_single_cluster=True, | |
| ).fit_predict(umap_X) | |
| return labels | |
| class ClusterBackend(torch.nn.Module): | |
| r"""Perfom clustering for input embeddings and output the labels. | |
| Args: | |
| model_dir: A model dir. | |
| model_config: The model config. | |
| """ | |
| def __init__(self): | |
| super().__init__() | |
| self.model_config = {"merge_thr": 0.78} | |
| # self.other_config = kwargs | |
| self.spectral_cluster = SpectralCluster() | |
| self.umap_hdbscan_cluster = UmapHdbscan() | |
| def forward(self, X, **params): | |
| # clustering and return the labels | |
| k = params["oracle_num"] if "oracle_num" in params else None | |
| assert ( | |
| len(X.shape) == 2 | |
| ), "modelscope error: the shape of input should be [N, C]" | |
| if X.shape[0] < 20: | |
| return np.zeros(X.shape[0], dtype="int") | |
| if X.shape[0] < 2048 or k is not None: | |
| # unexpected corner case | |
| labels = self.spectral_cluster(X, k) | |
| else: | |
| labels = self.umap_hdbscan_cluster(X) | |
| if k is None and "merge_thr" in self.model_config: | |
| labels = self.merge_by_cos(labels, X, self.model_config["merge_thr"]) | |
| return labels | |
| def merge_by_cos(self, labels, embs, cos_thr): | |
| # merge the similar speakers by cosine similarity | |
| assert cos_thr > 0 and cos_thr <= 1 | |
| while True: | |
| spk_num = labels.max() + 1 | |
| if spk_num == 1: | |
| break | |
| spk_center = [] | |
| for i in range(spk_num): | |
| spk_emb = embs[labels == i].mean(0) | |
| spk_center.append(spk_emb) | |
| assert len(spk_center) > 0 | |
| spk_center = np.stack(spk_center, axis=0) | |
| norm_spk_center = spk_center / np.linalg.norm( | |
| spk_center, axis=1, keepdims=True | |
| ) | |
| affinity = np.matmul(norm_spk_center, norm_spk_center.T) | |
| affinity = np.triu(affinity, 1) | |
| spks = np.unravel_index(np.argmax(affinity), affinity.shape) | |
| if affinity[spks] < cos_thr: | |
| break | |
| for i in range(len(labels)): | |
| if labels[i] == spks[1]: | |
| labels[i] = spks[0] | |
| elif labels[i] > spks[1]: | |
| labels[i] -= 1 | |
| return labels | |