niobures's picture
Pyannote (models, models_onnx)
8c838e7 verified
#!/usr/bin/env python
# encoding: utf-8
#
# The MIT License (MIT)
#
# Copyright (c) 2016-2021 CNRS
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# AUTHORS
# Hervé BREDIN - http://herve.niderb.fr
"""
# Signal processing
"""
from functools import singledispatch
from itertools import zip_longest
from typing import Optional, Union
import numpy as np
import scipy.signal
from pyannote_audio_utils.core import Annotation, Segment, SlidingWindowFeature, Timeline
from pyannote_audio_utils.core.utils.generators import pairwise
@singledispatch
def binarize(
scores,
onset: float = 0.5,
offset: Optional[float] = None,
initial_state: Optional[Union[bool, np.ndarray]] = None,
):
"""(Batch) hysteresis thresholding
Parameters
----------
scores : numpy.ndarray or SlidingWindowFeature
(num_chunks, num_frames, num_classes)- or (num_frames, num_classes)-shaped scores.
onset : float, optional
Onset threshold. Defaults to 0.5.
offset : float, optional
Offset threshold. Defaults to `onset`.
initial_state : np.ndarray or bool, optional
Initial state.
Returns
-------
binarized : same as scores
Binarized scores with same shape and type as scores.
Reference
---------
https://stackoverflow.com/questions/23289976/how-to-find-zero-crossings-with-hysteresis
"""
raise NotImplementedError(
"scores must be of type numpy.ndarray or SlidingWindowFeatures"
)
@binarize.register
def binarize_ndarray(
scores: np.ndarray,
onset: float = 0.5,
offset: Optional[float] = None,
initial_state: Optional[Union[bool, np.ndarray]] = None,
):
"""(Batch) hysteresis thresholding
Parameters
----------
scores : numpy.ndarray
(num_frames, num_classes)-shaped scores.
onset : float, optional
Onset threshold. Defaults to 0.5.
offset : float, optional
Offset threshold. Defaults to `onset`.
initial_state : np.ndarray or bool, optional
Initial state.
Returns
-------
binarized : same as scores
Binarized scores with same shape and type as scores.
"""
offset = offset or onset
batch_size, num_frames = scores.shape
scores = np.nan_to_num(scores)
if initial_state is None:
initial_state = scores[:, 0] >= 0.5 * (onset + offset)
elif isinstance(initial_state, bool):
initial_state = initial_state * np.ones((batch_size,), dtype=bool)
elif isinstance(initial_state, np.ndarray):
assert initial_state.shape == (batch_size,)
assert initial_state.dtype == bool
initial_state = np.tile(initial_state, (num_frames, 1)).T
on = scores > onset
off_or_on = (scores < offset) | on
# indices of frames for which the on/off state is well-defined
well_defined_idx = np.array(
list(zip_longest(*[np.nonzero(oon)[0] for oon in off_or_on], fillvalue=-1))
).T
# corner case where well_defined_idx is empty
if not well_defined_idx.size:
return np.zeros_like(scores, dtype=bool) | initial_state
# points to the index of the previous well-defined frame
same_as = np.cumsum(off_or_on, axis=1)
samples = np.tile(np.arange(batch_size), (num_frames, 1)).T
return np.where(
same_as, on[samples, well_defined_idx[samples, same_as - 1]], initial_state
)
@binarize.register
def binarize_swf(
scores: SlidingWindowFeature,
onset: float = 0.5,
offset: Optional[float] = None,
initial_state: Optional[bool] = None,
):
"""(Batch) hysteresis thresholding
Parameters
----------
scores : SlidingWindowFeature
(num_chunks, num_frames, num_classes)- or (num_frames, num_classes)-shaped scores.
onset : float, optional
Onset threshold. Defaults to 0.5.
offset : float, optional
Offset threshold. Defaults to `onset`.
initial_state : np.ndarray or bool, optional
Initial state.
Returns
-------
binarized : same as scores
Binarized scores with same shape and type as scores.
"""
offset = offset or onset
if scores.data.ndim == 2:
num_frames, num_classes = scores.data.shape
data = scores.data.transpose()
binarized = binarize(
data, onset=onset, offset=offset, initial_state=initial_state
)
return SlidingWindowFeature(
1.0
* binarized.transpose(),
scores.sliding_window,
)
elif scores.data.ndim == 3:
num_chunks, num_frames, num_classes = scores.data.shape
data = scores.data.reshape([-1, num_classes])
binarized = binarize(
data, onset=onset, offset=offset, initial_state=initial_state
)
return SlidingWindowFeature(
1.0
* binarized.reshape([num_chunks, num_frames, num_classes]),
scores.sliding_window,
)
else:
raise ValueError(
"Shape of scores must be (num_chunks, num_frames, num_classes) or (num_frames, num_classes)."
)
class Binarize:
"""Binarize detection scores using hysteresis thresholding
Parameters
----------
onset : float, optional
Onset threshold. Defaults to 0.5.
offset : float, optional
Offset threshold. Defaults to `onset`.
min_duration_on : float, optional
Remove active regions shorter than that many seconds. Defaults to 0s.
min_duration_off : float, optional
Fill inactive regions shorter than that many seconds. Defaults to 0s.
pad_onset : float, optional
Extend active regions by moving their start time by that many seconds.
Defaults to 0s.
pad_offset : float, optional
Extend active regions by moving their end time by that many seconds.
Defaults to 0s.
Reference
---------
Gregory Gelly and Jean-Luc Gauvain. "Minimum Word Error Training of
RNN-based Voice Activity Detection", InterSpeech 2015.
"""
def __init__(
self,
onset: float = 0.5,
offset: Optional[float] = None,
min_duration_on: float = 0.0,
min_duration_off: float = 0.0,
pad_onset: float = 0.0,
pad_offset: float = 0.0,
):
super().__init__()
self.onset = onset
self.offset = offset or onset
self.pad_onset = pad_onset
self.pad_offset = pad_offset
self.min_duration_on = min_duration_on
self.min_duration_off = min_duration_off
def __call__(self, scores: SlidingWindowFeature) -> Annotation:
"""Binarize detection scores
Parameters
----------
scores : SlidingWindowFeature
Detection scores.
Returns
-------
active : Annotation
Binarized scores.
"""
num_frames, num_classes = scores.data.shape
frames = scores.sliding_window
timestamps = [frames[i].middle for i in range(num_frames)]
# annotation meant to store 'active' regions
active = Annotation()
for k, k_scores in enumerate(scores.data.T):
label = k if scores.labels is None else scores.labels[k]
# initial state
start = timestamps[0]
is_active = k_scores[0] > self.onset
for t, y in zip(timestamps[1:], k_scores[1:]):
# currently active
if is_active:
# switching from active to inactive
if y < self.offset:
region = Segment(start - self.pad_onset, t + self.pad_offset)
active[region, k] = label
start = t
is_active = False
# currently inactive
else:
# switching from inactive to active
if y > self.onset:
start = t
is_active = True
# if active at the end, add final region
if is_active:
region = Segment(start - self.pad_onset, t + self.pad_offset)
active[region, k] = label
# because of padding, some active regions might be overlapping: merge them.
# also: fill same speaker gaps shorter than min_duration_off
if self.pad_offset > 0.0 or self.pad_onset > 0.0 or self.min_duration_off > 0.0:
active = active.support(collar=self.min_duration_off)
# remove tracks shorter than min_duration_on
if self.min_duration_on > 0:
for segment, track in list(active.itertracks()):
if segment.duration < self.min_duration_on:
del active[segment, track]
return active
class Peak:
"""Peak detection
Parameters
----------
alpha : float, optional
Peak threshold. Defaults to 0.5
min_duration : float, optional
Minimum elapsed time between two consecutive peaks. Defaults to 1 second.
"""
def __init__(
self,
alpha: float = 0.5,
min_duration: float = 1.0,
):
super(Peak, self).__init__()
self.alpha = alpha
self.min_duration = min_duration
def __call__(self, scores: SlidingWindowFeature):
"""Peak detection
Parameter
---------
scores : SlidingWindowFeature
Detection scores.
Returns
-------
segmentation : Timeline
Partition.
"""
if scores.dimension != 1:
raise ValueError("Peak expects one-dimensional scores.")
num_frames = len(scores)
frames = scores.sliding_window
precision = frames.step
order = max(1, int(np.rint(self.min_duration / precision)))
indices = scipy.signal.argrelmax(scores[:], order=order)[0]
peak_time = np.array(
[frames[i].middle for i in indices if scores[i] > self.alpha]
)
boundaries = np.hstack([[frames[0].start], peak_time, [frames[num_frames].end]])
segmentation = Timeline()
for i, (start, end) in enumerate(pairwise(boundaries)):
segment = Segment(start, end)
segmentation.add(segment)
return segmentation