| | import sys |
| | import io, os, stat |
| | import subprocess |
| | import random |
| | from zipfile import ZipFile |
| | import uuid |
| | import time |
| | import torch |
| | import torchaudio |
| |
|
| | |
| | os.system('python -m unidic download') |
| |
|
| | |
| | os.environ["COQUI_TOS_AGREED"] = "1" |
| |
|
| | |
| | |
| | import langid |
| | import base64 |
| | import csv |
| | from io import StringIO |
| | import datetime |
| | import re |
| |
|
| | import gradio as gr |
| | from scipy.io.wavfile import write |
| | from pydub import AudioSegment |
| |
|
| | from TTS.api import TTS |
| | from TTS.tts.configs.xtts_config import XttsConfig |
| | from TTS.tts.models.xtts import Xtts |
| |
|
| | HF_TOKEN = os.environ.get("HF_TOKEN") |
| |
|
| | from huggingface_hub import HfApi, hf_hub_download |
| |
|
| | |
| | api = HfApi(token=HF_TOKEN) |
| | repo_id = "tartuNLP/XTTS-v2-multi" |
| |
|
| | |
| | print("Export newer ffmpeg binary for denoise filter") |
| |
|
| | ZipFile("ffmpeg.zip").extractall() |
| | print("Make ffmpeg binary executable") |
| | st = os.stat("ffmpeg") |
| | os.chmod("ffmpeg", st.st_mode | stat.S_IEXEC) |
| |
|
| | |
| | print("Downloading Coqui XTTS V2") |
| |
|
| | model_folder = "model/" |
| | config_file = "config.json" |
| | if not os.path.exists(model_folder + config_file): |
| | hf_hub_download( |
| | repo_id=repo_id, |
| | filename=config_file, |
| | local_dir=model_folder, |
| | ) |
| | model_path = "model.pth" |
| | if not os.path.exists(model_folder + model_path): |
| | hf_hub_download( |
| | repo_id=repo_id, |
| | filename=model_path, |
| | local_dir=model_folder, |
| | ) |
| | vocab_path = "vocab.json" |
| | if not os.path.exists(model_folder + vocab_path): |
| | hf_hub_download( |
| | repo_id=repo_id, |
| | filename=vocab_path, |
| | local_dir=model_folder, |
| | ) |
| | print("XTTS downloaded") |
| |
|
| | config = XttsConfig() |
| | config.load_json(os.path.join(model_folder, config_file)) |
| |
|
| | model = Xtts.init_from_config(config) |
| | model.load_checkpoint( |
| | config, |
| | checkpoint_path=os.path.join(model_folder, model_path), |
| | vocab_path=os.path.join(model_folder, vocab_path), |
| | eval=True, |
| | use_deepspeed=False, |
| | ) |
| |
|
| | |
| | DEVICE_ASSERT_DETECTED = 0 |
| | DEVICE_ASSERT_PROMPT = None |
| | DEVICE_ASSERT_LANG = None |
| |
|
| | supported_languages = config.languages |
| |
|
| | def predict( |
| | prompt, |
| | language, |
| | speaker_wav, |
| | voice_cleanup, |
| | agree, |
| | ): |
| | if agree == True: |
| | if language not in supported_languages: |
| | gr.Warning( |
| | f"Language you put {language} in is not in is not in our Supported Languages, please choose from dropdown" |
| | ) |
| |
|
| | return ( |
| | None, |
| | None, |
| | None, |
| | None, |
| | ) |
| |
|
| | language_predicted = langid.classify(prompt)[ |
| | 0 |
| | ].strip() |
| |
|
| | |
| | if language_predicted == "zh": |
| | |
| | language_predicted = "zh-cn" |
| |
|
| | print(f"Detected language:{language_predicted}, Chosen language:{language}") |
| |
|
| | |
| | |
| |
|
| | |
| | lowpassfilter = denoise = trim = loudness = True |
| |
|
| | if lowpassfilter: |
| | lowpass_highpass = "lowpass=8000,highpass=75," |
| | else: |
| | lowpass_highpass = "" |
| |
|
| | if trim: |
| | |
| | trim_silence = "areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02,areverse,silenceremove=start_periods=1:start_silence=0:start_threshold=0.02," |
| | else: |
| | trim_silence = "" |
| |
|
| | if voice_cleanup: |
| | try: |
| | out_filename = ( |
| | speaker_wav + str(uuid.uuid4()) + ".wav" |
| | ) |
| |
|
| | |
| | shell_command = f"ffmpeg -y -i {speaker_wav} -af {lowpass_highpass}{trim_silence} {out_filename}".split( |
| | " " |
| | ) |
| |
|
| | command_result = subprocess.run( |
| | [item for item in shell_command], |
| | capture_output=False, |
| | text=True, |
| | check=True, |
| | ) |
| | speaker_wav = out_filename |
| | print("Filtered microphone input") |
| | except subprocess.CalledProcessError: |
| | |
| | print("Error: failed filtering, use original microphone input") |
| | else: |
| | speaker_wav = speaker_wav |
| |
|
| | if len(prompt) < 2: |
| | gr.Warning("Please give a longer prompt text") |
| | return ( |
| | None, |
| | None, |
| | None, |
| | None, |
| | ) |
| | if len(prompt) > 200: |
| | gr.Warning( |
| | "Text length limited to 200 characters for this demo, please try shorter text. You can clone this space and edit code for your own usage" |
| | ) |
| | return ( |
| | None, |
| | None, |
| | None, |
| | None, |
| | ) |
| | global DEVICE_ASSERT_DETECTED |
| | if DEVICE_ASSERT_DETECTED: |
| | global DEVICE_ASSERT_PROMPT |
| | global DEVICE_ASSERT_LANG |
| | |
| | print( |
| | f"Unrecoverable exception caused by language:{DEVICE_ASSERT_LANG} prompt:{DEVICE_ASSERT_PROMPT}" |
| | ) |
| |
|
| | |
| | space = api.get_space_runtime(repo_id=repo_id) |
| | if space.stage!="BUILDING": |
| | api.restart_space(repo_id=repo_id) |
| | else: |
| | print("TRIED TO RESTART but space is building") |
| |
|
| | try: |
| | metrics_text = "" |
| | t_latent = time.time() |
| |
|
| | |
| | try: |
| | (gpt_cond_latent, speaker_embedding) = model.get_conditioning_latents( |
| | audio_path=speaker_wav, |
| | gpt_cond_len=model.config.gpt_cond_len, |
| | gpt_cond_chunk_len=model.config.gpt_cond_chunk_len, |
| | max_ref_length=model.config.max_ref_len, |
| | sound_norm_refs=model.config.sound_norm_refs, |
| | ) |
| | except Exception as e: |
| | print("Speaker encoding error", str(e)) |
| | gr.Warning( |
| | "It appears something wrong with reference, did you unmute your microphone?" |
| | ) |
| | return ( |
| | None, |
| | None, |
| | None, |
| | None, |
| | ) |
| |
|
| | latent_calculation_time = time.time() - t_latent |
| | |
| |
|
| | |
| | prompt= re.sub("([^\x00-\x7F]|\w)(\.|\。|\?)",r"\1 \2\2",prompt) |
| |
|
| | wav_chunks = [] |
| | |
| | |
| | ''' |
| | print("I: Generating new audio...") |
| | t0 = time.time() |
| | out = model.inference( |
| | prompt, |
| | language, |
| | gpt_cond_latent, |
| | speaker_embedding, |
| | temperature=model.config.temperature, |
| | length_penalty=model.config.length_penalty, |
| | repetition_penalty=model.config.repetition_penalty, |
| | top_k=model.config.top_k, |
| | top_p=model.config.top_p, |
| | ) |
| | inference_time = time.time() - t0 |
| | print(f"I: Time to generate audio: {round(inference_time*1000)} milliseconds") |
| | metrics_text+=f"Time to generate audio: {round(inference_time*1000)} milliseconds\n" |
| | real_time_factor= (time.time() - t0) / out['wav'].shape[-1] * 24000 |
| | print(f"Real-time factor (RTF): {real_time_factor}") |
| | metrics_text+=f"Real-time factor (RTF): {real_time_factor:.2f}\n" |
| | torchaudio.save("output.wav", torch.tensor(out["wav"]).unsqueeze(0), 24000) |
| | ''' |
| |
|
| | |
| | print("I: Generating new audio in streaming mode...") |
| | t0 = time.time() |
| | chunks = model.inference_stream( |
| | prompt, |
| | language, |
| | gpt_cond_latent, |
| | speaker_embedding, |
| | repetition_penalty=7.0, |
| | temperature=0.85, |
| | ) |
| |
|
| | first_chunk = True |
| | for i, chunk in enumerate(chunks): |
| | if first_chunk: |
| | first_chunk_time = time.time() - t0 |
| | metrics_text += f"Latency to first audio chunk: {round(first_chunk_time*1000)} milliseconds\n" |
| | first_chunk = False |
| | wav_chunks.append(chunk) |
| | print(f"Received chunk {i} of audio length {chunk.shape[-1]}") |
| | inference_time = time.time() - t0 |
| | print( |
| | f"I: Time to generate audio: {round(inference_time*1000)} milliseconds" |
| | ) |
| | |
| | |
| | |
| |
|
| | wav = torch.cat(wav_chunks, dim=0) |
| | print(wav.shape) |
| | real_time_factor = (time.time() - t0) / wav.shape[0] * 24000 |
| | print(f"Real-time factor (RTF): {real_time_factor}") |
| | metrics_text += f"Real-time factor (RTF): {real_time_factor:.2f}\n" |
| |
|
| | torchaudio.save("output.wav", wav.squeeze().unsqueeze(0).cpu(), 24000) |
| | |
| |
|
| | except RuntimeError as e: |
| | if "device-side assert" in str(e): |
| | |
| | print( |
| | f"Exit due to: Unrecoverable exception caused by language:{language} prompt:{prompt}", |
| | flush=True, |
| | ) |
| | gr.Warning("Unhandled Exception encounter, please retry in a minute") |
| | print("Cuda device-assert Runtime encountered need restart") |
| | if not DEVICE_ASSERT_DETECTED: |
| | DEVICE_ASSERT_DETECTED = 1 |
| | DEVICE_ASSERT_PROMPT = prompt |
| | DEVICE_ASSERT_LANG = language |
| |
|
| | |
| | |
| | error_time = datetime.datetime.now().strftime("%d-%m-%Y-%H:%M:%S") |
| | error_data = [ |
| | error_time, |
| | prompt, |
| | language, |
| | speaker_wav, |
| | voice_cleanup, |
| | agree, |
| | ] |
| | error_data = [str(e) if type(e) != str else e for e in error_data] |
| | print(error_data) |
| | print(speaker_wav) |
| |
|
| | ''' |
| | write_io = StringIO() |
| | csv.writer(write_io).writerows([error_data]) |
| | csv_upload = write_io.getvalue().encode() |
| | |
| | filename = error_time + "_" + str(uuid.uuid4()) + ".csv" |
| | print("Writing error csv") |
| | error_api = HfApi() |
| | error_api.upload_file( |
| | path_or_fileobj=csv_upload, |
| | path_in_repo=filename, |
| | repo_id="coqui/xtts-flagged-dataset", |
| | repo_type="dataset", |
| | ) |
| | |
| | # speaker_wav |
| | print("Writing error reference audio") |
| | speaker_filename = ( |
| | error_time + "_reference_" + str(uuid.uuid4()) + ".wav" |
| | ) |
| | error_api = HfApi() |
| | error_api.upload_file( |
| | path_or_fileobj=speaker_wav, |
| | path_in_repo=speaker_filename, |
| | repo_id="coqui/xtts-flagged-dataset", |
| | repo_type="dataset", |
| | ) |
| | ''' |
| |
|
| | |
| | space = api.get_space_runtime(repo_id=repo_id) |
| | if space.stage!="BUILDING": |
| | api.restart_space(repo_id=repo_id) |
| | else: |
| | print("TRIED TO RESTART but space is building") |
| | |
| | else: |
| | if "Failed to decode" in str(e): |
| | print("Speaker encoding error", str(e)) |
| | gr.Warning( |
| | "It appears something wrong with reference, did you unmute your microphone?" |
| | ) |
| | else: |
| | print("RuntimeError: non device-side assert error:", str(e)) |
| | gr.Warning("Something unexpected happened please retry again.") |
| | return ( |
| | None, |
| | None, |
| | None, |
| | None, |
| | ) |
| | return ( |
| | "output.wav", |
| | metrics_text, |
| | speaker_wav, |
| | ) |
| | else: |
| | gr.Warning("Please accept the Terms & Condition!") |
| | return ( |
| | None, |
| | None, |
| | None, |
| | None, |
| | ) |
| |
|
| |
|
| | title = "Coqui🐸 XTTSv2-est" |
| |
|
| | description = """ |
| | |
| | <br/> |
| | |
| | This demo is running a fine-tuned XTTS model. <a href="https://huggingface.co/coqui/XTTS-v2">XTTS</a> is a multilingual text-to-speech and voice-cloning model. This demo features zero-shot voice cloning. |
| | |
| | Supported languages: Finnish: fi, English: en, Estonian: et, German: de, Russian: ru |
| | |
| | <br/> |
| | """ |
| |
|
| |
|
| | article = """ |
| | <div style='margin:20px auto;'> |
| | <p>By using this demo you agree to the terms of the Coqui Public Model License at https://coqui.ai/cpml</p> |
| | <p>We collect data only for error cases for improvement.</p> |
| | </div> |
| | """ |
| | examples = [ |
| | [ |
| | "Once when I was six years old I saw a magnificent picture", |
| | "en", |
| | "examples/female.wav", |
| | False, |
| | False, |
| | True, |
| | ], |
| | [ |
| | "Als ich sechs war, sah ich einmal ein wunderbares Bild", |
| | "de", |
| | "examples/female.wav", |
| | False, |
| | False, |
| | True, |
| | ], |
| | [ |
| | "Kunagi, kui olin kuueaastane, nägin ma ühte imelist pilti", |
| | "et", |
| | "examples/female.wav", |
| | False, |
| | False, |
| | True, |
| | ], |
| | [ |
| | "Когда мне было шесть лет, я увидел однажды удивительную картинку", |
| | "ru", |
| | "examples/female.wav", |
| | False, |
| | False, |
| | True, |
| | ], |
| | [ |
| | "Kerran ollessani kuusivuotias näin mahtavan kuvan", |
| | "fi", |
| | "examples/female.wav", |
| | False, |
| | False, |
| | True, |
| | ], |
| | ] |
| |
|
| |
|
| |
|
| | with gr.Blocks(analytics_enabled=False) as demo: |
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown( |
| | """ |
| | ## <img src="https://raw.githubusercontent.com/coqui-ai/TTS/main/images/coqui-log-green-TTS.png" height="56"/> |
| | """ |
| | ) |
| | with gr.Column(): |
| | |
| | pass |
| |
|
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown(description) |
| |
|
| | with gr.Row(): |
| | with gr.Column(): |
| | input_text_gr = gr.Textbox( |
| | lines=2, |
| | label="Text Prompt", |
| | info="One or two sentences at a time is better. Up to 200 text characters.", |
| | value="Tere, olen sinu hääle kloon. Ürita mulle lindistada võimalikult hea kvaliteediga klipp, et oskaksin su kõnet paremini jäljendada.", |
| | ) |
| | language_gr = gr.Dropdown( |
| | label="Language", |
| | info="Select an output language for the synthesised speech", |
| | choices=[ |
| | "de", |
| | "fi", |
| | "en", |
| | "et", |
| | "ru" |
| | ], |
| | multiselect=False, |
| | value="et", |
| | interactive=True, |
| | ) |
| | ref_gr = gr.Audio( |
| | label="Reference Audio", |
| | sources=["microphone", "upload"], |
| | |
| | type="filepath", |
| | value="examples/female.wav", |
| | ) |
| | clean_ref_gr = gr.Checkbox( |
| | label="Cleanup Reference Voice", |
| | value=False, |
| | info="This check can improve output if your microphone or reference voice is noisy", |
| | ) |
| | tos_gr = gr.Checkbox( |
| | label="Agree", |
| | value=False, |
| | info="I agree to the terms of the CPML: https://coqui.ai/cpml", |
| | ) |
| |
|
| | tts_button = gr.Button("Send", elem_id="send-btn", visible=True) |
| |
|
| |
|
| | with gr.Column(): |
| | audio_gr = gr.Audio(label="Synthesised Audio", autoplay=True) |
| | out_text_gr = gr.Text(label="Metrics") |
| | ref_audio_gr = gr.Audio(label="Reference Audio Used") |
| |
|
| | with gr.Row(): |
| | gr.Examples(examples, |
| | label="Examples", |
| | inputs=[input_text_gr, language_gr, ref_gr, clean_ref_gr, tos_gr], |
| | outputs=[audio_gr, out_text_gr, ref_audio_gr], |
| | fn=predict, |
| | cache_examples=False,) |
| |
|
| | tts_button.click(predict, [input_text_gr, language_gr, ref_gr, clean_ref_gr, tos_gr], outputs=[audio_gr, out_text_gr, ref_audio_gr]) |
| |
|
| | if __name__ == "__main__": |
| | demo.queue() |
| | demo.launch(debug=True, show_api=True) |