| | """ |
| | Tera.VO - Audio Processing Module |
| | Handles mel spectrogram computation, audio normalization, and vocoder synthesis. |
| | """ |
| |
|
| | import numpy as np |
| | import tensorflow as tf |
| | from scipy.io import wavfile |
| | from scipy.signal import lfilter |
| | import librosa |
| | import soundfile as sf |
| | import io |
| |
|
| |
|
| | class AudioProcessor: |
| | """Handles all audio processing for Tera.VO TTS system.""" |
| |
|
| | def __init__( |
| | self, |
| | sample_rate=22050, |
| | n_fft=1024, |
| | hop_length=256, |
| | win_length=1024, |
| | n_mels=80, |
| | mel_fmin=0, |
| | mel_fmax=8000, |
| | preemphasis=0.97, |
| | ref_level_db=20, |
| | min_level_db=-100, |
| | max_abs_value=4.0, |
| | griffin_lim_iters=60, |
| | ): |
| | self.sample_rate = sample_rate |
| | self.n_fft = n_fft |
| | self.hop_length = hop_length |
| | self.win_length = win_length |
| | self.n_mels = n_mels |
| | self.mel_fmin = mel_fmin |
| | self.mel_fmax = mel_fmax |
| | self.preemphasis = preemphasis |
| | self.ref_level_db = ref_level_db |
| | self.min_level_db = min_level_db |
| | self.max_abs_value = max_abs_value |
| | self.griffin_lim_iters = griffin_lim_iters |
| |
|
| | |
| | self.mel_basis = librosa.filters.mel( |
| | sr=self.sample_rate, |
| | n_fft=self.n_fft, |
| | n_mels=self.n_mels, |
| | fmin=self.mel_fmin, |
| | fmax=self.mel_fmax, |
| | ) |
| |
|
| | def load_wav(self, path): |
| | """Load and normalize a wav file.""" |
| | wav, sr = librosa.load(path, sr=self.sample_rate) |
| | wav = wav / np.max(np.abs(wav)) * 0.95 |
| | return wav |
| |
|
| | def save_wav(self, wav, path): |
| | """Save waveform to file.""" |
| | wav = wav / np.max(np.abs(wav)) * 0.95 |
| | sf.write(path, wav, self.sample_rate) |
| |
|
| | def preemphasize(self, wav): |
| | """Apply pre-emphasis filter.""" |
| | return lfilter([1, -self.preemphasis], [1], wav) |
| |
|
| | def inv_preemphasize(self, wav): |
| | """Inverse pre-emphasis filter.""" |
| | return lfilter([1], [1, -self.preemphasis], wav) |
| |
|
| | def melspectrogram(self, wav): |
| | """Compute mel spectrogram.""" |
| | wav = self.preemphasize(wav) |
| | stft = librosa.stft( |
| | y=wav, |
| | n_fft=self.n_fft, |
| | hop_length=self.hop_length, |
| | win_length=self.win_length, |
| | ) |
| | magnitudes = np.abs(stft) |
| | mel = np.dot(self.mel_basis, magnitudes) |
| | mel = self._amp_to_db(mel) |
| | mel = self._normalize(mel) |
| | return mel.T |
| |
|
| | def inv_melspectrogram(self, mel): |
| | """Convert mel spectrogram back to waveform using Griffin-Lim.""" |
| | mel = mel.T |
| | mel = self._denormalize(mel) |
| | mel = self._db_to_amp(mel) |
| |
|
| | |
| | inv_mel_basis = np.linalg.pinv(self.mel_basis) |
| | magnitudes = np.maximum(1e-10, np.dot(inv_mel_basis, mel)) |
| |
|
| | |
| | wav = self._griffin_lim(magnitudes) |
| | wav = self.inv_preemphasize(wav) |
| | wav = wav / np.max(np.abs(wav)) * 0.95 |
| | return wav |
| |
|
| | def _griffin_lim(self, magnitudes): |
| | """Griffin-Lim algorithm for phase reconstruction.""" |
| | angles = np.exp(2j * np.pi * np.random.rand(*magnitudes.shape)) |
| | complex_spec = magnitudes * angles |
| |
|
| | for i in range(self.griffin_lim_iters): |
| | wav = librosa.istft( |
| | complex_spec, |
| | hop_length=self.hop_length, |
| | win_length=self.win_length, |
| | ) |
| | if i < self.griffin_lim_iters - 1: |
| | stft = librosa.stft( |
| | wav, |
| | n_fft=self.n_fft, |
| | hop_length=self.hop_length, |
| | win_length=self.win_length, |
| | ) |
| | angles = np.exp(1j * np.angle(stft)) |
| | complex_spec = magnitudes * angles |
| |
|
| | return wav |
| |
|
| | def _amp_to_db(self, x): |
| | """Convert amplitude to decibels.""" |
| | return 20 * np.log10(np.maximum(1e-5, x)) - self.ref_level_db |
| |
|
| | def _db_to_amp(self, x): |
| | """Convert decibels to amplitude.""" |
| | return np.power(10.0, (x + self.ref_level_db) * 0.05) |
| |
|
| | def _normalize(self, S): |
| | """Normalize spectrogram values.""" |
| | return np.clip( |
| | (2 * self.max_abs_value) * ((S - self.min_level_db) / (-self.min_level_db)) |
| | - self.max_abs_value, |
| | -self.max_abs_value, |
| | self.max_abs_value, |
| | ) |
| |
|
| | def _denormalize(self, D): |
| | """Denormalize spectrogram values.""" |
| | return ( |
| | ((np.clip(D, -self.max_abs_value, self.max_abs_value) + self.max_abs_value) |
| | * (-self.min_level_db) / (2 * self.max_abs_value)) |
| | + self.min_level_db |
| | ) |
| |
|
| | def wav_to_bytes(self, wav): |
| | """Convert waveform array to bytes for Gradio output.""" |
| | buffer = io.BytesIO() |
| | sf.write(buffer, wav, self.sample_rate, format='WAV') |
| | buffer.seek(0) |
| | return buffer.read() |
| |
|
| |
|
| | class TextProcessor: |
| | """Handles text normalization and phoneme conversion.""" |
| |
|
| | def __init__(self, max_text_length=200): |
| | self.max_text_length = max_text_length |
| | self._build_char_map() |
| |
|
| | def _build_char_map(self): |
| | """Build character to index mapping.""" |
| | |
| | self.pad = '_' |
| | self.eos = '~' |
| | self.characters = ( |
| | self.pad + self.eos |
| | + 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' |
| | + "!'(),.:;? " |
| | + '-' |
| | + '0123456789' |
| | ) |
| | self.char_to_id = {c: i for i, c in enumerate(self.characters)} |
| | self.id_to_char = {i: c for i, c in enumerate(self.characters)} |
| | self.vocab_size = len(self.characters) |
| |
|
| | def text_to_sequence(self, text): |
| | """Convert text string to sequence of character IDs.""" |
| | text = self._clean_text(text) |
| | sequence = [] |
| | for char in text: |
| | if char in self.char_to_id: |
| | sequence.append(self.char_to_id[char]) |
| | sequence.append(self.char_to_id[self.eos]) |
| | return sequence |
| |
|
| | def sequence_to_text(self, sequence): |
| | """Convert sequence of character IDs back to text.""" |
| | return ''.join(self.id_to_char.get(id, '') for id in sequence) |
| |
|
| | def _clean_text(self, text): |
| | """Clean and normalize input text.""" |
| | try: |
| | from unidecode import unidecode |
| | text = unidecode(text) |
| | except ImportError: |
| | pass |
| |
|
| | |
| | text = self._expand_numbers(text) |
| |
|
| | |
| | text = self._expand_abbreviations(text) |
| |
|
| | return text |
| |
|
| | def _expand_numbers(self, text): |
| | """Expand numbers to words.""" |
| | try: |
| | import inflect |
| | p = inflect.engine() |
| | words = text.split() |
| | result = [] |
| | for word in words: |
| | stripped = word.strip('.,!?;:') |
| | if stripped.isdigit(): |
| | expanded = p.number_to_words(int(stripped)) |
| | word = word.replace(stripped, expanded) |
| | result.append(word) |
| | return ' '.join(result) |
| | except (ImportError, Exception): |
| | return text |
| |
|
| | def _expand_abbreviations(self, text): |
| | """Expand common abbreviations.""" |
| | abbreviations = { |
| | 'Mr.': 'Mister', |
| | 'Mrs.': 'Misses', |
| | 'Dr.': 'Doctor', |
| | 'Prof.': 'Professor', |
| | 'Jr.': 'Junior', |
| | 'Sr.': 'Senior', |
| | 'St.': 'Saint', |
| | 'etc.': 'et cetera', |
| | 'vs.': 'versus', |
| | 'i.e.': 'that is', |
| | 'e.g.': 'for example', |
| | } |
| | for abbr, full in abbreviations.items(): |
| | text = text.replace(abbr, full) |
| | return text |
| |
|
| | def pad_sequence(self, sequence, max_len=None): |
| | """Pad sequence to max length.""" |
| | if max_len is None: |
| | max_len = self.max_text_length |
| | if len(sequence) >= max_len: |
| | return sequence[:max_len] |
| | return sequence + [0] * (max_len - len(sequence)) |