""" Utilities for operating on encoded Midi sequences. """ from collections import defaultdict from anticipation.config import * from anticipation.vocab import * def print_tokens(tokens): print('---------------------') for j, (tm, dur, note) in enumerate(zip(tokens[0::3],tokens[1::3],tokens[2::3])): if note == SEPARATOR: assert tm == SEPARATOR and dur == SEPARATOR print(j, 'SEPARATOR') continue if note == REST: assert tm < CONTROL_OFFSET assert dur == DUR_OFFSET+0 print(j, tm, 'REST') continue if note < CONTROL_OFFSET: tm = tm - TIME_OFFSET dur = dur - DUR_OFFSET note = note - NOTE_OFFSET instr = note//2**7 pitch = note - (2**7)*instr print(j, tm, dur, instr, pitch) else: tm = tm - ATIME_OFFSET dur = dur - ADUR_OFFSET note = note - ANOTE_OFFSET instr = note//2**7 pitch = note - (2**7)*instr print(j, tm, dur, instr, pitch, '(A)') def clip(tokens, start, end, clip_duration=True, seconds=True): if seconds: start = int(TIME_RESOLUTION*start) end = int(TIME_RESOLUTION*end) new_tokens = [] for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]): if note < CONTROL_OFFSET: this_time = time - TIME_OFFSET this_dur = dur - DUR_OFFSET else: this_time = time - ATIME_OFFSET this_dur = dur - ADUR_OFFSET if this_time < start or end < this_time: continue # truncate extended notes if clip_duration and end < this_time + this_dur: dur -= this_time + this_dur - end new_tokens.extend([time, dur, note]) return new_tokens def mask(tokens, start, end): new_tokens = [] for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]): if note < CONTROL_OFFSET: this_time = (time - TIME_OFFSET)/float(TIME_RESOLUTION) else: this_time = (time - ATIME_OFFSET)/float(TIME_RESOLUTION) if start < this_time < end: continue new_tokens.extend([time, dur, note]) return new_tokens def delete(tokens, criterion): new_tokens = [] for token in zip(tokens[0::3],tokens[1::3],tokens[2::3]): if criterion(token): continue new_tokens.extend(token) return new_tokens def sort(tokens): """ sort sequence of events or controls (but not both) """ times = tokens[0::3] indices = sorted(range(len(times)), key=times.__getitem__) sorted_tokens = [] for idx in indices: sorted_tokens.extend(tokens[3*idx:3*(idx+1)]) return sorted_tokens def split(tokens): """ split a sequence into events and controls """ events = [] controls = [] for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]): if note < CONTROL_OFFSET: events.extend([time, dur, note]) else: controls.extend([time, dur, note]) return events, controls def pad(tokens, end_time=None, density=TIME_RESOLUTION): """ Pads tokens up to end_time (if given) with REST (a special token value defined in vocab.py) up to a desired density. see Definition 3.5 and Example 3.6 in the paper. """ end_time = TIME_OFFSET+(end_time if end_time else max_time(tokens, seconds=False)) new_tokens = [] previous_time = TIME_OFFSET+0 for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]): # must pad before separation, anticipation assert note < CONTROL_OFFSET # insert pad tokens to ensure the desired density while time > previous_time + density: new_tokens.extend([previous_time+density, DUR_OFFSET+0, REST]) previous_time += density new_tokens.extend([time, dur, note]) previous_time = time while end_time > previous_time + density: new_tokens.extend([previous_time+density, DUR_OFFSET+0, REST]) previous_time += density return new_tokens def unpad(tokens): new_tokens = [] for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]): if note == REST: continue new_tokens.extend([time, dur, note]) return new_tokens def anticipate(events, controls, delta=DELTA*TIME_RESOLUTION): """ Interleave a sequence of events with anticipated controls. Inputs: events : a sequence of events controls : a sequence of time-localized controls delta : the anticipation interval Returns: tokens : interleaved events and anticipated controls controls : unconsumed controls (control time > max_time(events) + delta) """ if len(controls) == 0: return events, controls tokens = [] event_time = 0 control_time = controls[0] - ATIME_OFFSET for time, dur, note in zip(events[0::3],events[1::3],events[2::3]): while event_time >= control_time - delta: tokens.extend(controls[0:3]) controls = controls[3:] # consume this control control_time = controls[0] - ATIME_OFFSET if len(controls) > 0 else float('inf') assert note < CONTROL_OFFSET event_time = time - TIME_OFFSET tokens.extend([time, dur, note]) return tokens, controls def anticipate2(events, controls, map, delta=DELTA*TIME_RESOLUTION): """ Interleave a sequence of events with anticipated controls, where controls represent the performance of a piece and events represent the score, and map is a mapping from score beats and downbeats to performance beats and downbeats. Note that ATIME_OFFSET, CONTROL_OFFSET, TIME_OFFSET offset the arrival times of controls and events to differentiate between them. But they are subtracted to retrieve the actual time of the event or control. Also, our map interpolates from the first to last beats in the score/performance, so we need to throw away tokens that are not in the domain and range of the map. """ if len(controls) == 0: return events, controls domain_min = map.x.min() domain_max = map.x.max() range_min = map.y.min() range_max = map.y.max() filtered_events = [t for t in list(zip(events[0::3], events[1::3], events[2::3])) \ if domain_min <= t[0]/TIME_RESOLUTION <= domain_max] filtered_controls = [t for t in list(zip(controls[0::3], controls[1::3], controls[2::3])) \ if range_min <= (t[0]-CONTROL_OFFSET)/TIME_RESOLUTION <= range_max] tokens = [] control_time = filtered_controls[0][0] - ATIME_OFFSET for time, dur, note in filtered_events: while map(time / TIME_RESOLUTION)*TIME_RESOLUTION >= control_time - delta: tokens.extend(filtered_controls[0]) filtered_controls = filtered_controls[1:] # consume this control control_time = filtered_controls[0][0] - ATIME_OFFSET if len(filtered_controls) > 0 else float('inf') assert note < CONTROL_OFFSET tokens.extend([time, dur, note]) controls = [item for tup in filtered_controls for item in tup] return tokens, controls def sparsity(tokens): max_dt = 0 previous_time = TIME_OFFSET+0 for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]): if note == SEPARATOR: continue assert note < CONTROL_OFFSET # don't operate on interleaved sequences max_dt = max(max_dt, time - previous_time) previous_time = time return max_dt def min_time(tokens, seconds=True, instr=None): mt = None for time, dur, note in zip(tokens[0::3],tokens[1::3],tokens[2::3]): # stop calculating at sequence separator if note == SEPARATOR: break if note < CONTROL_OFFSET: time -= TIME_OFFSET note -= NOTE_OFFSET else: time -= ATIME_OFFSET note -= ANOTE_OFFSET # min time of a particular instrument if instr is not None and instr != note//2**7: continue mt = time if mt is None else min(mt, time) if mt is None: mt = 0 return mt/float(TIME_RESOLUTION) if seconds else mt def max_time(tokens, seconds=True, instr=None): mt = 0 for time, dur, note in zip(tokens[0::3],tokens[1::3],tokens[2::3]): # keep checking for max_time, even if it appears after a separator # (this is important because we use this check for vocab overflow in tokenization) if note == SEPARATOR: continue if note < CONTROL_OFFSET: time -= TIME_OFFSET note -= NOTE_OFFSET else: time -= ATIME_OFFSET note -= ANOTE_OFFSET # max time of a particular instrument if instr is not None and instr != note//2**7: continue mt = max(mt, time) return mt/float(TIME_RESOLUTION) if seconds else mt def get_instruments(tokens): instruments = defaultdict(int) for time, dur, note in zip(tokens[0::3],tokens[1::3],tokens[2::3]): if note >= SPECIAL_OFFSET: continue if note < CONTROL_OFFSET: note -= NOTE_OFFSET else: note -= ANOTE_OFFSET instr = note//2**7 instruments[instr] += 1 return instruments def translate(tokens, dt, seconds=False): if seconds: dt = int(TIME_RESOLUTION*dt) new_tokens = [] for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]): # stop translating after EOT if note == SEPARATOR: new_tokens.extend([time, dur, note]) dt = 0 continue if note < CONTROL_OFFSET: this_time = time - TIME_OFFSET else: this_time = time - ATIME_OFFSET assert 0 <= this_time + dt new_tokens.extend([time+dt, dur, note]) return new_tokens def combine(events, controls): return sort(events + [token - CONTROL_OFFSET for token in controls])