Spaces:
Running
Running
| #!flask/bin/python | |
| """TTS demo server.""" | |
| import argparse | |
| import io | |
| import json | |
| import logging | |
| import os | |
| import sys | |
| import warnings | |
| from pathlib import Path | |
| from threading import Lock | |
| from urllib.parse import parse_qs | |
| import torch | |
| import torchaudio | |
| try: | |
| from flask import Flask, render_template, render_template_string, request, send_file | |
| except ImportError as e: | |
| msg = "Server requires requires flask, use `pip install coqui-tts[server]`" | |
| raise ImportError(msg) from e | |
| from TTS.api import TTS | |
| from TTS.utils.generic_utils import ConsoleFormatter, setup_logger | |
| from TTS.utils.manage import ModelManager | |
| logger = logging.getLogger(__name__) | |
| setup_logger("TTS", level=logging.INFO, stream=sys.stdout, formatter=ConsoleFormatter()) | |
| def create_argparser() -> argparse.ArgumentParser: | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| "--list_models", | |
| action="store_true", | |
| help="list available pre-trained tts and vocoder models.", | |
| ) | |
| parser.add_argument( | |
| "--model_name", | |
| type=str, | |
| default="tts_models/en/ljspeech/tacotron2-DDC", | |
| help="Name of one of the pre-trained tts models in format <language>/<dataset>/<model_name>", | |
| ) | |
| parser.add_argument("--vocoder_name", type=str, default=None, help="Name of one of the released vocoder models.") | |
| parser.add_argument("--speaker_idx", type=str, default=None, help="Default speaker ID for multi-speaker models.") | |
| # Args for running custom models | |
| parser.add_argument("--config_path", default=None, type=str, help="Path to model config file.") | |
| parser.add_argument( | |
| "--model_path", | |
| type=str, | |
| default=None, | |
| help="Path to model file.", | |
| ) | |
| parser.add_argument( | |
| "--vocoder_path", | |
| type=str, | |
| help="Path to vocoder model file. If it is not defined, model uses GL as vocoder. Please make sure that you installed vocoder library before (WaveRNN).", | |
| default=None, | |
| ) | |
| parser.add_argument("--vocoder_config_path", type=str, help="Path to vocoder model config file.", default=None) | |
| parser.add_argument("--speakers_file_path", type=str, help="JSON file for multi-speaker model.", default=None) | |
| parser.add_argument("--port", type=int, default=5002, help="port to listen on.") | |
| parser.add_argument("--device", type=str, help="Device to run model on.", default="cpu") | |
| parser.add_argument("--use_cuda", action=argparse.BooleanOptionalAction, default=False, help="true to use CUDA.") | |
| parser.add_argument( | |
| "--debug", action=argparse.BooleanOptionalAction, default=False, help="true to enable Flask debug mode." | |
| ) | |
| parser.add_argument( | |
| "--show_details", action=argparse.BooleanOptionalAction, default=False, help="Generate model detail page." | |
| ) | |
| parser.add_argument("--language_idx", type=str, help="Default language ID for multilingual models.", default="en") | |
| return parser | |
| # parse the args | |
| args = create_argparser().parse_args() | |
| manager = ModelManager(models_file=TTS.get_models_file_path()) | |
| # update in-use models to the specified released models. | |
| model_path = None | |
| config_path = None | |
| speakers_file_path = None | |
| vocoder_path = None | |
| vocoder_config_path = None | |
| # CASE1: list pre-trained TTS models | |
| if args.list_models: | |
| manager.list_models() | |
| sys.exit(0) | |
| device = args.device | |
| if args.use_cuda: | |
| warnings.warn("`--use_cuda` is deprecated, use `--device cuda` instead.", DeprecationWarning, stacklevel=2) | |
| # CASE2: load models | |
| model_name = args.model_name if args.model_path is None else None | |
| api = TTS( | |
| model_name=model_name, | |
| model_path=args.model_path, | |
| config_path=args.config_path, | |
| vocoder_name=args.vocoder_name, | |
| vocoder_path=args.vocoder_path, | |
| vocoder_config_path=args.vocoder_config_path, | |
| speakers_file_path=args.speakers_file_path, | |
| # language_ids_file_path=args.language_ids_file_path, | |
| ).to(device) | |
| # TODO: set this from SpeakerManager | |
| use_gst = api.synthesizer.tts_config.get("use_gst", False) | |
| app = Flask(__name__) | |
| def style_wav_uri_to_dict(style_wav: str) -> str | dict: | |
| """Transform an uri style_wav, in either a string (path to wav file to be use for style transfer) | |
| or a dict (gst tokens/values to be use for styling) | |
| Args: | |
| style_wav (str): uri | |
| Returns: | |
| Union[str, dict]: path to file (str) or gst style (dict) | |
| """ | |
| if style_wav: | |
| if os.path.isfile(style_wav) and style_wav.endswith(".wav"): | |
| return style_wav # style_wav is a .wav file located on the server | |
| style_wav = json.loads(style_wav) | |
| return style_wav # style_wav is a gst dictionary with {token1_id : token1_weigth, ...} | |
| return None | |
| def index(): | |
| return render_template( | |
| "index.html", | |
| show_details=args.show_details, | |
| use_multi_speaker=api.is_multi_speaker, | |
| use_multi_language=api.is_multi_lingual, | |
| speaker_ids=api.speakers, | |
| language_ids=api.languages, | |
| use_gst=use_gst, | |
| supports_cloning=api.synthesizer.tts_config.supports_cloning, | |
| ) | |
| def details(): | |
| model_config = api.synthesizer.tts_config | |
| vocoder_config = api.synthesizer.vocoder_config or None | |
| return render_template( | |
| "details.html", | |
| show_details=args.show_details, | |
| model_config=model_config, | |
| vocoder_config=vocoder_config, | |
| args=args.__dict__, | |
| ) | |
| lock = Lock() | |
| def tts(): | |
| with lock: | |
| text = request.headers.get("text") or request.values.get("text", "") | |
| speaker_idx = ( | |
| request.headers.get("speaker-id") or request.values.get("speaker_id", args.speaker_idx) | |
| if api.is_multi_speaker | |
| else None | |
| ) | |
| # Handle empty speaker_id for voice cloning scenarios | |
| if speaker_idx == "": | |
| speaker_idx = None | |
| language_idx = ( | |
| request.headers.get("language-id") or request.values.get("language_id", args.language_idx) | |
| if api.is_multi_lingual | |
| else None | |
| ) | |
| # Handle empty language_id | |
| if language_idx == "": | |
| language_idx = None | |
| style_wav = request.headers.get("style-wav") or request.values.get("style_wav", "") | |
| style_wav = style_wav_uri_to_dict(style_wav) | |
| speaker_wav = request.headers.get("speaker-wav") or request.values.get("speaker_wav", "") | |
| # Basic validation | |
| if not text.strip(): | |
| return {"error": "Text parameter is required"}, 400 | |
| logger.info("Model input: %s", text) | |
| logger.info("Speaker idx: %s", speaker_idx) | |
| logger.info("Speaker wav: %s", speaker_wav) | |
| logger.info("Language idx: %s", language_idx) | |
| try: | |
| wavs = api.tts( | |
| text, speaker=speaker_idx, language=language_idx, style_wav=style_wav, speaker_wav=speaker_wav | |
| ) | |
| except Exception as e: | |
| logger.error("TTS synthesis failed: %s", str(e)) | |
| return {"error": f"TTS synthesis failed: {str(e)}"}, 500 | |
| out = io.BytesIO() | |
| api.synthesizer.save_wav(wavs, out) | |
| return send_file(out, mimetype="audio/wav") | |
| # Basic MaryTTS compatibility layer | |
| def mary_tts_api_locales(): | |
| """MaryTTS-compatible /locales endpoint""" | |
| # NOTE: We currently assume there is only one model active at the same time | |
| if args.model_name is not None: | |
| model_details = args.model_name.split("/") | |
| else: | |
| model_details = ["", "en", "", "default"] | |
| return render_template_string("{{ locale }}\n", locale=model_details[1]) | |
| def mary_tts_api_voices(): | |
| """MaryTTS-compatible /voices endpoint""" | |
| # NOTE: We currently assume there is only one model active at the same time | |
| if args.model_name is not None: | |
| model_details = args.model_name.split("/") | |
| else: | |
| model_details = ["", "en", "", "default"] | |
| if api.is_multi_speaker: | |
| return render_template_string( | |
| "{% for speaker in speakers %}{{ speaker }} {{ locale }} {{ gender }}\n{% endfor %}", | |
| speakers=api.speakers, | |
| locale=model_details[1], | |
| gender="u", | |
| ) | |
| return render_template_string( | |
| "{{ name }} {{ locale }} {{ gender }}\n", name=model_details[3], locale=model_details[1], gender="u" | |
| ) | |
| def mary_tts_api_process(): | |
| """MaryTTS-compatible /process endpoint""" | |
| with lock: | |
| if request.method == "POST": | |
| data = parse_qs(request.get_data(as_text=True)) | |
| speaker_idx = data.get("VOICE", [args.speaker_idx])[0] | |
| # NOTE: we ignore parameter LOCALE for now since we have only one active model | |
| text = data.get("INPUT_TEXT", [""])[0] | |
| else: | |
| text = request.args.get("INPUT_TEXT", "") | |
| speaker_idx = request.args.get("VOICE", args.speaker_idx) | |
| logger.info("Model input: %s", text) | |
| logger.info("Speaker idx: %s", speaker_idx) | |
| wavs = api.tts(text, speaker=speaker_idx) | |
| out = io.BytesIO() | |
| api.synthesizer.save_wav(wavs, out) | |
| return send_file(out, mimetype="audio/wav") | |
| # OpenAI-compatible Speech API | |
| def openai_tts(): | |
| """ | |
| POST /v1/audio/speech | |
| { | |
| "model": "tts-1", # ignored, defaults to args.model_name | |
| "voice": "alloy", # required: a speaker ID or a file/folder for voice cloning | |
| "input": "Hello world!", # required text to speak | |
| "response_format": "wav" # optional: wav, opus, aac, flac, wav, pcm (alternative to format) | |
| } | |
| """ | |
| payload = request.get_json(force=True) | |
| logger.info(payload) | |
| text = payload.get("input") or "" | |
| speaker_idx = payload.get("voice", args.speaker_idx) if api.is_multi_speaker else None | |
| fmt = payload.get("response_format", "mp3").lower() # OpenAI default is .mp3 | |
| speed = payload.get("speed", 1.0) | |
| language_idx = args.language_idx if api.is_multi_lingual else None | |
| speaker_wav = None | |
| if speaker_idx is not None: | |
| voice_path = Path(speaker_idx) | |
| if voice_path.exists() and api.synthesizer.tts_config.supports_cloning: | |
| speaker_wav = str(voice_path) if voice_path.is_file() else [str(w) for w in voice_path.glob("*.wav")] | |
| speaker_idx = None | |
| # here we ignore payload["model"] since its loaded at startup | |
| def _save_audio(waveform, sample_rate, format_args): | |
| buf = io.BytesIO() | |
| torchaudio.save(buf, waveform, sample_rate, **format_args) | |
| buf.seek(0) | |
| return buf | |
| def _save_pcm(waveform): | |
| """Raw PCM (16-bit little-endian).""" | |
| waveform_int16 = (waveform * 32767).to(torch.int16) | |
| buf = io.BytesIO() | |
| buf.write(waveform_int16.numpy().tobytes()) | |
| buf.seek(0) | |
| return buf | |
| with lock: | |
| logger.info("Model input: %s", text) | |
| logger.info("Speaker idx: %s", speaker_idx) | |
| logger.info("Speaker wav: %s", speaker_wav) | |
| logger.info("Language idx: %s", language_idx) | |
| wavs = api.tts(text, speaker=speaker_idx, language=language_idx, speaker_wav=speaker_wav, speed=speed) | |
| out = io.BytesIO() | |
| api.synthesizer.save_wav(wavs, out) | |
| out.seek(0) | |
| waveform, sample_rate = torchaudio.load(out) | |
| mimetypes = { | |
| "wav": "audio/wav", | |
| "mp3": "audio/mpeg", | |
| "opus": "audio/ogg", | |
| "aac": "audio/aac", | |
| "flac": "audio/flac", | |
| "pcm": "audio/L16", | |
| } | |
| mimetype = mimetypes.get(fmt, "audio/mpeg") | |
| if fmt == "wav": | |
| out.seek(0) | |
| return send_file(out, mimetype=mimetype) | |
| format_dispatch = { | |
| "mp3": lambda: _save_audio(waveform, sample_rate, {"format": "mp3"}), | |
| "opus": lambda: _save_audio(waveform, sample_rate, {"format": "ogg", "encoding": "opus"}), | |
| "aac": lambda: _save_audio(waveform, sample_rate, {"format": "mp4", "encoding": "aac"}), # m4a container | |
| "flac": lambda: _save_audio(waveform, sample_rate, {"format": "flac"}), | |
| "pcm": lambda: _save_pcm(waveform), | |
| } | |
| # Check if format is supported | |
| if fmt not in format_dispatch: | |
| return "Unsupported format", 400 | |
| # Generate and send file | |
| audio_buffer = format_dispatch[fmt]() | |
| return send_file(audio_buffer, mimetype=mimetype) | |
| def main(): | |
| app.run(debug=args.debug, host="::", port=args.port) | |
| if __name__ == "__main__": | |
| main() | |