| import numpy as np |
| from typing import List, Optional, Union |
| from transformers.feature_extraction_sequence_utils import SequenceFeatureExtractor |
| from transformers.feature_extraction_utils import BatchFeature |
| from transformers.utils import PaddingStrategy, TensorType, logging |
|
|
| logger = logging.get_logger(__name__) |
|
|
|
|
| class XvectorFeatureExtractor(SequenceFeatureExtractor): |
|
|
| model_input_names = ["input_values", "attention_mask"] |
|
|
| def __init__( |
| self, |
| feature_size=1, |
| sampling_rate=16000, |
| padding_value=0.0, |
| return_attention_mask=False, |
| do_normalize=True, |
| **kwargs, |
| ): |
| super().__init__(feature_size=feature_size, sampling_rate=sampling_rate, padding_value=padding_value, **kwargs) |
| self.return_attention_mask = return_attention_mask |
| self.do_normalize = do_normalize |
|
|
| @staticmethod |
| def zero_mean_unit_var_norm( |
| input_values: List[np.ndarray], attention_mask: List[np.ndarray], padding_value: float = 0.0 |
| ) -> List[np.ndarray]: |
| """ |
| Every array in the list is normalized to have zero mean and unit variance |
| """ |
| if attention_mask is not None: |
| attention_mask = np.array(attention_mask, np.int32) |
| normed_input_values = [] |
|
|
| for vector, length in zip(input_values, attention_mask.sum(-1)): |
| normed_slice = (vector - vector[:length].mean()) / np.sqrt(vector[:length].var() + 1e-7) |
| if length < normed_slice.shape[0]: |
| normed_slice[length:] = padding_value |
|
|
| normed_input_values.append(normed_slice) |
| else: |
| normed_input_values = [(x - x.mean()) / np.sqrt(x.var() + 1e-7) for x in input_values] |
|
|
| return normed_input_values |
|
|
| def __call__( |
| self, |
| raw_speech: Union[np.ndarray, List[float], List[np.ndarray], List[List[float]]], |
| padding: Union[bool, str, PaddingStrategy] = False, |
| max_length: Optional[int] = None, |
| truncation: bool = False, |
| pad_to_multiple_of: Optional[int] = None, |
| return_attention_mask: Optional[bool] = None, |
| return_tensors: Optional[Union[str, TensorType]] = None, |
| sampling_rate: Optional[int] = None, |
| **kwargs, |
| ) -> BatchFeature: |
| if sampling_rate is not None: |
| if sampling_rate != self.sampling_rate: |
| raise ValueError( |
| f"The model corresponding to this feature extractor: {self} was trained using a sampling rate of" |
| f" {self.sampling_rate}. Please make sure that the provided `raw_speech` input was sampled with" |
| f" {self.sampling_rate} and not {sampling_rate}." |
| ) |
| else: |
| logger.warning( |
| "It is strongly recommended to pass the ``sampling_rate`` argument to this function. " |
| "Failing to do so can result in silent errors that might be hard to debug." |
| ) |
|
|
| is_batched_numpy = isinstance(raw_speech, np.ndarray) and len(raw_speech.shape) > 1 |
| if is_batched_numpy and len(raw_speech.shape) > 2: |
| raise ValueError(f"Only mono-channel audio is supported for input to {self}") |
| is_batched = is_batched_numpy or ( |
| isinstance(raw_speech, (list, tuple)) and (isinstance(raw_speech[0], (np.ndarray, tuple, list))) |
| ) |
|
|
| |
| if not is_batched: |
| raw_speech = [raw_speech] |
|
|
| |
| encoded_inputs = BatchFeature({"input_values": raw_speech}) |
|
|
| padded_inputs = self.pad( |
| encoded_inputs, |
| padding=padding, |
| max_length=max_length, |
| truncation=truncation, |
| pad_to_multiple_of=pad_to_multiple_of, |
| return_attention_mask=return_attention_mask, |
| ) |
|
|
| |
| input_values = padded_inputs["input_values"] |
| if not isinstance(input_values[0], np.ndarray): |
| padded_inputs["input_values"] = [np.asarray(array, dtype=np.float32) for array in input_values] |
| elif ( |
| not isinstance(input_values, np.ndarray) |
| and isinstance(input_values[0], np.ndarray) |
| and input_values[0].dtype is np.dtype(np.float64) |
| ): |
| padded_inputs["input_values"] = [array.astype(np.float32) for array in input_values] |
| elif isinstance(input_values, np.ndarray) and input_values.dtype is np.dtype(np.float64): |
| padded_inputs["input_values"] = input_values.astype(np.float32) |
|
|
| |
| attention_mask = padded_inputs.get("attention_mask") |
| if attention_mask is not None: |
| padded_inputs["attention_mask"] = [np.asarray(array, dtype=np.int32) for array in attention_mask] |
|
|
| |
| if self.do_normalize: |
| attention_mask = ( |
| attention_mask |
| if self._get_padding_strategies(padding, max_length=max_length) is not PaddingStrategy.DO_NOT_PAD |
| else None |
| ) |
| padded_inputs["input_values"] = self.zero_mean_unit_var_norm( |
| padded_inputs["input_values"], attention_mask=attention_mask, padding_value=self.padding_value |
| ) |
|
|
| if return_tensors is not None: |
| padded_inputs = padded_inputs.convert_to_tensors(return_tensors) |
|
|
| return padded_inputs |