| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| """Clustering pipelines""" |
|
|
|
|
| import random |
| from enum import Enum |
| from typing import Optional, Tuple |
|
|
| import numpy as np |
| from pyannote_audio_utils.core import SlidingWindow, SlidingWindowFeature |
| from pyannote_audio_utils.pipeline import Pipeline |
| from pyannote_audio_utils.pipeline.parameter import Categorical, Integer, Uniform |
| from scipy.cluster.hierarchy import fcluster, linkage |
| from scipy.optimize import linear_sum_assignment |
| from scipy.spatial.distance import cdist |
|
|
|
|
| class BaseClustering(Pipeline): |
| def __init__( |
| self, |
| metric: str = "cosine", |
| max_num_embeddings: int = 1000, |
| constrained_assignment: bool = False, |
| ): |
| super().__init__() |
| self.metric = metric |
| self.max_num_embeddings = max_num_embeddings |
| self.constrained_assignment = constrained_assignment |
|
|
| def set_num_clusters( |
| self, |
| num_embeddings: int, |
| num_clusters: Optional[int] = None, |
| min_clusters: Optional[int] = None, |
| max_clusters: Optional[int] = None, |
| ): |
| min_clusters = num_clusters or min_clusters or 1 |
| min_clusters = max(1, min(num_embeddings, min_clusters)) |
| max_clusters = num_clusters or max_clusters or num_embeddings |
| max_clusters = max(1, min(num_embeddings, max_clusters)) |
|
|
| if min_clusters > max_clusters: |
| raise ValueError( |
| f"min_clusters must be smaller than (or equal to) max_clusters " |
| f"(here: min_clusters={min_clusters:g} and max_clusters={max_clusters:g})." |
| ) |
|
|
| if min_clusters == max_clusters: |
| num_clusters = min_clusters |
|
|
| return num_clusters, min_clusters, max_clusters |
|
|
| def filter_embeddings( |
| self, |
| embeddings: np.ndarray, |
| segmentations: Optional[SlidingWindowFeature] = None, |
| ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: |
| """Filter NaN embeddings and downsample embeddings |
| |
| Parameters |
| ---------- |
| embeddings : (num_chunks, num_speakers, dimension) array |
| Sequence of embeddings. |
| segmentations : (num_chunks, num_frames, num_speakers) array |
| Binary segmentations. |
| |
| Returns |
| ------- |
| filtered_embeddings : (num_embeddings, dimension) array |
| chunk_idx : (num_embeddings, ) array |
| speaker_idx : (num_embeddings, ) array |
| """ |
|
|
| |
| active = np.sum(segmentations.data, axis=1) > 0 |
| |
| valid = ~np.any(np.isnan(embeddings), axis=2) |
|
|
| |
| chunk_idx, speaker_idx = np.where(active * valid) |
|
|
| |
| num_embeddings = len(chunk_idx) |
| if num_embeddings > self.max_num_embeddings: |
| indices = list(range(num_embeddings)) |
| random.shuffle(indices) |
| indices = sorted(indices[: self.max_num_embeddings]) |
| chunk_idx = chunk_idx[indices] |
| speaker_idx = speaker_idx[indices] |
|
|
| return embeddings[chunk_idx, speaker_idx], chunk_idx, speaker_idx |
|
|
| def constrained_argmax(self, soft_clusters: np.ndarray) -> np.ndarray: |
| soft_clusters = np.nan_to_num(soft_clusters, nan=np.nanmin(soft_clusters)) |
| num_chunks, num_speakers, num_clusters = soft_clusters.shape |
| |
|
|
| hard_clusters = -2 * np.ones((num_chunks, num_speakers), dtype=np.int8) |
|
|
| for c, cost in enumerate(soft_clusters): |
| speakers, clusters = linear_sum_assignment(cost, maximize=True) |
| for s, k in zip(speakers, clusters): |
| hard_clusters[c, s] = k |
|
|
| return hard_clusters |
|
|
| def assign_embeddings( |
| self, |
| embeddings: np.ndarray, |
| train_chunk_idx: np.ndarray, |
| train_speaker_idx: np.ndarray, |
| train_clusters: np.ndarray, |
| constrained: bool = False, |
| ): |
| """Assign embeddings to the closest centroid |
| |
| Cluster centroids are computed as the average of the train embeddings |
| previously assigned to them. |
| |
| Parameters |
| ---------- |
| embeddings : (num_chunks, num_speakers, dimension)-shaped array |
| Complete set of embeddings. |
| train_chunk_idx : (num_embeddings,)-shaped array |
| train_speaker_idx : (num_embeddings,)-shaped array |
| Indices of subset of embeddings used for "training". |
| train_clusters : (num_embedding,)-shaped array |
| Clusters of the above subset |
| constrained : bool, optional |
| Use constrained_argmax, instead of (default) argmax. |
| |
| Returns |
| ------- |
| soft_clusters : (num_chunks, num_speakers, num_clusters)-shaped array |
| hard_clusters : (num_chunks, num_speakers)-shaped array |
| centroids : (num_clusters, dimension)-shaped array |
| Clusters centroids |
| """ |
|
|
| |
|
|
| num_clusters = np.max(train_clusters) + 1 |
| num_chunks, num_speakers, dimension = embeddings.shape |
|
|
| train_embeddings = embeddings[train_chunk_idx, train_speaker_idx] |
|
|
| centroids = np.vstack( |
| [ |
| np.mean(train_embeddings[train_clusters == k], axis=0) |
| for k in range(num_clusters) |
| ] |
| ) |
|
|
| e2k_distance = cdist( |
| embeddings.reshape([-1, dimension]), |
| centroids, |
| metric=self.metric |
| ).reshape([num_chunks, num_speakers, -1]) |
| |
| soft_clusters = 2 - e2k_distance |
|
|
| |
| if constrained: |
| hard_clusters = self.constrained_argmax(soft_clusters) |
| else: |
| hard_clusters = np.argmax(soft_clusters, axis=2) |
|
|
| |
| |
| |
|
|
| return hard_clusters, soft_clusters, centroids |
|
|
| def __call__( |
| self, |
| embeddings: np.ndarray, |
| segmentations: Optional[SlidingWindowFeature] = None, |
| num_clusters: Optional[int] = None, |
| min_clusters: Optional[int] = None, |
| max_clusters: Optional[int] = None, |
| **kwargs, |
| ) -> np.ndarray: |
| """Apply clustering |
| |
| Parameters |
| ---------- |
| embeddings : (num_chunks, num_speakers, dimension) array |
| Sequence of embeddings. |
| segmentations : (num_chunks, num_frames, num_speakers) array |
| Binary segmentations. |
| num_clusters : int, optional |
| Number of clusters, when known. Default behavior is to use |
| internal threshold hyper-parameter to decide on the number |
| of clusters. |
| min_clusters : int, optional |
| Minimum number of clusters. Has no effect when `num_clusters` is provided. |
| max_clusters : int, optional |
| Maximum number of clusters. Has no effect when `num_clusters` is provided. |
| |
| Returns |
| ------- |
| hard_clusters : (num_chunks, num_speakers) array |
| Hard cluster assignment (hard_clusters[c, s] = k means that sth speaker |
| of cth chunk is assigned to kth cluster) |
| soft_clusters : (num_chunks, num_speakers, num_clusters) array |
| Soft cluster assignment (the higher soft_clusters[c, s, k], the most likely |
| the sth speaker of cth chunk belongs to kth cluster) |
| centroids : (num_clusters, dimension) array |
| Centroid vectors of each cluster |
| """ |
|
|
| train_embeddings, train_chunk_idx, train_speaker_idx = self.filter_embeddings( |
| embeddings, |
| segmentations=segmentations, |
| ) |
|
|
| num_embeddings, _ = train_embeddings.shape |
|
|
| num_clusters, min_clusters, max_clusters = self.set_num_clusters( |
| num_embeddings, |
| num_clusters=num_clusters, |
| min_clusters=min_clusters, |
| max_clusters=max_clusters, |
| ) |
|
|
| if max_clusters < 2: |
| |
| num_chunks, num_speakers, _ = embeddings.shape |
| hard_clusters = np.zeros((num_chunks, num_speakers), dtype=np.int8) |
| soft_clusters = np.ones((num_chunks, num_speakers, 1)) |
| centroids = np.mean(train_embeddings, axis=0, keepdims=True) |
| return hard_clusters, soft_clusters, centroids |
|
|
| train_clusters = self.cluster( |
| train_embeddings, |
| min_clusters, |
| max_clusters, |
| num_clusters=num_clusters, |
| ) |
|
|
| hard_clusters, soft_clusters, centroids = self.assign_embeddings( |
| embeddings, |
| train_chunk_idx, |
| train_speaker_idx, |
| train_clusters, |
| constrained=self.constrained_assignment, |
| ) |
|
|
| return hard_clusters, soft_clusters, centroids |
|
|
|
|
| class AgglomerativeClustering(BaseClustering): |
| """Agglomerative clustering |
| |
| Parameters |
| ---------- |
| metric : {"cosine", "euclidean", ...}, optional |
| Distance metric to use. Defaults to "cosine". |
| |
| Hyper-parameters |
| ---------------- |
| method : {"average", "centroid", "complete", "median", "single", "ward"} |
| Linkage method. |
| threshold : float in range [0.0, 2.0] |
| Clustering threshold. |
| min_cluster_size : int in range [1, 20] |
| Minimum cluster size |
| """ |
|
|
| def __init__( |
| self, |
| metric: str = "cosine", |
| max_num_embeddings: int = np.inf, |
| constrained_assignment: bool = False, |
| ): |
| super().__init__( |
| metric=metric, |
| max_num_embeddings=max_num_embeddings, |
| constrained_assignment=constrained_assignment, |
| ) |
|
|
| self.threshold = Uniform(0.0, 2.0) |
| self.method = Categorical( |
| ["average", "centroid", "complete", "median", "single", "ward", "weighted"] |
| ) |
|
|
| |
| self.min_cluster_size = Integer(1, 20) |
| |
|
|
| def cluster( |
| self, |
| embeddings: np.ndarray, |
| min_clusters: int, |
| max_clusters: int, |
| num_clusters: Optional[int] = None, |
| ): |
| """ |
| |
| Parameters |
| ---------- |
| embeddings : (num_embeddings, dimension) array |
| Embeddings |
| min_clusters : int |
| Minimum number of clusters |
| max_clusters : int |
| Maximum number of clusters |
| num_clusters : int, optional |
| Actual number of clusters. Default behavior is to estimate it based |
| on values provided for `min_clusters`, `max_clusters`, and `threshold`. |
| |
| Returns |
| ------- |
| clusters : (num_embeddings, ) array |
| 0-indexed cluster indices. |
| """ |
|
|
| num_embeddings, _ = embeddings.shape |
|
|
| |
| |
| min_cluster_size = min( |
| self.min_cluster_size, max(1, round(0.1 * num_embeddings)) |
| ) |
| |
|
|
| |
| if num_embeddings == 1: |
| return np.zeros((1,), dtype=np.uint8) |
|
|
| |
| |
| if self.metric == "cosine" and self.method in ["centroid", "median", "ward"]: |
| with np.errstate(divide="ignore", invalid="ignore"): |
| embeddings /= np.linalg.norm(embeddings, axis=-1, keepdims=True) |
| dendrogram: np.ndarray = linkage( |
| embeddings, method=self.method, metric="euclidean" |
| ) |
|
|
| |
| else: |
| dendrogram: np.ndarray = linkage( |
| embeddings, method=self.method, metric=self.metric |
| ) |
|
|
| |
| clusters = fcluster(dendrogram, self.threshold, criterion="distance") - 1 |
|
|
| |
| |
| cluster_unique, cluster_counts = np.unique( |
| clusters, |
| return_counts=True, |
| ) |
| large_clusters = cluster_unique[cluster_counts >= min_cluster_size] |
| num_large_clusters = len(large_clusters) |
|
|
| |
| if num_large_clusters < min_clusters: |
| num_clusters = min_clusters |
|
|
| |
| elif num_large_clusters > max_clusters: |
| num_clusters = max_clusters |
|
|
| |
| if num_clusters is not None and num_large_clusters != num_clusters: |
| |
| _dendrogram = np.copy(dendrogram) |
| _dendrogram[:, 2] = np.arange(num_embeddings - 1) |
|
|
| best_iteration = num_embeddings - 1 |
| best_num_large_clusters = 1 |
|
|
| |
| |
|
|
| for iteration in np.argsort(np.abs(dendrogram[:, 2] - self.threshold)): |
| |
| |
| new_cluster_size = _dendrogram[iteration, 3] |
| if new_cluster_size < min_cluster_size: |
| continue |
|
|
| |
| clusters = fcluster(_dendrogram, iteration, criterion="distance") - 1 |
| cluster_unique, cluster_counts = np.unique(clusters, return_counts=True) |
| large_clusters = cluster_unique[cluster_counts >= min_cluster_size] |
| num_large_clusters = len(large_clusters) |
|
|
| |
| |
| if abs(num_large_clusters - num_clusters) < abs( |
| best_num_large_clusters - num_clusters |
| ): |
| best_iteration = iteration |
| best_num_large_clusters = num_large_clusters |
|
|
| |
| if num_large_clusters == num_clusters: |
| break |
|
|
| |
| if best_num_large_clusters != num_clusters: |
| clusters = ( |
| fcluster(_dendrogram, best_iteration, criterion="distance") - 1 |
| ) |
| cluster_unique, cluster_counts = np.unique(clusters, return_counts=True) |
| large_clusters = cluster_unique[cluster_counts >= min_cluster_size] |
| num_large_clusters = len(large_clusters) |
| print( |
| f"Found only {num_large_clusters} clusters. Using a smaller value than {min_cluster_size} for `min_cluster_size` might help." |
| ) |
|
|
| if num_large_clusters == 0: |
| clusters[:] = 0 |
| return clusters |
|
|
| small_clusters = cluster_unique[cluster_counts < min_cluster_size] |
| if len(small_clusters) == 0: |
| return clusters |
|
|
| |
| large_centroids = np.vstack( |
| [ |
| np.mean(embeddings[clusters == large_k], axis=0) |
| for large_k in large_clusters |
| ] |
| ) |
| small_centroids = np.vstack( |
| [ |
| np.mean(embeddings[clusters == small_k], axis=0) |
| for small_k in small_clusters |
| ] |
| ) |
| centroids_cdist = cdist(large_centroids, small_centroids, metric=self.metric) |
| for small_k, large_k in enumerate(np.argmin(centroids_cdist, axis=0)): |
| clusters[clusters == small_clusters[small_k]] = large_clusters[large_k] |
|
|
| |
| _, clusters = np.unique(clusters, return_inverse=True) |
| return clusters |
|
|
|
|
| class Clustering(Enum): |
| AgglomerativeClustering = AgglomerativeClustering |
|
|
|
|