# Standard libraries #import base64 import io #import json import os import uuid #import re import tempfile import time from datetime import datetime #from os import getenv #from typing import Any #from typing import Dict #from typing import IO from typing import List from typing import Optional from typing import Tuple #from typing import Union from typing import AnyStr #from io import BytesIO #from copy import deepcopy import hashlib # Third-party libraries import requests import streamlit as st #import streamlit.components.v1 as components #from audiorecorder import audiorecorder from openai import OpenAI from pydub import AudioSegment import warnings # Ignore DeprecationWarning warnings.filterwarnings("ignore", category=DeprecationWarning) from dotenv import load_dotenv # Charger les variables d'environnement depuis le fichier .env load_dotenv() from var_app import __version__ from var_app import LANGUAGES_EMOJI from var_app import SUPPORTED_LANGUAGES from var_app import CHAT_FILES_UPLOAD_ALLOWED_TYPES from core.core import translations from core.core import get_translation from core.converter import convert_iso6391_to_language_name from core.converter import convert_language_name_to_iso6391 from core.files import read_file from core.text_to_speech import openai_tts from core.DetectLanguage import detect_language from core.speech_to_text import huggingface_endpoints_stt from core.speech_to_text import transcribe_audio from core.audio_files import concatenate_audio_files from core.audio_files import split_audio from core.text_to_speech import process_tts_message from core.files import load_ui_language from core.core import process_message from core.core import init_process_mode from core.moderation import api_moderation_openai_text from core.audio_isolation import isolate_audio def init_langs_for_processing( target_language: Optional[AnyStr] = "en", interface_language: Optional[AnyStr] = "English", language_detected: Optional[AnyStr] = None ) -> Tuple[str, str]: # Initialisation du mode de traitement pour la langue cible actuelle system_prompt, operation_prompt = init_process_mode( from_lang = ( language_detected if "language_detected" in language_detected else convert_language_name_to_iso6391( interface_language ) ), to_lang = target_language ) return (system_prompt, operation_prompt) #def detection_langue_du_message_utilisateur( # user_input: str, # detect_lang_from_text: Optional[bool] = True, # ): # # Traitement du message texte de l'utilisateur # if (detect_lang_from_text): # language_detected = detect_language( # input_text = user_input, # temperature = 0.01, # context_window = 512, # model="gpt-4o-mini" # ) # return { # "text":f"{user_input}", # "language": language_detected.strip() # } # else: # return { # "text":f"{user_input}", # "language": "default" # } def user_message_moderation_check( user_message: Optional[str] = None ): # Appeler la fonction de modération moderation_result = api_moderation_openai_text(user_message) if moderation_result.get("flagged"): st.error("Votre message a été jugé inapproprié et ne peut pas être traité.") return # Arrêter le traitement si le message est inapproprié elif "error" in moderation_result: st.error(moderation_result["error"]) return # Gérer les erreurs de modération def hash_file(file): hasher = hashlib.md5() buf = file.read() hasher.update(buf) file.seek(0) return hasher.hexdigest() def callback_change_edited_text(key, value, modified_text): if value["type"] in ["txt"]: st.session_state.changed_uploaded_files[key]["bytes_data"] = st.session_state[modified_text].encode() elif value["type"] in ["wav", "mp3"]: st.session_state.changed_uploaded_files[key]["audio_transcription"] = st.session_state[modified_text] def save_attachment(attachment): """Sauvegarde la pièce jointe et retourne le chemin.""" # Créer un dossier pour les pièces jointes s'il n'existe pas attachments_dir = 'attachments' os.makedirs(attachments_dir, exist_ok=True) # Générer un nom de fichier unique file_extension = os.path.splitext(attachment.name)[1] filename = f"{uuid.uuid4()}{file_extension}" file_path = os.path.join(attachments_dir, filename) # Sauvegarder le fichier with open(file_path, 'wb') as f: f.write(attachment.getbuffer()) return file_path # Au début du fichier, après les imports st.set_page_config( page_title=f"DEMORRHA - (v{__version__})", page_icon="👹", layout="wide", initial_sidebar_state="collapsed" ) @st.dialog("STT Settings") def stt_settings(state__stt_voice_isolation): with st.expander(f"{get_translation('parametres_stt')}", expanded=True, icon="🎤"): set__stt_voice_isolation = st.checkbox( get_translation("isolation_voix"), value=state__stt_voice_isolation ) if st.button("Submit"): st.session_state.stt_voice_isolation = set__stt_voice_isolation st.rerun() @st.dialog("TTS Settings") def tts_settings(name__tts_voice, state__tts_with_text, state__tts_with_audio, state__autoplay_tts): with st.expander(f"{get_translation('parametres_tts')}", expanded=True, icon="🔊"): set__tts_voice = st.selectbox( get_translation("choix_voix_tts"), options=["alloy", "echo", "fable", "onyx", "nova", "shimmer"], index=list(["alloy", "echo", "fable", "onyx", "nova", "shimmer"]).index(name__tts_voice) ) set__tts_with_text = st.checkbox( get_translation("activer_tts_texte"), value=state__tts_with_text ) set__tts_with_audio = st.checkbox( get_translation("activer_tts_audio"), value=state__tts_with_audio ) set__autoplay_tts = st.checkbox( get_translation("lecture_auto_tts"), value=state__autoplay_tts ) if st.button("Submit"): st.session_state.autoplay_tts = set__autoplay_tts st.session_state.enable_tts_for_input_from_audio_record = set__tts_with_audio st.session_state.enable_tts_for_input_from_text_field = set__tts_with_text st.session_state.tts_voice = set__tts_voice st.rerun() @st.fragment def recorder_released(): if "audio_list" not in st.session_state: st.session_state.audio_list = [] if "rec_widget" in st.session_state: if st.session_state.rec_widget: audio_recorded = True else: audio_recorded = False if audio_recorded: audio = AudioSegment.from_wav(io.BytesIO(st.session_state.rec_widget.getvalue())) st.write(f"Frame rate: {audio.frame_rate}, Frame width: {audio.frame_width}, Duration: {audio.duration_seconds} seconds") if st.session_state.stt_voice_isolation: # Isoler l'audio ici audio = isolate_audio(audio) # if not st.session_state.language_detected: # # 1. Verifie si audio.duration_seconds est superieur a 600 secondes (10 minutes) # if audio.duration_seconds > 600: # # PyDub handles time in milliseconds # ten_minutes = 10 * 60 * 1000 # first_ten_minutes_audio = audio[:ten_minutes] # else: # # less than ten minutes ... nervermind, the name of this variable is # first_ten_minutes_audio = deepcopy(audio) # # with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp_extract: # first_ten_minutes_audio.export(tmp_extract, format="mp3") # tmp_extract.close() # # # il faut transcrire sans specifier l'argument language dans la fonction transcribe_audio # # ensuite on pourra utiliser la fonction detect_language pour detecter la langue du texte transcrit # # # Transcrire les 10 premiers minutes audio en texte # st.session_state.language_detected = detect_language( # input_text = transcribe_audio(tmp_extract), # temperature = 0.2, # context_window = 512, # model = "gpt-4o-mini" # ) # first_ten_minutes_audio = AudioSegment.empty() # # st.markdown( # f"- {get_translation('langue_detectee')} {convert_iso6391_to_language_name(st.session_state.language_detected)}" # ) # # # ############################################################## try: with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp_file: audio.export(tmp_file, format="mp3") tmp_file.close() # Transcrire l'audio en texte #st.session_state.transcription = transcribe_audio( # tmp_file, # language=convert_language_name_to_iso6391(st.session_state.interface_language) #) st.session_state.transcription = huggingface_endpoints_stt(tmp_file) audio = AudioSegment.empty() st.markdown( f"🌐 {get_translation('langue_interface')} {st.session_state.interface_language}({convert_language_name_to_iso6391(st.session_state.interface_language)})" ) st.markdown( f"🎤 {get_translation('transcription_audio')} {st.session_state.transcription}" ) st.session_state.audio_list = [] for cursor_selected_lang in st.session_state.selected_languages: st.session_state.target_language = cursor_selected_lang["iso-639-1"] st.session_state.full_response = "" # Initialisation du mode de traitement pour la langue cible actuelle st.session_state.system_prompt, st.session_state.operation_prompt = init_langs_for_processing( target_language = st.session_state.target_language, interface_language = st.session_state.interface_language, language_detected = st.session_state.interface_language ) with st.chat_message("assistant", avatar="👻"): message_placeholder = st.empty() st.session_state.response_generator = process_message( st.session_state.transcription, st.session_state.operation_prompt, st.session_state.system_prompt ) for response_chunk in st.session_state.response_generator: message_placeholder.markdown(response_chunk) st.session_state.end_response = st.session_state.response_generator.close() if st.session_state.full_response != "": message_placeholder.markdown(st.session_state.full_response) if st.session_state.enable_tts_for_input_from_audio_record: st.session_state.tts_audio, st.session_state.tts_duration = process_tts_message( st.session_state.full_response ) if st.session_state.tts_audio: st.session_state.audio_list.append( ( st.session_state.tts_audio, st.session_state.tts_duration ) ) else: pass if st.session_state.audio_list: st.session_state.final_audio = concatenate_audio_files(st.session_state.audio_list) with st.container(border=True): # Générer un nom de fichier unique st.session_state.timestamp = time.strftime("%Y%m%d-%H%M%S") st.session_state.langues = "_".join([lang["iso-639-1"] for lang in st.session_state.selected_languages]) st.session_state.nom_fichier = f"reponse_audio_{st.session_state.langues}_{st.session_state.timestamp}.mp3" st.audio(st.session_state.final_audio, format="audio/mpeg", autoplay=st.session_state.autoplay_tts) st.download_button( label=f"📥 {get_translation('telecharger_audio')}", data=st.session_state.final_audio, file_name=st.session_state.nom_fichier, mime="audio/mpeg", use_container_width=True, type="primary", key=f"download_button_{st.session_state.langues}_{st.session_state.timestamp}", ) except Exception as e: st.error(f"[AUDIO] - {get_translation('erreur_importation_audio')}: {str(e)}") def main_page(): """Page principale de l'application.""" if "audio_list" not in st.session_state: st.session_state.audio_list = [] if "ui_chat_input_disabled" not in st.session_state: st.session_state.ui_chat_input_disabled = False if "ui_audio_input_disabled" not in st.session_state: st.session_state.ui_audio_input_disabled = False if "ui_filesuploader_disabled" not in st.session_state: st.session_state.ui_filesuploader_disabled = False # Dictionnaire pour stocker les fichiers modifiés if 'changed_uploaded_files' not in st.session_state: st.session_state.changed_uploaded_files = {} # Dictionnaire pour stocker le contenu modifié des fichiers if 'edited_texts' not in st.session_state: st.session_state.edited_texts = {} # Liste pour stocker les fichiers audio if 'audio_files' not in st.session_state: st.session_state.audio_files = [] # Initialisation des variables d'état de session if "ui_loaded" not in st.session_state: st.session_state["ui_loaded"] = False if "language_detected" not in st.session_state: st.session_state["language_detected"] = None if "process_mode" not in st.session_state: st.session_state["process_mode"] = "translation" if "target_language" not in st.session_state: st.session_state.target_language = "en" if "selected_languages" not in st.session_state: st.session_state.selected_languages = [ {"language": "English", "iso-639-1": "en"} ] if "interface_language_select" not in st.session_state: st.session_state.interface_language_select = "English" # Langue par défaut if "stt_voice_isolation" not in st.session_state: st.session_state["stt_voice_isolation"] = False if "enable_tts_for_input_from_audio_record" not in st.session_state: st.session_state["enable_tts_for_input_from_audio_record"] = False if "autoplay_tts" not in st.session_state: st.session_state["autoplay_tts"] = False if "enable_tts_for_input_from_text_field" not in st.session_state: st.session_state["enable_tts_for_input_from_text_field"] = False if "tts_voice" not in st.session_state: st.session_state["tts_voice"] = "onyx" # Initialisation de l'historique des messages avec le prompt système if "messages" not in st.session_state: st.session_state.messages = [] def set_session_selected_languages( selected_language_names: Optional[ List[ AnyStr ] ] = ["English"] ) -> None: st.session_state.selected_languages = [ {"language": lang, "iso-639-1": convert_language_name_to_iso6391(lang)} for lang in selected_language_names ] def on_languages_change() -> None: """Fonction de rappel pour le changement de langue(s) de destination.""" selected_language_names: List[str] = st.session_state.language_selector set_session_selected_languages(selected_language_names) # Configuration de la barre latérale with st.sidebar: st.logo("img/logo_2.png", icon_image="img/logo_2.png") st.header(get_translation("sidebar_titre")) st.write(f"#### Settings") if st.button(f"Speech-To-Text"): stt_settings(state__stt_voice_isolation=st.session_state.stt_voice_isolation) if st.button(f"Text-To-Speech"): tts_settings( name__tts_voice = st.session_state.tts_voice, state__tts_with_text = st.session_state.enable_tts_for_input_from_text_field, state__tts_with_audio = st.session_state.enable_tts_for_input_from_audio_record, state__autoplay_tts = st.session_state.autoplay_tts ) with st.expander(f"{get_translation('a_propos')}", expanded=False, icon="ℹ️"): st.subheader(f"version: {__version__}") st.info(get_translation("info_app")) with st.expander(f"{get_translation('selection_langue')}", expanded=True, icon="🌐"): # Conteneur pour la sélection de langue # Sélection multiple des langues de destination st.multiselect( label=get_translation("langues_destination"), placeholder=get_translation("placeholder_langues"), options=SUPPORTED_LANGUAGES, default=["English"], key="language_selector", max_selections=4, on_change=on_languages_change, format_func=lambda lang: f"{LANGUAGES_EMOJI.get(lang, '')} {lang}" ) if st.session_state.get('show_report_form', False): # show_report_form() pass else: with st.container(border=True): chat_input_tabs1, chat_input_tabs2, chat_input_tabs3 = st.tabs(["text_input", "audio_input", "file_upload_input"]) st.session_state.ui_chat_input_disabled = False st.session_state.ui_audio_input_disabled = False st.session_state.ui_filesuploader_disabled = False with chat_input_tabs1: with st.container(border=True): # Interface utilisateur pour le chat textuel st.session_state.user_input = st.chat_input( get_translation("entrez_message"), disabled=st.session_state.ui_chat_input_disabled ) if st.session_state.user_input: # Désactiver temporairement l'interface pendant le traitement st.session_state.ui_chat_input_disabled = True st.session_state.ui_audio_input_disabled = True st.session_state.ui_filesuploader_disabled = True try: # Vérification de la modération user_message_moderation_check(st.session_state.user_input) # Détection de la langue du message language_detected = detect_language( input_text=st.session_state.user_input, temperature=0.01, context_window=512, model="gpt-4o-mini" ) # Réinitialiser l'état précédent st.session_state.full_response = "" st.session_state.audio_list = [] with st.chat_message("user", avatar="👤"): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") st.write(timestamp) with st.container(border=True): st.subheader(f"🌐 lang: {convert_iso6391_to_language_name(language_detected)}") st.markdown(st.session_state.user_input) # Traitement pour chaque langue sélectionnée for lang in st.session_state.selected_languages: st.session_state.target_language = lang["iso-639-1"] # Initialisation du mode de traitement avec la langue détectée st.session_state.system_prompt, st.session_state.operation_prompt = init_langs_for_processing( target_language=st.session_state.target_language, interface_language=st.session_state.interface_language, language_detected=language_detected ) with st.chat_message("assistant", avatar="👻"): with st.status(f"Processing response in {lang['language']}...", expanded=True) as status: message_placeholder = st.empty() st.session_state.response_generator = process_message( st.session_state.user_input, st.session_state.operation_prompt, st.session_state.system_prompt ) full_response = "" for response_chunk in st.session_state.response_generator: full_response += response_chunk message_placeholder.markdown(full_response) st.session_state.response_generator.close() st.session_state.full_response = full_response # Générer l'audio si TTS est activé if st.session_state.enable_tts_for_input_from_text_field: status.update(label=f"Generating audio in {lang['language']}...") tts_audio, tts_duration = process_tts_message( full_response ) if tts_audio: st.session_state.audio_list.append( (tts_audio, tts_duration) ) status.update(label="Done!", state="complete") # Générer l'audio final si nécessaire if st.session_state.audio_list: with st.container(border=True): st.session_state.final_audio = concatenate_audio_files(st.session_state.audio_list) # Générer un nom de fichier unique st.session_state.timestamp = time.strftime("%Y%m%d-%H%M%S") st.session_state.langues = "_".join([lang["iso-639-1"] for lang in st.session_state.selected_languages]) st.session_state.nom_fichier = f"reponse_audio_{st.session_state.langues}_{st.session_state.timestamp}.mp3" st.audio( st.session_state.final_audio, format="audio/mpeg", autoplay=st.session_state.autoplay_tts ) st.download_button( label=f"📥 {get_translation('telecharger_audio')}", data=st.session_state.final_audio, file_name=st.session_state.nom_fichier, mime="audio/mpeg", use_container_width=True, type="primary", key=f"download_button_{st.session_state.langues}_{st.session_state.timestamp}" ) except Exception as e: st.error(f"Une erreur s'est produite : {str(e)}") finally: # Réactiver l'interface st.session_state.ui_chat_input_disabled = False st.session_state.ui_audio_input_disabled = False st.session_state.ui_filesuploader_disabled = False with chat_input_tabs3: with st.container(border=True): # Interface utilisateur pour l'upload de fichiers st.session_state.uploaded_files = st.file_uploader( "Choose files to upload", accept_multiple_files=True, type=CHAT_FILES_UPLOAD_ALLOWED_TYPES, key="chat_files_upload", disabled=st.session_state.ui_filesuploader_disabled ) with chat_input_tabs2: with st.container(border=True): # Interface utilisateur pour l'enregistrement audio st.audio_input( "Record a voice message", on_change=recorder_released, key="rec_widget", disabled=st.session_state.ui_audio_input_disabled ) with st.container(border=True): if st.session_state.user_input: # Traitement du message texte user_message_moderation_check(st.session_state.user_input) # Réinitialiser l'état précédent st.session_state.full_response = "" st.session_state.audio_list = [] with st.chat_message("user", avatar="👤"): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") st.write(timestamp) with st.container(border=True): st.subheader(f"🌐 lang: {st.session_state.interface_language}") st.markdown(st.session_state.user_input) for lang in st.session_state.selected_languages: st.session_state.target_language = lang["iso-639-1"] st.session_state.system_prompt, st.session_state.operation_prompt = init_langs_for_processing( target_language=st.session_state.target_language, interface_language=st.session_state.interface_language, language_detected=st.session_state.interface_language ) with st.chat_message("assistant", avatar="👻"): message_placeholder = st.empty() st.session_state.response_generator = process_message( st.session_state.user_input, st.session_state.operation_prompt, st.session_state.system_prompt ) for response_chunk in st.session_state.response_generator: message_placeholder.markdown(response_chunk) st.session_state.response_generator.close() if st.session_state.uploaded_files: # Traitement des fichiers téléchargés for uploaded_file in st.session_state.uploaded_files: file_path = save_attachment(uploaded_file) with st.chat_message("user", avatar="👤"): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") st.write(timestamp) st.subheader(f"Fichier téléchargé: {uploaded_file.name}") if uploaded_file.type.startswith("text"): uploaded_file.seek(0) content = uploaded_file.read().decode("utf-8") with st.chat_message("user", avatar="👤"): st.markdown(content) for lang in st.session_state.selected_languages: st.session_state.target_language = lang["iso-639-1"] st.session_state.system_prompt, st.session_state.operation_prompt = init_langs_for_processing( target_language=st.session_state.target_language, interface_language=st.session_state.interface_language, language_detected=st.session_state.interface_language ) with st.chat_message("assistant", avatar="👻"): message_placeholder = st.empty() st.session_state.response_generator = process_message( content, st.session_state.operation_prompt, st.session_state.system_prompt ) for response_chunk in st.session_state.response_generator: message_placeholder.markdown(response_chunk) st.session_state.response_generator.close() else: with st.chat_message("assistant", avatar="👻"): st.markdown("Type de fichier non supporté pour le traitement. Seul le texte est supporté.") main_page()