|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
# Signal processing |
|
|
""" |
|
|
|
|
|
from functools import singledispatch |
|
|
from itertools import zip_longest |
|
|
from typing import Optional, Union |
|
|
|
|
|
import numpy as np |
|
|
import scipy.signal |
|
|
from pyannote_audio_utils.core import Annotation, Segment, SlidingWindowFeature, Timeline |
|
|
from pyannote_audio_utils.core.utils.generators import pairwise |
|
|
|
|
|
|
|
|
@singledispatch |
|
|
def binarize( |
|
|
scores, |
|
|
onset: float = 0.5, |
|
|
offset: Optional[float] = None, |
|
|
initial_state: Optional[Union[bool, np.ndarray]] = None, |
|
|
): |
|
|
"""(Batch) hysteresis thresholding |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
scores : numpy.ndarray or SlidingWindowFeature |
|
|
(num_chunks, num_frames, num_classes)- or (num_frames, num_classes)-shaped scores. |
|
|
onset : float, optional |
|
|
Onset threshold. Defaults to 0.5. |
|
|
offset : float, optional |
|
|
Offset threshold. Defaults to `onset`. |
|
|
initial_state : np.ndarray or bool, optional |
|
|
Initial state. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
binarized : same as scores |
|
|
Binarized scores with same shape and type as scores. |
|
|
|
|
|
Reference |
|
|
--------- |
|
|
https://stackoverflow.com/questions/23289976/how-to-find-zero-crossings-with-hysteresis |
|
|
""" |
|
|
raise NotImplementedError( |
|
|
"scores must be of type numpy.ndarray or SlidingWindowFeatures" |
|
|
) |
|
|
|
|
|
|
|
|
@binarize.register |
|
|
def binarize_ndarray( |
|
|
scores: np.ndarray, |
|
|
onset: float = 0.5, |
|
|
offset: Optional[float] = None, |
|
|
initial_state: Optional[Union[bool, np.ndarray]] = None, |
|
|
): |
|
|
"""(Batch) hysteresis thresholding |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
scores : numpy.ndarray |
|
|
(num_frames, num_classes)-shaped scores. |
|
|
onset : float, optional |
|
|
Onset threshold. Defaults to 0.5. |
|
|
offset : float, optional |
|
|
Offset threshold. Defaults to `onset`. |
|
|
initial_state : np.ndarray or bool, optional |
|
|
Initial state. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
binarized : same as scores |
|
|
Binarized scores with same shape and type as scores. |
|
|
""" |
|
|
|
|
|
offset = offset or onset |
|
|
|
|
|
batch_size, num_frames = scores.shape |
|
|
|
|
|
scores = np.nan_to_num(scores) |
|
|
|
|
|
if initial_state is None: |
|
|
initial_state = scores[:, 0] >= 0.5 * (onset + offset) |
|
|
|
|
|
elif isinstance(initial_state, bool): |
|
|
initial_state = initial_state * np.ones((batch_size,), dtype=bool) |
|
|
|
|
|
elif isinstance(initial_state, np.ndarray): |
|
|
assert initial_state.shape == (batch_size,) |
|
|
assert initial_state.dtype == bool |
|
|
|
|
|
initial_state = np.tile(initial_state, (num_frames, 1)).T |
|
|
|
|
|
on = scores > onset |
|
|
off_or_on = (scores < offset) | on |
|
|
|
|
|
|
|
|
well_defined_idx = np.array( |
|
|
list(zip_longest(*[np.nonzero(oon)[0] for oon in off_or_on], fillvalue=-1)) |
|
|
).T |
|
|
|
|
|
|
|
|
if not well_defined_idx.size: |
|
|
return np.zeros_like(scores, dtype=bool) | initial_state |
|
|
|
|
|
|
|
|
same_as = np.cumsum(off_or_on, axis=1) |
|
|
|
|
|
samples = np.tile(np.arange(batch_size), (num_frames, 1)).T |
|
|
|
|
|
return np.where( |
|
|
same_as, on[samples, well_defined_idx[samples, same_as - 1]], initial_state |
|
|
) |
|
|
|
|
|
|
|
|
@binarize.register |
|
|
def binarize_swf( |
|
|
scores: SlidingWindowFeature, |
|
|
onset: float = 0.5, |
|
|
offset: Optional[float] = None, |
|
|
initial_state: Optional[bool] = None, |
|
|
): |
|
|
"""(Batch) hysteresis thresholding |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
scores : SlidingWindowFeature |
|
|
(num_chunks, num_frames, num_classes)- or (num_frames, num_classes)-shaped scores. |
|
|
onset : float, optional |
|
|
Onset threshold. Defaults to 0.5. |
|
|
offset : float, optional |
|
|
Offset threshold. Defaults to `onset`. |
|
|
initial_state : np.ndarray or bool, optional |
|
|
Initial state. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
binarized : same as scores |
|
|
Binarized scores with same shape and type as scores. |
|
|
|
|
|
""" |
|
|
|
|
|
offset = offset or onset |
|
|
|
|
|
if scores.data.ndim == 2: |
|
|
num_frames, num_classes = scores.data.shape |
|
|
data = scores.data.transpose() |
|
|
binarized = binarize( |
|
|
data, onset=onset, offset=offset, initial_state=initial_state |
|
|
) |
|
|
return SlidingWindowFeature( |
|
|
1.0 |
|
|
* binarized.transpose(), |
|
|
scores.sliding_window, |
|
|
) |
|
|
|
|
|
elif scores.data.ndim == 3: |
|
|
num_chunks, num_frames, num_classes = scores.data.shape |
|
|
data = scores.data.reshape([-1, num_classes]) |
|
|
binarized = binarize( |
|
|
data, onset=onset, offset=offset, initial_state=initial_state |
|
|
) |
|
|
return SlidingWindowFeature( |
|
|
1.0 |
|
|
* binarized.reshape([num_chunks, num_frames, num_classes]), |
|
|
scores.sliding_window, |
|
|
) |
|
|
|
|
|
else: |
|
|
raise ValueError( |
|
|
"Shape of scores must be (num_chunks, num_frames, num_classes) or (num_frames, num_classes)." |
|
|
) |
|
|
|
|
|
|
|
|
class Binarize: |
|
|
"""Binarize detection scores using hysteresis thresholding |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
onset : float, optional |
|
|
Onset threshold. Defaults to 0.5. |
|
|
offset : float, optional |
|
|
Offset threshold. Defaults to `onset`. |
|
|
min_duration_on : float, optional |
|
|
Remove active regions shorter than that many seconds. Defaults to 0s. |
|
|
min_duration_off : float, optional |
|
|
Fill inactive regions shorter than that many seconds. Defaults to 0s. |
|
|
pad_onset : float, optional |
|
|
Extend active regions by moving their start time by that many seconds. |
|
|
Defaults to 0s. |
|
|
pad_offset : float, optional |
|
|
Extend active regions by moving their end time by that many seconds. |
|
|
Defaults to 0s. |
|
|
|
|
|
Reference |
|
|
--------- |
|
|
Gregory Gelly and Jean-Luc Gauvain. "Minimum Word Error Training of |
|
|
RNN-based Voice Activity Detection", InterSpeech 2015. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
onset: float = 0.5, |
|
|
offset: Optional[float] = None, |
|
|
min_duration_on: float = 0.0, |
|
|
min_duration_off: float = 0.0, |
|
|
pad_onset: float = 0.0, |
|
|
pad_offset: float = 0.0, |
|
|
): |
|
|
|
|
|
super().__init__() |
|
|
|
|
|
self.onset = onset |
|
|
self.offset = offset or onset |
|
|
|
|
|
self.pad_onset = pad_onset |
|
|
self.pad_offset = pad_offset |
|
|
|
|
|
self.min_duration_on = min_duration_on |
|
|
self.min_duration_off = min_duration_off |
|
|
|
|
|
def __call__(self, scores: SlidingWindowFeature) -> Annotation: |
|
|
"""Binarize detection scores |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
scores : SlidingWindowFeature |
|
|
Detection scores. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
active : Annotation |
|
|
Binarized scores. |
|
|
""" |
|
|
|
|
|
num_frames, num_classes = scores.data.shape |
|
|
frames = scores.sliding_window |
|
|
timestamps = [frames[i].middle for i in range(num_frames)] |
|
|
|
|
|
|
|
|
active = Annotation() |
|
|
|
|
|
for k, k_scores in enumerate(scores.data.T): |
|
|
|
|
|
label = k if scores.labels is None else scores.labels[k] |
|
|
|
|
|
|
|
|
start = timestamps[0] |
|
|
is_active = k_scores[0] > self.onset |
|
|
|
|
|
for t, y in zip(timestamps[1:], k_scores[1:]): |
|
|
|
|
|
|
|
|
if is_active: |
|
|
|
|
|
if y < self.offset: |
|
|
region = Segment(start - self.pad_onset, t + self.pad_offset) |
|
|
active[region, k] = label |
|
|
start = t |
|
|
is_active = False |
|
|
|
|
|
|
|
|
else: |
|
|
|
|
|
if y > self.onset: |
|
|
start = t |
|
|
is_active = True |
|
|
|
|
|
|
|
|
if is_active: |
|
|
region = Segment(start - self.pad_onset, t + self.pad_offset) |
|
|
active[region, k] = label |
|
|
|
|
|
|
|
|
|
|
|
if self.pad_offset > 0.0 or self.pad_onset > 0.0 or self.min_duration_off > 0.0: |
|
|
active = active.support(collar=self.min_duration_off) |
|
|
|
|
|
|
|
|
if self.min_duration_on > 0: |
|
|
for segment, track in list(active.itertracks()): |
|
|
if segment.duration < self.min_duration_on: |
|
|
del active[segment, track] |
|
|
|
|
|
return active |
|
|
|
|
|
|
|
|
class Peak: |
|
|
"""Peak detection |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
alpha : float, optional |
|
|
Peak threshold. Defaults to 0.5 |
|
|
min_duration : float, optional |
|
|
Minimum elapsed time between two consecutive peaks. Defaults to 1 second. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
alpha: float = 0.5, |
|
|
min_duration: float = 1.0, |
|
|
): |
|
|
super(Peak, self).__init__() |
|
|
self.alpha = alpha |
|
|
self.min_duration = min_duration |
|
|
|
|
|
def __call__(self, scores: SlidingWindowFeature): |
|
|
"""Peak detection |
|
|
|
|
|
Parameter |
|
|
--------- |
|
|
scores : SlidingWindowFeature |
|
|
Detection scores. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
segmentation : Timeline |
|
|
Partition. |
|
|
""" |
|
|
|
|
|
if scores.dimension != 1: |
|
|
raise ValueError("Peak expects one-dimensional scores.") |
|
|
|
|
|
num_frames = len(scores) |
|
|
frames = scores.sliding_window |
|
|
|
|
|
precision = frames.step |
|
|
order = max(1, int(np.rint(self.min_duration / precision))) |
|
|
indices = scipy.signal.argrelmax(scores[:], order=order)[0] |
|
|
|
|
|
peak_time = np.array( |
|
|
[frames[i].middle for i in indices if scores[i] > self.alpha] |
|
|
) |
|
|
boundaries = np.hstack([[frames[0].start], peak_time, [frames[num_frames].end]]) |
|
|
|
|
|
segmentation = Timeline() |
|
|
for i, (start, end) in enumerate(pairwise(boundaries)): |
|
|
segment = Segment(start, end) |
|
|
segmentation.add(segment) |
|
|
|
|
|
return segmentation |
|
|
|