Spaces:
Running
Running
| #!/usr/bin/python3 | |
| # Copyright (c) 2021 LALAL.AI | |
| # | |
| # Permission is hereby granted, free of charge, to any person obtaining a copy | |
| # of this software and associated documentation files (the "Software"), to deal | |
| # in the Software without restriction, including without limitation the rights | |
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
| # copies of the Software, and to permit persons to whom the Software is | |
| # furnished to do so, subject to the following conditions: | |
| # | |
| # The above copyright notice and this permission notice shall be included in all | |
| # copies or substantial portions of the Software. | |
| # | |
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
| # SOFTWARE. | |
| import cgi | |
| import json | |
| import os | |
| import sys | |
| import time | |
| from argparse import ArgumentParser | |
| from urllib.parse import quote, unquote, urlencode | |
| from urllib.request import urlopen, Request | |
| CURRENT_DIR_PATH = os.path.dirname(os.path.realpath(__file__)) | |
| URL_API = "https://www.lalal.ai/api/" | |
| def update_percent(pct): | |
| pct = str(pct) | |
| sys.stdout.write("\b" * len(pct)) | |
| sys.stdout.write(" " * len(pct)) | |
| sys.stdout.write("\b" * len(pct)) | |
| sys.stdout.write(pct) | |
| sys.stdout.flush() | |
| def make_content_disposition(filename, disposition="attachment"): | |
| try: | |
| filename.encode("ascii") | |
| file_expr = f'filename="{filename}"' | |
| except UnicodeEncodeError: | |
| quoted = quote(filename) | |
| file_expr = f"filename*=utf-8''{quoted}" | |
| return f"{disposition}; {file_expr}" | |
| def upload_file(file_path, license): | |
| url_for_upload = URL_API + "upload/" | |
| _, filename = os.path.split(file_path) | |
| headers = { | |
| "Content-Disposition": make_content_disposition(filename), | |
| "Authorization": f"license {license}", | |
| } | |
| with open(file_path, "rb") as f: | |
| request = Request(url_for_upload, f, headers) | |
| with urlopen(request) as response: | |
| upload_result = json.load(response) | |
| if upload_result["status"] == "success": | |
| return upload_result["id"] | |
| else: | |
| raise RuntimeError(upload_result["error"]) | |
| def split_file(file_id, license, stem, filter_type, splitter): | |
| url_for_split = URL_API + "split/" | |
| headers = { | |
| "Authorization": f"license {license}", | |
| } | |
| query_args = { | |
| "id": file_id, | |
| "stem": stem, | |
| "filter": filter_type, | |
| "splitter": splitter, | |
| } | |
| encoded_args = urlencode(query_args).encode("utf-8") | |
| request = Request(url_for_split, encoded_args, headers=headers) | |
| with urlopen(request) as response: | |
| split_result = json.load(response) | |
| if split_result["status"] == "error": | |
| raise RuntimeError(split_result["error"]) | |
| def check_file(file_id): | |
| url_for_check = URL_API + "check/?" | |
| query_args = {"id": file_id} | |
| encoded_args = urlencode(query_args) | |
| is_queueup = False | |
| while True: | |
| with urlopen(url_for_check + encoded_args) as response: | |
| check_result = json.load(response) | |
| if check_result["status"] == "error": | |
| raise RuntimeError(check_result["error"]) | |
| task_state = check_result["task"]["state"] | |
| if task_state == "error": | |
| raise RuntimeError(check_result["task"]["error"]) | |
| if task_state == "progress": | |
| progress = int(check_result["task"]["progress"]) | |
| if progress == 0 and not is_queueup: | |
| print("Queue up...") | |
| is_queueup = True | |
| elif progress > 0: | |
| update_percent(f"Progress: {progress}%") | |
| if task_state == "success": | |
| update_percent("Progress: 100%\n") | |
| stem_track_url = check_result["split"]["stem_track"] | |
| back_track_url = check_result["split"]["back_track"] | |
| return stem_track_url, back_track_url | |
| time.sleep(15) | |
| def get_filename_from_content_disposition(header): | |
| _, params = cgi.parse_header(header) | |
| filename = params.get("filename") | |
| if filename: | |
| return filename | |
| filename = params.get("filename*") | |
| if filename: | |
| encoding, quoted = filename.split("''") | |
| unquoted = unquote(quoted, encoding) | |
| return unquoted | |
| raise ValueError("Invalid header Content-Disposition") | |
| def download_file(url_for_download, output_path): | |
| with urlopen(url_for_download) as response: | |
| filename = get_filename_from_content_disposition( | |
| response.headers["Content-Disposition"] | |
| ) | |
| file_path = os.path.join(output_path, filename) | |
| with open(file_path, "wb") as f: | |
| while True: | |
| chunk = response.read(8196) | |
| if not chunk: | |
| break | |
| f.write(chunk) | |
| return file_path | |
| def batch_process_for_file(input_path, output_path, stem, filter_type, splitter): | |
| license = os.environ.get("LALALAI_LICENCE") | |
| try: | |
| print(f'Uploading the file "{input_path}"...') | |
| file_id = upload_file(file_path=input_path, license=license) | |
| print( | |
| f'The file "{input_path}" has been successfully uploaded (file id: {file_id})' | |
| ) | |
| print(f'Processing the file "{input_path}"...') | |
| split_file(file_id, license, stem, filter_type, splitter) | |
| stem_track_url, back_track_url = check_file(file_id) | |
| print(f'Downloading the stem track file "{stem_track_url}"...') | |
| stem_file = download_file(stem_track_url, output_path) | |
| print(f'The stem track file has been downloaded to "{stem_file}"') | |
| print(f'Downloading the back track file "{back_track_url}"...') | |
| back_track_file = download_file(back_track_url, output_path) | |
| print(f'The back track file has been downloaded to "{back_track_file}"') | |
| print(f'The file "{input_path}" has been successfully split') | |
| return stem_file | |
| except Exception as err: | |
| print(f'Cannot process the file "{input_path}": {err}') | |
| def batch_process(input_path, output_path, stem, filter_type, splitter): | |
| if type(input_path) is list: | |
| downloaded_files = [] | |
| for path in input_path: | |
| if os.path.isfile(path): | |
| downloaded_file = batch_process_for_file(path, output_path, stem, filter_type, splitter) | |
| downloaded_files.append(downloaded_file) | |
| return downloaded_files | |
| if os.path.isfile(input_path): | |
| downloaded_file = batch_process_for_file(input_path, output_path, stem, filter_type, splitter) | |
| return downloaded_file | |
| else: | |
| downloaded_files = [] | |
| for path in os.listdir(input_path): | |
| path = os.path.join(input_path, path) | |
| if os.path.isfile(path): | |
| downloaded_file = batch_process_for_file(path, output_path, stem, filter_type, splitter) | |
| downloaded_files.append(downloaded_file) | |
| return downloaded_files | |
| def main(): | |
| parser = ArgumentParser(description="Lalalai splitter") | |
| parser.add_argument( | |
| "--input", type=str, required=True, help="Input directory or a file" | |
| ) | |
| parser.add_argument( | |
| "--output", type=str, default=CURRENT_DIR_PATH, help="Output directory" | |
| ) | |
| parser.add_argument( | |
| "--stem", | |
| type=str, | |
| default="vocals", | |
| choices=[ | |
| "vocals", | |
| "drum", | |
| "bass", | |
| "piano", | |
| "electric_guitar", | |
| "acoustic_guitar", | |
| "synthesizer", | |
| "voice", | |
| "strings", | |
| "wind", | |
| ], | |
| help='Stem option. Stems "voice", "strings", "wind" are not supported by Cassiopeia', | |
| ) | |
| parser.add_argument( | |
| "--filter", | |
| type=int, | |
| default=1, | |
| choices=[0, 1, 2], | |
| help="0 (mild), 1 (normal), 2 (aggressive)", | |
| ) | |
| parser.add_argument( | |
| "--splitter", | |
| type=str, | |
| default="phoenix", | |
| choices=["phoenix", "cassiopeia"], | |
| help="The type of neural network used to split audio", | |
| ) | |
| args = parser.parse_args() | |
| os.makedirs(args.output, exist_ok=True) | |
| batch_process(args.input, args.output, args.stem, args.filter, args.splitter) | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except Exception as err: | |
| print(err) | |