william590y's picture
Upload folder using huggingface_hub
151b875 verified
"""
Utilities for operating on encoded Midi sequences.
"""
from collections import defaultdict
from anticipation.config import *
from anticipation.vocab import *
def print_tokens(tokens):
print('---------------------')
for j, (tm, dur, note) in enumerate(zip(tokens[0::3],tokens[1::3],tokens[2::3])):
if note == SEPARATOR:
assert tm == SEPARATOR and dur == SEPARATOR
print(j, 'SEPARATOR')
continue
if note == REST:
assert tm < CONTROL_OFFSET
assert dur == DUR_OFFSET+0
print(j, tm, 'REST')
continue
if note < CONTROL_OFFSET:
tm = tm - TIME_OFFSET
dur = dur - DUR_OFFSET
note = note - NOTE_OFFSET
instr = note//2**7
pitch = note - (2**7)*instr
print(j, tm, dur, instr, pitch)
else:
tm = tm - ATIME_OFFSET
dur = dur - ADUR_OFFSET
note = note - ANOTE_OFFSET
instr = note//2**7
pitch = note - (2**7)*instr
print(j, tm, dur, instr, pitch, '(A)')
def clip(tokens, start, end, clip_duration=True, seconds=True):
if seconds:
start = int(TIME_RESOLUTION*start)
end = int(TIME_RESOLUTION*end)
new_tokens = []
for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
if note < CONTROL_OFFSET:
this_time = time - TIME_OFFSET
this_dur = dur - DUR_OFFSET
else:
this_time = time - ATIME_OFFSET
this_dur = dur - ADUR_OFFSET
if this_time < start or end < this_time:
continue
# truncate extended notes
if clip_duration and end < this_time + this_dur:
dur -= this_time + this_dur - end
new_tokens.extend([time, dur, note])
return new_tokens
def mask(tokens, start, end):
new_tokens = []
for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
if note < CONTROL_OFFSET:
this_time = (time - TIME_OFFSET)/float(TIME_RESOLUTION)
else:
this_time = (time - ATIME_OFFSET)/float(TIME_RESOLUTION)
if start < this_time < end:
continue
new_tokens.extend([time, dur, note])
return new_tokens
def delete(tokens, criterion):
new_tokens = []
for token in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
if criterion(token):
continue
new_tokens.extend(token)
return new_tokens
def sort(tokens):
""" sort sequence of events or controls (but not both) """
times = tokens[0::3]
indices = sorted(range(len(times)), key=times.__getitem__)
sorted_tokens = []
for idx in indices:
sorted_tokens.extend(tokens[3*idx:3*(idx+1)])
return sorted_tokens
def split(tokens):
""" split a sequence into events and controls """
events = []
controls = []
for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
if note < CONTROL_OFFSET:
events.extend([time, dur, note])
else:
controls.extend([time, dur, note])
return events, controls
def pad(tokens, end_time=None, density=TIME_RESOLUTION):
"""
Pads tokens up to end_time (if given) with REST (a special token value defined in vocab.py)
up to a desired density. see Definition 3.5 and Example 3.6 in the paper.
"""
end_time = TIME_OFFSET+(end_time if end_time else max_time(tokens, seconds=False))
new_tokens = []
previous_time = TIME_OFFSET+0
for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
# must pad before separation, anticipation
assert note < CONTROL_OFFSET
# insert pad tokens to ensure the desired density
while time > previous_time + density:
new_tokens.extend([previous_time+density, DUR_OFFSET+0, REST])
previous_time += density
new_tokens.extend([time, dur, note])
previous_time = time
while end_time > previous_time + density:
new_tokens.extend([previous_time+density, DUR_OFFSET+0, REST])
previous_time += density
return new_tokens
def unpad(tokens):
new_tokens = []
for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
if note == REST: continue
new_tokens.extend([time, dur, note])
return new_tokens
def anticipate(events, controls, delta=DELTA*TIME_RESOLUTION):
"""
Interleave a sequence of events with anticipated controls.
Inputs:
events : a sequence of events
controls : a sequence of time-localized controls
delta : the anticipation interval
Returns:
tokens : interleaved events and anticipated controls
controls : unconsumed controls (control time > max_time(events) + delta)
"""
if len(controls) == 0:
return events, controls
tokens = []
event_time = 0
control_time = controls[0] - ATIME_OFFSET
for time, dur, note in zip(events[0::3],events[1::3],events[2::3]):
while event_time >= control_time - delta:
tokens.extend(controls[0:3])
controls = controls[3:] # consume this control
control_time = controls[0] - ATIME_OFFSET if len(controls) > 0 else float('inf')
assert note < CONTROL_OFFSET
event_time = time - TIME_OFFSET
tokens.extend([time, dur, note])
return tokens, controls
def anticipate2(events, controls, map, delta=DELTA*TIME_RESOLUTION):
"""
Interleave a sequence of events with anticipated controls, where controls represent
the performance of a piece and events represent the score, and map is a mapping from
score beats and downbeats to performance beats and downbeats.
Note that ATIME_OFFSET, CONTROL_OFFSET, TIME_OFFSET offset the arrival times of
controls and events to differentiate between them. But they are subtracted to retrieve
the actual time of the event or control.
Also, our map interpolates from the first to last beats in the score/performance, so we need
to throw away tokens that are not in the domain and range of the map.
"""
if len(controls) == 0:
return events, controls
domain_min = map.x.min()
domain_max = map.x.max()
range_min = map.y.min()
range_max = map.y.max()
filtered_events = [t for t in list(zip(events[0::3], events[1::3], events[2::3])) \
if domain_min <= t[0]/TIME_RESOLUTION <= domain_max]
filtered_controls = [t for t in list(zip(controls[0::3], controls[1::3], controls[2::3])) \
if range_min <= (t[0]-CONTROL_OFFSET)/TIME_RESOLUTION <= range_max]
tokens = []
control_time = filtered_controls[0][0] - ATIME_OFFSET
for time, dur, note in filtered_events:
while map(time / TIME_RESOLUTION)*TIME_RESOLUTION >= control_time - delta:
tokens.extend(filtered_controls[0])
filtered_controls = filtered_controls[1:] # consume this control
control_time = filtered_controls[0][0] - ATIME_OFFSET if len(filtered_controls) > 0 else float('inf')
assert note < CONTROL_OFFSET
tokens.extend([time, dur, note])
controls = [item for tup in filtered_controls for item in tup]
return tokens, controls
def sparsity(tokens):
max_dt = 0
previous_time = TIME_OFFSET+0
for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
if note == SEPARATOR: continue
assert note < CONTROL_OFFSET # don't operate on interleaved sequences
max_dt = max(max_dt, time - previous_time)
previous_time = time
return max_dt
def min_time(tokens, seconds=True, instr=None):
mt = None
for time, dur, note in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
# stop calculating at sequence separator
if note == SEPARATOR: break
if note < CONTROL_OFFSET:
time -= TIME_OFFSET
note -= NOTE_OFFSET
else:
time -= ATIME_OFFSET
note -= ANOTE_OFFSET
# min time of a particular instrument
if instr is not None and instr != note//2**7:
continue
mt = time if mt is None else min(mt, time)
if mt is None: mt = 0
return mt/float(TIME_RESOLUTION) if seconds else mt
def max_time(tokens, seconds=True, instr=None):
mt = 0
for time, dur, note in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
# keep checking for max_time, even if it appears after a separator
# (this is important because we use this check for vocab overflow in tokenization)
if note == SEPARATOR: continue
if note < CONTROL_OFFSET:
time -= TIME_OFFSET
note -= NOTE_OFFSET
else:
time -= ATIME_OFFSET
note -= ANOTE_OFFSET
# max time of a particular instrument
if instr is not None and instr != note//2**7:
continue
mt = max(mt, time)
return mt/float(TIME_RESOLUTION) if seconds else mt
def get_instruments(tokens):
instruments = defaultdict(int)
for time, dur, note in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
if note >= SPECIAL_OFFSET: continue
if note < CONTROL_OFFSET:
note -= NOTE_OFFSET
else:
note -= ANOTE_OFFSET
instr = note//2**7
instruments[instr] += 1
return instruments
def translate(tokens, dt, seconds=False):
if seconds:
dt = int(TIME_RESOLUTION*dt)
new_tokens = []
for (time, dur, note) in zip(tokens[0::3],tokens[1::3],tokens[2::3]):
# stop translating after EOT
if note == SEPARATOR:
new_tokens.extend([time, dur, note])
dt = 0
continue
if note < CONTROL_OFFSET:
this_time = time - TIME_OFFSET
else:
this_time = time - ATIME_OFFSET
assert 0 <= this_time + dt
new_tokens.extend([time+dt, dur, note])
return new_tokens
def combine(events, controls):
return sort(events + [token - CONTROL_OFFSET for token in controls])