Spaces:
Paused
Paused
0x07CB
refactor: Amélioration de la robustesse de la concaténation audio avec gestion des erreurs et nettoyage des fichiers temporaires
5de0e95
unverified
| #coding: utf-8 | |
| from pydub import AudioSegment | |
| #from openai import OpenAI | |
| #from io import BytesIO | |
| #from typing import Any | |
| #from typing import Dict | |
| #from typing import IO | |
| from typing import List | |
| from typing import Optional | |
| from typing import Tuple | |
| from typing import Union | |
| import base64 | |
| import io | |
| import tempfile | |
| import os | |
| import streamlit as st | |
| def concatenate_audio_files(audio_list: List[Tuple[Union[bytes, str], float]]) -> Optional[bytes]: | |
| """ | |
| Concatène une liste de fichiers audio avec des effets sonores. | |
| Args: | |
| audio_list (List[Tuple[Union[bytes, str], float]]): Liste de tuples contenant les données audio | |
| (en bytes ou base64) et leur durée. | |
| Returns: | |
| Optional[bytes]: Données audio concaténées ou None en cas d'erreur. | |
| Raises: | |
| ValueError: Si la liste d'audio est vide ou invalide. | |
| IOError: Si une erreur se produit lors de la lecture/écriture des fichiers. | |
| """ | |
| if not audio_list: | |
| st.error("Erreur : Aucun fichier audio à concaténer") | |
| return None | |
| final_audio = AudioSegment.empty() | |
| temp_files = [] # Pour le nettoyage des fichiers temporaires | |
| try: | |
| # Charger les effets sonores | |
| try: | |
| begin_sound = AudioSegment.from_mp3( | |
| "sound-effects/voice-message-play-begin/voice-message-play-begin-1.mp3" | |
| ) | |
| end_sound = AudioSegment.from_mp3( | |
| "sound-effects/voice-message-play-ending/voice-message-play-ending-1.mp3" | |
| ) | |
| except IOError as e: | |
| st.warning("Impossible de charger les effets sonores, continuation sans effets") | |
| begin_sound = end_sound = AudioSegment.empty() | |
| # 1.5 secondes de silence | |
| silence = AudioSegment.silent(duration=1500) | |
| for audio_data, _ in audio_list: | |
| try: | |
| # Convertir en bytes si c'est une chaîne base64 | |
| if isinstance(audio_data, str): | |
| try: | |
| audio_bytes = base64.b64decode(audio_data) | |
| except Exception as e: | |
| st.error(f"Erreur de décodage base64 : {e}") | |
| continue | |
| else: | |
| audio_bytes = audio_data | |
| # Créer un fichier temporaire pour l'audio | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') | |
| temp_files.append(temp_file.name) | |
| temp_file.write(audio_bytes) | |
| temp_file.close() | |
| # Convertir les octets en un segment audio | |
| segment = AudioSegment.from_mp3(temp_file.name) | |
| # Ajouter le son de début, le segment TTS, le son de fin et le silence | |
| final_audio += begin_sound + segment + end_sound + silence | |
| except Exception as e: | |
| st.warning(f"Erreur lors du traitement d'un segment audio : {e}") | |
| continue | |
| if len(final_audio) == 0: | |
| raise ValueError("Aucun segment audio n'a pu être traité correctement") | |
| # Convertir le segment audio final en octets | |
| buffer = io.BytesIO() | |
| final_audio.export(buffer, format="mp3") | |
| return buffer.getvalue() | |
| except ValueError as e: | |
| st.error(f"Erreur de validation : {e}") | |
| return None | |
| except IOError as e: | |
| st.error(f"Erreur lors de la lecture ou de l'écriture des fichiers audio : {e}") | |
| return None | |
| except Exception as e: | |
| st.error(f"Une erreur inattendue s'est produite : {e}") | |
| return None | |
| finally: | |
| # Nettoyage des fichiers temporaires | |
| for temp_file in temp_files: | |
| try: | |
| os.remove(temp_file) | |
| except Exception: | |
| pass | |
| def split_audio(audio_file, max_size_mb: int = 25) -> List[bytes]: | |
| """ | |
| Divise un fichier audio en segments de taille maximale spécifiée. | |
| Args: | |
| audio_file: Fichier audio ouvert en mode binaire. | |
| max_size_mb (int): Taille maximale de chaque segment en Mo. | |
| Returns: | |
| List[bytes]: Liste des segments audio divisés sous forme de bytes. | |
| """ | |
| try: | |
| audio_file.seek(0) | |
| audio = AudioSegment.from_file(audio_file) | |
| duration_ms = len(audio) | |
| segment_duration_ms = int( | |
| (max_size_mb * 1024 * 1024 * 8) / | |
| (audio.frame_rate * audio.sample_width * audio.channels) | |
| ) | |
| segments = [] | |
| for start in range(0, duration_ms, segment_duration_ms): | |
| end = min(start + segment_duration_ms, duration_ms) | |
| segment = audio[start:end] | |
| with io.BytesIO() as buffer: | |
| segment.export(buffer, format="mp3") | |
| segments.append(buffer.getvalue()) | |
| return segments | |
| except Exception as e: | |
| print(f"Une erreur s'est produite lors de la division de l'audio : {e}") | |
| return [] | |