demorrha / core /speech_to_text.py
0x07CB
refactor: Amélioration de la robustesse de la transcription audio avec gestion des erreurs et fallback
28363fc unverified
#coding: utf-8
# Importation des bibliothèques nécessaires selon les bonnes pratiques PEP8
import requests # Pour envoyer des requêtes HTTP à l'API
import json # Pour traiter les réponses JSON de l'API
from os import getenv
from pydub import AudioSegment
from openai import OpenAI
from io import BytesIO
#from typing import Any
#from typing import Dict
from typing import IO
from typing import List
from typing import Optional
#from typing import Tuple
from typing import Union
import os
import streamlit as st
from core.DetectLanguage import detect_language
def huggingface_endpoints_stt(
filepath: Union[str, IO]
) -> str:
"""
Transcrit un fichier audio en texte en utilisant l'API Hugging Face.
Args:
filepath (Union[str, IO]): Chemin vers le fichier audio ou objet IO contenant l'audio à transcrire.
Returns:
str: Le texte transcrit à partir de l'audio.
Raises:
Exception: Si une erreur survient lors de l'appel à l'API Hugging Face.
"""
file_path = filepath if isinstance(filepath, str) else filepath.name
# Définir l'URL de l'endpoint d'inférence sur Hugging Face
API_URL = f"{getenv('HF_WHISPER_ENDPOINT')}"
# Inclure votre token d'accès Hugging Face dans les en-têtes de la requête
headers = {
"Authorization": f"Bearer {getenv('HF_API_TOKEN')}"
}
# Ajouter le type de contenu audio à l'en-tête de la requête
ext = file_path.split('.')[-1].lower()
if ext == "mp3":
mime_type = "audio/mpeg"
else:
mime_type = f"audio/{ext}"
headers["Content-Type"] = mime_type
# Ouvrir le fichier audio en mode binaire
with open(file_path, "rb") as audio:
# Envoyer une requête POST à l'API avec le fichier audio
response = requests.post(API_URL, headers=headers, data=audio)
# Vérifier si la requête a réussi (code 200)
if response.status_code == 200:
# Extraire la transcription du texte de la réponse JSON
transcription = json.loads(response.content.decode("utf-8"))
return transcription.get("text", "Pas de transcription disponible.")
else:
# En cas d'erreur, afficher le code de statut et le message
raise Exception(f"Erreur API: {response.status_code}, {response.text}")
# ############################################################
def transcribe_audio(
filepath: Union[str, IO],
language: Optional[str] = None
) -> str:
"""
Transcrit un fichier audio en texte.
Args:
filepath (Union[str, IO]): Chemin vers le fichier audio ou objet IO.
language (Optional[str]): Code de langue ISO 639-1 pour la transcription.
Returns:
str: Le texte transcrit ou une chaîne vide en cas d'erreur.
Raises:
ValueError: Si le fichier audio est invalide ou vide.
IOError: Si une erreur se produit lors de la lecture du fichier.
"""
if not filepath:
st.error("Erreur : Aucun fichier audio fourni")
return ""
try:
# Vérifier si le fichier existe et est accessible
if isinstance(filepath, str) and not os.path.exists(filepath):
raise FileNotFoundError(f"Le fichier {filepath} n'existe pas")
# Vérifier la taille du fichier
file_size = os.path.getsize(filepath) if isinstance(filepath, str) else filepath.tell()
if file_size == 0:
raise ValueError("Le fichier audio est vide")
# Transcription avec Hugging Face
try:
transcription = huggingface_endpoints_stt(filepath)
if transcription:
return transcription
except Exception as hf_error:
st.warning(f"Erreur avec l'endpoint Hugging Face, tentative avec OpenAI : {hf_error}")
# Transcription avec OpenAI comme fallback
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
audio_file = open(filepath if isinstance(filepath, str) else filepath.name, "rb")
transcription = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language=language
)
return transcription.text
except FileNotFoundError as e:
st.error(f"Erreur : {e}")
return ""
except ValueError as e:
st.error(f"Erreur : {e}")
return ""
except IOError as e:
st.error(f"Erreur lors de la lecture du fichier audio : {e}")
return ""
except Exception as e:
st.error(f"Une erreur inattendue s'est produite lors de la transcription : {e}")
return ""
# ############################################################
def translate_audio(filepath: Union[str, IO]) -> str:
"""
Traduit un fichier audio temporaire en Anglais.
Args:
filepath Chemin vers le fichier audio temporaire à traduire.
Returns:
str: Le texte traduit.
"""
max_size_mb = 25
translated_text = []
client = OpenAI(api_key=getenv("OPENAI_API_KEY"))
try:
with open(filepath if isinstance(filepath, str) else filepath.name, "rb") as f:
# filepath peut etre un chemin vers un fichier audio ou un objet IO
f.seek(0)
audio = AudioSegment.from_file(f)
duration_ms = len(audio)
segment_duration_ms = int(
(max_size_mb * 1024 * 1024 * 8) /
(audio.frame_rate * audio.sample_width * audio.channels)
)
for start in range(0, duration_ms, segment_duration_ms):
end = min(start + segment_duration_ms, duration_ms)
segment = audio[start:end]
buffer = BytesIO()
segment.export(buffer, format="mp3")
buffer.seek(0)
translation = client.audio.translations.create(
model="whisper-1",
file=("audio.mp3", buffer)
)
translated_text.append(translation)
return " ".join(translated_text)
except Exception as e:
print(f"Erreur lors de la traduction de l'audio : {e}")
return ""
# ############################################################
class SpeechToText(object):
def __init__(self,
api_key: str):
self.api_key = api_key
self.client = OpenAI(api_key=self.api_key)
def aquire_audio(self,
filepath: Union[str, IO, List[Union[str, IO]]]):
"""
Integrer la detection de langue :
Ajoute un appel a la fonction detect_language juste apres l'aquisition de l'audio et avant de choisir entre transcrire ou traduire.
"""
if isinstance(filepath, str):
file_paths = [filepath]
elif isinstance(filepath, IO):
file_paths = [filepath.name]
else:
file_paths = [f'{file_path}' if isinstance(filepath, List) and isinstance(file_path, str) else file_path.name for file_path in filepath]
# create the list 'file_streams'
file_streams = [open(filepath, "rb") for filepath in file_paths]
def create_assistant():
return self.client.beta.assistants.create(
name="Audio Language Detector",
instructions=" ".join([
"Act as an language detection function for an audio file.",
"You are the assistant designed to detect the language of an audio file.",
"This assistant is designed to detect the language of an audio file.",
"You receive an audio file as input, and you analyze it to determine the language spoken in the audio.",
"The assistant will return the detected language of the audio in ISO 639-1 format.",
""
]),
model="gpt-4o",
tools=[{"type": "file_search"}]
)
def create_vector_store():
return self.client.beta.vector_stores.create(
name="Audio Language Detection"
)
assistant = create_assistant()
vectore_store = create_vector_store()
file_batch = self.client.beta.vector_stores.file_batches.upload_and_poll(
vector_store_id=vectore_store.id,
files=file_streams
)
# update the assistant to use the vector store
assistant = self.client.beta.assistants.update(
assistant_id=assistant.id,
tool_ressources={"file_search": {"vector_store_ids": [vectore_store.id]}}
)
## Create a thread
### Upload the user provided audio
message_file = self.client.files.create(
file=open(file_paths[0], "rb")
)