demorrha / pages /main.py
0x07CB
fix: Suppression du paramètre voice inutile dans les appels à process_tts_message
10ca27f unverified
# Standard libraries
#import base64
import io
#import json
import os
import uuid
#import re
import tempfile
import time
from datetime import datetime
#from os import getenv
#from typing import Any
#from typing import Dict
#from typing import IO
from typing import List
from typing import Optional
from typing import Tuple
#from typing import Union
from typing import AnyStr
#from io import BytesIO
#from copy import deepcopy
import hashlib
# Third-party libraries
import requests
import streamlit as st
#import streamlit.components.v1 as components
#from audiorecorder import audiorecorder
from openai import OpenAI
from pydub import AudioSegment
import warnings
# Ignore DeprecationWarning
warnings.filterwarnings("ignore", category=DeprecationWarning)
from dotenv import load_dotenv
# Charger les variables d'environnement depuis le fichier .env
load_dotenv()
from var_app import __version__
from var_app import LANGUAGES_EMOJI
from var_app import SUPPORTED_LANGUAGES
from var_app import CHAT_FILES_UPLOAD_ALLOWED_TYPES
from core.core import translations
from core.core import get_translation
from core.converter import convert_iso6391_to_language_name
from core.converter import convert_language_name_to_iso6391
from core.files import read_file
from core.text_to_speech import openai_tts
from core.DetectLanguage import detect_language
from core.speech_to_text import huggingface_endpoints_stt
from core.speech_to_text import transcribe_audio
from core.audio_files import concatenate_audio_files
from core.audio_files import split_audio
from core.text_to_speech import process_tts_message
from core.files import load_ui_language
from core.core import process_message
from core.core import init_process_mode
from core.moderation import api_moderation_openai_text
from core.audio_isolation import isolate_audio
def init_langs_for_processing(
target_language: Optional[AnyStr] = "en",
interface_language: Optional[AnyStr] = "English",
language_detected: Optional[AnyStr] = None
) -> Tuple[str, str]:
# Initialisation du mode de traitement pour la langue cible actuelle
system_prompt, operation_prompt = init_process_mode(
from_lang = (
language_detected if "language_detected" in language_detected else convert_language_name_to_iso6391(
interface_language
)
),
to_lang = target_language
)
return (system_prompt, operation_prompt)
#def detection_langue_du_message_utilisateur(
# user_input: str,
# detect_lang_from_text: Optional[bool] = True,
# ):
# # Traitement du message texte de l'utilisateur
# if (detect_lang_from_text):
# language_detected = detect_language(
# input_text = user_input,
# temperature = 0.01,
# context_window = 512,
# model="gpt-4o-mini"
# )
# return {
# "text":f"{user_input}",
# "language": language_detected.strip()
# }
# else:
# return {
# "text":f"{user_input}",
# "language": "default"
# }
def user_message_moderation_check(
user_message: Optional[str] = None ):
# Appeler la fonction de modération
moderation_result = api_moderation_openai_text(user_message)
if moderation_result.get("flagged"):
st.error("Votre message a été jugé inapproprié et ne peut pas être traité.")
return # Arrêter le traitement si le message est inapproprié
elif "error" in moderation_result:
st.error(moderation_result["error"])
return # Gérer les erreurs de modération
def hash_file(file):
hasher = hashlib.md5()
buf = file.read()
hasher.update(buf)
file.seek(0)
return hasher.hexdigest()
def callback_change_edited_text(key, value, modified_text):
if value["type"] in ["txt"]:
st.session_state.changed_uploaded_files[key]["bytes_data"] = st.session_state[modified_text].encode()
elif value["type"] in ["wav", "mp3"]:
st.session_state.changed_uploaded_files[key]["audio_transcription"] = st.session_state[modified_text]
def save_attachment(attachment):
"""Sauvegarde la pièce jointe et retourne le chemin."""
# Créer un dossier pour les pièces jointes s'il n'existe pas
attachments_dir = 'attachments'
os.makedirs(attachments_dir, exist_ok=True)
# Générer un nom de fichier unique
file_extension = os.path.splitext(attachment.name)[1]
filename = f"{uuid.uuid4()}{file_extension}"
file_path = os.path.join(attachments_dir, filename)
# Sauvegarder le fichier
with open(file_path, 'wb') as f:
f.write(attachment.getbuffer())
return file_path
# Au début du fichier, après les imports
st.set_page_config(
page_title=f"DEMORRHA - (v{__version__})",
page_icon="👹",
layout="wide",
initial_sidebar_state="collapsed"
)
@st.dialog("STT Settings")
def stt_settings(state__stt_voice_isolation):
with st.expander(f"{get_translation('parametres_stt')}",
expanded=True,
icon="🎤"):
set__stt_voice_isolation = st.checkbox(
get_translation("isolation_voix"),
value=state__stt_voice_isolation
)
if st.button("Submit"):
st.session_state.stt_voice_isolation = set__stt_voice_isolation
st.rerun()
@st.dialog("TTS Settings")
def tts_settings(name__tts_voice,
state__tts_with_text,
state__tts_with_audio,
state__autoplay_tts):
with st.expander(f"{get_translation('parametres_tts')}",
expanded=True,
icon="🔊"):
set__tts_voice = st.selectbox(
get_translation("choix_voix_tts"),
options=["alloy", "echo", "fable", "onyx", "nova", "shimmer"],
index=list(["alloy", "echo", "fable", "onyx", "nova", "shimmer"]).index(name__tts_voice)
)
set__tts_with_text = st.checkbox(
get_translation("activer_tts_texte"),
value=state__tts_with_text
)
set__tts_with_audio = st.checkbox(
get_translation("activer_tts_audio"),
value=state__tts_with_audio
)
set__autoplay_tts = st.checkbox(
get_translation("lecture_auto_tts"),
value=state__autoplay_tts
)
if st.button("Submit"):
st.session_state.autoplay_tts = set__autoplay_tts
st.session_state.enable_tts_for_input_from_audio_record = set__tts_with_audio
st.session_state.enable_tts_for_input_from_text_field = set__tts_with_text
st.session_state.tts_voice = set__tts_voice
st.rerun()
@st.fragment
def recorder_released():
if "audio_list" not in st.session_state:
st.session_state.audio_list = []
if "rec_widget" in st.session_state:
if st.session_state.rec_widget:
audio_recorded = True
else:
audio_recorded = False
if audio_recorded:
audio = AudioSegment.from_wav(io.BytesIO(st.session_state.rec_widget.getvalue()))
st.write(f"Frame rate: {audio.frame_rate}, Frame width: {audio.frame_width}, Duration: {audio.duration_seconds} seconds")
if st.session_state.stt_voice_isolation:
# Isoler l'audio ici
audio = isolate_audio(audio)
# if not st.session_state.language_detected:
# # 1. Verifie si audio.duration_seconds est superieur a 600 secondes (10 minutes)
# if audio.duration_seconds > 600:
# # PyDub handles time in milliseconds
# ten_minutes = 10 * 60 * 1000
# first_ten_minutes_audio = audio[:ten_minutes]
# else:
# # less than ten minutes ... nervermind, the name of this variable is
# first_ten_minutes_audio = deepcopy(audio)
#
# with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp_extract:
# first_ten_minutes_audio.export(tmp_extract, format="mp3")
# tmp_extract.close()
#
# # il faut transcrire sans specifier l'argument language dans la fonction transcribe_audio
# # ensuite on pourra utiliser la fonction detect_language pour detecter la langue du texte transcrit
#
# # Transcrire les 10 premiers minutes audio en texte
# st.session_state.language_detected = detect_language(
# input_text = transcribe_audio(tmp_extract),
# temperature = 0.2,
# context_window = 512,
# model = "gpt-4o-mini"
# )
# first_ten_minutes_audio = AudioSegment.empty()
#
# st.markdown(
# f"- {get_translation('langue_detectee')} {convert_iso6391_to_language_name(st.session_state.language_detected)}"
# )
#
#
# ##############################################################
try:
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp_file:
audio.export(tmp_file, format="mp3")
tmp_file.close()
# Transcrire l'audio en texte
#st.session_state.transcription = transcribe_audio(
# tmp_file,
# language=convert_language_name_to_iso6391(st.session_state.interface_language)
#)
st.session_state.transcription = huggingface_endpoints_stt(tmp_file)
audio = AudioSegment.empty()
st.markdown(
f"🌐 {get_translation('langue_interface')} {st.session_state.interface_language}({convert_language_name_to_iso6391(st.session_state.interface_language)})"
)
st.markdown(
f"🎤 {get_translation('transcription_audio')} {st.session_state.transcription}"
)
st.session_state.audio_list = []
for cursor_selected_lang in st.session_state.selected_languages:
st.session_state.target_language = cursor_selected_lang["iso-639-1"]
st.session_state.full_response = ""
# Initialisation du mode de traitement pour la langue cible actuelle
st.session_state.system_prompt, st.session_state.operation_prompt = init_langs_for_processing(
target_language = st.session_state.target_language,
interface_language = st.session_state.interface_language,
language_detected = st.session_state.interface_language
)
with st.chat_message("assistant", avatar="👻"):
message_placeholder = st.empty()
st.session_state.response_generator = process_message(
st.session_state.transcription,
st.session_state.operation_prompt,
st.session_state.system_prompt
)
for response_chunk in st.session_state.response_generator:
message_placeholder.markdown(response_chunk)
st.session_state.end_response = st.session_state.response_generator.close()
if st.session_state.full_response != "":
message_placeholder.markdown(st.session_state.full_response)
if st.session_state.enable_tts_for_input_from_audio_record:
st.session_state.tts_audio, st.session_state.tts_duration = process_tts_message(
st.session_state.full_response
)
if st.session_state.tts_audio:
st.session_state.audio_list.append(
( st.session_state.tts_audio,
st.session_state.tts_duration )
)
else:
pass
if st.session_state.audio_list:
st.session_state.final_audio = concatenate_audio_files(st.session_state.audio_list)
with st.container(border=True):
# Générer un nom de fichier unique
st.session_state.timestamp = time.strftime("%Y%m%d-%H%M%S")
st.session_state.langues = "_".join([lang["iso-639-1"] for lang in st.session_state.selected_languages])
st.session_state.nom_fichier = f"reponse_audio_{st.session_state.langues}_{st.session_state.timestamp}.mp3"
st.audio(st.session_state.final_audio,
format="audio/mpeg",
autoplay=st.session_state.autoplay_tts)
st.download_button(
label=f"📥 {get_translation('telecharger_audio')}",
data=st.session_state.final_audio,
file_name=st.session_state.nom_fichier,
mime="audio/mpeg",
use_container_width=True,
type="primary",
key=f"download_button_{st.session_state.langues}_{st.session_state.timestamp}",
)
except Exception as e:
st.error(f"[AUDIO] - {get_translation('erreur_importation_audio')}: {str(e)}")
def main_page():
"""Page principale de l'application."""
if "audio_list" not in st.session_state:
st.session_state.audio_list = []
if "ui_chat_input_disabled" not in st.session_state:
st.session_state.ui_chat_input_disabled = False
if "ui_audio_input_disabled" not in st.session_state:
st.session_state.ui_audio_input_disabled = False
if "ui_filesuploader_disabled" not in st.session_state:
st.session_state.ui_filesuploader_disabled = False
# Dictionnaire pour stocker les fichiers modifiés
if 'changed_uploaded_files' not in st.session_state:
st.session_state.changed_uploaded_files = {}
# Dictionnaire pour stocker le contenu modifié des fichiers
if 'edited_texts' not in st.session_state:
st.session_state.edited_texts = {}
# Liste pour stocker les fichiers audio
if 'audio_files' not in st.session_state:
st.session_state.audio_files = []
# Initialisation des variables d'état de session
if "ui_loaded" not in st.session_state:
st.session_state["ui_loaded"] = False
if "language_detected" not in st.session_state:
st.session_state["language_detected"] = None
if "process_mode" not in st.session_state:
st.session_state["process_mode"] = "translation"
if "target_language" not in st.session_state:
st.session_state.target_language = "en"
if "selected_languages" not in st.session_state:
st.session_state.selected_languages = [
{"language": "English", "iso-639-1": "en"}
]
if "interface_language_select" not in st.session_state:
st.session_state.interface_language_select = "English" # Langue par défaut
if "stt_voice_isolation" not in st.session_state:
st.session_state["stt_voice_isolation"] = False
if "enable_tts_for_input_from_audio_record" not in st.session_state:
st.session_state["enable_tts_for_input_from_audio_record"] = False
if "autoplay_tts" not in st.session_state:
st.session_state["autoplay_tts"] = False
if "enable_tts_for_input_from_text_field" not in st.session_state:
st.session_state["enable_tts_for_input_from_text_field"] = False
if "tts_voice" not in st.session_state:
st.session_state["tts_voice"] = "onyx"
# Initialisation de l'historique des messages avec le prompt système
if "messages" not in st.session_state:
st.session_state.messages = []
def set_session_selected_languages(
selected_language_names: Optional[ List[ AnyStr ] ] = ["English"]
) -> None:
st.session_state.selected_languages = [
{"language": lang, "iso-639-1": convert_language_name_to_iso6391(lang)}
for lang in selected_language_names
]
def on_languages_change() -> None:
"""Fonction de rappel pour le changement de langue(s) de destination."""
selected_language_names: List[str] = st.session_state.language_selector
set_session_selected_languages(selected_language_names)
# Configuration de la barre latérale
with st.sidebar:
st.logo("img/logo_2.png", icon_image="img/logo_2.png")
st.header(get_translation("sidebar_titre"))
st.write(f"#### Settings")
if st.button(f"Speech-To-Text"):
stt_settings(state__stt_voice_isolation=st.session_state.stt_voice_isolation)
if st.button(f"Text-To-Speech"):
tts_settings(
name__tts_voice = st.session_state.tts_voice,
state__tts_with_text = st.session_state.enable_tts_for_input_from_text_field,
state__tts_with_audio = st.session_state.enable_tts_for_input_from_audio_record,
state__autoplay_tts = st.session_state.autoplay_tts
)
with st.expander(f"{get_translation('a_propos')}",
expanded=False,
icon="ℹ️"):
st.subheader(f"version: {__version__}")
st.info(get_translation("info_app"))
with st.expander(f"{get_translation('selection_langue')}",
expanded=True,
icon="🌐"):
# Conteneur pour la sélection de langue
# Sélection multiple des langues de destination
st.multiselect(
label=get_translation("langues_destination"),
placeholder=get_translation("placeholder_langues"),
options=SUPPORTED_LANGUAGES,
default=["English"],
key="language_selector",
max_selections=4,
on_change=on_languages_change,
format_func=lambda lang: f"{LANGUAGES_EMOJI.get(lang, '')} {lang}"
)
if st.session_state.get('show_report_form', False):
# show_report_form()
pass
else:
with st.container(border=True):
chat_input_tabs1, chat_input_tabs2, chat_input_tabs3 = st.tabs(["text_input", "audio_input", "file_upload_input"])
st.session_state.ui_chat_input_disabled = False
st.session_state.ui_audio_input_disabled = False
st.session_state.ui_filesuploader_disabled = False
with chat_input_tabs1:
with st.container(border=True):
# Interface utilisateur pour le chat textuel
st.session_state.user_input = st.chat_input(
get_translation("entrez_message"),
disabled=st.session_state.ui_chat_input_disabled
)
if st.session_state.user_input:
# Désactiver temporairement l'interface pendant le traitement
st.session_state.ui_chat_input_disabled = True
st.session_state.ui_audio_input_disabled = True
st.session_state.ui_filesuploader_disabled = True
try:
# Vérification de la modération
user_message_moderation_check(st.session_state.user_input)
# Détection de la langue du message
language_detected = detect_language(
input_text=st.session_state.user_input,
temperature=0.01,
context_window=512,
model="gpt-4o-mini"
)
# Réinitialiser l'état précédent
st.session_state.full_response = ""
st.session_state.audio_list = []
with st.chat_message("user", avatar="👤"):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
st.write(timestamp)
with st.container(border=True):
st.subheader(f"🌐 lang: {convert_iso6391_to_language_name(language_detected)}")
st.markdown(st.session_state.user_input)
# Traitement pour chaque langue sélectionnée
for lang in st.session_state.selected_languages:
st.session_state.target_language = lang["iso-639-1"]
# Initialisation du mode de traitement avec la langue détectée
st.session_state.system_prompt, st.session_state.operation_prompt = init_langs_for_processing(
target_language=st.session_state.target_language,
interface_language=st.session_state.interface_language,
language_detected=language_detected
)
with st.chat_message("assistant", avatar="👻"):
with st.status(f"Processing response in {lang['language']}...", expanded=True) as status:
message_placeholder = st.empty()
st.session_state.response_generator = process_message(
st.session_state.user_input,
st.session_state.operation_prompt,
st.session_state.system_prompt
)
full_response = ""
for response_chunk in st.session_state.response_generator:
full_response += response_chunk
message_placeholder.markdown(full_response)
st.session_state.response_generator.close()
st.session_state.full_response = full_response
# Générer l'audio si TTS est activé
if st.session_state.enable_tts_for_input_from_text_field:
status.update(label=f"Generating audio in {lang['language']}...")
tts_audio, tts_duration = process_tts_message(
full_response
)
if tts_audio:
st.session_state.audio_list.append(
(tts_audio, tts_duration)
)
status.update(label="Done!", state="complete")
# Générer l'audio final si nécessaire
if st.session_state.audio_list:
with st.container(border=True):
st.session_state.final_audio = concatenate_audio_files(st.session_state.audio_list)
# Générer un nom de fichier unique
st.session_state.timestamp = time.strftime("%Y%m%d-%H%M%S")
st.session_state.langues = "_".join([lang["iso-639-1"] for lang in st.session_state.selected_languages])
st.session_state.nom_fichier = f"reponse_audio_{st.session_state.langues}_{st.session_state.timestamp}.mp3"
st.audio(
st.session_state.final_audio,
format="audio/mpeg",
autoplay=st.session_state.autoplay_tts
)
st.download_button(
label=f"📥 {get_translation('telecharger_audio')}",
data=st.session_state.final_audio,
file_name=st.session_state.nom_fichier,
mime="audio/mpeg",
use_container_width=True,
type="primary",
key=f"download_button_{st.session_state.langues}_{st.session_state.timestamp}"
)
except Exception as e:
st.error(f"Une erreur s'est produite : {str(e)}")
finally:
# Réactiver l'interface
st.session_state.ui_chat_input_disabled = False
st.session_state.ui_audio_input_disabled = False
st.session_state.ui_filesuploader_disabled = False
with chat_input_tabs3:
with st.container(border=True):
# Interface utilisateur pour l'upload de fichiers
st.session_state.uploaded_files = st.file_uploader(
"Choose files to upload",
accept_multiple_files=True,
type=CHAT_FILES_UPLOAD_ALLOWED_TYPES,
key="chat_files_upload",
disabled=st.session_state.ui_filesuploader_disabled
)
with chat_input_tabs2:
with st.container(border=True):
# Interface utilisateur pour l'enregistrement audio
st.audio_input(
"Record a voice message",
on_change=recorder_released,
key="rec_widget",
disabled=st.session_state.ui_audio_input_disabled
)
with st.container(border=True):
if st.session_state.user_input:
# Traitement du message texte
user_message_moderation_check(st.session_state.user_input)
# Réinitialiser l'état précédent
st.session_state.full_response = ""
st.session_state.audio_list = []
with st.chat_message("user", avatar="👤"):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
st.write(timestamp)
with st.container(border=True):
st.subheader(f"🌐 lang: {st.session_state.interface_language}")
st.markdown(st.session_state.user_input)
for lang in st.session_state.selected_languages:
st.session_state.target_language = lang["iso-639-1"]
st.session_state.system_prompt, st.session_state.operation_prompt = init_langs_for_processing(
target_language=st.session_state.target_language,
interface_language=st.session_state.interface_language,
language_detected=st.session_state.interface_language
)
with st.chat_message("assistant", avatar="👻"):
message_placeholder = st.empty()
st.session_state.response_generator = process_message(
st.session_state.user_input,
st.session_state.operation_prompt,
st.session_state.system_prompt
)
for response_chunk in st.session_state.response_generator:
message_placeholder.markdown(response_chunk)
st.session_state.response_generator.close()
if st.session_state.uploaded_files:
# Traitement des fichiers téléchargés
for uploaded_file in st.session_state.uploaded_files:
file_path = save_attachment(uploaded_file)
with st.chat_message("user", avatar="👤"):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
st.write(timestamp)
st.subheader(f"Fichier téléchargé: {uploaded_file.name}")
if uploaded_file.type.startswith("text"):
uploaded_file.seek(0)
content = uploaded_file.read().decode("utf-8")
with st.chat_message("user", avatar="👤"):
st.markdown(content)
for lang in st.session_state.selected_languages:
st.session_state.target_language = lang["iso-639-1"]
st.session_state.system_prompt, st.session_state.operation_prompt = init_langs_for_processing(
target_language=st.session_state.target_language,
interface_language=st.session_state.interface_language,
language_detected=st.session_state.interface_language
)
with st.chat_message("assistant", avatar="👻"):
message_placeholder = st.empty()
st.session_state.response_generator = process_message(
content,
st.session_state.operation_prompt,
st.session_state.system_prompt
)
for response_chunk in st.session_state.response_generator:
message_placeholder.markdown(response_chunk)
st.session_state.response_generator.close()
else:
with st.chat_message("assistant", avatar="👻"):
st.markdown("Type de fichier non supporté pour le traitement. Seul le texte est supporté.")
main_page()