Spaces:
Runtime error
Runtime error
| import argparse | |
| from concurrent.futures import ProcessPoolExecutor | |
| import time | |
| import subprocess as sp | |
| from pathlib import Path | |
| import typing as tp | |
| import warnings | |
| from tempfile import NamedTemporaryFile | |
| import gradio as gr | |
| from audiocraft.data.audio import audio_write | |
| from audiocraft.models import MusicGen | |
| MODEL = None | |
| INTERRUPTING = False | |
| # We have to wrap subprocess call to clean a bit the log when using gr.make_waveform | |
| _old_call = sp.call | |
| def _call_nostderr(*args, **kwargs): | |
| # Avoid ffmpeg vomiting on the logs. | |
| kwargs['stderr'] = sp.DEVNULL | |
| kwargs['stdout'] = sp.DEVNULL | |
| _old_call(*args, **kwargs) | |
| sp.call = _call_nostderr | |
| # Preallocating the pool of processes. | |
| pool = ProcessPoolExecutor(4) | |
| pool.__enter__() | |
| def interrupt(): | |
| global INTERRUPTING | |
| INTERRUPTING = True | |
| class FileCleaner: | |
| def __init__(self, file_lifetime: float = 3600): | |
| self.file_lifetime = file_lifetime | |
| self.files = [] | |
| def add(self, path: tp.Union[str, Path]): | |
| self._cleanup() | |
| self.files.append((time.time(), Path(path))) | |
| def _cleanup(self): | |
| now = time.time() | |
| for time_added, path in list(self.files): | |
| if now - time_added > self.file_lifetime: | |
| if path.exists(): | |
| path.unlink() | |
| self.files.pop(0) | |
| else: | |
| break | |
| file_cleaner = FileCleaner() | |
| def make_waveform(*args, **kwargs): | |
| # Further remove some warnings. | |
| waveform_start = time.time() | |
| with warnings.catch_warnings(): | |
| warnings.simplefilter('ignore') | |
| out = gr.make_waveform(*args, **kwargs) | |
| print("Make a video took", time.time() - waveform_start) | |
| return out | |
| def load_model(version='facebook/musicgen-medium'): | |
| global MODEL | |
| print("Loading model", version) | |
| if MODEL is None or MODEL.name != version: | |
| MODEL = MusicGen.get_pretrained(version) | |
| def _do_predictions(texts, duration, progress=False, **gen_kwargs): | |
| MODEL.set_generation_params(duration=duration, **gen_kwargs) | |
| generate_start = time.time() | |
| outputs = MODEL.generate(texts, progress=progress) | |
| outputs = outputs.detach().cpu().float() | |
| pending_videos = [] | |
| out_wavs = [] | |
| for output in outputs: | |
| with NamedTemporaryFile("wb", suffix=".wav", delete=False) as file: | |
| audio_write( | |
| file.name, output, MODEL.sample_rate, strategy="loudness", | |
| loudness_headroom_db=16, loudness_compressor=True, add_suffix=False) | |
| pending_videos.append(pool.submit(make_waveform, file.name)) | |
| out_wavs.append(file.name) | |
| file_cleaner.add(file.name) | |
| out_videos = [pending_video.result() for pending_video in pending_videos] | |
| for video in out_videos: | |
| file_cleaner.add(video) | |
| print("generation took", time.time() - generate_start) | |
| print("Tempfiles currently stored: ", len(file_cleaner.files)) | |
| return out_videos, out_wavs | |
| def predict_full(model, text, duration, bpm, topk, topp, temperature, cfg_coef, progress=gr.Progress()): | |
| text = "lofi " + text + " bpm: " + str(bpm) | |
| global INTERRUPTING | |
| INTERRUPTING = False | |
| if temperature < 0: | |
| raise gr.Error("Temperature must be >= 0.") | |
| if topk < 0: | |
| raise gr.Error("Topk must be non-negative.") | |
| if topp < 0: | |
| raise gr.Error("Topp must be non-negative.") | |
| topk = int(topk) | |
| load_model(model) | |
| def _progress(generated, to_generate): | |
| progress((min(generated, to_generate), to_generate)) | |
| if INTERRUPTING: | |
| raise gr.Error("Interrupted.") | |
| MODEL.set_custom_progress_callback(_progress) | |
| videos, wavs = _do_predictions( | |
| [text], duration, progress=True, | |
| top_k=topk, top_p=topp, temperature=temperature, cfg_coef=cfg_coef) | |
| return videos[0], wavs[0], None, None | |
| def ui(launch_kwargs): | |
| with gr.Blocks() as interface: | |
| gr.Markdown( | |
| """ | |
| # Lofi University | |
| Generate lofi tracks to help study. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Row(): | |
| text = gr.Text(label="Describe your lofi", interactive=True) | |
| with gr.Row(): | |
| submit = gr.Button("Submit") | |
| _ = gr.Button("Interrupt").click(fn=interrupt, queue=False) | |
| with gr.Row(): | |
| model = gr.Radio(["facebook/musicgen-medium", "facebook/musicgen-small", | |
| "facebook/musicgen-large"], | |
| label="Model", value="facebook/musicgen-medium", interactive=True) | |
| with gr.Row(): | |
| bpm = gr.Slider(minimum=50, maximum=150, value=80, label="BPM", interactive=True) | |
| with gr.Row(): | |
| duration = gr.Slider(minimum=1, maximum=120, value=10, label="Duration", interactive=True) | |
| with gr.Row(): | |
| topk = gr.Number(label="Top-k", value=250, interactive=True) | |
| topp = gr.Number(label="Top-p", value=0, interactive=True) | |
| temperature = gr.Number(label="Temperature", value=1.0, interactive=True) | |
| cfg_coef = gr.Number(label="Classifier Free Guidance", value=3.0, interactive=True) | |
| with gr.Column(): | |
| output = gr.Video(label="Generated Music") | |
| audio_output = gr.Audio(label="Generated Music (wav)", type='filepath') | |
| submit.click(predict_full, inputs=[model, text, duration, bpm, topk, topp, temperature, cfg_coef], outputs=[output, audio_output]) | |
| gr.Examples( | |
| fn=predict_full, | |
| examples=[ | |
| [ | |
| "Dreamy synth layers with light beats", | |
| "facebook/musicgen-medium", | |
| ], | |
| [ | |
| "Mellow piano chords are accompanied by a subtle, relaxed drum loop", | |
| "facebook/musicgen-medium", | |
| ], | |
| ], | |
| inputs=[text, model], | |
| outputs=[output] | |
| ) | |
| interface.queue().launch(**launch_kwargs) | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| '--server_port', | |
| type=int, | |
| default=0, | |
| help='Port to run the server listener on', | |
| ) | |
| parser.add_argument( | |
| '--inbrowser', action='store_true', help='Open in browser' | |
| ) | |
| parser.add_argument( | |
| '--share', action='store_true', help='Share the gradio UI' | |
| ) | |
| args = parser.parse_args() | |
| launch_kwargs = {} | |
| if args.server_port: | |
| launch_kwargs['server_port'] = args.server_port | |
| if args.inbrowser: | |
| launch_kwargs['inbrowser'] = args.inbrowser | |
| if args.share: | |
| launch_kwargs['share'] = args.share | |
| ui(launch_kwargs) | |