|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
########## |
|
|
Annotation |
|
|
########## |
|
|
|
|
|
.. plot:: pyplots/annotation.py |
|
|
|
|
|
:class:`pyannote.core.Annotation` instances are ordered sets of non-empty |
|
|
tracks: |
|
|
|
|
|
- ordered, because segments are sorted by start time (and end time in case of tie) |
|
|
- set, because one cannot add twice the same track |
|
|
- non-empty, because one cannot add empty track |
|
|
|
|
|
A track is a (support, name) pair where `support` is a Segment instance, |
|
|
and `name` is an additional identifier so that it is possible to add multiple |
|
|
tracks with the same support. |
|
|
|
|
|
To define the annotation depicted above: |
|
|
|
|
|
.. code-block:: ipython |
|
|
|
|
|
In [1]: from pyannote.core import Annotation, Segment |
|
|
|
|
|
In [6]: annotation = Annotation() |
|
|
...: annotation[Segment(1, 5)] = 'Carol' |
|
|
...: annotation[Segment(6, 8)] = 'Bob' |
|
|
...: annotation[Segment(12, 18)] = 'Carol' |
|
|
...: annotation[Segment(7, 20)] = 'Alice' |
|
|
...: |
|
|
|
|
|
which is actually a shortcut for |
|
|
|
|
|
.. code-block:: ipython |
|
|
|
|
|
In [6]: annotation = Annotation() |
|
|
...: annotation[Segment(1, 5), '_'] = 'Carol' |
|
|
...: annotation[Segment(6, 8), '_'] = 'Bob' |
|
|
...: annotation[Segment(12, 18), '_'] = 'Carol' |
|
|
...: annotation[Segment(7, 20), '_'] = 'Alice' |
|
|
...: |
|
|
|
|
|
where all tracks share the same (default) name ``'_'``. |
|
|
|
|
|
In case two tracks share the same support, use a different track name: |
|
|
|
|
|
.. code-block:: ipython |
|
|
|
|
|
In [6]: annotation = Annotation(uri='my_video_file', modality='speaker') |
|
|
...: annotation[Segment(1, 5), 1] = 'Carol' # track name = 1 |
|
|
...: annotation[Segment(1, 5), 2] = 'Bob' # track name = 2 |
|
|
...: annotation[Segment(12, 18)] = 'Carol' |
|
|
...: |
|
|
|
|
|
The track name does not have to be unique over the whole set of tracks. |
|
|
|
|
|
.. note:: |
|
|
|
|
|
The optional *uri* and *modality* keywords argument can be used to remember |
|
|
which document and modality (e.g. speaker or face) it describes. |
|
|
|
|
|
Several convenient methods are available. Here are a few examples: |
|
|
|
|
|
.. code-block:: ipython |
|
|
|
|
|
In [9]: annotation.labels() # sorted list of labels |
|
|
Out[9]: ['Bob', 'Carol'] |
|
|
|
|
|
In [10]: annotation.chart() # label duration chart |
|
|
Out[10]: [('Carol', 10), ('Bob', 4)] |
|
|
|
|
|
In [11]: list(annotation.itertracks()) |
|
|
Out[11]: [(<Segment(1, 5)>, 1), (<Segment(1, 5)>, 2), (<Segment(12, 18)>, u'_')] |
|
|
|
|
|
In [12]: annotation.label_timeline('Carol') |
|
|
Out[12]: <Timeline(uri=my_video_file, segments=[<Segment(1, 5)>, <Segment(12, 18)>])> |
|
|
|
|
|
See :class:`pyannote.core.Annotation` for the complete reference. |
|
|
""" |
|
|
import itertools |
|
|
import warnings |
|
|
from collections import defaultdict |
|
|
from typing import ( |
|
|
Hashable, |
|
|
Optional, |
|
|
Dict, |
|
|
Union, |
|
|
Iterable, |
|
|
List, |
|
|
Set, |
|
|
TextIO, |
|
|
Tuple, |
|
|
Iterator, |
|
|
Text, |
|
|
TYPE_CHECKING, |
|
|
) |
|
|
|
|
|
import numpy as np |
|
|
from sortedcontainers import SortedDict |
|
|
|
|
|
from . import ( |
|
|
PYANNOTE_SEGMENT, |
|
|
PYANNOTE_TRACK, |
|
|
PYANNOTE_LABEL, |
|
|
) |
|
|
from .segment import Segment, SlidingWindow |
|
|
from .timeline import Timeline |
|
|
from .feature import SlidingWindowFeature |
|
|
from .utils.generators import string_generator, int_generator |
|
|
from .utils.types import Label, Key, Support, LabelGenerator, TrackName, CropMode |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
class Annotation: |
|
|
"""Annotation |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
uri : string, optional |
|
|
name of annotated resource (e.g. audio or video file) |
|
|
modality : string, optional |
|
|
name of annotated modality |
|
|
|
|
|
Returns |
|
|
------- |
|
|
annotation : Annotation |
|
|
New annotation |
|
|
|
|
|
""" |
|
|
|
|
|
@classmethod |
|
|
def from_df( |
|
|
cls, |
|
|
df: "pd.DataFrame", |
|
|
uri: Optional[str] = None, |
|
|
modality: Optional[str] = None, |
|
|
) -> "Annotation": |
|
|
|
|
|
df = df[[PYANNOTE_SEGMENT, PYANNOTE_TRACK, PYANNOTE_LABEL]] |
|
|
return Annotation.from_records(df.itertuples(index=False), uri, modality) |
|
|
|
|
|
def __init__(self, uri: Optional[str] = None, modality: Optional[str] = None): |
|
|
|
|
|
self._uri: Optional[str] = uri |
|
|
self.modality: Optional[str] = modality |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._tracks: Dict[Segment, Dict[TrackName, Label]] = SortedDict() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._labels: Dict[Label, Timeline] = {} |
|
|
self._labelNeedsUpdate: Dict[Label, bool] = {} |
|
|
|
|
|
|
|
|
self._timeline: Timeline = None |
|
|
self._timelineNeedsUpdate: bool = True |
|
|
|
|
|
@property |
|
|
def uri(self): |
|
|
return self._uri |
|
|
|
|
|
@uri.setter |
|
|
def uri(self, uri: str): |
|
|
|
|
|
for label in self.labels(): |
|
|
timeline = self.label_timeline(label, copy=False) |
|
|
timeline.uri = uri |
|
|
timeline = self.get_timeline(copy=False) |
|
|
timeline.uri = uri |
|
|
self._uri = uri |
|
|
|
|
|
def _updateLabels(self): |
|
|
|
|
|
|
|
|
update = set( |
|
|
label for label, update in self._labelNeedsUpdate.items() if update |
|
|
) |
|
|
|
|
|
|
|
|
_segments = {label: [] for label in update} |
|
|
for segment, track, label in self.itertracks(yield_label=True): |
|
|
if label in update: |
|
|
_segments[label].append(segment) |
|
|
|
|
|
|
|
|
for label in update: |
|
|
if _segments[label]: |
|
|
self._labels[label] = Timeline(segments=_segments[label], uri=self.uri) |
|
|
self._labelNeedsUpdate[label] = False |
|
|
else: |
|
|
self._labels.pop(label, None) |
|
|
self._labelNeedsUpdate.pop(label, None) |
|
|
|
|
|
def __len__(self): |
|
|
"""Number of segments |
|
|
|
|
|
>>> len(annotation) # annotation contains three segments |
|
|
3 |
|
|
""" |
|
|
return len(self._tracks) |
|
|
|
|
|
def __nonzero__(self): |
|
|
return self.__bool__() |
|
|
|
|
|
def __bool__(self): |
|
|
"""Emptiness |
|
|
|
|
|
>>> if annotation: |
|
|
... # annotation is not empty |
|
|
... else: |
|
|
... # annotation is empty |
|
|
""" |
|
|
return len(self._tracks) > 0 |
|
|
|
|
|
def itersegments(self): |
|
|
"""Iterate over segments (in chronological order) |
|
|
|
|
|
>>> for segment in annotation.itersegments(): |
|
|
... # do something with the segment |
|
|
|
|
|
See also |
|
|
-------- |
|
|
:class:`pyannote.core.Segment` describes how segments are sorted. |
|
|
""" |
|
|
return iter(self._tracks) |
|
|
|
|
|
def itertracks( |
|
|
self, yield_label: bool = False |
|
|
) -> Iterator[Union[Tuple[Segment, TrackName], Tuple[Segment, TrackName, Label]]]: |
|
|
"""Iterate over tracks (in chronological order) |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
yield_label : bool, optional |
|
|
When True, yield (segment, track, label) tuples, such that |
|
|
annotation[segment, track] == label. Defaults to yielding |
|
|
(segment, track) tuple. |
|
|
|
|
|
Examples |
|
|
-------- |
|
|
|
|
|
>>> for segment, track in annotation.itertracks(): |
|
|
... # do something with the track |
|
|
|
|
|
>>> for segment, track, label in annotation.itertracks(yield_label=True): |
|
|
... # do something with the track and its label |
|
|
""" |
|
|
|
|
|
for segment, tracks in self._tracks.items(): |
|
|
for track, lbl in sorted( |
|
|
tracks.items(), key=lambda tl: (str(tl[0]), str(tl[1])) |
|
|
): |
|
|
if yield_label: |
|
|
yield segment, track, lbl |
|
|
else: |
|
|
yield segment, track |
|
|
|
|
|
def _updateTimeline(self): |
|
|
self._timeline = Timeline(segments=self._tracks, uri=self.uri) |
|
|
self._timelineNeedsUpdate = False |
|
|
|
|
|
def get_timeline(self, copy: bool = True) -> Timeline: |
|
|
"""Get timeline made of all annotated segments |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
copy : bool, optional |
|
|
Defaults (True) to returning a copy of the internal timeline. |
|
|
Set to False to return the actual internal timeline (faster). |
|
|
|
|
|
Returns |
|
|
------- |
|
|
timeline : Timeline |
|
|
Timeline made of all annotated segments. |
|
|
|
|
|
Note |
|
|
---- |
|
|
In case copy is set to False, be careful **not** to modify the returned |
|
|
timeline, as it may lead to weird subsequent behavior of the annotation |
|
|
instance. |
|
|
|
|
|
""" |
|
|
if self._timelineNeedsUpdate: |
|
|
self._updateTimeline() |
|
|
if copy: |
|
|
return self._timeline.copy() |
|
|
return self._timeline |
|
|
|
|
|
def __eq__(self, other: "Annotation"): |
|
|
"""Equality |
|
|
|
|
|
>>> annotation == other |
|
|
|
|
|
Two annotations are equal if and only if their tracks and associated |
|
|
labels are equal. |
|
|
""" |
|
|
pairOfTracks = itertools.zip_longest( |
|
|
self.itertracks(yield_label=True), other.itertracks(yield_label=True) |
|
|
) |
|
|
return all(t1 == t2 for t1, t2 in pairOfTracks) |
|
|
|
|
|
def __ne__(self, other: "Annotation"): |
|
|
"""Inequality""" |
|
|
pairOfTracks = itertools.zip_longest( |
|
|
self.itertracks(yield_label=True), other.itertracks(yield_label=True) |
|
|
) |
|
|
|
|
|
return any(t1 != t2 for t1, t2 in pairOfTracks) |
|
|
|
|
|
def __contains__(self, included: Union[Segment, Timeline]): |
|
|
"""Inclusion |
|
|
|
|
|
Check whether every segment of `included` does exist in annotation. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
included : Segment or Timeline |
|
|
Segment or timeline being checked for inclusion |
|
|
|
|
|
Returns |
|
|
------- |
|
|
contains : bool |
|
|
True if every segment in `included` exists in timeline, |
|
|
False otherwise |
|
|
|
|
|
""" |
|
|
return included in self.get_timeline(copy=False) |
|
|
|
|
|
def _iter_rttm(self) -> Iterator[Text]: |
|
|
"""Generate lines for an RTTM file for this annotation |
|
|
|
|
|
Returns |
|
|
------- |
|
|
iterator: Iterator[str] |
|
|
An iterator over RTTM text lines |
|
|
""" |
|
|
uri = self.uri if self.uri else "<NA>" |
|
|
if isinstance(uri, Text) and " " in uri: |
|
|
msg = ( |
|
|
f"Space-separated RTTM file format does not allow file URIs " |
|
|
f'containing spaces (got: "{uri}").' |
|
|
) |
|
|
raise ValueError(msg) |
|
|
for segment, _, label in self.itertracks(yield_label=True): |
|
|
if isinstance(label, Text) and " " in label: |
|
|
msg = ( |
|
|
f"Space-separated RTTM file format does not allow labels " |
|
|
f'containing spaces (got: "{label}").' |
|
|
) |
|
|
raise ValueError(msg) |
|
|
yield ( |
|
|
f"SPEAKER {uri} 1 {segment.start:.3f} {segment.duration:.3f} " |
|
|
f"<NA> <NA> {label} <NA> <NA>\n" |
|
|
) |
|
|
|
|
|
def to_rttm(self) -> Text: |
|
|
"""Serialize annotation as a string using RTTM format |
|
|
|
|
|
Returns |
|
|
------- |
|
|
serialized: str |
|
|
RTTM string |
|
|
""" |
|
|
return "".join([line for line in self._iter_rttm()]) |
|
|
|
|
|
def write_rttm(self, file: TextIO): |
|
|
"""Dump annotation to file using RTTM format |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
file : file object |
|
|
|
|
|
Usage |
|
|
----- |
|
|
>>> with open('file.rttm', 'w') as file: |
|
|
... annotation.write_rttm(file) |
|
|
""" |
|
|
for line in self._iter_rttm(): |
|
|
file.write(line) |
|
|
|
|
|
def _iter_lab(self) -> Iterator[Text]: |
|
|
"""Generate lines for a LAB file for this annotation |
|
|
|
|
|
Returns |
|
|
------- |
|
|
iterator: Iterator[str] |
|
|
An iterator over LAB text lines |
|
|
""" |
|
|
for segment, _, label in self.itertracks(yield_label=True): |
|
|
if isinstance(label, Text) and " " in label: |
|
|
msg = ( |
|
|
f"Space-separated LAB file format does not allow labels " |
|
|
f'containing spaces (got: "{label}").' |
|
|
) |
|
|
raise ValueError(msg) |
|
|
yield f"{segment.start:.3f} {segment.start + segment.duration:.3f} {label}\n" |
|
|
|
|
|
def to_lab(self) -> Text: |
|
|
"""Serialize annotation as a string using LAB format |
|
|
|
|
|
Returns |
|
|
------- |
|
|
serialized: str |
|
|
LAB string |
|
|
""" |
|
|
return "".join([line for line in self._iter_lab()]) |
|
|
|
|
|
def write_lab(self, file: TextIO): |
|
|
"""Dump annotation to file using LAB format |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
file : file object |
|
|
|
|
|
Usage |
|
|
----- |
|
|
>>> with open('file.lab', 'w') as file: |
|
|
... annotation.write_lab(file) |
|
|
""" |
|
|
for line in self._iter_lab(): |
|
|
file.write(line) |
|
|
|
|
|
def crop(self, support: Support, mode: CropMode = "intersection") -> "Annotation": |
|
|
"""Crop annotation to new support |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
support : Segment or Timeline |
|
|
If `support` is a `Timeline`, its support is used. |
|
|
mode : {'strict', 'loose', 'intersection'}, optional |
|
|
Controls how segments that are not fully included in `support` are |
|
|
handled. 'strict' mode only keeps fully included segments. 'loose' |
|
|
mode keeps any intersecting segment. 'intersection' mode keeps any |
|
|
intersecting segment but replace them by their actual intersection. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
cropped : Annotation |
|
|
Cropped annotation |
|
|
|
|
|
Note |
|
|
---- |
|
|
In 'intersection' mode, the best is done to keep the track names |
|
|
unchanged. However, in some cases where two original segments are |
|
|
cropped into the same resulting segments, conflicting track names are |
|
|
modified to make sure no track is lost. |
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
if isinstance(support, Segment): |
|
|
support = Timeline(segments=[support], uri=self.uri) |
|
|
return self.crop(support, mode=mode) |
|
|
|
|
|
elif isinstance(support, Timeline): |
|
|
|
|
|
|
|
|
support = support.support() |
|
|
cropped = self.__class__(uri=self.uri, modality=self.modality) |
|
|
|
|
|
if mode == "loose": |
|
|
|
|
|
_tracks = {} |
|
|
_labels = set([]) |
|
|
|
|
|
for segment, _ in self.get_timeline(copy=False).co_iter(support): |
|
|
tracks = dict(self._tracks[segment]) |
|
|
_tracks[segment] = tracks |
|
|
_labels.update(tracks.values()) |
|
|
|
|
|
cropped._tracks = SortedDict(_tracks) |
|
|
|
|
|
cropped._labelNeedsUpdate = {label: True for label in _labels} |
|
|
cropped._labels = {label: None for label in _labels} |
|
|
|
|
|
cropped._timelineNeedsUpdate = True |
|
|
cropped._timeline = None |
|
|
|
|
|
return cropped |
|
|
|
|
|
elif mode == "strict": |
|
|
|
|
|
_tracks = {} |
|
|
_labels = set([]) |
|
|
|
|
|
for segment, other_segment in self.get_timeline(copy=False).co_iter( |
|
|
support |
|
|
): |
|
|
|
|
|
if segment not in other_segment: |
|
|
continue |
|
|
|
|
|
tracks = dict(self._tracks[segment]) |
|
|
_tracks[segment] = tracks |
|
|
_labels.update(tracks.values()) |
|
|
|
|
|
cropped._tracks = SortedDict(_tracks) |
|
|
|
|
|
cropped._labelNeedsUpdate = {label: True for label in _labels} |
|
|
cropped._labels = {label: None for label in _labels} |
|
|
|
|
|
cropped._timelineNeedsUpdate = True |
|
|
cropped._timeline = None |
|
|
|
|
|
return cropped |
|
|
|
|
|
elif mode == "intersection": |
|
|
|
|
|
for segment, other_segment in self.get_timeline(copy=False).co_iter( |
|
|
support |
|
|
): |
|
|
|
|
|
intersection = segment & other_segment |
|
|
for track, label in self._tracks[segment].items(): |
|
|
track = cropped.new_track(intersection, candidate=track) |
|
|
cropped[intersection, track] = label |
|
|
|
|
|
return cropped |
|
|
|
|
|
else: |
|
|
raise NotImplementedError("unsupported mode: '%s'" % mode) |
|
|
|
|
|
def extrude( |
|
|
self, removed: Support, mode: CropMode = "intersection" |
|
|
) -> "Annotation": |
|
|
"""Remove segments that overlap `removed` support. |
|
|
|
|
|
A simple illustration: |
|
|
|
|
|
annotation |
|
|
A |------| |------| |
|
|
B |----------| |
|
|
C |--------------| |------| |
|
|
|
|
|
removed `Timeline` |
|
|
|-------| |-----------| |
|
|
|
|
|
extruded Annotation with mode="intersection" |
|
|
B |---| |
|
|
C |--| |------| |
|
|
|
|
|
extruded Annotation with mode="loose" |
|
|
C |------| |
|
|
|
|
|
extruded Annotation with mode="strict" |
|
|
A |------| |
|
|
B |----------| |
|
|
C |--------------| |------| |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
removed : Segment or Timeline |
|
|
If `support` is a `Timeline`, its support is used. |
|
|
mode : {'strict', 'loose', 'intersection'}, optional |
|
|
Controls how segments that are not fully included in `removed` are |
|
|
handled. 'strict' mode only removes fully included segments. 'loose' |
|
|
mode removes any intersecting segment. 'intersection' mode removes |
|
|
the overlapping part of any intersecting segment. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
extruded : Annotation |
|
|
Extruded annotation |
|
|
|
|
|
Note |
|
|
---- |
|
|
In 'intersection' mode, the best is done to keep the track names |
|
|
unchanged. However, in some cases where two original segments are |
|
|
cropped into the same resulting segments, conflicting track names are |
|
|
modified to make sure no track is lost. |
|
|
|
|
|
""" |
|
|
if isinstance(removed, Segment): |
|
|
removed = Timeline([removed]) |
|
|
|
|
|
extent_tl = Timeline([self.get_timeline().extent()], uri=self.uri) |
|
|
truncating_support = removed.gaps(support=extent_tl) |
|
|
|
|
|
if mode == "loose": |
|
|
mode = "strict" |
|
|
elif mode == "strict": |
|
|
mode = "loose" |
|
|
return self.crop(truncating_support, mode=mode) |
|
|
|
|
|
def get_overlap(self, labels: Optional[Iterable[Label]] = None) -> "Timeline": |
|
|
"""Get overlapping parts of the annotation. |
|
|
|
|
|
A simple illustration: |
|
|
|
|
|
annotation |
|
|
A |------| |------| |----| |
|
|
B |--| |-----| |----------| |
|
|
C |--------------| |------| |
|
|
|
|
|
annotation.get_overlap() |
|
|
|------| |-----| |--------| |
|
|
|
|
|
annotation.get_overlap(for_labels=["A", "B"]) |
|
|
|--| |--| |----| |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
labels : optional list of labels |
|
|
Labels for which to consider the overlap |
|
|
|
|
|
Returns |
|
|
------- |
|
|
overlap : `pyannote.core.Timeline` |
|
|
Timeline of the overlaps. |
|
|
""" |
|
|
if labels: |
|
|
annotation = self.subset(labels) |
|
|
else: |
|
|
annotation = self |
|
|
|
|
|
overlaps_tl = Timeline(uri=annotation.uri) |
|
|
for (s1, t1), (s2, t2) in annotation.co_iter(annotation): |
|
|
|
|
|
if self[s1, t1] == self[s2, t2]: |
|
|
continue |
|
|
overlaps_tl.add(s1 & s2) |
|
|
return overlaps_tl.support() |
|
|
|
|
|
def get_tracks(self, segment: Segment) -> Set[TrackName]: |
|
|
"""Query tracks by segment |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
segment : Segment |
|
|
Query |
|
|
|
|
|
Returns |
|
|
------- |
|
|
tracks : set |
|
|
Set of tracks |
|
|
|
|
|
Note |
|
|
---- |
|
|
This will return an empty set if segment does not exist. |
|
|
""" |
|
|
return set(self._tracks.get(segment, {}).keys()) |
|
|
|
|
|
def has_track(self, segment: Segment, track: TrackName) -> bool: |
|
|
"""Check whether a given track exists |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
segment : Segment |
|
|
Query segment |
|
|
track : |
|
|
Query track |
|
|
|
|
|
Returns |
|
|
------- |
|
|
exists : bool |
|
|
True if track exists for segment |
|
|
""" |
|
|
return track in self._tracks.get(segment, {}) |
|
|
|
|
|
def copy(self) -> "Annotation": |
|
|
"""Get a copy of the annotation |
|
|
|
|
|
Returns |
|
|
------- |
|
|
annotation : Annotation |
|
|
Copy of the annotation |
|
|
""" |
|
|
|
|
|
|
|
|
copied = self.__class__(uri=self.uri, modality=self.modality) |
|
|
|
|
|
|
|
|
_tracks, _labels = [], set([]) |
|
|
for key, value in self._tracks.items(): |
|
|
_labels.update(value.values()) |
|
|
_tracks.append((key, dict(value))) |
|
|
|
|
|
copied._tracks = SortedDict(_tracks) |
|
|
|
|
|
copied._labels = {label: None for label in _labels} |
|
|
copied._labelNeedsUpdate = {label: True for label in _labels} |
|
|
|
|
|
copied._timeline = None |
|
|
copied._timelineNeedsUpdate = True |
|
|
|
|
|
return copied |
|
|
|
|
|
def new_track( |
|
|
self, |
|
|
segment: Segment, |
|
|
candidate: Optional[TrackName] = None, |
|
|
prefix: Optional[str] = None, |
|
|
) -> TrackName: |
|
|
"""Generate a new track name for given segment |
|
|
|
|
|
Ensures that the returned track name does not already |
|
|
exist for the given segment. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
segment : Segment |
|
|
Segment for which a new track name is generated. |
|
|
candidate : any valid track name, optional |
|
|
When provided, try this candidate name first. |
|
|
prefix : str, optional |
|
|
Track name prefix. Defaults to the empty string ''. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
name : str |
|
|
New track name |
|
|
""" |
|
|
|
|
|
|
|
|
existing_tracks = set(self._tracks.get(segment, {})) |
|
|
|
|
|
|
|
|
|
|
|
if (candidate is not None) and (candidate not in existing_tracks): |
|
|
return candidate |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if prefix is None: |
|
|
prefix = "" |
|
|
|
|
|
|
|
|
|
|
|
count = 0 |
|
|
while ("%s%d" % (prefix, count)) in existing_tracks: |
|
|
count += 1 |
|
|
|
|
|
|
|
|
return "%s%d" % (prefix, count) |
|
|
|
|
|
def __str__(self): |
|
|
"""Human-friendly representation""" |
|
|
|
|
|
return "\n".join( |
|
|
["%s %s %s" % (s, t, l) for s, t, l in self.itertracks(yield_label=True)] |
|
|
) |
|
|
|
|
|
def __delitem__(self, key: Key): |
|
|
"""Delete one track |
|
|
|
|
|
>>> del annotation[segment, track] |
|
|
|
|
|
Delete all tracks of a segment |
|
|
|
|
|
>>> del annotation[segment] |
|
|
""" |
|
|
|
|
|
|
|
|
if isinstance(key, Segment): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tracks = self._tracks.pop(key) |
|
|
|
|
|
|
|
|
self._timelineNeedsUpdate = True |
|
|
|
|
|
|
|
|
for track, label in tracks.items(): |
|
|
self._labelNeedsUpdate[label] = True |
|
|
|
|
|
|
|
|
elif isinstance(key, tuple) and len(key) == 2: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tracks = self._tracks[key[0]] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
label = tracks.pop(key[1]) |
|
|
|
|
|
|
|
|
self._labelNeedsUpdate[label] = True |
|
|
|
|
|
|
|
|
|
|
|
if not tracks: |
|
|
self._tracks.pop(key[0]) |
|
|
self._timelineNeedsUpdate = True |
|
|
|
|
|
else: |
|
|
raise NotImplementedError( |
|
|
"Deletion only works with Segment or (Segment, track) keys." |
|
|
) |
|
|
|
|
|
|
|
|
def __getitem__(self, key: Key) -> Label: |
|
|
"""Get track label |
|
|
|
|
|
>>> label = annotation[segment, track] |
|
|
|
|
|
Note |
|
|
---- |
|
|
``annotation[segment]`` is equivalent to ``annotation[segment, '_']`` |
|
|
|
|
|
""" |
|
|
|
|
|
if isinstance(key, Segment): |
|
|
key = (key, "_") |
|
|
|
|
|
return self._tracks[key[0]][key[1]] |
|
|
|
|
|
|
|
|
def __setitem__(self, key: Key, label: Label): |
|
|
"""Add new or update existing track |
|
|
|
|
|
>>> annotation[segment, track] = label |
|
|
|
|
|
If (segment, track) does not exist, it is added. |
|
|
If (segment, track) already exists, it is updated. |
|
|
|
|
|
Note |
|
|
---- |
|
|
``annotation[segment] = label`` is equivalent to ``annotation[segment, '_'] = label`` |
|
|
|
|
|
Note |
|
|
---- |
|
|
If `segment` is empty, it does nothing. |
|
|
""" |
|
|
|
|
|
if isinstance(key, Segment): |
|
|
key = (key, "_") |
|
|
|
|
|
segment, track = key |
|
|
|
|
|
|
|
|
if not segment: |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
if segment not in self._tracks: |
|
|
self._tracks[segment] = {} |
|
|
self._timelineNeedsUpdate = True |
|
|
|
|
|
|
|
|
|
|
|
if track in self._tracks[segment]: |
|
|
old_label = self._tracks[segment][track] |
|
|
self._labelNeedsUpdate[old_label] = True |
|
|
|
|
|
|
|
|
self._tracks[segment][track] = label |
|
|
self._labelNeedsUpdate[label] = True |
|
|
|
|
|
def empty(self) -> "Annotation": |
|
|
"""Return an empty copy |
|
|
|
|
|
Returns |
|
|
------- |
|
|
empty : Annotation |
|
|
Empty annotation using the same 'uri' and 'modality' attributes. |
|
|
|
|
|
""" |
|
|
return self.__class__(uri=self.uri, modality=self.modality) |
|
|
|
|
|
def labels(self) -> List[Label]: |
|
|
"""Get sorted list of labels |
|
|
|
|
|
Returns |
|
|
------- |
|
|
labels : list |
|
|
Sorted list of labels |
|
|
""" |
|
|
if any([lnu for lnu in self._labelNeedsUpdate.values()]): |
|
|
self._updateLabels() |
|
|
return sorted(self._labels, key=str) |
|
|
|
|
|
def get_labels( |
|
|
self, segment: Segment, unique: bool = True |
|
|
) -> Union[Set[Label], List[Label]]: |
|
|
"""Query labels by segment |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
segment : Segment |
|
|
Query |
|
|
unique : bool, optional |
|
|
When False, return the list of (possibly repeated) labels. |
|
|
Defaults to returning the set of labels. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
labels : set or list |
|
|
Set (resp. list) of labels for `segment` if it exists, empty set (resp. list) otherwise |
|
|
if unique (resp. if not unique). |
|
|
|
|
|
Examples |
|
|
-------- |
|
|
>>> annotation = Annotation() |
|
|
>>> segment = Segment(0, 2) |
|
|
>>> annotation[segment, 'speaker1'] = 'Bernard' |
|
|
>>> annotation[segment, 'speaker2'] = 'John' |
|
|
>>> print sorted(annotation.get_labels(segment)) |
|
|
set(['Bernard', 'John']) |
|
|
>>> print annotation.get_labels(Segment(1, 2)) |
|
|
set([]) |
|
|
|
|
|
""" |
|
|
|
|
|
labels = self._tracks.get(segment, {}).values() |
|
|
|
|
|
if unique: |
|
|
return set(labels) |
|
|
|
|
|
return list(labels) |
|
|
|
|
|
def subset(self, labels: Iterable[Label], invert: bool = False) -> "Annotation": |
|
|
"""Filter annotation by labels |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
labels : iterable |
|
|
List of filtered labels |
|
|
invert : bool, optional |
|
|
If invert is True, extract all but requested labels |
|
|
|
|
|
Returns |
|
|
------- |
|
|
filtered : Annotation |
|
|
Filtered annotation |
|
|
""" |
|
|
|
|
|
labels = set(labels) |
|
|
|
|
|
if invert: |
|
|
labels = set(self.labels()) - labels |
|
|
else: |
|
|
labels = labels & set(self.labels()) |
|
|
|
|
|
sub = self.__class__(uri=self.uri, modality=self.modality) |
|
|
|
|
|
_tracks, _labels = {}, set([]) |
|
|
for segment, tracks in self._tracks.items(): |
|
|
sub_tracks = { |
|
|
track: label for track, label in tracks.items() if label in labels |
|
|
} |
|
|
if sub_tracks: |
|
|
_tracks[segment] = sub_tracks |
|
|
_labels.update(sub_tracks.values()) |
|
|
|
|
|
sub._tracks = SortedDict(_tracks) |
|
|
|
|
|
sub._labelNeedsUpdate = {label: True for label in _labels} |
|
|
sub._labels = {label: None for label in _labels} |
|
|
|
|
|
sub._timelineNeedsUpdate = True |
|
|
sub._timeline = None |
|
|
|
|
|
return sub |
|
|
|
|
|
def update(self, annotation: "Annotation", copy: bool = False) -> "Annotation": |
|
|
"""Add every track of an existing annotation (in place) |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
annotation : Annotation |
|
|
Annotation whose tracks are being added |
|
|
copy : bool, optional |
|
|
Return a copy of the annotation. Defaults to updating the |
|
|
annotation in-place. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
self : Annotation |
|
|
Updated annotation |
|
|
|
|
|
Note |
|
|
---- |
|
|
Existing tracks are updated with the new label. |
|
|
""" |
|
|
|
|
|
result = self.copy() if copy else self |
|
|
|
|
|
|
|
|
for segment, track, label in annotation.itertracks(yield_label=True): |
|
|
result[segment, track] = label |
|
|
|
|
|
return result |
|
|
|
|
|
def label_timeline(self, label: Label, copy: bool = True) -> Timeline: |
|
|
"""Query segments by label |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
label : object |
|
|
Query |
|
|
copy : bool, optional |
|
|
Defaults (True) to returning a copy of the internal timeline. |
|
|
Set to False to return the actual internal timeline (faster). |
|
|
|
|
|
Returns |
|
|
------- |
|
|
timeline : Timeline |
|
|
Timeline made of all segments for which at least one track is |
|
|
annotated as label |
|
|
|
|
|
Note |
|
|
---- |
|
|
If label does not exist, this will return an empty timeline. |
|
|
|
|
|
Note |
|
|
---- |
|
|
In case copy is set to False, be careful **not** to modify the returned |
|
|
timeline, as it may lead to weird subsequent behavior of the annotation |
|
|
instance. |
|
|
|
|
|
""" |
|
|
if label not in self.labels(): |
|
|
return Timeline(uri=self.uri) |
|
|
|
|
|
if self._labelNeedsUpdate[label]: |
|
|
self._updateLabels() |
|
|
|
|
|
if copy: |
|
|
return self._labels[label].copy() |
|
|
|
|
|
return self._labels[label] |
|
|
|
|
|
def label_support(self, label: Label) -> Timeline: |
|
|
"""Label support |
|
|
|
|
|
Equivalent to ``Annotation.label_timeline(label).support()`` |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
label : object |
|
|
Query |
|
|
|
|
|
Returns |
|
|
------- |
|
|
support : Timeline |
|
|
Label support |
|
|
|
|
|
See also |
|
|
-------- |
|
|
:func:`~pyannote.core.Annotation.label_timeline` |
|
|
:func:`~pyannote.core.Timeline.support` |
|
|
|
|
|
""" |
|
|
return self.label_timeline(label, copy=False).support() |
|
|
|
|
|
def label_duration(self, label: Label) -> float: |
|
|
"""Label duration |
|
|
|
|
|
Equivalent to ``Annotation.label_timeline(label).duration()`` |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
label : object |
|
|
Query |
|
|
|
|
|
Returns |
|
|
------- |
|
|
duration : float |
|
|
Duration, in seconds. |
|
|
|
|
|
See also |
|
|
-------- |
|
|
:func:`~pyannote.core.Annotation.label_timeline` |
|
|
:func:`~pyannote.core.Timeline.duration` |
|
|
|
|
|
""" |
|
|
|
|
|
return self.label_timeline(label, copy=False).duration() |
|
|
|
|
|
def chart(self, percent: bool = False) -> List[Tuple[Label, float]]: |
|
|
"""Get labels chart (from longest to shortest duration) |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
percent : bool, optional |
|
|
Return list of (label, percentage) tuples. |
|
|
Defaults to returning list of (label, duration) tuples. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
chart : list |
|
|
List of (label, duration), sorted by duration in decreasing order. |
|
|
""" |
|
|
|
|
|
chart = sorted( |
|
|
((L, self.label_duration(L)) for L in self.labels()), |
|
|
key=lambda x: x[1], |
|
|
reverse=True, |
|
|
) |
|
|
|
|
|
if percent: |
|
|
total = np.sum([duration for _, duration in chart]) |
|
|
chart = [(label, duration / total) for (label, duration) in chart] |
|
|
|
|
|
return chart |
|
|
|
|
|
def argmax(self, support: Optional[Support] = None) -> Optional[Label]: |
|
|
"""Get label with longest duration |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
support : Segment or Timeline, optional |
|
|
Find label with longest duration within provided support. |
|
|
Defaults to whole extent. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
label : any existing label or None |
|
|
Label with longest intersection |
|
|
|
|
|
Examples |
|
|
-------- |
|
|
>>> annotation = Annotation(modality='speaker') |
|
|
>>> annotation[Segment(0, 10), 'speaker1'] = 'Alice' |
|
|
>>> annotation[Segment(8, 20), 'speaker1'] = 'Bob' |
|
|
>>> print "%s is such a talker!" % annotation.argmax() |
|
|
Bob is such a talker! |
|
|
>>> segment = Segment(22, 23) |
|
|
>>> if not annotation.argmax(support): |
|
|
... print "No label intersecting %s" % segment |
|
|
No label intersection [22 --> 23] |
|
|
|
|
|
""" |
|
|
|
|
|
cropped = self |
|
|
if support is not None: |
|
|
cropped = cropped.crop(support, mode="intersection") |
|
|
|
|
|
if not cropped: |
|
|
return None |
|
|
|
|
|
return max( |
|
|
((_, cropped.label_duration(_)) for _ in cropped.labels()), |
|
|
key=lambda x: x[1], |
|
|
)[0] |
|
|
|
|
|
def rename_tracks(self, generator: LabelGenerator = "string") -> "Annotation": |
|
|
"""Rename all tracks |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
generator : 'string', 'int', or iterable, optional |
|
|
If 'string' (default) rename tracks to 'A', 'B', 'C', etc. |
|
|
If 'int', rename tracks to 0, 1, 2, etc. |
|
|
If iterable, use it to generate track names. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
renamed : Annotation |
|
|
Copy of the original annotation where tracks are renamed. |
|
|
|
|
|
Example |
|
|
------- |
|
|
>>> annotation = Annotation() |
|
|
>>> annotation[Segment(0, 1), 'a'] = 'a' |
|
|
>>> annotation[Segment(0, 1), 'b'] = 'b' |
|
|
>>> annotation[Segment(1, 2), 'a'] = 'a' |
|
|
>>> annotation[Segment(1, 3), 'c'] = 'c' |
|
|
>>> print(annotation) |
|
|
[ 00:00:00.000 --> 00:00:01.000] a a |
|
|
[ 00:00:00.000 --> 00:00:01.000] b b |
|
|
[ 00:00:01.000 --> 00:00:02.000] a a |
|
|
[ 00:00:01.000 --> 00:00:03.000] c c |
|
|
>>> print(annotation.rename_tracks(generator='int')) |
|
|
[ 00:00:00.000 --> 00:00:01.000] 0 a |
|
|
[ 00:00:00.000 --> 00:00:01.000] 1 b |
|
|
[ 00:00:01.000 --> 00:00:02.000] 2 a |
|
|
[ 00:00:01.000 --> 00:00:03.000] 3 c |
|
|
""" |
|
|
|
|
|
renamed = self.__class__(uri=self.uri, modality=self.modality) |
|
|
|
|
|
if generator == "string": |
|
|
generator = string_generator() |
|
|
elif generator == "int": |
|
|
generator = int_generator() |
|
|
|
|
|
|
|
|
for s, _, label in self.itertracks(yield_label=True): |
|
|
renamed[s, next(generator)] = label |
|
|
return renamed |
|
|
|
|
|
def rename_labels( |
|
|
self, |
|
|
mapping: Optional[Dict] = None, |
|
|
generator: LabelGenerator = "string", |
|
|
copy: bool = True, |
|
|
) -> "Annotation": |
|
|
"""Rename labels |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
mapping : dict, optional |
|
|
{old_name: new_name} mapping dictionary. |
|
|
generator : 'string', 'int' or iterable, optional |
|
|
If 'string' (default) rename label to 'A', 'B', 'C', ... If 'int', |
|
|
rename to 0, 1, 2, etc. If iterable, use it to generate labels. |
|
|
copy : bool, optional |
|
|
Set to True to return a copy of the annotation. Set to False to |
|
|
update the annotation in-place. Defaults to True. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
renamed : Annotation |
|
|
Annotation where labels have been renamed |
|
|
|
|
|
Note |
|
|
---- |
|
|
Unmapped labels are kept unchanged. |
|
|
|
|
|
Note |
|
|
---- |
|
|
Parameter `generator` has no effect when `mapping` is provided. |
|
|
|
|
|
""" |
|
|
|
|
|
if mapping is None: |
|
|
if generator == "string": |
|
|
generator = string_generator() |
|
|
elif generator == "int": |
|
|
generator = int_generator() |
|
|
|
|
|
mapping = {label: next(generator) for label in self.labels()} |
|
|
|
|
|
renamed = self.copy() if copy else self |
|
|
|
|
|
for old_label, new_label in mapping.items(): |
|
|
renamed._labelNeedsUpdate[old_label] = True |
|
|
renamed._labelNeedsUpdate[new_label] = True |
|
|
|
|
|
for segment, tracks in self._tracks.items(): |
|
|
new_tracks = { |
|
|
track: mapping.get(label, label) for track, label in tracks.items() |
|
|
} |
|
|
renamed._tracks[segment] = new_tracks |
|
|
|
|
|
return renamed |
|
|
|
|
|
def relabel_tracks(self, generator: LabelGenerator = "string") -> "Annotation": |
|
|
"""Relabel tracks |
|
|
|
|
|
Create a new annotation where each track has a unique label. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
generator : 'string', 'int' or iterable, optional |
|
|
If 'string' (default) relabel tracks to 'A', 'B', 'C', ... If 'int' |
|
|
relabel to 0, 1, 2, ... If iterable, use it to generate labels. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
renamed : Annotation |
|
|
New annotation with relabeled tracks. |
|
|
""" |
|
|
|
|
|
if generator == "string": |
|
|
generator = string_generator() |
|
|
elif generator == "int": |
|
|
generator = int_generator() |
|
|
|
|
|
relabeled = self.empty() |
|
|
for s, t, _ in self.itertracks(yield_label=True): |
|
|
relabeled[s, t] = next(generator) |
|
|
|
|
|
return relabeled |
|
|
|
|
|
def support(self, collar: float = 0.0) -> "Annotation": |
|
|
"""Annotation support |
|
|
|
|
|
The support of an annotation is an annotation where contiguous tracks |
|
|
with same label are merged into one unique covering track. |
|
|
|
|
|
A picture is worth a thousand words:: |
|
|
|
|
|
collar |
|
|
|---| |
|
|
|
|
|
annotation |
|
|
|--A--| |--A--| |-B-| |
|
|
|-B-| |--C--| |----B-----| |
|
|
|
|
|
annotation.support(collar) |
|
|
|------A------| |------B------| |
|
|
|-B-| |--C--| |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
collar : float, optional |
|
|
Merge tracks with same label and separated by less than `collar` |
|
|
seconds. This is why 'A' tracks are merged in above figure. |
|
|
Defaults to 0. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
support : Annotation |
|
|
Annotation support |
|
|
|
|
|
Note |
|
|
---- |
|
|
Track names are lost in the process. |
|
|
""" |
|
|
|
|
|
generator = string_generator() |
|
|
|
|
|
|
|
|
|
|
|
support = self.empty() |
|
|
for label in self.labels(): |
|
|
|
|
|
|
|
|
timeline = self.label_timeline(label, copy=True) |
|
|
|
|
|
|
|
|
timeline = timeline.support(collar) |
|
|
|
|
|
|
|
|
for segment in timeline.support(): |
|
|
support[segment, next(generator)] = label |
|
|
|
|
|
return support |
|
|
|
|
|
def co_iter( |
|
|
self, other: "Annotation" |
|
|
) -> Iterator[Tuple[Tuple[Segment, TrackName], Tuple[Segment, TrackName]]]: |
|
|
"""Iterate over pairs of intersecting tracks |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
other : Annotation |
|
|
Second annotation |
|
|
|
|
|
Returns |
|
|
------- |
|
|
iterable : (Segment, object), (Segment, object) iterable |
|
|
Yields pairs of intersecting tracks, in chronological (then |
|
|
alphabetical) order. |
|
|
|
|
|
See also |
|
|
-------- |
|
|
:func:`~pyannote.core.Timeline.co_iter` |
|
|
|
|
|
""" |
|
|
timeline = self.get_timeline(copy=False) |
|
|
other_timeline = other.get_timeline(copy=False) |
|
|
for s, S in timeline.co_iter(other_timeline): |
|
|
tracks = sorted(self.get_tracks(s), key=str) |
|
|
other_tracks = sorted(other.get_tracks(S), key=str) |
|
|
for t, T in itertools.product(tracks, other_tracks): |
|
|
yield (s, t), (S, T) |
|
|
|
|
|
def __mul__(self, other: "Annotation") -> np.ndarray: |
|
|
"""Cooccurrence (or confusion) matrix |
|
|
|
|
|
>>> matrix = annotation * other |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
other : Annotation |
|
|
Second annotation |
|
|
|
|
|
Returns |
|
|
------- |
|
|
cooccurrence : (n_self, n_other) np.ndarray |
|
|
Cooccurrence matrix where `n_self` (resp. `n_other`) is the number |
|
|
of labels in `self` (resp. `other`). |
|
|
""" |
|
|
|
|
|
if not isinstance(other, Annotation): |
|
|
raise TypeError( |
|
|
"computing cooccurrence matrix only works with Annotation " "instances." |
|
|
) |
|
|
|
|
|
i_labels = self.labels() |
|
|
j_labels = other.labels() |
|
|
|
|
|
I = {label: i for i, label in enumerate(i_labels)} |
|
|
J = {label: j for j, label in enumerate(j_labels)} |
|
|
|
|
|
matrix = np.zeros((len(I), len(J))) |
|
|
|
|
|
|
|
|
for (segment, track), (other_segment, other_track) in self.co_iter(other): |
|
|
i = I[self[segment, track]] |
|
|
j = J[other[other_segment, other_track]] |
|
|
duration = (segment & other_segment).duration |
|
|
matrix[i, j] += duration |
|
|
|
|
|
return matrix |
|
|
|
|
|
def discretize( |
|
|
self, |
|
|
support: Optional[Segment] = None, |
|
|
resolution: Union[float, SlidingWindow] = 0.01, |
|
|
labels: Optional[List[Hashable]] = None, |
|
|
duration: Optional[float] = None, |
|
|
): |
|
|
"""Discretize |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
support : Segment, optional |
|
|
Part of annotation to discretize. |
|
|
Defaults to annotation full extent. |
|
|
resolution : float or SlidingWindow, optional |
|
|
Defaults to 10ms frames. |
|
|
labels : list of labels, optional |
|
|
Defaults to self.labels() |
|
|
duration : float, optional |
|
|
Overrides support duration and ensures that the number of |
|
|
returned frames is fixed (which might otherwise not be the case |
|
|
because of rounding errors). |
|
|
|
|
|
Returns |
|
|
------- |
|
|
discretized : SlidingWindowFeature |
|
|
(num_frames, num_labels)-shaped binary features. |
|
|
""" |
|
|
|
|
|
if support is None: |
|
|
support = self.get_timeline().extent() |
|
|
start_time, end_time = support |
|
|
|
|
|
cropped = self.crop(support, mode="intersection") |
|
|
|
|
|
if labels is None: |
|
|
labels = cropped.labels() |
|
|
|
|
|
if isinstance(resolution, SlidingWindow): |
|
|
resolution = SlidingWindow( |
|
|
start=start_time, step=resolution.step, duration=resolution.duration |
|
|
) |
|
|
else: |
|
|
resolution = SlidingWindow( |
|
|
start=start_time, step=resolution, duration=resolution |
|
|
) |
|
|
|
|
|
start_frame = resolution.closest_frame(start_time) |
|
|
if duration is None: |
|
|
end_frame = resolution.closest_frame(end_time) |
|
|
num_frames = end_frame - start_frame |
|
|
else: |
|
|
num_frames = int(round(duration / resolution.step)) |
|
|
|
|
|
data = np.zeros((num_frames, len(labels)), dtype=np.uint8) |
|
|
for k, label in enumerate(labels): |
|
|
segments = cropped.label_timeline(label) |
|
|
for start, stop in resolution.crop( |
|
|
segments, mode="center", return_ranges=True |
|
|
): |
|
|
data[max(0, start) : min(stop, num_frames), k] += 1 |
|
|
data = np.minimum(data, 1, out=data) |
|
|
|
|
|
return SlidingWindowFeature(data, resolution, labels=labels) |
|
|
|
|
|
@classmethod |
|
|
def from_records( |
|
|
cls, |
|
|
records: Iterator[Tuple[Segment, TrackName, Label]], |
|
|
uri: Optional[str] = None, |
|
|
modality: Optional[str] = None, |
|
|
) -> "Annotation": |
|
|
"""Annotation |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
records : iterator of tuples |
|
|
(segment, track, label) tuples |
|
|
uri : string, optional |
|
|
name of annotated resource (e.g. audio or video file) |
|
|
modality : string, optional |
|
|
name of annotated modality |
|
|
|
|
|
Returns |
|
|
------- |
|
|
annotation : Annotation |
|
|
New annotation |
|
|
|
|
|
""" |
|
|
annotation = cls(uri=uri, modality=modality) |
|
|
tracks = defaultdict(dict) |
|
|
labels = set() |
|
|
for segment, track, label in records: |
|
|
tracks[segment][track] = label |
|
|
labels.add(label) |
|
|
annotation._tracks = SortedDict(tracks) |
|
|
annotation._labels = {label: None for label in labels} |
|
|
annotation._labelNeedsUpdate = {label: True for label in annotation._labels} |
|
|
annotation._timeline = None |
|
|
annotation._timelineNeedsUpdate = True |
|
|
|
|
|
return annotation |
|
|
|
|
|
def _repr_png_(self): |
|
|
"""IPython notebook support |
|
|
|
|
|
See also |
|
|
-------- |
|
|
:mod:`pyannote.core.notebook` |
|
|
""" |
|
|
from .notebook import MATPLOTLIB_IS_AVAILABLE, MATPLOTLIB_WARNING |
|
|
|
|
|
if not MATPLOTLIB_IS_AVAILABLE: |
|
|
warnings.warn(MATPLOTLIB_WARNING.format(klass=self.__class__.__name__)) |
|
|
return None |
|
|
|
|
|
from .notebook import repr_annotation |
|
|
return repr_annotation(self) |
|
|
|