Spaces:
Paused
Paused
| import streamlit as st | |
| from openai import OpenAI | |
| from os import getenv | |
| from audiorecorder import audiorecorder | |
| import tempfile | |
| import base64 | |
| from pydub import AudioSegment | |
| import os | |
| import io | |
| import time | |
| import re | |
| from typing import Union, Optional, Any | |
| import json | |
| def load_lang_ui(filepath: Optional[str] = "ui_lang_support.json") -> dict: | |
| try: | |
| with open(filepath, 'r', encoding='utf-8') as file: | |
| return json.load(file) | |
| except FileNotFoundError: | |
| print(_("erreur_fichier_non_trouve").format(filepath)) | |
| return {} | |
| except json.JSONDecodeError: | |
| print(_("erreur_lecture_fichier").format("Erreur de décodage JSON")) | |
| return {} | |
| except Exception as e: | |
| print(_("erreur_lecture_fichier").format(str(e))) | |
| return {} | |
| # Dictionnaire pour stocker les traductions | |
| traductions = load_lang_ui() | |
| # Fonction pour obtenir les traductions en fonction de la langue sélectionnée | |
| def _(key): | |
| return traductions[st.session_state.interface_language][key] | |
| # Configuration du client OpenAI avec la clé API | |
| client = OpenAI(api_key=getenv("OPENAI_API_KEY")) | |
| # Fonction pour lire et retourner le contenu de fichiers textes | |
| def lire_fichier(nom_fichier): | |
| try: | |
| with open(nom_fichier, 'r', encoding='utf-8') as fichier: | |
| contenu = fichier.read() | |
| return contenu | |
| except FileNotFoundError: | |
| return _("erreur_fichier_non_trouve").format(nom_fichier) | |
| except Exception as e: | |
| return _("erreur_lecture_fichier").format(str(e)) | |
| # Fonction pour diviser un fichier audio en segments de 25 Mo ou moins | |
| def split_audio(audio_file, max_size_mb=25): | |
| audio = AudioSegment.from_wav(audio_file) | |
| duration_ms = len(audio) | |
| segment_duration_ms = int((max_size_mb * 1024 * 1024 * 8) / (audio.frame_rate * audio.sample_width * audio.channels)) | |
| segments = [] | |
| for start in range(0, duration_ms, segment_duration_ms): | |
| end = min(start + segment_duration_ms, duration_ms) | |
| segment = audio[start:end] | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_segment: | |
| segment.export(temp_segment.name, format="wav") | |
| segments.append(temp_segment.name) | |
| return segments | |
| # Fonction modifiée pour transcrire l'audio en texte | |
| def transcribe_audio(audio_file, language=None): | |
| max_size_mb = 25 | |
| file_size_mb = os.path.getsize(audio_file.name) / (1024 * 1024) | |
| if file_size_mb > max_size_mb: | |
| segments = split_audio(audio_file.name, max_size_mb) | |
| full_transcript = "" | |
| for segment in segments: | |
| with open(segment, "rb") as audio_segment: | |
| transcript = client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_segment, | |
| language=language | |
| ) | |
| full_transcript += transcript.text + " " | |
| os.unlink(segment) # Supprimer le fichier temporaire | |
| return full_transcript.strip() | |
| else: | |
| with open(audio_file.name, "rb") as audio_file: | |
| transcript = client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_file, | |
| language=language | |
| ) | |
| return transcript.text | |
| # Fonction pour détecter la langue d'un texte donné | |
| def language_detection(input_text, temperature=0.01): | |
| system_prompt = "".join([ | |
| "Je souhaite que vous agissiez en tant que fonction linguistique.", | |
| "Je m'exprimerai dans n'importe quelle langue, et vous en détecterez la langue.", | |
| "Vous fournirez le résultat de votre détection au format ISO-639-1.", | |
| "Votre réponse doit représenter l'argument `language` et contenir seulement sa valeur de type chaîne de caractères.", | |
| "La langue de l'audio d'entrée. Fournir la langue d'entrée au format ISO-639-1 améliorera la précision et la latence." | |
| ]) | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| temperature=temperature, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": system_prompt | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"{input_text}" | |
| } | |
| ] | |
| ) | |
| return response.choices[0].message.content | |
| def get_duration_pydub(audio_file): | |
| audio = AudioSegment.from_file(audio_file) | |
| return audio.duration_seconds | |
| # Fonction pour convertir du texte en parole | |
| def text_to_speech(text): | |
| response = client.audio.speech.create( | |
| model="tts-1", | |
| voice=st.session_state.tts_voice, | |
| input=text | |
| ) | |
| # Sauvegarder l'audio dans un fichier temporaire | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio: | |
| response.stream_to_file(temp_audio.name) | |
| # Lire le contenu du fichier audio | |
| with open(temp_audio.name, "rb") as audio_file: | |
| audio_bytes = audio_file.read() | |
| # Lire la durée de l'audio en secondes | |
| audio_duration = get_duration_pydub(temp_audio.name) | |
| return audio_bytes, audio_duration | |
| def concatenate_audio_files(audio_list): | |
| # Créer un segment audio vide | |
| final_audio = AudioSegment.empty() | |
| # Charger les effets sonores | |
| begin_sound = AudioSegment.from_mp3("sound-effects/voice-message-play-begin/voice-message-play-begin-1.mp3") | |
| end_sound = AudioSegment.from_mp3("sound-effects/voice-message-play-ending/voice-message-play-ending-1.mp3") | |
| # Silence de 5 secondes | |
| silence = AudioSegment.silent(duration=5000) # 5000 ms = 5 secondes | |
| for audio_bytes, _ in audio_list: | |
| # Convertir les bytes en un segment audio | |
| segment = AudioSegment.from_mp3(io.BytesIO(audio_bytes)) | |
| # Ajouter le son de début, le segment TTS, le son de fin, et le silence au final_audio | |
| final_audio += begin_sound + segment + end_sound + silence | |
| # Convertir le segment audio final en bytes | |
| buffer = io.BytesIO() | |
| final_audio.export(buffer, format="mp3") | |
| return buffer.getvalue() | |
| # Fonction pour traiter les messages de l'utilisateur et générer une réponse | |
| def process_message(message, | |
| operation_prompt="", | |
| tts_enabled=False): | |
| payload_content = f'{operation_prompt} :\n\"\"\"\n{message}\n\"\"\"' | |
| st.session_state.messages.append({"role": "user", "content": payload_content}) | |
| with st.chat_message("user"): | |
| st.markdown(message) | |
| with st.chat_message("assistant"): | |
| message_placeholder = st.empty() | |
| full_response = "" | |
| for response in client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=st.session_state.messages, | |
| stream=True, | |
| temperature=0.1, | |
| ): | |
| full_response += (response.choices[0].delta.content or "") | |
| message_placeholder.markdown(full_response + "▌") | |
| # Utiliser un regex pour retirer les trois premières et dernières doubles quotes | |
| full_response = re.sub(r'^"{3}|"{3}$', '', full_response.strip()) | |
| message_placeholder.markdown(full_response) | |
| st.session_state.messages.append({"role": "assistant", "content": full_response}) | |
| if tts_enabled: | |
| tts_audio, tts_duration = text_to_speech(full_response) | |
| return tts_audio, tts_duration | |
| return None, None | |
| # Classe pour stocker les prompts système globaux | |
| class GlobalSystemPrompts: | |
| # Méthode pour récupérer le prompt système pour la fonctionnalité Linguascribe | |
| def linguascribe(): | |
| SYSTEM_PROMPT = f"{lire_fichier('linguascribe.prompt')}" | |
| return SYSTEM_PROMPT | |
| # Variables globales pour les prompts | |
| SYSTEM_PROMPT="" | |
| OP_PROMPT="" | |
| # Fonction pour configurer le mode de traduction | |
| def set_mode_translation(from_lang, dest_lang): | |
| global SYSTEM_PROMPT | |
| global OP_PROMPT | |
| SYSTEM_PROMPT=GlobalSystemPrompts.linguascribe() | |
| OP_PROMPT = f"Translate({from_lang} to {dest_lang})" | |
| # Liste des langues supportées par l'application | |
| SUPPORTED_LANGUAGES=["Afrikaans", "Arabic", "Armenian", "Azerbaijani", "Belarusian", "Bosnian", "Bulgarian", "Catalan", "Chinese", "Croatian", "Czech", "Danish", "Dutch", "English", "Estonian", "Finnish", "French", "Galician", "German", "Greek", "Hebrew", "Hindi", "Hungarian", "Icelandic", "Indonesian", "Italian", "Japanese", "Kannada", "Kazakh", "Korean", "Latvian", "Lithuanian", "Macedonian", "Malay", "Marathi", "Maori", "Nepali", "Norwegian", "Persian", "Polish", "Portuguese", "Romanian", "Russian", "Serbian", "Slovak", "Slovenian", "Spanish", "Swahili", "Swedish", "Tagalog", "Tamil", "Thai", "Turkish", "Ukrainian", "Urdu", "Vietnamese", "Welsh"] | |
| # Fonction pour convertir le nom d'une langue en code ISO 639-1 | |
| def convert_language_name_to_iso6391(language_data): | |
| # Dictionnaire de correspondance entre les noms de langues et les codes ISO 639-1 | |
| language_to_iso = { | |
| "Afrikaans": "af", "Arabic": "ar", "Armenian": "hy", "Azerbaijani": "az", | |
| "Belarusian": "be", "Bosnian": "bs", "Bulgarian": "bg", "Catalan": "ca", | |
| "Chinese": "zh", "Croatian": "hr", "Czech": "cs", "Danish": "da", | |
| "Dutch": "nl", "English": "en", "Estonian": "et", "Finnish": "fi", | |
| "French": "fr", "Galician": "gl", "German": "de", "Greek": "el", | |
| "Hebrew": "he", "Hindi": "hi", "Hungarian": "hu", "Icelandic": "is", | |
| "Indonesian": "id", "Italian": "it", "Japanese": "ja", "Kannada": "kn", | |
| "Kazakh": "kk", "Korean": "ko", "Latvian": "lv", "Lithuanian": "lt", | |
| "Macedonian": "mk", "Malay": "ms", "Marathi": "mr", "Maori": "mi", | |
| "Nepali": "ne", "Norwegian": "no", "Persian": "fa", "Polish": "pl", | |
| "Portuguese": "pt", "Romanian": "ro", "Russian": "ru", "Serbian": "sr", | |
| "Slovak": "sk", "Slovenian": "sl", "Spanish": "es", "Swahili": "sw", | |
| "Swedish": "sv", "Tagalog": "tl", "Tamil": "ta", "Thai": "th", | |
| "Turkish": "tr", "Ukrainian": "uk", "Urdu": "ur", "Vietnamese": "vi", | |
| "Welsh": "cy" | |
| } | |
| # Vérifier si language_data est un dictionnaire | |
| if isinstance(language_data, dict): | |
| language_name = language_data.get('language') | |
| else: | |
| language_name = language_data | |
| # Retourne le code ISO 639-1 correspondant au nom de la langue | |
| return language_to_iso.get(language_name, "en") # Par défaut, retourne 'en' si la langue n'est pas trouvée | |
| # Fonction principale de l'application | |
| def main(): | |
| st.title("------- DEMORRHA -------") | |
| # Initialisation des variables d'état de la session | |
| if "language_detected" not in st.session_state: | |
| st.session_state["language_detected"] = None | |
| if "process_mode" not in st.session_state: | |
| st.session_state["process_mode"] = "translation" | |
| if "target_language" not in st.session_state: | |
| st.session_state.target_language = "en" | |
| if "selected_languages" not in st.session_state: | |
| st.session_state.selected_languages = [{"language": "English", "iso-639-1": "en"}] | |
| if "enable_tts_for_input_from_text_field" not in st.session_state: | |
| st.session_state["enable_tts_for_input_from_text_field"] = True | |
| if "enable_tts_for_input_from_audio_record" not in st.session_state: | |
| st.session_state["enable_tts_for_input_from_audio_record"] = True | |
| if "interface_language" not in st.session_state: | |
| st.session_state.interface_language = "Français" # Langue par défaut | |
| def init_process_mode(): | |
| # Configuration du mode de traduction si nécessaire | |
| if "translation" == st.session_state["process_mode"]: | |
| set_mode_translation(from_lang=st.session_state.language_detected, dest_lang=st.session_state.target_language) | |
| init_process_mode() | |
| # Initialisation de l'historique des messages avec le prompt système | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| # Vérifier si un message système existe déjà dans st.session_state.messages | |
| if not any(message["role"] == "system" for message in st.session_state.messages): | |
| st.session_state.messages.insert(0, {"role": "system", "content": SYSTEM_PROMPT}) | |
| # Interface utilisateur pour le chat textuel | |
| if user_input := st.chat_input(_("entrez_message")): | |
| # Traitement du message textuel de l'utilisateur | |
| if None == st.session_state.language_detected: | |
| st.session_state.language_detected = language_detection(input_text=user_input, temperature=0.01) | |
| audio_list = [] | |
| for cursor_selected_lang in st.session_state.selected_languages: | |
| st.session_state.target_language = cursor_selected_lang["iso-639-1"] | |
| # Initialisation du mode de traitement pour la langue cible actuelle | |
| init_process_mode() | |
| # Traitement du message de l'utilisateur pour la langue cible actuelle | |
| tts_audio, tts_duration = process_message( | |
| user_input, | |
| operation_prompt=f"{OP_PROMPT}", | |
| tts_enabled=st.session_state.enable_tts_for_input_from_text_field | |
| ) | |
| if tts_audio is not None: | |
| audio_list.append((tts_audio, tts_duration)) | |
| if audio_list: | |
| final_audio = concatenate_audio_files(audio_list) | |
| st.audio(final_audio, | |
| format="audio/mp3", | |
| autoplay=True) | |
| with st.container(border=True): | |
| # Interface utilisateur pour l'enregistrement audio | |
| st.write(_("enregistrez_message")) | |
| audio = audiorecorder(start_prompt=_("cliquez_enregistrer"), | |
| stop_prompt=_("cliquez_arreter"), | |
| pause_prompt=_("cliquez_pause"), | |
| show_visualizer=True, | |
| key="vocal_chat_input" | |
| ) | |
| # Traitement de l'entrée audio de l'utilisateur | |
| if len(audio) > 0: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio: | |
| audio.export(temp_audio.name, format="wav") | |
| transcription = transcribe_audio(temp_audio, language=st.session_state.language_detected) | |
| os.unlink(temp_audio.name) # Supprimer le fichier temporaire | |
| if None == st.session_state.language_detected: | |
| st.session_state.language_detected = language_detection(input_text=transcription, temperature=0.01) | |
| st.write(_("langue_detectee").format(st.session_state.language_detected)) | |
| st.write(_("transcription").format(transcription)) | |
| audio_list = [] | |
| for cursor_selected_lang in st.session_state.selected_languages: | |
| st.session_state.target_language = cursor_selected_lang["iso-639-1"] | |
| # Initialisation du mode de traitement pour la langue cible actuelle | |
| init_process_mode() | |
| # Traitement du message de l'utilisateur pour la langue cible actuelle | |
| tts_audio, tts_duration = process_message( | |
| transcription, | |
| operation_prompt=f"{OP_PROMPT}", | |
| tts_enabled=st.session_state.enable_tts_for_input_from_audio_record | |
| ) | |
| if tts_audio is not None: | |
| audio_list.append((tts_audio, tts_duration)) | |
| if audio_list: | |
| final_audio = concatenate_audio_files(audio_list) | |
| st.audio(final_audio, | |
| format="audio/mp3", | |
| autoplay=True) | |
| # Configuration de la barre latérale | |
| with st.sidebar: | |
| st.header(_("sidebar_titre")) | |
| st.markdown("## " + _("a_propos")) | |
| st.info(_("info_app")) | |
| # Fonction de rappel pour le changement de(s) langue(s) de destination selectionnée(s) | |
| def on_languages_change(): | |
| selected_language_names = st.session_state.language_selector | |
| st.session_state.selected_languages = [ | |
| {"language": lang, "iso-639-1": convert_language_name_to_iso6391(lang)} | |
| for lang in selected_language_names | |
| ] | |
| with st.container(border=True): | |
| st.subheader(_("langue_interface")) | |
| # Sélection de la langue de l'interface | |
| st.selectbox( | |
| label=_("choix_langue_interface"), | |
| options=list(traductions.keys()), | |
| key="interface_language", | |
| index=list(traductions.keys()).index("Français") if "interface_language" not in st.session_state else list(traductions.keys()).index(st.session_state.interface_language) | |
| ) | |
| with st.container(border=True): | |
| # Conteneur pour la sélection de la langue | |
| st.subheader(_("selection_langue")) | |
| # Sélection multiple des langues de destination | |
| st.multiselect( | |
| label=_("langues_destination"), | |
| placeholder=_("placeholder_langues"), | |
| options=SUPPORTED_LANGUAGES, | |
| default=["English"], | |
| key="language_selector", | |
| max_selections=4, | |
| on_change=on_languages_change | |
| ) | |
| with st.container(border=True): | |
| st.subheader(_("parametres_tts")) | |
| st.selectbox( | |
| _("choix_voix_tts"), | |
| options=["alloy", "echo", "fable", "onyx", "nova", "shimmer"], | |
| index=3, # "onyx" est à l'index 3 | |
| key="tts_voice" | |
| ) | |
| st.checkbox( | |
| _("activer_tts_texte"), | |
| key="enable_tts_for_input_from_text_field" | |
| ) | |
| st.checkbox( | |
| _("activer_tts_audio"), | |
| key="enable_tts_for_input_from_audio_record" | |
| ) | |
| # Point d'entrée de l'application | |
| if __name__ == "__main__": | |
| main() | |