import numpy as np import scipy import sklearn from sklearn.cluster._kmeans import k_means from sklearn.metrics.pairwise import cosine_similarity import fastcluster from scipy.cluster.hierarchy import fcluster from scipy.spatial.distance import squareform try: import umap, hdbscan except ImportError: raise ImportError( "Package \"umap\" or \"hdbscan\" not found. \ Please install them first by \"pip install umap-learn hdbscan\"." ) class SpectralCluster: """A spectral clustering method using unnormalized Laplacian of affinity matrix. This implementation is adapted from https://github.com/speechbrain/speechbrain. """ def __init__(self, min_num_spks=1, max_num_spks=10, pval=0.02, min_pnum=6, oracle_num=None): self.min_num_spks = min_num_spks self.max_num_spks = max_num_spks self.min_pnum = min_pnum self.pval = pval self.k = oracle_num def __call__(self, X, **kwargs): pval = kwargs.get('pval', None) oracle_num = kwargs.get('speaker_num', None) # Similarity matrix computation sim_mat = self.get_sim_mat(X) # Refining similarity matrix with pval prunned_sim_mat = self.p_pruning(sim_mat, pval) # Symmetrization sym_prund_sim_mat = 0.5 * (prunned_sim_mat + prunned_sim_mat.T) # Laplacian calculation laplacian = self.get_laplacian(sym_prund_sim_mat) # Get Spectral Embeddings emb, num_of_spk = self.get_spec_embs(laplacian, oracle_num) # Perform clustering labels = self.cluster_embs(emb, num_of_spk) return labels def get_sim_mat(self, X): # Cosine similarities M = cosine_similarity(X, X) return M def p_pruning(self, A, pval=None): if pval is None: pval = self.pval n_elems = int((1 - pval) * A.shape[0]) n_elems = min(n_elems, A.shape[0]-self.min_pnum) # For each row in a affinity matrix for i in range(A.shape[0]): low_indexes = np.argsort(A[i, :]) low_indexes = low_indexes[0:n_elems] # Replace smaller similarity values by 0s A[i, low_indexes] = 0 return A def get_laplacian(self, M): M[np.diag_indices(M.shape[0])] = 0 D = np.sum(np.abs(M), axis=1) D = np.diag(D) L = D - M return L def get_spec_embs(self, L, k_oracle=None): if k_oracle is None: k_oracle = self.k lambdas, eig_vecs = scipy.sparse.linalg.eigsh(L, k=min(self.max_num_spks+1, L.shape[0]), which='SM') if k_oracle is not None: num_of_spk = k_oracle else: lambda_gap_list = self.getEigenGaps( lambdas[self.min_num_spks - 1:self.max_num_spks + 1]) num_of_spk = np.argmax(lambda_gap_list) + self.min_num_spks emb = eig_vecs[:, :num_of_spk] return emb, num_of_spk def cluster_embs(self, emb, k): # k-means _, labels, _ = k_means(emb, k) return labels def getEigenGaps(self, eig_vals): eig_vals_gap_list = [] for i in range(len(eig_vals) - 1): gap = float(eig_vals[i + 1]) - float(eig_vals[i]) eig_vals_gap_list.append(gap) return eig_vals_gap_list class UmapHdbscan: """ Reference: - Siqi Zheng, Hongbin Suo. Reformulating Speaker Diarization as Community Detection With Emphasis On Topological Structure. ICASSP2022 """ def __init__(self, n_neighbors=20, n_components=60, min_samples=20, min_cluster_size=10, metric='euclidean'): self.n_neighbors = n_neighbors self.n_components = n_components self.min_samples = min_samples self.min_cluster_size = min_cluster_size self.metric = metric def __call__(self, X, **kwargs): umap_X = umap.UMAP( n_neighbors=self.n_neighbors, min_dist=0.0, n_components=min(self.n_components, X.shape[0]-2), metric=self.metric, ).fit_transform(X) labels = hdbscan.HDBSCAN(min_samples=self.min_samples, min_cluster_size=self.min_cluster_size).fit_predict(umap_X) return labels class AHCluster: """ Agglomerative Hierarchical Clustering, a bottom-up approach which iteratively merges the closest clusters until a termination condition is reached. This implementation is adapted from https://github.com/BUTSpeechFIT/VBx. """ def __init__(self, fix_cos_thr=0.4): self.fix_cos_thr = fix_cos_thr def __call__(self, X, **kwargs): scr_mx = cosine_similarity(X) scr_mx = squareform(-scr_mx, checks=False) lin_mat = fastcluster.linkage(scr_mx, method='average', preserve_input='False') adjust = abs(lin_mat[:, 2].min()) lin_mat[:, 2] += adjust labels = fcluster(lin_mat, -self.fix_cos_thr + adjust, criterion='distance') - 1 return labels class CommonClustering: """Perfom clustering for input embeddings and output the labels. """ def __init__(self, cluster_type, cluster_line=40, mer_cos=None, min_cluster_size=4, **kwargs): self.cluster_type = cluster_type self.cluster_line = cluster_line self.min_cluster_size = min_cluster_size self.mer_cos = mer_cos if self.cluster_type == 'spectral': self.cluster = SpectralCluster(**kwargs) elif self.cluster_type == 'umap_hdbscan': kwargs['min_cluster_size'] = min_cluster_size self.cluster = UmapHdbscan(**kwargs) elif self.cluster_type == 'AHC': self.cluster = AHCluster(**kwargs) else: raise ValueError( '%s is not currently supported.' % self.cluster_type ) if self.cluster_type != 'AHC': self.cluster_for_short = AHCluster() else: self.cluster_for_short = self.cluster def __call__(self, X, **kwargs): # clustering and return the labels assert len(X.shape) == 2, 'Shape of input should be [N, C]' if X.shape[0] <= 1: return np.zeros(X.shape[0], dtype=int) if X.shape[0] < self.cluster_line: labels = self.cluster_for_short(X) else: labels = self.cluster(X, **kwargs) # remove extremely minor cluster labels = self.filter_minor_cluster(labels, X, self.min_cluster_size) # merge similar speaker if self.mer_cos is not None: labels = self.merge_by_cos(labels, X, self.mer_cos) return labels def filter_minor_cluster(self, labels, x, min_cluster_size): cset = np.unique(labels) csize = np.array([(labels == i).sum() for i in cset]) minor_idx = np.where(csize <= self.min_cluster_size)[0] if len(minor_idx) == 0: return labels minor_cset = cset[minor_idx] major_idx = np.where(csize > self.min_cluster_size)[0] if len(major_idx) == 0: return np.zeros_like(labels) major_cset = cset[major_idx] major_center = np.stack([x[labels == i].mean(0) \ for i in major_cset]) for i in range(len(labels)): if labels[i] in minor_cset: cos_sim = cosine_similarity(x[i][np.newaxis], major_center) labels[i] = major_cset[cos_sim.argmax()] return labels def merge_by_cos(self, labels, x, cos_thr): # merge the similar speakers by cosine similarity assert cos_thr > 0 and cos_thr <= 1 while True: cset = np.unique(labels) if len(cset) == 1: break centers = np.stack([x[labels == i].mean(0) \ for i in cset]) affinity = cosine_similarity(centers, centers) affinity = np.triu(affinity, 1) idx = np.unravel_index(np.argmax(affinity), affinity.shape) if affinity[idx] < cos_thr: break c1, c2 = cset[np.array(idx)] labels[labels==c2]=c1 return labels class JointClustering: """Perfom joint clustering for input audio and visual embeddings and output the labels. """ def __init__(self, audio_cluster, vision_cluster): self.audio_cluster = audio_cluster self.vision_cluster = vision_cluster def __call__(self, audioX, visionX, audioT, visionT, conf): # audio-only and video-only clustering alabels = self.audio_cluster(audioX) vlabels = self.vision_cluster(visionX) alabels = self.arrange_labels(alabels) vlist, vspk_embs, vspk_dur = self.get_vlist_embs(audioX, alabels, vlabels, audioT, visionT, conf) # modify alabels according to vlabels aspk_num = alabels.max()+1 for i in range(aspk_num): aspki_index = np.where(alabels==i)[0] aspki_embs = audioX[alabels==i] aspkiT_part = np.array(audioT)[alabels==i] overlap_vspk = self.overlap_spks(self.cast_overlap(aspkiT_part), vlist, vspk_dur) if len(overlap_vspk) > 1: centers = np.stack([vspk_embs[s] for s in overlap_vspk]) distribute_labels = self.distribute_embs(aspki_embs, centers) for j in range(distribute_labels.max()+1): for loc in aspki_index[distribute_labels==j]: alabels[loc] = overlap_vspk[j] elif len(overlap_vspk) == 1: for loc in aspki_index: alabels[loc] = overlap_vspk[0] alabels = self.arrange_labels(alabels) return alabels def overlap_spks(self, times, vlist, vspk_dur=None): # get the vspk that overlaps with times. overlap_dur = {} for [a_st, a_ed] in times: for [v_st, v_ed, v_id] in vlist: if a_ed > v_st and v_ed > a_st: if v_id not in overlap_dur: overlap_dur[v_id]=0 overlap_dur[v_id] += min(a_ed, v_ed) - max(a_st, v_st) vspk_list = [] for v_id, dur in overlap_dur.items(): # set the criteria for confirming overlap. if (vspk_dur is None and dur > 0.5) or (vspk_dur is not None and dur > min(vspk_dur[v_id]*0.5, 0.5)): vspk_list.append(v_id) return vspk_list def distribute_embs(self, embs, centers): # embs: [n, D]. centers: [k, D] norm_centers = centers / np.linalg.norm(centers, axis=1, keepdims=True) norm_embs = embs / np.linalg.norm(embs, axis=1, keepdims=True) similarity = np.matmul(norm_embs, norm_centers.T) # [n, k] argsort = np.argsort(similarity, axis=-1) return argsort[:, -1] def get_vlist_embs(self, audioX, alabels, vlabels, audioT, visionT, conf): assert len(vlabels) == len(visionT) vlist = [] for i, ti in enumerate(visionT): if len(vlist)==0 or vlabels[i] != vlist[-1][2] or ti - visionT[i-1] > conf.face_det_stride*0.04 + 1e-4: if len(vlist) > 0 and vlist[-1][1] - vlist[-1][0] < 1e-4: # remove too short intervals. vlist.pop() vlist.append([ti, ti, vlabels[i]]) else: vlist[-1][1] = ti # adjust vision labels vlabels_arrange = self.arrange_labels([i[2] for i in vlist], a_st=alabels.max()+1) vlist = [[i[0], i[1], j] for i, j in zip(vlist, vlabels_arrange)] # get audio spk embs aligning with 'vlist' vspk_embs = {} for [v_st, v_ed, v_id] in vlist: for i, [a_st, a_ed] in enumerate(audioT): if a_ed >= v_st and v_ed >= a_st: if min(a_ed, v_ed) - max(a_st, v_st) > 1: if v_id not in vspk_embs: vspk_embs[v_id] = [] vspk_embs[v_id].append(audioX[i]) for k in vspk_embs: vspk_embs[k] = np.stack(vspk_embs[k]).mean(0) vlist_new = [] for i in vlist: if i[2] in vspk_embs: vlist_new.append(i) # get duration of v_spk vspk_dur = {} for i in vlist_new: if i[2] not in vspk_dur: vspk_dur[i[2]]=0 vspk_dur[i[2]] += i[1]-i[0] return vlist_new, vspk_embs, vspk_dur def cast_overlap(self, input_time): if len(input_time)==0: return input_time output_time = [] for i in range(0, len(input_time)-1): if i == 0 or output_time[-1][1] < input_time[i][0]: output_time.append(input_time[i]) else: output_time[-1][1] = input_time[i][1] return output_time def arrange_labels(self, labels, a_st=0): # arrange labels in order from 0. new_labels = [] labels_dict = {} idx = a_st for i in labels: if i not in labels_dict: labels_dict[i] = idx idx += 1 new_labels.append(labels_dict[i]) return np.array(new_labels)