Spaces:
Paused
Paused
| import streamlit as st | |
| from openai import OpenAI | |
| from os import getenv | |
| from audiorecorder import audiorecorder | |
| import tempfile | |
| import base64 | |
| from pydub import AudioSegment | |
| import os | |
| import io | |
| import time | |
| import re | |
| # Dictionnaire pour stocker les traductions | |
| traductions = { | |
| "Français": { | |
| "titre": "------- DEMORRHA -------", | |
| "entrez_message": "Entrez votre message ici:", | |
| "enregistrez_message": "Ou enregistrez votre message audio :", | |
| "cliquez_enregistrer": "Cliquez pour enregistrer", | |
| "cliquez_arreter": "Cliquez pour arrêter l'enregistrement", | |
| "cliquez_pause": "Cliquez pour mettre en pause", | |
| "langue_detectee": "Langue détectée : {}", | |
| "transcription": "Transcription : {}", | |
| "sidebar_titre": "DEMORRHA - v1", | |
| "a_propos": "À propos", | |
| "info_app": "\n".join([ | |
| "Cette application utilise Streamlit et l'API d'OpenAI pour créer un chat interactif avec des modèles de langages avancés dans le but de fournir un outil permettant la communication entre les êtres humains.", | |
| "Cet outil a pour objectif de montrer la voie dans un acte saint de la volonté de son auteur : ", | |
| "Abattre les barrières linguistiques entre les hommes." | |
| ]), | |
| "selection_langue": "Sélection de la langue", | |
| "langues_destination": "Choisissez les langues de destination", | |
| "placeholder_langues": "Sélectionnez une à quatre langue(s)", | |
| "parametres_tts": "Paramètres TTS", | |
| "choix_voix_tts": "Choisissez la voix TTS", | |
| "activer_tts_texte": "Activer TTS pour les entrées textuelles", | |
| "activer_tts_audio": "Activer TTS pour les entrées audio", | |
| "erreur_fichier_non_trouve": "Erreur : Le fichier '{}' n'a pas été trouvé.", | |
| "erreur_lecture_fichier": "Une erreur s'est produite lors de la lecture du fichier : {}", | |
| "langue_interface": "Langue de l'interface", | |
| "choix_langue_interface": "Choisissez la langue de l'interface", | |
| "envoyer": "Envoyer", | |
| "effacer": "Effacer", | |
| "historique_conversation": "Historique de la conversation", | |
| "effacer_historique": "Effacer l'historique", | |
| "erreur_api": "Une erreur s'est produite lors de l'appel à l'API : {}", | |
| "erreur_traitement": "Une erreur s'est produite lors du traitement de votre message : {}", | |
| "erreur_tts": "Une erreur s'est produite lors de la génération du texte en parole : {}", | |
| "chargement": "Chargement en cours...", | |
| "aucun_message": "Aucun message pour le moment. Commencez la conversation !" | |
| }, | |
| "English": { | |
| "titre": "------- DEMORRHA -------", | |
| "entrez_message": "Enter your message here:", | |
| "enregistrez_message": "Or record your audio message:", | |
| "cliquez_enregistrer": "Click to record", | |
| "cliquez_arreter": "Click to stop recording", | |
| "cliquez_pause": "Click to pause", | |
| "langue_detectee": "Detected language: {}", | |
| "transcription": "Transcription: {}", | |
| "sidebar_titre": "DEMORRHA - v1", | |
| "a_propos": "About", | |
| "info_app": "\n".join([ | |
| "This application uses Streamlit and the OpenAI API to create an interactive chat with advanced language models in order to provide a tool for communication between humans.", | |
| "This tool aims to lead the way in a holy act of its author's will:", | |
| "Breaking down language barriers between people." | |
| ]), | |
| "selection_langue": "Language selection", | |
| "langues_destination": "Choose the destination languages", | |
| "placeholder_langues": "Select one to four language(s)", | |
| "parametres_tts": "TTS settings", | |
| "choix_voix_tts": "Choose the TTS voice", | |
| "activer_tts_texte": "Enable TTS for text input", | |
| "activer_tts_audio": "Enable TTS for audio input", | |
| "erreur_fichier_non_trouve": "Error: The file '{}' was not found.", | |
| "erreur_lecture_fichier": "An error occurred while reading the file: {}", | |
| "langue_interface": "Interface language", | |
| "choix_langue_interface": "Choose the interface language", | |
| "envoyer": "Send", | |
| "effacer": "Clear", | |
| "historique_conversation": "Conversation history", | |
| "effacer_historique": "Clear history", | |
| "erreur_api": "An error occurred during the API call: {}", | |
| "erreur_traitement": "An error occurred while processing your message: {}", | |
| "erreur_tts": "An error occurred during text-to-speech generation: {}", | |
| "chargement": "Loading...", | |
| "aucun_message": "No messages yet. Start the conversation!" | |
| } | |
| } | |
| # Fonction pour obtenir les traductions en fonction de la langue sélectionnée | |
| def _(key): | |
| return traductions[st.session_state.interface_language][key] | |
| # Configuration du client OpenAI avec la clé API | |
| client = OpenAI(api_key=getenv("OPENAI_API_KEY")) | |
| # Fonction pour lire et retourner le contenu de fichiers textes | |
| def lire_fichier(nom_fichier): | |
| try: | |
| with open(nom_fichier, 'r', encoding='utf-8') as fichier: | |
| contenu = fichier.read() | |
| return contenu | |
| except FileNotFoundError: | |
| return _("erreur_fichier_non_trouve").format(nom_fichier) | |
| except Exception as e: | |
| return _("erreur_lecture_fichier").format(str(e)) | |
| # Fonction pour diviser un fichier audio en segments de 25 Mo ou moins | |
| def split_audio(audio_file, max_size_mb=25): | |
| audio = AudioSegment.from_wav(audio_file) | |
| duration_ms = len(audio) | |
| segment_duration_ms = int((max_size_mb * 1024 * 1024 * 8) / (audio.frame_rate * audio.sample_width * audio.channels)) | |
| segments = [] | |
| for start in range(0, duration_ms, segment_duration_ms): | |
| end = min(start + segment_duration_ms, duration_ms) | |
| segment = audio[start:end] | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_segment: | |
| segment.export(temp_segment.name, format="wav") | |
| segments.append(temp_segment.name) | |
| return segments | |
| # Fonction modifiée pour transcrire l'audio en texte | |
| def transcribe_audio(audio_file, language=None): | |
| max_size_mb = 25 | |
| file_size_mb = os.path.getsize(audio_file.name) / (1024 * 1024) | |
| if file_size_mb > max_size_mb: | |
| segments = split_audio(audio_file.name, max_size_mb) | |
| full_transcript = "" | |
| for segment in segments: | |
| with open(segment, "rb") as audio_segment: | |
| transcript = client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_segment, | |
| language=language | |
| ) | |
| full_transcript += transcript.text + " " | |
| os.unlink(segment) # Supprimer le fichier temporaire | |
| return full_transcript.strip() | |
| else: | |
| with open(audio_file.name, "rb") as audio_file: | |
| transcript = client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_file, | |
| language=language | |
| ) | |
| return transcript.text | |
| # Fonction pour détecter la langue d'un texte donné | |
| def language_detection(input_text, temperature=0.01): | |
| system_prompt = "".join([ | |
| "Je souhaite que vous agissiez en tant que fonction linguistique.", | |
| "Je m'exprimerai dans n'importe quelle langue, et vous en détecterez la langue.", | |
| "Vous fournirez le résultat de votre détection au format ISO-639-1.", | |
| "Votre réponse doit représenter l'argument `language` et contenir seulement sa valeur de type chaîne de caractères.", | |
| "La langue de l'audio d'entrée. Fournir la langue d'entrée au format ISO-639-1 améliorera la précision et la latence." | |
| ]) | |
| response = client.chat.completions.create( | |
| model="gpt-4o", | |
| temperature=temperature, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": system_prompt | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"{input_text}" | |
| } | |
| ] | |
| ) | |
| return response.choices[0].message.content | |
| def get_duration_pydub(audio_file): | |
| audio = AudioSegment.from_file(audio_file) | |
| return audio.duration_seconds | |
| # Fonction pour convertir du texte en parole | |
| def text_to_speech(text): | |
| response = client.audio.speech.create( | |
| model="tts-1", | |
| voice=st.session_state.tts_voice, | |
| input=text | |
| ) | |
| # Sauvegarder l'audio dans un fichier temporaire | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio: | |
| response.stream_to_file(temp_audio.name) | |
| # Lire le contenu du fichier audio | |
| with open(temp_audio.name, "rb") as audio_file: | |
| audio_bytes = audio_file.read() | |
| # Lire la durée de l'audio en secondes | |
| audio_duration = get_duration_pydub(temp_audio.name) | |
| return audio_bytes, audio_duration | |
| def concatenate_audio_files(audio_list): | |
| # Créer un segment audio vide | |
| final_audio = AudioSegment.empty() | |
| # Charger les effets sonores | |
| begin_sound = AudioSegment.from_mp3("sound-effects/voice-message-play-begin/voice-message-play-begin-1.mp3") | |
| end_sound = AudioSegment.from_mp3("sound-effects/voice-message-play-ending/voice-message-play-ending-1.mp3") | |
| # Silence de 5 secondes | |
| silence = AudioSegment.silent(duration=5000) # 5000 ms = 5 secondes | |
| for audio_bytes, _ in audio_list: | |
| # Convertir les bytes en un segment audio | |
| segment = AudioSegment.from_mp3(io.BytesIO(audio_bytes)) | |
| # Ajouter le son de début, le segment TTS, le son de fin, et le silence au final_audio | |
| final_audio += begin_sound + segment + end_sound + silence | |
| # Convertir le segment audio final en bytes | |
| buffer = io.BytesIO() | |
| final_audio.export(buffer, format="mp3") | |
| return buffer.getvalue() | |
| # Fonction pour traiter les messages de l'utilisateur et générer une réponse | |
| def process_message(message, | |
| operation_prompt="", | |
| tts_enabled=False): | |
| payload_content = f'{operation_prompt} :\n\"\"\"\n{message}\n\"\"\"' | |
| st.session_state.messages.append({"role": "user", "content": payload_content}) | |
| with st.chat_message("user"): | |
| st.markdown(message) | |
| with st.chat_message("assistant"): | |
| message_placeholder = st.empty() | |
| full_response = "" | |
| for response in client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=st.session_state.messages, | |
| stream=True, | |
| temperature=0.1, | |
| ): | |
| full_response += (response.choices[0].delta.content or "") | |
| message_placeholder.markdown(full_response + "▌") | |
| # Utiliser un regex pour retirer les trois premières et dernières doubles quotes | |
| full_response = re.sub(r'^"{3}|"{3}$', '', full_response.strip()) | |
| message_placeholder.markdown(full_response) | |
| st.session_state.messages.append({"role": "assistant", "content": full_response}) | |
| if tts_enabled: | |
| tts_audio, tts_duration = text_to_speech(full_response) | |
| return tts_audio, tts_duration | |
| return None, None | |
| #st.audio(tts_audio, format="audio/mp3", autoplay=True) | |
| # Classe pour stocker les prompts système globaux | |
| class GlobalSystemPrompts: | |
| # Méthode pour récupérer le prompt système pour la fonctionnalité Linguascribe | |
| def linguascribe(): | |
| SYSTEM_PROMPT = f"{lire_fichier('linguascribe.prompt')}" | |
| return SYSTEM_PROMPT | |
| # Variables globales pour les prompts | |
| SYSTEM_PROMPT="" | |
| OP_PROMPT="" | |
| # Fonction pour configurer le mode de traduction | |
| def set_mode_translation(from_lang, dest_lang): | |
| global SYSTEM_PROMPT | |
| global OP_PROMPT | |
| SYSTEM_PROMPT=GlobalSystemPrompts.linguascribe() | |
| OP_PROMPT = f"Translate({from_lang} to {dest_lang})" | |
| # Liste des langues supportées par l'application | |
| SUPPORTED_LANGUAGES=["Afrikaans", "Arabic", "Armenian", "Azerbaijani", "Belarusian", "Bosnian", "Bulgarian", "Catalan", "Chinese", "Croatian", "Czech", "Danish", "Dutch", "English", "Estonian", "Finnish", "French", "Galician", "German", "Greek", "Hebrew", "Hindi", "Hungarian", "Icelandic", "Indonesian", "Italian", "Japanese", "Kannada", "Kazakh", "Korean", "Latvian", "Lithuanian", "Macedonian", "Malay", "Marathi", "Maori", "Nepali", "Norwegian", "Persian", "Polish", "Portuguese", "Romanian", "Russian", "Serbian", "Slovak", "Slovenian", "Spanish", "Swahili", "Swedish", "Tagalog", "Tamil", "Thai", "Turkish", "Ukrainian", "Urdu", "Vietnamese", "Welsh"] | |
| # Fonction pour convertir le nom d'une langue en code ISO 639-1 | |
| def convert_language_name_to_iso6391(language_data): | |
| # Dictionnaire de correspondance entre les noms de langues et les codes ISO 639-1 | |
| language_to_iso = { | |
| "Afrikaans": "af", "Arabic": "ar", "Armenian": "hy", "Azerbaijani": "az", | |
| "Belarusian": "be", "Bosnian": "bs", "Bulgarian": "bg", "Catalan": "ca", | |
| "Chinese": "zh", "Croatian": "hr", "Czech": "cs", "Danish": "da", | |
| "Dutch": "nl", "English": "en", "Estonian": "et", "Finnish": "fi", | |
| "French": "fr", "Galician": "gl", "German": "de", "Greek": "el", | |
| "Hebrew": "he", "Hindi": "hi", "Hungarian": "hu", "Icelandic": "is", | |
| "Indonesian": "id", "Italian": "it", "Japanese": "ja", "Kannada": "kn", | |
| "Kazakh": "kk", "Korean": "ko", "Latvian": "lv", "Lithuanian": "lt", | |
| "Macedonian": "mk", "Malay": "ms", "Marathi": "mr", "Maori": "mi", | |
| "Nepali": "ne", "Norwegian": "no", "Persian": "fa", "Polish": "pl", | |
| "Portuguese": "pt", "Romanian": "ro", "Russian": "ru", "Serbian": "sr", | |
| "Slovak": "sk", "Slovenian": "sl", "Spanish": "es", "Swahili": "sw", | |
| "Swedish": "sv", "Tagalog": "tl", "Tamil": "ta", "Thai": "th", | |
| "Turkish": "tr", "Ukrainian": "uk", "Urdu": "ur", "Vietnamese": "vi", | |
| "Welsh": "cy" | |
| } | |
| # Vérifier si language_data est un dictionnaire | |
| if isinstance(language_data, dict): | |
| language_name = language_data.get('language') | |
| else: | |
| language_name = language_data | |
| # Retourne le code ISO 639-1 correspondant au nom de la langue | |
| return language_to_iso.get(language_name, "en") # Par défaut, retourne 'en' si la langue n'est pas trouvée | |
| # Fonction principale de l'application | |
| def main(): | |
| st.title("------- DEMORRHA -------") | |
| # Initialisation des variables d'état de la session | |
| if "language_detected" not in st.session_state: | |
| st.session_state["language_detected"] = None | |
| if "process_mode" not in st.session_state: | |
| st.session_state["process_mode"] = "translation" | |
| if "target_language" not in st.session_state: | |
| st.session_state.target_language = "en" | |
| if "selected_languages" not in st.session_state: | |
| st.session_state.selected_languages = [{"language": "English", "iso-639-1": "en"}] | |
| if "enable_tts_for_input_from_text_field" not in st.session_state: | |
| st.session_state["enable_tts_for_input_from_text_field"] = True | |
| if "enable_tts_for_input_from_audio_record" not in st.session_state: | |
| st.session_state["enable_tts_for_input_from_audio_record"] = True | |
| if "interface_language" not in st.session_state: | |
| st.session_state.interface_language = "Français" # Langue par défaut | |
| def init_process_mode(): | |
| # Configuration du mode de traduction si nécessaire | |
| if "translation" == st.session_state["process_mode"]: | |
| set_mode_translation(from_lang=st.session_state.language_detected, dest_lang=st.session_state.target_language) | |
| init_process_mode() | |
| # Initialisation de l'historique des messages avec le prompt système | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| # Vérifier si un message système existe déjà dans st.session_state.messages | |
| if not any(message["role"] == "system" for message in st.session_state.messages): | |
| st.session_state.messages.insert(0, {"role": "system", "content": SYSTEM_PROMPT}) | |
| # Interface utilisateur pour le chat textuel | |
| if user_input := st.chat_input(_("entrez_message")): | |
| # Traitement du message textuel de l'utilisateur | |
| if None == st.session_state.language_detected: | |
| st.session_state.language_detected = language_detection(input_text=user_input, temperature=0.01) | |
| audio_list = [] | |
| for cursor_selected_lang in st.session_state.selected_languages: | |
| st.session_state.target_language = cursor_selected_lang["iso-639-1"] | |
| # Initialisation du mode de traitement pour la langue cible actuelle | |
| init_process_mode() | |
| # Traitement du message de l'utilisateur pour la langue cible actuelle | |
| tts_audio, tts_duration = process_message( | |
| user_input, | |
| operation_prompt=f"{OP_PROMPT}", | |
| tts_enabled=st.session_state.enable_tts_for_input_from_text_field | |
| ) | |
| if tts_audio is not None: | |
| audio_list.append((tts_audio, tts_duration)) | |
| if audio_list: | |
| final_audio = concatenate_audio_files(audio_list) | |
| st.audio(final_audio, | |
| format="audio/mp3", | |
| autoplay=True) | |
| with st.container(border=True): | |
| # Interface utilisateur pour l'enregistrement audio | |
| st.write(_("enregistrez_message")) | |
| audio = audiorecorder(start_prompt=_("cliquez_enregistrer"), | |
| stop_prompt=_("cliquez_arreter"), | |
| pause_prompt=_("cliquez_pause"), | |
| show_visualizer=True, | |
| key="vocal_chat_input" | |
| ) | |
| # Traitement de l'entrée audio de l'utilisateur | |
| if len(audio) > 0: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio: | |
| audio.export(temp_audio.name, format="wav") | |
| transcription = transcribe_audio(temp_audio, language=st.session_state.language_detected) | |
| os.unlink(temp_audio.name) # Supprimer le fichier temporaire | |
| if None == st.session_state.language_detected: | |
| st.session_state.language_detected = language_detection(input_text=transcription, temperature=0.01) | |
| st.write(_("langue_detectee").format(st.session_state.language_detected)) | |
| st.write(_("transcription").format(transcription)) | |
| audio_list = [] | |
| for cursor_selected_lang in st.session_state.selected_languages: | |
| st.session_state.target_language = cursor_selected_lang["iso-639-1"] | |
| # Initialisation du mode de traitement pour la langue cible actuelle | |
| init_process_mode() | |
| # Traitement du message de l'utilisateur pour la langue cible actuelle | |
| tts_audio, tts_duration = process_message( | |
| transcription, | |
| operation_prompt=f"{OP_PROMPT}", | |
| tts_enabled=st.session_state.enable_tts_for_input_from_audio_record | |
| ) | |
| if tts_audio is not None: | |
| audio_list.append((tts_audio, tts_duration)) | |
| if audio_list: | |
| final_audio = concatenate_audio_files(audio_list) | |
| st.audio(final_audio, | |
| format="audio/mp3", | |
| autoplay=True) | |
| # Configuration de la barre latérale | |
| with st.sidebar: | |
| st.header(_("sidebar_titre")) | |
| st.markdown("## " + _("a_propos")) | |
| st.info(_("info_app")) | |
| # Fonction de rappel pour le changement de(s) langue(s) de destination selectionnée(s) | |
| def on_languages_change(): | |
| selected_language_names = st.session_state.language_selector | |
| st.session_state.selected_languages = [ | |
| {"language": lang, "iso-639-1": convert_language_name_to_iso6391(lang)} | |
| for lang in selected_language_names | |
| ] | |
| with st.container(border=True): | |
| st.subheader(_("langue_interface")) | |
| # Sélection de la langue de l'interface | |
| st.selectbox( | |
| label=_("choix_langue_interface"), | |
| options=list(traductions.keys()), | |
| key="interface_language", | |
| index=list(traductions.keys()).index("Français") if "interface_language" not in st.session_state else list(traductions.keys()).index(st.session_state.interface_language) | |
| ) | |
| with st.container(border=True): | |
| # Conteneur pour la sélection de la langue | |
| st.subheader(_("selection_langue")) | |
| # Sélection multiple des langues de destination | |
| st.multiselect( | |
| label=_("langues_destination"), | |
| placeholder=_("placeholder_langues"), | |
| options=SUPPORTED_LANGUAGES, | |
| default=["English"], | |
| key="language_selector", | |
| max_selections=4, | |
| on_change=on_languages_change | |
| ) | |
| with st.container(border=True): | |
| st.subheader(_("parametres_tts")) | |
| st.selectbox( | |
| _("choix_voix_tts"), | |
| options=["alloy", "echo", "fable", "onyx", "nova", "shimmer"], | |
| index=3, # "onyx" est à l'index 3 | |
| key="tts_voice" | |
| ) | |
| st.checkbox( | |
| _("activer_tts_texte"), | |
| key="enable_tts_for_input_from_text_field" | |
| ) | |
| st.checkbox( | |
| _("activer_tts_audio"), | |
| key="enable_tts_for_input_from_audio_record" | |
| ) | |
| # Point d'entrée de l'application | |
| if __name__ == "__main__": | |
| main() | |