Spaces:
Paused
Paused
File size: 8,899 Bytes
17f055f a085f2e 00f58d9 a085f2e 28363fc 17f055f 9237b52 2978bdc 17f055f 09220d1 17f055f d0549a7 17f055f 2978bdc 17f055f a085f2e 709c374 a085f2e 28363fc a085f2e 28363fc a085f2e 28363fc a085f2e 28363fc a085f2e 28363fc a085f2e 28363fc a085f2e cbefa53 25c0823 cbefa53 25c0823 cbefa53 a085f2e 17f055f 9237b52 00f58d9 9237b52 00f58d9 9237b52 17f055f 9237b52 17f055f 9237b52 a085f2e 9237b52 a085f2e 9237b52 a085f2e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
#coding: utf-8
# Importation des bibliothèques nécessaires selon les bonnes pratiques PEP8
import requests # Pour envoyer des requêtes HTTP à l'API
import json # Pour traiter les réponses JSON de l'API
from os import getenv
from pydub import AudioSegment
from openai import OpenAI
from io import BytesIO
#from typing import Any
#from typing import Dict
from typing import IO
from typing import List
from typing import Optional
#from typing import Tuple
from typing import Union
import os
import streamlit as st
from core.DetectLanguage import detect_language
def huggingface_endpoints_stt(
filepath: Union[str, IO]
) -> str:
"""
Transcrit un fichier audio en texte en utilisant l'API Hugging Face.
Args:
filepath (Union[str, IO]): Chemin vers le fichier audio ou objet IO contenant l'audio à transcrire.
Returns:
str: Le texte transcrit à partir de l'audio.
Raises:
Exception: Si une erreur survient lors de l'appel à l'API Hugging Face.
"""
file_path = filepath if isinstance(filepath, str) else filepath.name
# Définir l'URL de l'endpoint d'inférence sur Hugging Face
API_URL = f"{getenv('HF_WHISPER_ENDPOINT')}"
# Inclure votre token d'accès Hugging Face dans les en-têtes de la requête
headers = {
"Authorization": f"Bearer {getenv('HF_API_TOKEN')}"
}
# Ajouter le type de contenu audio à l'en-tête de la requête
ext = file_path.split('.')[-1].lower()
if ext == "mp3":
mime_type = "audio/mpeg"
else:
mime_type = f"audio/{ext}"
headers["Content-Type"] = mime_type
# Ouvrir le fichier audio en mode binaire
with open(file_path, "rb") as audio:
# Envoyer une requête POST à l'API avec le fichier audio
response = requests.post(API_URL, headers=headers, data=audio)
# Vérifier si la requête a réussi (code 200)
if response.status_code == 200:
# Extraire la transcription du texte de la réponse JSON
transcription = json.loads(response.content.decode("utf-8"))
return transcription.get("text", "Pas de transcription disponible.")
else:
# En cas d'erreur, afficher le code de statut et le message
raise Exception(f"Erreur API: {response.status_code}, {response.text}")
# ############################################################
def transcribe_audio(
filepath: Union[str, IO],
language: Optional[str] = None
) -> str:
"""
Transcrit un fichier audio en texte.
Args:
filepath (Union[str, IO]): Chemin vers le fichier audio ou objet IO.
language (Optional[str]): Code de langue ISO 639-1 pour la transcription.
Returns:
str: Le texte transcrit ou une chaîne vide en cas d'erreur.
Raises:
ValueError: Si le fichier audio est invalide ou vide.
IOError: Si une erreur se produit lors de la lecture du fichier.
"""
if not filepath:
st.error("Erreur : Aucun fichier audio fourni")
return ""
try:
# Vérifier si le fichier existe et est accessible
if isinstance(filepath, str) and not os.path.exists(filepath):
raise FileNotFoundError(f"Le fichier {filepath} n'existe pas")
# Vérifier la taille du fichier
file_size = os.path.getsize(filepath) if isinstance(filepath, str) else filepath.tell()
if file_size == 0:
raise ValueError("Le fichier audio est vide")
# Transcription avec Hugging Face
try:
transcription = huggingface_endpoints_stt(filepath)
if transcription:
return transcription
except Exception as hf_error:
st.warning(f"Erreur avec l'endpoint Hugging Face, tentative avec OpenAI : {hf_error}")
# Transcription avec OpenAI comme fallback
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
audio_file = open(filepath if isinstance(filepath, str) else filepath.name, "rb")
transcription = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language=language
)
return transcription.text
except FileNotFoundError as e:
st.error(f"Erreur : {e}")
return ""
except ValueError as e:
st.error(f"Erreur : {e}")
return ""
except IOError as e:
st.error(f"Erreur lors de la lecture du fichier audio : {e}")
return ""
except Exception as e:
st.error(f"Une erreur inattendue s'est produite lors de la transcription : {e}")
return ""
# ############################################################
def translate_audio(filepath: Union[str, IO]) -> str:
"""
Traduit un fichier audio temporaire en Anglais.
Args:
filepath Chemin vers le fichier audio temporaire à traduire.
Returns:
str: Le texte traduit.
"""
max_size_mb = 25
translated_text = []
client = OpenAI(api_key=getenv("OPENAI_API_KEY"))
try:
with open(filepath if isinstance(filepath, str) else filepath.name, "rb") as f:
# filepath peut etre un chemin vers un fichier audio ou un objet IO
f.seek(0)
audio = AudioSegment.from_file(f)
duration_ms = len(audio)
segment_duration_ms = int(
(max_size_mb * 1024 * 1024 * 8) /
(audio.frame_rate * audio.sample_width * audio.channels)
)
for start in range(0, duration_ms, segment_duration_ms):
end = min(start + segment_duration_ms, duration_ms)
segment = audio[start:end]
buffer = BytesIO()
segment.export(buffer, format="mp3")
buffer.seek(0)
translation = client.audio.translations.create(
model="whisper-1",
file=("audio.mp3", buffer)
)
translated_text.append(translation)
return " ".join(translated_text)
except Exception as e:
print(f"Erreur lors de la traduction de l'audio : {e}")
return ""
# ############################################################
class SpeechToText(object):
def __init__(self,
api_key: str):
self.api_key = api_key
self.client = OpenAI(api_key=self.api_key)
def aquire_audio(self,
filepath: Union[str, IO, List[Union[str, IO]]]):
"""
Integrer la detection de langue :
Ajoute un appel a la fonction detect_language juste apres l'aquisition de l'audio et avant de choisir entre transcrire ou traduire.
"""
if isinstance(filepath, str):
file_paths = [filepath]
elif isinstance(filepath, IO):
file_paths = [filepath.name]
else:
file_paths = [f'{file_path}' if isinstance(filepath, List) and isinstance(file_path, str) else file_path.name for file_path in filepath]
# create the list 'file_streams'
file_streams = [open(filepath, "rb") for filepath in file_paths]
def create_assistant():
return self.client.beta.assistants.create(
name="Audio Language Detector",
instructions=" ".join([
"Act as an language detection function for an audio file.",
"You are the assistant designed to detect the language of an audio file.",
"This assistant is designed to detect the language of an audio file.",
"You receive an audio file as input, and you analyze it to determine the language spoken in the audio.",
"The assistant will return the detected language of the audio in ISO 639-1 format.",
""
]),
model="gpt-4o",
tools=[{"type": "file_search"}]
)
def create_vector_store():
return self.client.beta.vector_stores.create(
name="Audio Language Detection"
)
assistant = create_assistant()
vectore_store = create_vector_store()
file_batch = self.client.beta.vector_stores.file_batches.upload_and_poll(
vector_store_id=vectore_store.id,
files=file_streams
)
# update the assistant to use the vector store
assistant = self.client.beta.assistants.update(
assistant_id=assistant.id,
tool_ressources={"file_search": {"vector_store_ids": [vectore_store.id]}}
)
## Create a thread
### Upload the user provided audio
message_file = self.client.files.create(
file=open(file_paths[0], "rb")
)
|