# Copyright 2022 The Music Spectrogram Diffusion Authors. # Copyright 2025 The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import math from typing import Any, Callable import numpy as np import torch from ....models import T5FilmDecoder from ....schedulers import DDPMScheduler from ....utils import is_onnx_available, logging from ....utils.torch_utils import randn_tensor if is_onnx_available(): from ...onnx_utils import OnnxRuntimeModel from ...pipeline_utils import AudioPipelineOutput, DiffusionPipeline from .continuous_encoder import SpectrogramContEncoder from .notes_encoder import SpectrogramNotesEncoder logger = logging.get_logger(__name__) # pylint: disable=invalid-name TARGET_FEATURE_LENGTH = 256 class SpectrogramDiffusionPipeline(DiffusionPipeline): r""" Pipeline for unconditional audio generation. This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods implemented for all pipelines (downloading, saving, running on a particular device, etc.). Args: notes_encoder ([`SpectrogramNotesEncoder`]): continuous_encoder ([`SpectrogramContEncoder`]): decoder ([`T5FilmDecoder`]): A [`T5FilmDecoder`] to denoise the encoded audio latents. scheduler ([`DDPMScheduler`]): A scheduler to be used in combination with `decoder` to denoise the encoded audio latents. melgan ([`OnnxRuntimeModel`]): """ _optional_components = ["melgan"] def __init__( self, notes_encoder: SpectrogramNotesEncoder, continuous_encoder: SpectrogramContEncoder, decoder: T5FilmDecoder, scheduler: DDPMScheduler, melgan: OnnxRuntimeModel if is_onnx_available() else Any, ) -> None: super().__init__() # From MELGAN self.min_value = math.log(1e-5) # Matches MelGAN training. self.max_value = 4.0 # Largest value for most examples self.n_dims = 128 self.register_modules( notes_encoder=notes_encoder, continuous_encoder=continuous_encoder, decoder=decoder, scheduler=scheduler, melgan=melgan, ) def scale_features(self, features, output_range=(-1.0, 1.0), clip=False): """Linearly scale features to network outputs range.""" min_out, max_out = output_range if clip: features = torch.clip(features, self.min_value, self.max_value) # Scale to [0, 1]. zero_one = (features - self.min_value) / (self.max_value - self.min_value) # Scale to [min_out, max_out]. return zero_one * (max_out - min_out) + min_out def scale_to_features(self, outputs, input_range=(-1.0, 1.0), clip=False): """Invert by linearly scaling network outputs to features range.""" min_out, max_out = input_range outputs = torch.clip(outputs, min_out, max_out) if clip else outputs # Scale to [0, 1]. zero_one = (outputs - min_out) / (max_out - min_out) # Scale to [self.min_value, self.max_value]. return zero_one * (self.max_value - self.min_value) + self.min_value def encode(self, input_tokens, continuous_inputs, continuous_mask): tokens_mask = input_tokens > 0 tokens_encoded, tokens_mask = self.notes_encoder( encoder_input_tokens=input_tokens, encoder_inputs_mask=tokens_mask ) continuous_encoded, continuous_mask = self.continuous_encoder( encoder_inputs=continuous_inputs, encoder_inputs_mask=continuous_mask ) return [(tokens_encoded, tokens_mask), (continuous_encoded, continuous_mask)] def decode(self, encodings_and_masks, input_tokens, noise_time): timesteps = noise_time if not torch.is_tensor(timesteps): timesteps = torch.tensor([timesteps], dtype=torch.long, device=input_tokens.device) elif torch.is_tensor(timesteps) and len(timesteps.shape) == 0: timesteps = timesteps[None].to(input_tokens.device) # broadcast to batch dimension in a way that's compatible with ONNX/Core ML timesteps = timesteps * torch.ones(input_tokens.shape[0], dtype=timesteps.dtype, device=timesteps.device) logits = self.decoder( encodings_and_masks=encodings_and_masks, decoder_input_tokens=input_tokens, decoder_noise_time=timesteps ) return logits @torch.no_grad() def __call__( self, input_tokens: list[list[int]], generator: torch.Generator | None = None, num_inference_steps: int = 100, return_dict: bool = True, output_type: str = "np", callback: Callable[[int, int, torch.Tensor], None] | None = None, callback_steps: int = 1, ) -> AudioPipelineOutput | tuple: if (callback_steps is None) or ( callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0) ): raise ValueError( f"`callback_steps` has to be a positive integer but is {callback_steps} of type" f" {type(callback_steps)}." ) r""" The call function to the pipeline for generation. Args: input_tokens (`list[list[int]]`): generator (`torch.Generator` or `list[torch.Generator]`, *optional*): A [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make generation deterministic. num_inference_steps (`int`, *optional*, defaults to 100): The number of denoising steps. More denoising steps usually lead to a higher quality audio at the expense of slower inference. return_dict (`bool`, *optional*, defaults to `True`): Whether or not to return a [`~pipelines.AudioPipelineOutput`] instead of a plain tuple. output_type (`str`, *optional*, defaults to `"np"`): The output format of the generated audio. callback (`Callable`, *optional*): A function that calls every `callback_steps` steps during inference. The function is called with the following arguments: `callback(step: int, timestep: int, latents: torch.Tensor)`. callback_steps (`int`, *optional*, defaults to 1): The frequency at which the `callback` function is called. If not specified, the callback is called at every step. Example: ```py >>> from diffusers import SpectrogramDiffusionPipeline, MidiProcessor >>> pipe = SpectrogramDiffusionPipeline.from_pretrained("google/music-spectrogram-diffusion") >>> pipe = pipe.to("cuda") >>> processor = MidiProcessor() >>> # Download MIDI from: wget http://www.piano-midi.de/midis/beethoven/beethoven_hammerklavier_2.mid >>> output = pipe(processor("beethoven_hammerklavier_2.mid")) >>> audio = output.audios[0] ``` Returns: [`pipelines.AudioPipelineOutput`] or `tuple`: If `return_dict` is `True`, [`pipelines.AudioPipelineOutput`] is returned, otherwise a `tuple` is returned where the first element is a list with the generated audio. """ pred_mel = np.zeros([1, TARGET_FEATURE_LENGTH, self.n_dims], dtype=np.float32) full_pred_mel = np.zeros([1, 0, self.n_dims], np.float32) ones = torch.ones((1, TARGET_FEATURE_LENGTH), dtype=bool, device=self.device) for i, encoder_input_tokens in enumerate(input_tokens): if i == 0: encoder_continuous_inputs = torch.from_numpy(pred_mel[:1].copy()).to( device=self.device, dtype=self.decoder.dtype ) # The first chunk has no previous context. encoder_continuous_mask = torch.zeros((1, TARGET_FEATURE_LENGTH), dtype=bool, device=self.device) else: # The full song pipeline does not feed in a context feature, so the mask # will be all 0s after the feature converter. Because we know we're # feeding in a full context chunk from the previous prediction, set it # to all 1s. encoder_continuous_mask = ones encoder_continuous_inputs = self.scale_features( encoder_continuous_inputs, output_range=[-1.0, 1.0], clip=True ) encodings_and_masks = self.encode( input_tokens=torch.IntTensor([encoder_input_tokens]).to(device=self.device), continuous_inputs=encoder_continuous_inputs, continuous_mask=encoder_continuous_mask, ) # Sample encoder_continuous_inputs shaped gaussian noise to begin loop x = randn_tensor( shape=encoder_continuous_inputs.shape, generator=generator, device=self.device, dtype=self.decoder.dtype, ) # set step values self.scheduler.set_timesteps(num_inference_steps) # Denoising diffusion loop for j, t in enumerate(self.progress_bar(self.scheduler.timesteps)): output = self.decode( encodings_and_masks=encodings_and_masks, input_tokens=x, noise_time=t / self.scheduler.config.num_train_timesteps, # rescale to [0, 1) ) # Compute previous output: x_t -> x_t-1 x = self.scheduler.step(output, t, x, generator=generator).prev_sample mel = self.scale_to_features(x, input_range=[-1.0, 1.0]) encoder_continuous_inputs = mel[:1] pred_mel = mel.cpu().float().numpy() full_pred_mel = np.concatenate([full_pred_mel, pred_mel[:1]], axis=1) # call the callback, if provided if callback is not None and i % callback_steps == 0: callback(i, full_pred_mel) logger.info("Generated segment", i) if output_type == "np" and not is_onnx_available(): raise ValueError( "Cannot return output in 'np' format if ONNX is not available. Make sure to have ONNX installed or set 'output_type' to 'mel'." ) elif output_type == "np" and self.melgan is None: raise ValueError( "Cannot return output in 'np' format if melgan component is not defined. Make sure to define `self.melgan` or set 'output_type' to 'mel'." ) if output_type == "np": output = self.melgan(input_features=full_pred_mel.astype(np.float32)) else: output = full_pred_mel if not return_dict: return (output,) return AudioPipelineOutput(audios=output)