| import re |
| from typing import List, Optional, Union, Tuple |
| from math import ceil |
|
|
| import numpy as np |
| import torch |
| import scipy |
| from torch.nn.utils.rnn import pad_sequence |
|
|
| from enum import Enum |
|
|
| from transformers import AutoFeatureExtractor |
| from transformers.feature_extraction_utils import BatchFeature |
| from transformers.feature_extraction_sequence_utils import SequenceFeatureExtractor |
| from transformers.image_utils import ImageInput, make_nested_list_of_images |
| from transformers.processing_utils import ImagesKwargs, ProcessingKwargs, ProcessorMixin, Unpack, AudioKwargs |
| from transformers.tokenization_utils_base import PreTokenizedInput, TextInput |
| from transformers.utils import to_py_obj, TensorType |
| from transformers.audio_utils import AudioInput |
|
|
|
|
|
|
|
|
| def speechlib_mel(sample_rate, n_fft, n_mels, fmin=None, fmax=None): |
| """Create a Mel filter-bank the same as SpeechLib FbankFC. |
| |
| Args: |
| sample_rate (int): Sample rate in Hz. number > 0 [scalar] |
| n_fft (int): FFT size. int > 0 [scalar] |
| n_mel (int): Mel filter size. int > 0 [scalar] |
| fmin (float): lowest frequency (in Hz). If None use 0.0. |
| float >= 0 [scalar] |
| fmax: highest frequency (in Hz). If None use sample_rate / 2. |
| float >= 0 [scalar] |
| |
| Returns |
| out (numpy.ndarray): Mel transform matrix |
| [shape=(n_mels, 1 + n_fft/2)] |
| """ |
|
|
| bank_width = int(n_fft // 2 + 1) |
| if fmax is None: |
| fmax = sample_rate / 2 |
| if fmin is None: |
| fmin = 0 |
| assert fmin >= 0, "fmin cannot be negtive" |
| assert fmin < fmax <= sample_rate / 2, "fmax must be between (fmin, samplerate / 2]" |
|
|
| def mel(f): |
| return 1127.0 * np.log(1.0 + f / 700.0) |
|
|
| def bin2mel(fft_bin): |
| return 1127.0 * np.log(1.0 + fft_bin * sample_rate / (n_fft * 700.0)) |
|
|
| def f2bin(f): |
| return int((f * n_fft / sample_rate) + 0.5) |
|
|
| |
| klo = f2bin(fmin) + 1 |
| khi = f2bin(fmax) |
|
|
| khi = max(khi, klo) |
|
|
| |
| mlo = mel(fmin) |
| mhi = mel(fmax) |
| m_centers = np.linspace(mlo, mhi, n_mels + 2) |
| ms = (mhi - mlo) / (n_mels + 1) |
|
|
| matrix = np.zeros((n_mels, bank_width), dtype=np.float32) |
| for m in range(0, n_mels): |
| left = m_centers[m] |
| center = m_centers[m + 1] |
| right = m_centers[m + 2] |
| for fft_bin in range(klo, khi): |
| mbin = bin2mel(fft_bin) |
| if left < mbin < right: |
| matrix[m, fft_bin] = 1.0 - abs(center - mbin) / ms |
|
|
| return matrix |
|
|
|
|
| class NemotronAudioFeatureExtractor(SequenceFeatureExtractor): |
| model_input_names = ["input_audio_embeds", "audio_embed_sizes", "audio_attention_mask"] |
|
|
| def __init__(self, audio_compression_rate=8, |
| audio_downsample_rate=1, |
| audio_feat_stride=1, |
| feature_size = 80, |
| sampling_rate = 16000, |
| padding_value = 0.0, |
| **kwargs): |
| |
| super().__init__(feature_size=feature_size, |
| sampling_rate=sampling_rate, |
| padding_value=padding_value, **kwargs) |
|
|
| self.compression_rate = audio_compression_rate |
| self.qformer_compression_rate = audio_downsample_rate |
| self.feat_stride = audio_feat_stride |
|
|
| self._eightk_method = "fillzero" |
| self._mel = speechlib_mel(self.sampling_rate, 512, self.feature_size, fmin=None, fmax=self.sampling_rate//2-self.feature_size-230).T |
|
|
| self._hamming400 = np.hamming(400) |
| self._hamming200 = np.hamming(200) |
|
|
| def duration_to_frames(self, duration): |
| """duration in s, estimated frames""" |
| frame_rate = 10 |
|
|
| num_frames = duration * 1000 // frame_rate |
| return num_frames |
|
|
| def __call__( |
| self, |
| audios: List[AudioInput], |
| sampling_rate = 16000, |
| return_attention_mask=True, |
| padding="max_length", |
| return_tensors: Optional[Union[str, TensorType]] = None, |
| ): |
| |
| returned_input_audio_embeds = [] |
| returned_audio_embed_sizes = [] |
| audio_frames_list = [] |
|
|
| for audio_data in audios: |
| audio_embeds = self._extract_features(audio_data, sampling_rate) |
| audio_frames = len(audio_embeds) * self.feat_stride |
| audio_embed_size = self._compute_audio_embed_size(audio_frames) |
|
|
| returned_input_audio_embeds.append(torch.tensor(audio_embeds)) |
| returned_audio_embed_sizes.append(torch.tensor(audio_embed_size).long()) |
| audio_frames_list.append(audio_frames) |
|
|
| returned_input_audio_embeds = pad_sequence( |
| returned_input_audio_embeds, batch_first=True |
| ) |
| returned_audio_embed_sizes = torch.stack(returned_audio_embed_sizes, dim=0) |
| audio_frames = torch.tensor(audio_frames_list) |
| returned_audio_attention_mask = torch.arange(0, audio_frames.max()).unsqueeze(0) < audio_frames.unsqueeze(1) if len(audios) > 1 else None |
|
|
| data = { |
| "input_audio_embeds": returned_input_audio_embeds, |
| "audio_embed_sizes": returned_audio_embed_sizes, |
| } |
| if returned_audio_attention_mask is not None and return_attention_mask: |
| data["audio_attention_mask"] = returned_audio_attention_mask |
|
|
| return BatchFeature(data=data, tensor_type=return_tensors) |
|
|
| def _extract_spectrogram(self, wav, fs): |
| """Extract spectrogram features from waveform. |
| Args: |
| wav (1D array): waveform of the input |
| fs (int): sampling rate of the waveform, 16000 or 8000. |
| If fs=8000, the waveform will be resampled to 16000Hz. |
| Output: |
| log_fbank (2D array): a TxD matrix of log Mel filterbank features. |
| D=80, and T is the number of frames. |
| """ |
| if wav.ndim > 1: |
| wav = np.squeeze(wav) |
|
|
| |
| if len(wav.shape) == 2: |
| wav = wav.mean(1) |
|
|
| |
| if fs > 16000: |
| wav = scipy.signal.resample_poly(wav, 1, fs // 16000) |
| fs = 16000 |
| elif 8000 < fs < 16000: |
| wav = scipy.signal.resample_poly(wav, 1, fs // 8000) |
| fs = 8000 |
| elif fs < 8000: |
| raise RuntimeError(f"Unsupported sample rate {fs}") |
|
|
| if fs == 8000: |
| if self._eightk_method == "resample": |
| |
| |
| wav = scipy.signal.resample_poly(wav, 2, 1) |
| fs = 16000 |
| |
| elif fs != 16000: |
| |
| raise RuntimeError(f"Input data using an unsupported sample rate: {fs}") |
|
|
| preemphasis = 0.97 |
|
|
| if fs == 8000: |
| n_fft = 256 |
| win_length = 200 |
| hop_length = 80 |
| fft_window = self._hamming200 |
| elif fs == 16000: |
| n_fft = 512 |
| win_length = 400 |
| hop_length = 160 |
| fft_window = self._hamming400 |
|
|
| |
| n_batch = (wav.shape[0] - win_length) // hop_length + 1 |
| |
| |
| |
| |
| y_frames = np.array( |
| [wav[_stride : _stride + win_length] for _stride in range(0, hop_length * n_batch, hop_length)], |
| dtype=np.float32, |
| ) |
|
|
| |
| y_frames_prev = np.roll(y_frames, 1, axis=1) |
| y_frames_prev[:, 0] = y_frames_prev[:, 1] |
| y_frames = (y_frames - preemphasis * y_frames_prev) * 32768 |
|
|
| S = np.fft.rfft(fft_window * y_frames, n=n_fft, axis=1).astype(np.complex64) |
|
|
| if fs == 8000: |
| |
| |
| frames, bins = S.shape |
| padarray = np.zeros((frames, bins)) |
| S = np.concatenate((S[:, 0:-1], padarray), axis=1) |
|
|
| spec = np.abs(S).astype(np.float32) |
| return spec |
|
|
| def _extract_features(self, wav, fs): |
| """Extract log filterbank features from waveform. |
| Args: |
| wav (1D array): waveform of the input |
| fs (int): sampling rate of the waveform, 16000 or 8000. |
| If fs=8000, the waveform will be resampled to 16000Hz. |
| Output: |
| log_fbank (2D array): a TxD matrix of log Mel filterbank features. |
| D=80, and T is the number of frames. |
| """ |
| spec = self._extract_spectrogram(wav, fs) |
| spec_power = spec**2 |
|
|
| fbank_power = np.clip(spec_power.dot(self._mel), 1.0, None) |
| log_fbank = np.log(fbank_power).astype(np.float32) |
|
|
| return log_fbank |
|
|
| def _compute_audio_embed_size(self, audio_frames): |
| integer = audio_frames // self.compression_rate |
| remainder = audio_frames % self.compression_rate |
|
|
| result = integer if remainder == 0 else integer + 1 |
|
|
| integer = result // self.qformer_compression_rate |
| remainder = result % self.qformer_compression_rate |
| result = integer if remainder == 0 else integer + 1 |
|
|
| return result |
|
|
| class NemotronOmniProcessor(ProcessorMixin): |
| attributes = ["image_processor", "feature_extractor", "tokenizer"] |
| valid_kwargs = ["chat_template", "image_seq_length"] |
| image_processor_class = "AutoImageProcessor" |
| feature_extractor_class = "NemotronAudioFeatureExtractor" |
| tokenizer_class = "AutoTokenizer" |
|
|
| def __init__( |
| self, |
| image_processor, |
| feature_extractor, |
| tokenizer, |
| chat_template=None, |
| image_seq_length: int = 256, |
| **kwargs, |
| ): |
| self.image_seq_length = image_seq_length |
| self.image_token_id = -999 |
| self.boi_token = '' |
| self.image_token = '' |
| self.eoi_token='' |
| image_tokens_expanded = "".join([self.image_token] * image_seq_length) |
| self.full_image_sequence = f"\n\n{self.boi_token}{image_tokens_expanded}{self.eoi_token}\n\n" |
| |
| self.audio_token_id = 128255 |
| self.boa_token = "<start_of_audio>" |
| self.eoa_token = "<end_of_audio>" |
| self.audio_token = "<audio_soft_token>" |
| |
| super().__init__( |
| image_processor=image_processor, |
| feature_extractor=feature_extractor, |
| tokenizer=tokenizer, |
| chat_template=chat_template, |
| **kwargs, |
| ) |
|
|
| def __call__( |
| self, |
| images: ImageInput = None, |
| text: Union[TextInput, PreTokenizedInput, List[TextInput], List[PreTokenizedInput]] = None, |
| videos=None, |
| audio: List[AudioInput] = None, |
| return_tensors: Optional[Union[str, TensorType]] = None, |
| ) -> BatchFeature: |
| if text is None and images is None: |
| raise ValueError("Provide at least one of `text` or `audio`.") |
|
|
|
|
| if isinstance(text, str): |
| text = [text] |
| elif not isinstance(text, list) and not isinstance(text[0], str): |
| raise ValueError("Invalid input text. Please provide a string, or a list of strings") |
|
|
| |
| audio_inputs = {} |
| if audio is not None: |
| full_audio_sequences = [] |
| audio_inputs = self.feature_extractor(audio) |
| for i, embed_size in enumerate(audio_inputs.audio_embed_sizes): |
| audio_tokens_expanded = "".join([self.audio_token] * embed_size) |
| full_audio_sequence = f"\n\n{self.boa_token}{audio_tokens_expanded}{self.eoa_token}\n\n" |
| full_audio_sequences.append(full_audio_sequence) |
| |
| text = [prompt.replace(self.boa_token, audio_sequences) for (prompt, audio_sequences) in zip(text, full_audio_sequences)] |
|
|
| text_inputs = self.tokenizer(text=text, return_tensors="np") |
|
|
| |
| array_ids = text_inputs["input_ids"] |
| mm_token_type_ids = np.zeros_like(text_inputs["input_ids"]) |
| mm_token_type_ids[array_ids == self.image_token_id] = 1 |
| mm_token_type_ids[array_ids == self.audio_token_id] = 2 |
|
|
| has_vision_ids = np.any(mm_token_type_ids == 1, axis=1) |
| has_audio_ids = np.any(mm_token_type_ids == 2, axis=1) |
|
|
| input_modes = (has_audio_ids << 1) | has_vision_ids |
|
|
| text_inputs = {k: v.tolist() for k, v in text_inputs.items()} |
| text_inputs["token_type_ids"] = mm_token_type_ids.tolist() |
| text_inputs["input_modes"] = input_modes.tolist() |
| |
| return BatchFeature(data={**text_inputs, **audio_inputs}, tensor_type=return_tensors) |
|
|
| |
| def batch_decode(self, *args, **kwargs): |
| """ |
| This method forwards all its arguments to GemmaTokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please |
| refer to the docstring of this method for more information. |
| """ |
| return self.tokenizer.batch_decode(*args, **kwargs) |
|
|
| |
| def decode(self, *args, **kwargs): |
| """ |
| This method forwards all its arguments to GemmaTokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to |
| the docstring of this method for more information. |
| """ |
| return self.tokenizer.decode(*args, **kwargs) |
|
|
| @property |
| def model_input_names(self): |
| tokenizer_input_names = self.tokenizer.model_input_names + ["token_type_ids"] |
| image_processor_input_names = self.image_processor.model_input_names |
| return list(dict.fromkeys(tokenizer_input_names + image_processor_input_names)) |
|
|
| AutoFeatureExtractor.register("NemotronAudioFeatureExtractor", NemotronAudioFeatureExtractor) |