Spaces:
Runtime error
Runtime error
| import re | |
| import os | |
| import nltk | |
| import torch | |
| import pickle | |
| import torchaudio | |
| import numpy as np | |
| import gradio as gr | |
| from google.cloud import storage | |
| from TTS.tts.models.xtts import Xtts | |
| from nltk.tokenize import sent_tokenize | |
| from huggingface_hub import hf_hub_download | |
| from TTS.tts.configs.xtts_config import XttsConfig | |
| def _download_starting_files() -> None: | |
| """ | |
| Downloads the embeddings from a bucket | |
| """ | |
| os.makedirs('assets', exist_ok=True) | |
| # Download credentials file | |
| hf_hub_download( | |
| repo_id=os.environ.get('DATA'), repo_type='dataset', filename="credentials.json", | |
| token=os.environ.get('HUB_TOKEN'), local_dir="assets" | |
| ) | |
| # Initialise a client | |
| credentials = os.getenv('GOOGLE_APPLICATION_CREDENTIALS') | |
| storage_client = storage.Client.from_service_account_json(credentials) | |
| bucket = storage_client.get_bucket('embeddings-bella') | |
| # Get both embeddings | |
| blob = bucket.blob("gpt_cond_latent.npy") | |
| blob.download_to_filename('assets/gpt_cond_latent.npy') | |
| blob = bucket.blob("speaker_embedding.npy") | |
| blob.download_to_filename('assets/speaker_embedding.npy') | |
| def _load_array(filename): | |
| """ | |
| Opens a file a returns it, used with numpy files | |
| """ | |
| with open(filename, 'rb') as f: | |
| return pickle.load(f) | |
| # Get embeddings | |
| _download_starting_files() | |
| os.environ['COQUI_TOS_AGREED'] = '1' | |
| # Used to generate audio based on a sample | |
| nltk.download('punkt') | |
| model_path = os.path.join("tts_model") | |
| config = XttsConfig() | |
| config.load_json(os.path.join(model_path, "config.json")) | |
| model = Xtts.init_from_config(config) | |
| model.load_checkpoint( | |
| config, | |
| checkpoint_path=os.path.join(model_path, "model.pth"), | |
| vocab_path=os.path.join(model_path, "vocab.json"), | |
| eval=True, | |
| use_deepspeed=True, | |
| ) | |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| model.to(device) | |
| # Speaker latent | |
| path_latents = 'assets/gpt_cond_latent.npy' | |
| gpt_cond_latent = _load_array(path_latents) | |
| # Speaker embedding | |
| path_embedding = 'assets/speaker_embedding.npy' | |
| speaker_embedding = _load_array(path_embedding) | |
| def get_audio(text: str, language: str = 'es') -> gr.Audio: | |
| """ | |
| Returns a link from a bucket in GCP that contains the generated audio given a text and language and the | |
| name of such audio | |
| :param text: used to generate the audio | |
| :param language: 'es', 'en' or 'pt' | |
| :return link_audio and name_audio | |
| """ | |
| # Creates an audio with the answer and saves it as output.wav | |
| _save_audio(text, language) | |
| return gr.Audio(value='output.wav', interactive=False, visible=True) | |
| def _save_audio(answer: str, language: str) -> None: | |
| """ | |
| Splits the answer into sentences, clean and creates an audio for each one, then concatenates | |
| all the audios and saves them into a file (output.wav) | |
| """ | |
| # Split the answer into sentences and clean it | |
| sentences = _get_clean_answer(answer, language) | |
| # Get the voice of each sentence | |
| audio_segments = [] | |
| for sentence in sentences: | |
| audio_stream = _get_voice(sentence, language) | |
| audio_stream = torch.tensor(audio_stream) | |
| audio_segments.append(audio_stream) | |
| # Concatenate and save all audio segments | |
| concatenated_audio = torch.cat(audio_segments, dim=0) | |
| torchaudio.save('output.wav', concatenated_audio.unsqueeze(0), 24000) | |
| def _get_voice(sentence: str, language: str) -> np.ndarray: | |
| """ | |
| Returns a numpy array with a wav of an audio with the given sentence and language | |
| """ | |
| out = model.inference( | |
| sentence, | |
| language=language, | |
| gpt_cond_latent=gpt_cond_latent, | |
| speaker_embedding=speaker_embedding, | |
| temperature=0.1 | |
| ) | |
| return out['wav'] | |
| def _get_clean_answer(answer: str, language: str) -> list[str]: | |
| """ | |
| Returns a list of sentences of the answer. It also removes links | |
| """ | |
| # Remove the links in the audio and add another sentence | |
| if language == 'en': | |
| clean_answer = re.sub(r'http[s]?://\S+', 'the following link', answer) | |
| max_characters = 250 | |
| elif language == 'es': | |
| clean_answer = re.sub(r'http[s]?://\S+', 'el siguiente link', answer) | |
| max_characters = 239 | |
| else: | |
| clean_answer = re.sub(r'http[s]?://\S+', 'o seguinte link', answer) | |
| max_characters = 203 | |
| # Change the name from Bella to Bela | |
| clean_answer = clean_answer.replace('Bella', 'Bela') | |
| # Remove Florida and zipcode | |
| clean_answer = re.sub(r', FL \d+', "", clean_answer) | |
| # Split the answer into sentences with nltk and make sure they are shorter than the maximum possible | |
| # characters | |
| split_sentences = sent_tokenize(clean_answer) | |
| sentences = [] | |
| for sentence in split_sentences: | |
| if len(sentence) > max_characters: | |
| sentences.extend(split_sentence(sentence, max_characters)) | |
| else: | |
| sentences.append(sentence) | |
| return sentences | |
| def split_sentence(sentence: str, max_characters: int) -> list[str]: | |
| """ | |
| Returns a split sentences. The split point is the nearest comma to the middle | |
| of the sentence, if there is no comma then a space is used or just the middle. If the | |
| remaining sentences are still too long, another iteration is run | |
| """ | |
| # Get index of each comma | |
| sentences = [] | |
| commas = [i for i, c in enumerate(sentence) if c == ','] | |
| # No commas, search for spaces | |
| if len(commas) == 0: | |
| commas = [i for i, c in enumerate(sentence) if c == ' '] | |
| # No commas or spaces, split it in the middle | |
| if len(commas) == 0: | |
| sentences.append(sentence[:len(sentence) // 2]) | |
| sentences.append(sentence[len(sentence) // 2:]) | |
| return sentences | |
| # Nearest index to the middle | |
| split_point = min(commas, key=lambda x: abs(x - (len(sentence) // 2))) | |
| if sentence[split_point] == ',': | |
| left = sentence[:split_point] | |
| right = sentence[split_point + 2:] | |
| else: | |
| left = sentence[:split_point] | |
| right = sentence[split_point + 1:] | |
| if len(left) > max_characters: | |
| sentences.extend(split_sentence(left, max_characters)) | |
| else: | |
| sentences.append(left) | |
| if len(right) > max_characters: | |
| sentences.extend(split_sentence(right, max_characters)) | |
| else: | |
| sentences.append(right) | |
| return sentences | |