Spaces:
Runtime error
Runtime error
| import os | |
| import pandas as pd | |
| from pydub import AudioSegment | |
| import numpy as np | |
| from moviepy.editor import * | |
| import time | |
| import pickle | |
| import audioread | |
| import librosa # install numba==0.49.1 | |
| # setup A: numba 0.51.2, librosa 0.6.3, llvmlite: 0.34.0 | |
| # setupB: numba==0.49.1, llvmlite-0.32.1 | |
| from src.music.config import RATE_AUDIO_SAVE | |
| import hashlib | |
| import unicodedata | |
| import re | |
| # from src.music.piano_detection_model.piano_detection_model import SR | |
| def clean_removed_mp3_from_csv(path): | |
| print(f"Cleaning meta_data.csv using files from the folder, in {path}") | |
| files = os.listdir(path) | |
| indexes_to_remove = [] | |
| meta_data = pd.read_csv(path + 'meta_data.csv') | |
| for i, fn in enumerate(meta_data['filename']): | |
| if fn not in files: | |
| indexes_to_remove.append(i) | |
| meta_data = meta_data.drop(indexes_to_remove) | |
| meta_data.to_csv(path + 'meta_data.csv', index=False) | |
| print('\tDone.') | |
| def clean_removed_csv_from_folder(path): | |
| print(f"Cleaning files from folder using meta_data.csv listed file, in {path}") | |
| files = os.listdir(path) | |
| meta_data = pd.read_csv(path + 'meta_data.csv') | |
| hashes = set(meta_data['hash']) | |
| count = 0 | |
| for f in files: | |
| if f not in ['meta_data.csv', 'url.txt']: | |
| if f[:-4] not in hashes: | |
| count += 1 | |
| print(count) | |
| # os.remove(path + f) | |
| stop = 1 | |
| print('\tDone.') | |
| # def convert_mp3_to_mono_16k(path): | |
| # print(f"\n\n\t\tConverting mp3 to mono and 16k sample rate, in {path}\n") | |
| # if '.mp3' == path[-4:]: | |
| # audio = AudioFileClip(path) | |
| # audio.write_audiofile(path[:-4] + '.mp3', | |
| # verbose=False, | |
| # logger=None, | |
| # fps=FPS, | |
| # ffmpeg_params=["-ac", "1"]) | |
| # else: | |
| # list_files = os.listdir(path) | |
| # for i, f in enumerate(list_files): | |
| # print(compute_progress(i, len(list_files))) | |
| # if ".mp3" in f: | |
| # audio = AudioFileClip(path + f) | |
| # audio.write_audiofile(path + f[:-4] + '.mp3', | |
| # verbose=False, | |
| # logger=None, | |
| # fps=FPS, # 16000 sr | |
| # ffmpeg_params=["-ac", "1"] # make it mono | |
| # ) | |
| # print('\tDone.') | |
| def load_audio(path, sr=22050, mono=True, offset=0.0, duration=None, | |
| dtype=np.float32, res_type='kaiser_best', | |
| backends=[audioread.ffdec.FFmpegAudioFile]): | |
| """Load audio. Copied from librosa.core.load() except that ffmpeg backend is | |
| always used in this function. Code from piano_transcription_inference""" | |
| y = [] | |
| with audioread.audio_open(os.path.realpath(path), backends=backends) as input_file: | |
| sr_native = input_file.samplerate | |
| n_channels = input_file.channels | |
| s_start = int(np.round(sr_native * offset)) * n_channels | |
| if duration is None: | |
| s_end = np.inf | |
| else: | |
| s_end = s_start + (int(np.round(sr_native * duration)) | |
| * n_channels) | |
| n = 0 | |
| for frame in input_file: | |
| frame = librosa.core.audio.util.buf_to_float(frame, dtype=dtype) | |
| n_prev = n | |
| n = n + len(frame) | |
| if n < s_start: | |
| # offset is after the current frame | |
| # keep reading | |
| continue | |
| if s_end < n_prev: | |
| # we're off the end. stop reading | |
| break | |
| if s_end < n: | |
| # the end is in this frame. crop. | |
| frame = frame[:s_end - n_prev] | |
| if n_prev <= s_start <= n: | |
| # beginning is in this frame | |
| frame = frame[(s_start - n_prev):] | |
| # tack on the current frame | |
| y.append(frame) | |
| if y: | |
| y = np.concatenate(y) | |
| if n_channels > 1: | |
| y = y.reshape((-1, n_channels)).T | |
| if mono: | |
| y = librosa.core.audio.to_mono(y) | |
| if sr is not None: | |
| y = librosa.core.audio.resample(y, sr_native, sr, res_type=res_type) | |
| else: | |
| sr = sr_native | |
| # Final cleanup for dtype and contiguity | |
| y = np.ascontiguousarray(y, dtype=dtype) | |
| return (y, sr) | |
| def compute_progress(iter, total): | |
| return f"{int((iter+ 1) / total * 100)}%" | |
| def compute_progress_and_eta(times, iter, total, n_av=3000): | |
| av_time = np.mean(times[-n_av:]) | |
| progress = int(((iter + 1) / total) * 100) | |
| eta_h = int(av_time * (total - iter) // 3600) | |
| eta_m = int((av_time * (total - iter) - (eta_h * 3600)) // 60) | |
| eta_s = int((av_time * (total - iter) - (eta_h * 3600) - eta_m * 60)) | |
| eta = f"Progress: {progress}%, ETA: {eta_h}H{eta_m}M{eta_s}S." | |
| return eta | |
| def crop_mp3_from_meta_data_constraints(path, clean_constraints=True): | |
| print(f"Cropping mp3 using constraints from meta_data.csv, in {path}") | |
| meta_data = pd.read_csv(path + 'meta_data.csv') | |
| constraint_start = meta_data['constraint_start'].copy() | |
| length = meta_data['length'].copy() | |
| constraint_end = meta_data['constraint_end'].copy() | |
| filenames = meta_data['filename'].copy() | |
| times = [5] | |
| for i, c_start, c_end, fn, l in zip(range(len(constraint_start)), constraint_start, constraint_end, filenames, length): | |
| if c_start != 0 or c_end != l: | |
| i_time = time.time() | |
| print(compute_progress_and_eta(times, i, len(constraint_start), n_av=100)) | |
| song = AudioSegment.from_mp3(path + fn) | |
| extract = song[c_start*1000:c_end*1000] | |
| extract.export(path + fn, format="mp3") | |
| if clean_constraints: | |
| constraint_start[i] = 0 | |
| constraint_end[i] = length[i] | |
| meta_data['constraint_start'] = constraint_start | |
| meta_data['constraint_end'] = constraint_end | |
| meta_data.to_csv(path + 'meta_data.csv', index=False) | |
| times.append(time.time() - i_time) | |
| print('\tDone.') | |
| def get_all_subfiles_with_extension(path, max_depth=3, extension='.*', current_depth=0): | |
| folders = [f for f in os.listdir(path) if os.path.isdir(path + f)] | |
| # get all files in current folder with a given extension | |
| if isinstance(extension, list): | |
| assert all([isinstance(e, str) for e in extension]), 'extension can be a str or a list' | |
| files = [path + f for f in os.listdir(path) if os.path.isfile(path + f) and any([ext == f[-len(ext):] for ext in extension])] | |
| elif isinstance(extension, str): | |
| assert extension[0] == '.', 'extension should be an extension or a list of extensions' | |
| if extension == '.*': | |
| files = [path + f for f in os.listdir(path) if os.path.isfile(path + f)] | |
| else: | |
| files = [path + f for f in os.listdir(path) if os.path.isfile(path + f) and f[-len(extension):]==extension] | |
| else: | |
| print('Error: extension should be either a str or a list') | |
| raise ValueError | |
| if current_depth < max_depth: | |
| for fold in folders: | |
| files += get_all_subfiles_with_extension(path + fold + '/', max_depth=max_depth, extension=extension, current_depth=current_depth+1) | |
| return files | |
| def get_out_path(in_path, in_word, out_word, out_extension, exclude_paths=()): | |
| splitted_in_path = in_path.split('/') | |
| for i in range(len(splitted_in_path)): | |
| if splitted_in_path[i] == in_word: | |
| splitted_in_path[i] = out_word | |
| playlist_index = i + 1 | |
| file_index = len(splitted_in_path) - 1 | |
| if splitted_in_path[playlist_index] in exclude_paths: | |
| to_exclude = True | |
| return None, to_exclude, None | |
| else: | |
| to_exclude = False | |
| if out_word != 'midi': | |
| splitted_in_path[playlist_index] = '_'.join(splitted_in_path[playlist_index].split('_')[:-len(in_word.split('_'))]) + '_' + out_word | |
| else: | |
| splitted_in_path[playlist_index] += '_' + out_word | |
| if 'fake' not in splitted_in_path: | |
| os.makedirs('/'.join(splitted_in_path[:playlist_index + 1]), exist_ok=True) | |
| if out_word != 'midi': | |
| new_filename = '_'.join(splitted_in_path[file_index].split('_')[:-len(in_word.split('_'))]) + '_' + out_word + out_extension | |
| else: | |
| new_filename = '.'.join(splitted_in_path[file_index].split('.')[:-len(in_word.split('_'))]) + '_' + out_word + out_extension | |
| splitted_in_path[file_index] = new_filename | |
| splitted_in_path = splitted_in_path[:playlist_index + 1] + [splitted_in_path[file_index]] | |
| out_path = '/'.join(splitted_in_path) | |
| return out_path, to_exclude, splitted_in_path[playlist_index] | |
| def set_all_seeds(seed): | |
| import random | |
| import numpy as np | |
| import torch | |
| torch.manual_seed(seed) | |
| random.seed(seed) | |
| np.random.seed(seed) | |
| def get_paths_in_and_out(in_path, in_word, in_extension, out_word, out_extension, max_depth, exclude_paths=()): | |
| # find all files with the in_extension in subfolders of in_path up to max_depth. | |
| # for each, replace the in_word keyword in folders with the out_word, and append out_word to filenames. | |
| all_in_paths = get_all_subfiles_with_extension(in_path, max_depth=max_depth, extension=in_extension) | |
| indexes_not_transcribed = [] | |
| all_out_paths = [] | |
| all_playlists = [] | |
| for i_path, in_path in enumerate(all_in_paths): | |
| out_path, to_exclude, playlist = get_out_path(in_path=in_path, in_word=in_word, out_word=out_word, out_extension=out_extension, exclude_paths=exclude_paths) | |
| if not to_exclude: | |
| indexes_not_transcribed.append(i_path) | |
| all_out_paths.append(out_path) | |
| all_playlists.append(playlist) | |
| all_in_paths = [in_path for i, in_path in enumerate(all_in_paths) if i in indexes_not_transcribed] | |
| assert len(all_out_paths) == len(all_in_paths) | |
| return all_in_paths, all_out_paths, all_playlists | |
| def get_path_and_filter_existing(in_path, in_word, in_extension, out_word, out_extension, max_depth, exclude_paths=()): | |
| # find all files with the in_extension in subfolders of in_path up to max_depth. | |
| # for each, replace the in_word keyword in folders with the out_word, and append out_word to filenames. | |
| all_in_paths = get_all_subfiles_with_extension(in_path, max_depth=max_depth, extension=in_extension) | |
| indexes_to_process = [] | |
| all_out_paths = [] | |
| all_playlists = [] | |
| for i_path, in_path in enumerate(all_in_paths): | |
| out_path, to_exclude, playlist = get_out_path(in_path=in_path, in_word=in_word, out_word=out_word, out_extension=out_extension, exclude_paths=exclude_paths) | |
| if not to_exclude: | |
| if not os.path.exists(out_path): | |
| indexes_to_process.append(i_path) | |
| all_out_paths.append(out_path) | |
| all_playlists.append(playlist) | |
| all_in_paths = list(np.array(all_in_paths)[indexes_to_process])#[in_path for i, in_path in enumerate(all_in_paths) if i in indexes_to_process] | |
| assert len(all_out_paths) == len(all_in_paths) | |
| return all_in_paths, all_out_paths, all_playlists | |
| def md5sum(filename, blocksize=65536): | |
| hash = hashlib.md5() | |
| with open(filename, "rb") as f: | |
| for block in iter(lambda: f.read(blocksize), b""): | |
| hash.update(block) | |
| return hash.hexdigest() | |
| emoji_pattern = re.compile("[" | |
| u"\U0001F600-\U0001F64F" # emoticons | |
| u"\U0001F300-\U0001F5FF" # symbols & pictographs | |
| u"\U0001F680-\U0001F6FF" # transport & map symbols | |
| u"\U0001F1E0-\U0001F1FF" # flags (iOS) | |
| "]+", flags=re.UNICODE) | |
| def slugify(value, allow_unicode=False): | |
| """ | |
| Taken from https://github.com/django/django/blob/master/django/utils/text.py | |
| Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated | |
| dashes to single dashes. Remove characters that aren't alphanumerics, | |
| underscores, or hyphens. Convert to lowercase. Also strip leading and | |
| trailing whitespace, dashes, and underscores. | |
| """ | |
| value = str(value).lower() | |
| if allow_unicode: | |
| value = unicodedata.normalize('NFKC', value) | |
| else: | |
| value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii') | |
| value = re.sub(r'[^\w\s-]', '', value.lower()) | |
| value = emoji_pattern.sub(r'', value) | |
| value = re.sub(r'[-\s]+', '_', value).strip('-_') | |
| # if value == '': | |
| # for i in range(10): | |
| # value += str(np.random.choice(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])) | |
| return value | |
| if __name__ == '__main__': | |
| path = "/home/cedric/Documents/pianocktail/data/midi/street_piano/" | |
| # for folder in ['my_sheet_music_transcriptions']:#os.listdir(path): | |
| # print('\n\n\t\t', folder) | |
| # convert_mp4_to_mp3(path + folder + '/') | |
| clean_removed_csv_from_folder(path) | |
| # folder = 'street_piano/' | |
| # for folder in ['street_piano/']: | |
| # clean_removed_mp3_from_csv(path + folder) | |