| | import copy |
| | from typing import Any, Dict, List, Optional, Union |
| |
|
| | import numpy as np |
| | import torch |
| | from torchaudio.transforms import MelSpectrogram |
| | from transformers import Wav2Vec2PhonemeCTCTokenizer |
| | from transformers.feature_extraction_sequence_utils import SequenceFeatureExtractor |
| | from transformers.feature_extraction_utils import BatchFeature |
| | from transformers.processing_utils import ProcessorMixin |
| | from transformers.utils import TensorType, logging |
| |
|
| | logger = logging.get_logger(__name__) |
| | AudioType = Union[np.ndarray, List[float], List[np.ndarray], List[List[float]]] |
| |
|
| |
|
| | class Tacotron2FeatureExtractor(SequenceFeatureExtractor): |
| | model_input_names = ["mel_specgram", "mel_specgram_length", "gate_padded"] |
| |
|
| | def __init__( |
| | self, |
| | feature_size: int = 80, |
| | sampling_rate: int = 22050, |
| | n_fft: int = 1024, |
| | hop_length: int = 256, |
| | win_length: int = 1024, |
| | mel_fmin: float = 0.0, |
| | mel_fmax: float = 8000.0, |
| | padding_value: float = 0.0, |
| | **kwargs, |
| | ): |
| | super().__init__( |
| | feature_size=feature_size, |
| | sampling_rate=sampling_rate, |
| | padding_value=padding_value, |
| | **kwargs, |
| | ) |
| | self.feature_size = feature_size |
| | self.sampling_rate = sampling_rate |
| | self.n_fft = n_fft |
| | self.hop_length = hop_length |
| | self.win_length = win_length |
| | self.mel_fmin = mel_fmin |
| | self.mel_fmax = mel_fmax |
| |
|
| | def mel_specgram(self, waveform: torch.Tensor) -> torch.Tensor: |
| | if not hasattr(self, "_mel_specgram"): |
| | self._mel_specgram = MelSpectrogram( |
| | sample_rate=self.sampling_rate, |
| | n_fft=self.n_fft, |
| | win_length=self.win_length, |
| | hop_length=self.hop_length, |
| | f_min=self.mel_fmin, |
| | f_max=self.mel_fmax, |
| | n_mels=self.feature_size, |
| | mel_scale="slaney", |
| | normalized=False, |
| | power=1, |
| | norm="slaney", |
| | ) |
| | melspectrogram = self._mel_specgram(waveform) |
| | |
| | output = torch.log(torch.clamp(melspectrogram, min=1e-5)) |
| |
|
| | |
| | return output.permute(1, 0) |
| |
|
| | def __call__( |
| | self, |
| | audio: AudioType, |
| | sampling_rate: Optional[int] = None, |
| | padding: Union[bool, str] = True, |
| | return_tensors: Optional[Union[str, TensorType]] = None, |
| | return_length: bool = False, |
| | return_gate_padded: bool = False, |
| | **kwargs, |
| | ) -> BatchFeature: |
| |
|
| | if sampling_rate is not None: |
| | if sampling_rate != self.sampling_rate: |
| | raise ValueError( |
| | f"The model corresponding to this feature extractor: {self} was trained using a sampling rate of" |
| | f" {self.sampling_rate}. Please make sure that the provided `audio` input was sampled with" |
| | f" {self.sampling_rate} and not {sampling_rate}." |
| | ) |
| |
|
| | else: |
| | logger.warning( |
| | "It is strongly recommended to pass the `sampling_rate` argument to this function. " |
| | "Failing to do so can result in silent errors that might be hard to debug." |
| | ) |
| |
|
| | is_batched = bool( |
| | isinstance(audio, (list, tuple)) |
| | and ( |
| | isinstance(audio[0], np.ndarray) or isinstance(audio[0], (tuple, list)) |
| | ) |
| | ) |
| |
|
| | if is_batched: |
| | audio = [np.asarray(speech, dtype=np.float32) for speech in audio] |
| | elif not is_batched and not isinstance(audio, np.ndarray): |
| | audio = np.asarray(audio, dtype=np.float32) |
| | elif isinstance(audio, np.ndarray) and audio.dtype is np.dtype(np.float64): |
| | audio = audio.astype(np.float32) |
| |
|
| | |
| | if not is_batched: |
| | audio = [audio] |
| |
|
| | features = [ |
| | self.mel_specgram(torch.from_numpy(one_waveform)).numpy() |
| | for one_waveform in audio |
| | ] |
| |
|
| | encoded_inputs = BatchFeature({"mel_specgram": features}) |
| |
|
| | padded_inputs = self.pad( |
| | encoded_inputs, |
| | padding=padding, |
| | return_attention_mask=return_gate_padded, |
| | **kwargs, |
| | ) |
| |
|
| | if return_length: |
| | mel_specgram_length = [mel.shape[0] for mel in features] |
| | if len(mel_specgram_length) == 1 and return_tensors is None: |
| | mel_specgram_length = mel_specgram_length[0] |
| | padded_inputs["mel_specgram_length"] = mel_specgram_length |
| |
|
| | if return_gate_padded: |
| | gate_padded = 1 - padded_inputs.pop("attention_mask") |
| | gate_padded = np.roll(gate_padded, -1, axis=1) |
| | gate_padded[:, -1] = 1 |
| | gate_padded = gate_padded.astype(np.float32) |
| | padded_inputs["gate_padded"] = gate_padded |
| |
|
| | mel_specgram = padded_inputs["mel_specgram"] |
| | if isinstance(mel_specgram[0], list): |
| | padded_inputs["mel_specgram"] = [ |
| | np.asarray(feature, dtype=np.float32) for feature in mel_specgram |
| | ] |
| |
|
| | padded_inputs["mel_specgram"] = [ |
| | spec.transpose(1, 0) for spec in padded_inputs["mel_specgram"] |
| | ] |
| |
|
| | if return_tensors is not None: |
| | padded_inputs = padded_inputs.convert_to_tensors(return_tensors) |
| |
|
| | return padded_inputs |
| |
|
| | def to_dict(self) -> Dict[str, Any]: |
| | """ |
| | Serializes this instance to a Python dictionary. |
| | Returns: |
| | `Dict[str, Any]`: Dictionary of all the attributes that make up this feature extractor instance. |
| | """ |
| | output = copy.deepcopy(self.__dict__) |
| | output["feature_extractor_type"] = self.__class__.__name__ |
| | output.pop("_mel_specgram", None) |
| |
|
| | return output |
| |
|
| |
|
| | class Tacotron2Processor(ProcessorMixin): |
| | feature_extractor_class = "AutoFeatureExtractor" |
| | tokenizer_class = "Wav2Vec2PhonemeCTCTokenizer" |
| |
|
| | def __init__(self, feature_extractor, tokenizer): |
| | self.feature_extractor = feature_extractor |
| | self.tokenizer = tokenizer |
| | self.current_processor = self.feature_extractor |
| |
|
| | def __call__( |
| | self, |
| | text: Optional[str] = None, |
| | audio: Optional[AudioType] = None, |
| | return_tensors: Optional[Union[str, TensorType]] = None, |
| | return_length: bool = True, |
| | **kwargs, |
| | ) -> Any: |
| | if text is None and audio is None: |
| | raise ValueError( |
| | "You have to specify either text or audio. Both cannot be none." |
| | ) |
| |
|
| | if text is not None: |
| | encoding = self.tokenizer( |
| | text, |
| | return_tensors=return_tensors, |
| | padding=True, |
| | return_attention_mask=False, |
| | return_length=return_length, |
| | ) |
| |
|
| | if audio is not None: |
| | features = self.feature_extractor( |
| | audio, |
| | return_tensors=return_tensors, |
| | return_length=return_length, |
| | **kwargs, |
| | ) |
| |
|
| | if text is not None and audio is not None: |
| | return BatchFeature({**features, **encoding}) |
| | elif text is not None: |
| | return encoding |
| | else: |
| | return features |
| |
|
| | def batch_decode(self, *args, **kwargs): |
| | """ |
| | This method forwards all its arguments to PreTrainedTokenizer's [`~PreTrainedTokenizer.batch_decode`]. Please |
| | refer to the docstring of this method for more information. |
| | """ |
| | return self.tokenizer.batch_decode(*args, **kwargs) |
| |
|
| | def decode(self, *args, **kwargs): |
| | """ |
| | This method forwards all its arguments to PreTrainedTokenizer's [`~PreTrainedTokenizer.decode`]. Please refer |
| | to the docstring of this method for more information. |
| | """ |
| | return self.tokenizer.decode(*args, **kwargs) |
| |
|