Spaces:
Running
Running
| import numpy as np | |
| from sortedcontainers import SortedList | |
| PYANNOTE_SEGMENT = 'segment' | |
| class Timeline: | |
| def from_df(cls, df, uri = None): | |
| return cls(segments=list(df[PYANNOTE_SEGMENT]), uri=uri) | |
| def __init__(self, segments = None, uri = None): | |
| if segments is None: segments = () | |
| segments_set = set([segment for segment in segments if segment]) | |
| self.segments_set_ = segments_set | |
| self.segments_list_ = SortedList(segments_set) | |
| self.segments_boundaries_ = SortedList((boundary for segment in segments_set for boundary in segment)) | |
| self.uri = uri | |
| def __len__(self): | |
| return len(self.segments_set_) | |
| def __nonzero__(self): | |
| return self.__bool__() | |
| def __bool__(self): | |
| return len(self.segments_set_) > 0 | |
| def __iter__(self): | |
| return iter(self.segments_list_) | |
| def __getitem__(self, k): | |
| return self.segments_list_[k] | |
| def __eq__(self, other): | |
| return self.segments_set_ == other.segments_set_ | |
| def __ne__(self, other): | |
| return self.segments_set_ != other.segments_set_ | |
| def index(self, segment): | |
| return self.segments_list_.index(segment) | |
| def add(self, segment): | |
| segments_set_ = self.segments_set_ | |
| if segment in segments_set_ or not segment: return self | |
| segments_set_.add(segment) | |
| self.segments_list_.add(segment) | |
| segments_boundaries_ = self.segments_boundaries_ | |
| segments_boundaries_.add(segment.start) | |
| segments_boundaries_.add(segment.end) | |
| return self | |
| def remove(self, segment): | |
| segments_set_ = self.segments_set_ | |
| if segment not in segments_set_: return self | |
| segments_set_.remove(segment) | |
| self.segments_list_.remove(segment) | |
| segments_boundaries_ = self.segments_boundaries_ | |
| segments_boundaries_.remove(segment.start) | |
| segments_boundaries_.remove(segment.end) | |
| return self | |
| def discard(self, segment): | |
| return self.remove(segment) | |
| def __ior__(self, timeline): | |
| return self.update(timeline) | |
| def update(self, timeline): | |
| segments_set = self.segments_set_ | |
| segments_set |= timeline.segments_set_ | |
| self.segments_list_ = SortedList(segments_set) | |
| self.segments_boundaries_ = SortedList((boundary for segment in segments_set for boundary in segment)) | |
| return self | |
| def __or__(self, timeline): | |
| return self.union(timeline) | |
| def union(self, timeline): | |
| return Timeline(segments=self.segments_set_ | timeline.segments_set_, uri=self.uri) | |
| def co_iter(self, other): | |
| for segment in self.segments_list_: | |
| temp = Segment(start=segment.end, end=segment.end) | |
| for other_segment in other.segments_list_.irange(maximum=temp): | |
| if segment.intersects(other_segment): yield segment, other_segment | |
| def crop_iter(self, support, mode = 'intersection', returns_mapping = False): | |
| if mode not in {'loose', 'strict', 'intersection'}: raise ValueError | |
| if not isinstance(support, (Segment, Timeline)): raise TypeError | |
| if isinstance(support, Segment): | |
| support = Timeline(segments=([support] if support else []), uri=self.uri) | |
| for yielded in self.crop_iter(support, mode=mode, returns_mapping=returns_mapping): | |
| yield yielded | |
| return | |
| support = support.support() | |
| if mode == 'loose': | |
| for segment, _ in self.co_iter(support): | |
| yield segment | |
| return | |
| if mode == 'strict': | |
| for segment, other_segment in self.co_iter(support): | |
| if segment in other_segment: yield segment | |
| return | |
| for segment, other_segment in self.co_iter(support): | |
| mapped_to = segment & other_segment | |
| if not mapped_to: continue | |
| if returns_mapping: yield segment, mapped_to | |
| else: yield mapped_to | |
| def crop(self, support, mode = 'intersection', returns_mapping = False): | |
| if mode == 'intersection' and returns_mapping: | |
| segments, mapping = [], {} | |
| for segment, mapped_to in self.crop_iter(support, mode='intersection', returns_mapping=True): | |
| segments.append(mapped_to) | |
| mapping[mapped_to] = mapping.get(mapped_to, list()) + [segment] | |
| return Timeline(segments=segments, uri=self.uri), mapping | |
| return Timeline(segments=self.crop_iter(support, mode=mode), uri=self.uri) | |
| def overlapping(self, t): | |
| return list(self.overlapping_iter(t)) | |
| def overlapping_iter(self, t): | |
| for segment in self.segments_list_.irange(maximum=Segment(start=t, end=t)): | |
| if segment.overlaps(t): yield segment | |
| def get_overlap(self): | |
| overlaps_tl = Timeline(uri=self.uri) | |
| for s1, s2 in self.co_iter(self): | |
| if s1 == s2: continue | |
| overlaps_tl.add(s1 & s2) | |
| return overlaps_tl.support() | |
| def extrude(self, removed, mode = 'intersection'): | |
| if isinstance(removed, Segment): removed = Timeline([removed]) | |
| if mode == "loose": mode = "strict" | |
| elif mode == "strict": mode = "loose" | |
| return self.crop(removed.gaps(support=Timeline([self.extent()], uri=self.uri)), mode=mode) | |
| def __str__(self): | |
| n = len(self.segments_list_) | |
| string = "[" | |
| for i, segment in enumerate(self.segments_list_): | |
| string += str(segment) | |
| string += "\n " if i + 1 < n else "" | |
| string += "]" | |
| return string | |
| def __repr__(self): | |
| return "<Timeline(uri=%s, segments=%s)>" % (self.uri, list(self.segments_list_)) | |
| def __contains__(self, included): | |
| if isinstance(included, Segment): return included in self.segments_set_ | |
| elif isinstance(included, Timeline): return self.segments_set_.issuperset(included.segments_set_) | |
| else: raise TypeError | |
| def empty(self): | |
| return Timeline(uri=self.uri) | |
| def covers(self, other): | |
| gaps = self.gaps(support=other.extent()) | |
| for _ in gaps.co_iter(other): | |
| return False | |
| return True | |
| def copy(self, segment_func = None): | |
| if segment_func is None: return Timeline(segments=self.segments_list_, uri=self.uri) | |
| return Timeline(segments=[segment_func(s) for s in self.segments_list_], uri=self.uri) | |
| def extent(self): | |
| if self.segments_set_: | |
| segments_boundaries_ = self.segments_boundaries_ | |
| return Segment(start=segments_boundaries_[0], end=segments_boundaries_[-1]) | |
| return Segment(start=0.0, end=0.0) | |
| def support_iter(self, collar = 0.0): | |
| if not self: return | |
| new_segment = self.segments_list_[0] | |
| for segment in self: | |
| possible_gap = segment ^ new_segment | |
| if not possible_gap or possible_gap.duration < collar: new_segment |= segment | |
| else: | |
| yield new_segment | |
| new_segment = segment | |
| yield new_segment | |
| def support(self, collar = 0.): | |
| return Timeline(segments=self.support_iter(collar), uri=self.uri) | |
| def duration(self): | |
| return sum(s.duration for s in self.support_iter()) | |
| def gaps_iter(self, support = None): | |
| if support is None: support = self.extent() | |
| if not isinstance(support, (Segment, Timeline)): raise TypeError | |
| if isinstance(support, Segment): | |
| end = support.start | |
| for segment in self.crop(support, mode='intersection').support(): | |
| gap = Segment(start=end, end=segment.start) | |
| if gap: yield gap | |
| end = segment.end | |
| gap = Segment(start=end, end=support.end) | |
| if gap: yield gap | |
| elif isinstance(support, Timeline): | |
| for segment in support.support(): | |
| for gap in self.gaps_iter(support=segment): | |
| yield gap | |
| def gaps(self, support = None): | |
| return Timeline(segments=self.gaps_iter(support=support), uri=self.uri) | |
| def segmentation(self): | |
| support = self.support() | |
| timestamps = set([]) | |
| for (start, end) in self: | |
| timestamps.add(start) | |
| timestamps.add(end) | |
| timestamps = sorted(timestamps) | |
| if len(timestamps) == 0: return Timeline(uri=self.uri) | |
| segments = [] | |
| start = timestamps[0] | |
| for end in timestamps[1:]: | |
| segment = Segment(start=start, end=end) | |
| if segment and support.overlapping(segment.middle): segments.append(segment) | |
| start = end | |
| return Timeline(segments=segments, uri=self.uri) | |
| def _iter_uem(self): | |
| uri = self.uri if self.uri else "<NA>" | |
| for segment in self: | |
| yield f"{uri} 1 {segment.start:.3f} {segment.end:.3f}\n" | |
| def to_uem(self): | |
| return "".join([line for line in self._iter_uem()]) | |
| def write_uem(self, file): | |
| for line in self._iter_uem(): | |
| file.write(line) | |
| def _repr_png_(self): | |
| return None | |
| class Segment: | |
| def __init__(self, start, end): | |
| self.start = start | |
| self.end = end | |
| def set_precision(ndigits = None): | |
| global AUTO_ROUND_TIME, SEGMENT_PRECISION | |
| if ndigits is None: | |
| AUTO_ROUND_TIME = False | |
| SEGMENT_PRECISION = 1e-6 | |
| else: | |
| AUTO_ROUND_TIME = True | |
| SEGMENT_PRECISION = 10 ** (-ndigits) | |
| def __bool__(self): | |
| return bool((self.end - self.start) > SEGMENT_PRECISION) | |
| def __post_init__(self): | |
| if AUTO_ROUND_TIME: | |
| object.__setattr__(self, 'start', int(self.start / SEGMENT_PRECISION + 0.5) * SEGMENT_PRECISION) | |
| object.__setattr__(self, 'end', int(self.end / SEGMENT_PRECISION + 0.5) * SEGMENT_PRECISION) | |
| def duration(self): | |
| return self.end - self.start if self else 0. | |
| def middle(self): | |
| return .5 * (self.start + self.end) | |
| def __iter__(self): | |
| yield self.start | |
| yield self.end | |
| def copy(self): | |
| return Segment(start=self.start, end=self.end) | |
| def __contains__(self, other): | |
| return (self.start <= other.start) and (self.end >= other.end) | |
| def __and__(self, other): | |
| return Segment(start=max(self.start, other.start), end=min(self.end, other.end)) | |
| def intersects(self, other): | |
| return (self.start < other.start and other.start < self.end - SEGMENT_PRECISION) or (self.start > other.start and self.start < other.end - SEGMENT_PRECISION) or (self.start == other.start) | |
| def overlaps(self, t): | |
| return self.start <= t and self.end >= t | |
| def __or__(self, other): | |
| if not self: return other | |
| if not other: return self | |
| return Segment(start=min(self.start, other.start), end=max(self.end, other.end)) | |
| def __xor__(self, other): | |
| if (not self) or (not other): raise ValueError | |
| return Segment(start=min(self.end, other.end), end=max(self.start, other.start)) | |
| def _str_helper(self, seconds): | |
| from datetime import timedelta | |
| negative = seconds < 0 | |
| td = timedelta(seconds=abs(seconds)) | |
| hours, remainder = divmod(td.seconds + 86400 * td.days, 3600) | |
| minutes, seconds = divmod(remainder, 60) | |
| return '%s%02d:%02d:%02d.%03d' % ('-' if negative else ' ', hours, minutes, seconds, td.microseconds / 1000) | |
| def __str__(self): | |
| if self: return '[%s --> %s]' % (self._str_helper(self.start), self._str_helper(self.end)) | |
| return '[]' | |
| def __repr__(self): | |
| return '<Segment(%g, %g)>' % (self.start, self.end) | |
| def _repr_png_(self): | |
| return None | |
| class SlidingWindow: | |
| def __init__(self, duration=0.030, step=0.010, start=0.000, end=None): | |
| if duration <= 0: raise ValueError | |
| self.__duration = duration | |
| if step <= 0: raise ValueError | |
| self.__step = step | |
| self.__start = start | |
| if end is None: self.__end = np.inf | |
| else: | |
| if end <= start: raise ValueError | |
| self.__end = end | |
| self.__i = -1 | |
| def start(self): | |
| return self.__start | |
| def end(self): | |
| return self.__end | |
| def step(self): | |
| return self.__step | |
| def duration(self): | |
| return self.__duration | |
| def closest_frame(self, t): | |
| return int(np.rint((t - self.__start - .5 * self.__duration) / self.__step)) | |
| def samples(self, from_duration, mode = 'strict'): | |
| if mode == 'strict': return int(np.floor((from_duration - self.duration) / self.step)) + 1 | |
| elif mode == 'loose': return int(np.floor((from_duration + self.duration) / self.step)) | |
| elif mode == 'center': return int(np.rint((from_duration / self.step))) | |
| def crop(self, focus, mode = 'loose', fixed = None, return_ranges = False): | |
| if not isinstance(focus, (Segment, Timeline)): raise TypeError | |
| if isinstance(focus, Timeline): | |
| if fixed is not None: raise ValueError | |
| if return_ranges: | |
| ranges = [] | |
| for i, s in enumerate(focus.support()): | |
| rng = self.crop(s, mode=mode, fixed=fixed, return_ranges=True) | |
| if i == 0 or rng[0][0] > ranges[-1][1]: ranges += rng | |
| else: ranges[-1][1] = rng[0][1] | |
| return ranges | |
| return np.unique(np.hstack([self.crop(s, mode=mode, fixed=fixed, return_ranges=False) for s in focus.support()])) | |
| if mode == 'loose': | |
| i = int(np.ceil((focus.start - self.duration - self.start) / self.step)) | |
| if fixed is None: | |
| j = int(np.floor((focus.end - self.start) / self.step)) | |
| rng = (i, j + 1) | |
| else: | |
| n = self.samples(fixed, mode='loose') | |
| rng = (i, i + n) | |
| elif mode == 'strict': | |
| i = int(np.ceil((focus.start - self.start) / self.step)) | |
| if fixed is None: | |
| j = int(np.floor((focus.end - self.duration - self.start) / self.step)) | |
| rng = (i, j + 1) | |
| else: | |
| n = self.samples(fixed, mode='strict') | |
| rng = (i, i + n) | |
| elif mode == 'center': | |
| i = self.closest_frame(focus.start) | |
| if fixed is None: | |
| j = self.closest_frame(focus.end) | |
| rng = (i, j + 1) | |
| else: | |
| n = self.samples(fixed, mode='center') | |
| rng = (i, i + n) | |
| else: raise ValueError | |
| if return_ranges: return [list(rng)] | |
| return np.array(range(*rng), dtype=np.int64) | |
| def segmentToRange(self, segment): | |
| return self.segment_to_range(segment) | |
| def segment_to_range(self, segment): | |
| return self.closest_frame(segment.start), int(segment.duration / self.step) + 1 | |
| def rangeToSegment(self, i0, n): | |
| return self.range_to_segment(i0, n) | |
| def range_to_segment(self, i0, n): | |
| start = self.__start + (i0 - .5) * self.__step + .5 * self.__duration | |
| if i0 == 0: start = self.start | |
| return Segment(start, start + (n * self.__step)) | |
| def samplesToDuration(self, nSamples): | |
| return self.samples_to_duration(nSamples) | |
| def samples_to_duration(self, n_samples): | |
| return self.range_to_segment(0, n_samples).duration | |
| def durationToSamples(self, duration): | |
| return self.duration_to_samples(duration) | |
| def duration_to_samples(self, duration): | |
| return self.segment_to_range(Segment(0, duration))[1] | |
| def __getitem__(self, i): | |
| start = self.__start + i * self.__step | |
| if start >= self.__end: return None | |
| return Segment(start=start, end=start + self.__duration) | |
| def next(self): | |
| return self.__next__() | |
| def __next__(self): | |
| self.__i += 1 | |
| window = self[self.__i] | |
| if window: return window | |
| else: raise StopIteration() | |
| def __iter__(self): | |
| self.__i = -1 | |
| return self | |
| def __len__(self): | |
| if np.isinf(self.__end): raise ValueError | |
| i = self.closest_frame(self.__end) | |
| while (self[i]): | |
| i += 1 | |
| length = i | |
| return length | |
| def copy(self): | |
| return self.__class__(duration=self.duration, step=self.step, start=self.start, end=self.end) | |
| def __call__(self, support, align_last = False): | |
| if isinstance(support, Timeline): segments = support | |
| elif isinstance(support, Segment): segments = Timeline(segments=[support]) | |
| else: raise TypeError | |
| for segment in segments: | |
| if segment.duration < self.duration: continue | |
| for s in SlidingWindow(duration=self.duration, step=self.step, start=segment.start, end=segment.end): | |
| if s in segment: | |
| yield s | |
| last = s | |
| if align_last and last.end < segment.end: yield Segment(start=segment.end - self.duration, end=segment.end) |