| | import io
|
| | import logging
|
| | import wave
|
| | from pathlib import Path
|
| | import struct
|
| |
|
| | from flask import Flask, Response, jsonify, render_template, request, send_file, stream_with_context
|
| | from flask_cors import CORS
|
| | from piper import PiperVoice
|
| |
|
| |
|
| | logging.basicConfig(level=logging.INFO)
|
| | logger = logging.getLogger(__name__)
|
| |
|
| | app = Flask(__name__)
|
| | CORS(app)
|
| |
|
| |
|
| | tts_instances = {}
|
| |
|
| |
|
| | VOICES_DIR = Path(__file__).parent / "voices"
|
| |
|
| | def get_tts_instance(voice):
|
| | """
|
| | Retrieves a cached PiperVoice instance or creates a new one.
|
| | Loads the model and its required .onnx.json config file.
|
| | """
|
| | if voice not in tts_instances:
|
| | logger.info(f"Creating new PiperVoice instance for voice: {voice}")
|
| | try:
|
| | model_path, config_path = None, None
|
| | possible_paths = [
|
| | VOICES_DIR / f"{voice}.onnx",
|
| | Path(__file__).parent / f"{voice}.onnx",
|
| | Path(f"{voice}.onnx"),
|
| | ]
|
| | for path in possible_paths:
|
| | if path.exists():
|
| | model_path = str(path)
|
| | potential_config_path = path.with_suffix(".onnx.json")
|
| | if potential_config_path.exists():
|
| | config_path = str(potential_config_path)
|
| | logger.info(f"Found model at: {model_path}")
|
| | logger.info(f"Found config at: {config_path}")
|
| | break
|
| |
|
| | if not model_path or not config_path:
|
| | logger.error(f"Voice model or config not found for '{voice}'. Ensure both '.onnx' and '.onnx.json' are present.")
|
| | return None
|
| |
|
| | tts_instances[voice] = PiperVoice.load(model_path, config_path=config_path)
|
| | except Exception as e:
|
| | logger.error(f"Failed to create PiperVoice instance for voice {voice}: {e}", exc_info=True)
|
| | return None
|
| | return tts_instances[voice]
|
| |
|
| | @app.route('/')
|
| | def index():
|
| | """Serves the index.html frontend."""
|
| | return render_template('index.html')
|
| |
|
| | @app.route('/api/tts', methods=['GET'])
|
| | def synthesize_audio_full():
|
| | """
|
| | Generates the full audio file and returns it.
|
| | """
|
| | text = request.args.get('text')
|
| | voice = request.args.get('voice', 'en_GB-alba-medium')
|
| |
|
| | if not text:
|
| | return jsonify({"error": "Text to synthesize is required."}), 400
|
| |
|
| | tts_instance = get_tts_instance(voice)
|
| | if not tts_instance:
|
| | return jsonify({"error": f"Could not load voice model for '{voice}'."}), 500
|
| |
|
| | try:
|
| | wav_io = io.BytesIO()
|
| | with wave.open(wav_io, 'wb') as wav_file:
|
| | wav_file.setnchannels(1)
|
| | wav_file.setsampwidth(2)
|
| | wav_file.setframerate(tts_instance.config.sample_rate)
|
| |
|
| |
|
| |
|
| | for audio_chunk in tts_instance.synthesize(text):
|
| | wav_file.writeframes(audio_chunk.audio_int16_bytes)
|
| |
|
| | wav_io.seek(0)
|
| |
|
| | return send_file(
|
| | wav_io,
|
| | mimetype='audio/wav',
|
| | as_attachment=True,
|
| | download_name='output.wav'
|
| | )
|
| | except Exception as e:
|
| | logger.error(f"Error during full synthesis: {e}", exc_info=True)
|
| | return jsonify({"error": f"Failed to synthesize audio: {str(e)}"}), 500
|
| |
|
| | def generate_audio_stream(tts_instance, text):
|
| | """A generator function that streams the synthesized audio."""
|
| | try:
|
| |
|
| | def create_wav_header(sample_rate, bits_per_sample=16, channels=1):
|
| | datasize = 2**32 - 1
|
| | o = [b'RIFF', struct.pack('<I', datasize + 36), b'WAVE', b'fmt ',
|
| | struct.pack('<I', 16), struct.pack('<H', 1), struct.pack('<H', channels),
|
| | struct.pack('<I', sample_rate),
|
| | struct.pack('<I', sample_rate * channels * bits_per_sample // 8),
|
| | struct.pack('<H', channels * bits_per_sample // 8),
|
| | struct.pack('<H', bits_per_sample), b'data', struct.pack('<I', datasize)]
|
| | return b"".join(o)
|
| |
|
| | header = create_wav_header(tts_instance.config.sample_rate)
|
| | yield header
|
| |
|
| |
|
| |
|
| | for audio_chunk in tts_instance.synthesize(text):
|
| | yield audio_chunk.audio_int16_bytes
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error during stream generation: {e}", exc_info=True)
|
| |
|
| |
|
| | @app.route('/api/tts-stream', methods=['GET'])
|
| | def synthesize_audio_stream():
|
| | """
|
| | Streams the synthesized audio back to the client as it's generated.
|
| | """
|
| | text = request.args.get('text')
|
| | voice = request.args.get('voice', 'en_GB-alba-medium')
|
| |
|
| | if not text:
|
| | return jsonify({"error": "Text to synthesize is required."}), 400
|
| |
|
| | tts_instance = get_tts_instance(voice)
|
| | if not tts_instance:
|
| | return jsonify({"error": f"Could not load voice model for '{voice}'."}), 500
|
| |
|
| | stream_generator = generate_audio_stream(tts_instance, text)
|
| | return Response(stream_with_context(stream_generator), mimetype='audio/wav')
|
| |
|
| | if __name__ == '__main__':
|
| | app.run(debug=True, port=5001) |