demorrha / core /audio_files.py
0x07CB
refactor: Amélioration de la robustesse de la concaténation audio avec gestion des erreurs et nettoyage des fichiers temporaires
5de0e95 unverified
#coding: utf-8
from pydub import AudioSegment
#from openai import OpenAI
#from io import BytesIO
#from typing import Any
#from typing import Dict
#from typing import IO
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
import base64
import io
import tempfile
import os
import streamlit as st
def concatenate_audio_files(audio_list: List[Tuple[Union[bytes, str], float]]) -> Optional[bytes]:
"""
Concatène une liste de fichiers audio avec des effets sonores.
Args:
audio_list (List[Tuple[Union[bytes, str], float]]): Liste de tuples contenant les données audio
(en bytes ou base64) et leur durée.
Returns:
Optional[bytes]: Données audio concaténées ou None en cas d'erreur.
Raises:
ValueError: Si la liste d'audio est vide ou invalide.
IOError: Si une erreur se produit lors de la lecture/écriture des fichiers.
"""
if not audio_list:
st.error("Erreur : Aucun fichier audio à concaténer")
return None
final_audio = AudioSegment.empty()
temp_files = [] # Pour le nettoyage des fichiers temporaires
try:
# Charger les effets sonores
try:
begin_sound = AudioSegment.from_mp3(
"sound-effects/voice-message-play-begin/voice-message-play-begin-1.mp3"
)
end_sound = AudioSegment.from_mp3(
"sound-effects/voice-message-play-ending/voice-message-play-ending-1.mp3"
)
except IOError as e:
st.warning("Impossible de charger les effets sonores, continuation sans effets")
begin_sound = end_sound = AudioSegment.empty()
# 1.5 secondes de silence
silence = AudioSegment.silent(duration=1500)
for audio_data, _ in audio_list:
try:
# Convertir en bytes si c'est une chaîne base64
if isinstance(audio_data, str):
try:
audio_bytes = base64.b64decode(audio_data)
except Exception as e:
st.error(f"Erreur de décodage base64 : {e}")
continue
else:
audio_bytes = audio_data
# Créer un fichier temporaire pour l'audio
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3')
temp_files.append(temp_file.name)
temp_file.write(audio_bytes)
temp_file.close()
# Convertir les octets en un segment audio
segment = AudioSegment.from_mp3(temp_file.name)
# Ajouter le son de début, le segment TTS, le son de fin et le silence
final_audio += begin_sound + segment + end_sound + silence
except Exception as e:
st.warning(f"Erreur lors du traitement d'un segment audio : {e}")
continue
if len(final_audio) == 0:
raise ValueError("Aucun segment audio n'a pu être traité correctement")
# Convertir le segment audio final en octets
buffer = io.BytesIO()
final_audio.export(buffer, format="mp3")
return buffer.getvalue()
except ValueError as e:
st.error(f"Erreur de validation : {e}")
return None
except IOError as e:
st.error(f"Erreur lors de la lecture ou de l'écriture des fichiers audio : {e}")
return None
except Exception as e:
st.error(f"Une erreur inattendue s'est produite : {e}")
return None
finally:
# Nettoyage des fichiers temporaires
for temp_file in temp_files:
try:
os.remove(temp_file)
except Exception:
pass
def split_audio(audio_file, max_size_mb: int = 25) -> List[bytes]:
"""
Divise un fichier audio en segments de taille maximale spécifiée.
Args:
audio_file: Fichier audio ouvert en mode binaire.
max_size_mb (int): Taille maximale de chaque segment en Mo.
Returns:
List[bytes]: Liste des segments audio divisés sous forme de bytes.
"""
try:
audio_file.seek(0)
audio = AudioSegment.from_file(audio_file)
duration_ms = len(audio)
segment_duration_ms = int(
(max_size_mb * 1024 * 1024 * 8) /
(audio.frame_rate * audio.sample_width * audio.channels)
)
segments = []
for start in range(0, duration_ms, segment_duration_ms):
end = min(start + segment_duration_ms, duration_ms)
segment = audio[start:end]
with io.BytesIO() as buffer:
segment.export(buffer, format="mp3")
segments.append(buffer.getvalue())
return segments
except Exception as e:
print(f"Une erreur s'est produite lors de la division de l'audio : {e}")
return []