Apex / tts_engine.py
ernestmindres's picture
Update tts_engine.py
551e973 verified
import os
import sys
import subprocess
import io
import wave
from typing import Generator
# Configuration de la sortie audio (doit être WAV 16-bit 22050 Hz pour le streaming)
SAMPLE_RATE = 22050
BITS_PER_SAMPLE = 16
CHANNELS = 1
# --- Fonctions de compatibilité (inchangées) ---
def get_tts_engine():
return True
def reset_tts_engine():
pass
# --- Logique de Langues/Voix ---
def get_available_languages():
# Nous utilisons les voix espeak-ng par défaut pour le français
return {
'Français (Qualité Optimale)': 'fr-fr',
'Anglais (US)': 'en-us',
'Espagnol': 'es',
'Allemand': 'de',
'Italien': 'it',
}
# --- Logique de Génération Audio EN MODE STREAMING ---
def split_text_into_chunks(text, max_chars=2000):
"""Découpe le texte en morceaux pour la synthèse, améliorant la réactivité."""
# Cette logique simple est suffisante, mais peut être améliorée (sur les points/virgules)
chunks = []
while text:
chunk = text[:max_chars]
text = text[max_chars:]
chunks.append(chunk)
return chunks
def generate_wav_header(data_size):
"""Génère l'en-tête WAV pour un flux audio brut."""
header = io.BytesIO()
# RIFF chunk
header.write(b'RIFF')
header.write((data_size + 36).to_bytes(4, byteorder='little')) # ChunkSize
header.write(b'WAVE')
# fmt chunk
header.write(b'fmt ')
# 💥 CORRECTION ici (enlever un point)
header.write((16).to_bytes(4, byteorder='little')) # Subchunk1Size
# 💥 CORRECTION ici (enlever un point)
header.write((1).to_bytes(2, byteorder='little')) # AudioFormat (PCM=1)
header.write(CHANNELS.to_bytes(2, byteorder='little'))
header.write(SAMPLE_RATE.to_bytes(4, byteorder='little'))
# ByteRate = SampleRate * NumChannels * BitsPerSample/8
byte_rate = SAMPLE_RATE * CHANNELS * BITS_PER_SAMPLE // 8
header.write(byte_rate.to_bytes(4, byteorder='little'))
# BlockAlign = NumChannels * BitsPerSample/8
block_align = CHANNELS * BITS_PER_SAMPLE // 8
header.write(block_align.to_bytes(2, byteorder='little'))
header.write(BITS_PER_SAMPLE.to_bytes(2, byteorder='little'))
# data chunk
header.write(b'data')
header.write(data_size.to_bytes(4, byteorder='little'))
return header.getvalue()
def stream_text_to_audio(text: str, voice_id: str) -> Generator[bytes, None, None]:
"""
Convertit le texte en audio et le *yield* (renvoie) par morceaux (streaming).
La première sortie est l'en-tête WAV, suivie des données audio brutes.
PARAMÈTRES ESPEAK-NG AJUSTÉS POUR AMÉLIORER LA CLARTÉ ET LE RYTHME :
-s 150: Vitesse de 150 mots par minute (par défaut 175). Plus lent pour plus de clarté.
-p 60: Hauteur (pitch) de 60 (par défaut 50). Peut réduire l'effet monotone.
"""
# 1. Découpage du texte en morceaux pour la synthèse
chunks = split_text_into_chunks(text)
full_audio_data = io.BytesIO()
total_audio_length = 0
# 2. Générer et collecter chaque morceau audio
for chunk in chunks:
# espeak-ng -v [VOIX] -s [VITESSE] -p [HAUTEUR] --stdout "[TEXTE]"
command = [
"espeak-ng",
"-v", voice_id,
# --- AJUSTEMENTS DE VOIX ---
"-s", "150", # Vitesse: 150 Mots/Minute
"-p", "60", # Hauteur: 60/99
# ---------------------------
"--stdout", # Écrit le WAV sur la sortie standard
chunk
]
try:
# Exécution de la commande et capture de la sortie binaire
# Attention: subprocess.run() ne streame pas la sortie immédiatement,
# il la collecte en mémoire pour les chunks.
result = subprocess.run(command, check=True, capture_output=True, timeout=10)
# Le résultat est un fichier WAV complet pour le chunk
chunk_wav_data = result.stdout
# Ouvrir le chunk WAV pour extraire les données audio brutes (après l'en-tête de 44 octets)
if len(chunk_wav_data) < 44:
continue
# Simplement ajouter les données audio brutes (après l'en-tête)
raw_audio_data = chunk_wav_data[44:]
full_audio_data.write(raw_audio_data)
total_audio_length += len(raw_audio_data)
except subprocess.CalledProcessError as e:
# Renvoyer l'erreur de la commande si elle a échoué
error_message = f"La commande espeak-ng a échoué. Vérifiez la voix '{voice_id}'. Erreur: {e.stderr.decode()}"
sys.stderr.write(f"ERROR: {error_message}\n")
raise Exception(error_message)
except FileNotFoundError:
raise Exception("Le programme 'espeak-ng' n'a pas été trouvé. Vérifiez le Dockerfile.")
# 3. Réinitialiser la position de lecture
full_audio_data.seek(0)
# 4. Générer l'en-tête WAV final avec la taille totale
# NOTE: generate_wav_header() DOIT être corrigée pour utiliser 16.to_bytes() et 1.to_bytes()
final_header = generate_wav_header(total_audio_length)
# 5. Yield l'en-tête
yield final_header
# 6. Yield les données audio brutes par morceaux
chunk_size = 4096 # Taille de chaque morceau envoyé au client
while True:
data = full_audio_data.read(chunk_size)
if not data:
break
yield data
def text_to_audio_file(text, voice_id, output_path="output.wav"):
"""Ancienne fonction, non utilisée dans cette nouvelle approche."""
raise NotImplementedError("Utilisez 'stream_text_to_audio' pour le streaming.")