#coding: utf-8 # Importation des bibliothèques nécessaires selon les bonnes pratiques PEP8 import requests # Pour envoyer des requêtes HTTP à l'API import json # Pour traiter les réponses JSON de l'API from os import getenv from pydub import AudioSegment from openai import OpenAI from io import BytesIO #from typing import Any #from typing import Dict from typing import IO from typing import List from typing import Optional #from typing import Tuple from typing import Union import os import streamlit as st from core.DetectLanguage import detect_language def huggingface_endpoints_stt( filepath: Union[str, IO] ) -> str: """ Transcrit un fichier audio en texte en utilisant l'API Hugging Face. Args: filepath (Union[str, IO]): Chemin vers le fichier audio ou objet IO contenant l'audio à transcrire. Returns: str: Le texte transcrit à partir de l'audio. Raises: Exception: Si une erreur survient lors de l'appel à l'API Hugging Face. """ file_path = filepath if isinstance(filepath, str) else filepath.name # Définir l'URL de l'endpoint d'inférence sur Hugging Face API_URL = f"{getenv('HF_WHISPER_ENDPOINT')}" # Inclure votre token d'accès Hugging Face dans les en-têtes de la requête headers = { "Authorization": f"Bearer {getenv('HF_API_TOKEN')}" } # Ajouter le type de contenu audio à l'en-tête de la requête ext = file_path.split('.')[-1].lower() if ext == "mp3": mime_type = "audio/mpeg" else: mime_type = f"audio/{ext}" headers["Content-Type"] = mime_type # Ouvrir le fichier audio en mode binaire with open(file_path, "rb") as audio: # Envoyer une requête POST à l'API avec le fichier audio response = requests.post(API_URL, headers=headers, data=audio) # Vérifier si la requête a réussi (code 200) if response.status_code == 200: # Extraire la transcription du texte de la réponse JSON transcription = json.loads(response.content.decode("utf-8")) return transcription.get("text", "Pas de transcription disponible.") else: # En cas d'erreur, afficher le code de statut et le message raise Exception(f"Erreur API: {response.status_code}, {response.text}") # ############################################################ def transcribe_audio( filepath: Union[str, IO], language: Optional[str] = None ) -> str: """ Transcrit un fichier audio en texte. Args: filepath (Union[str, IO]): Chemin vers le fichier audio ou objet IO. language (Optional[str]): Code de langue ISO 639-1 pour la transcription. Returns: str: Le texte transcrit ou une chaîne vide en cas d'erreur. Raises: ValueError: Si le fichier audio est invalide ou vide. IOError: Si une erreur se produit lors de la lecture du fichier. """ if not filepath: st.error("Erreur : Aucun fichier audio fourni") return "" try: # Vérifier si le fichier existe et est accessible if isinstance(filepath, str) and not os.path.exists(filepath): raise FileNotFoundError(f"Le fichier {filepath} n'existe pas") # Vérifier la taille du fichier file_size = os.path.getsize(filepath) if isinstance(filepath, str) else filepath.tell() if file_size == 0: raise ValueError("Le fichier audio est vide") # Transcription avec Hugging Face try: transcription = huggingface_endpoints_stt(filepath) if transcription: return transcription except Exception as hf_error: st.warning(f"Erreur avec l'endpoint Hugging Face, tentative avec OpenAI : {hf_error}") # Transcription avec OpenAI comme fallback client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) audio_file = open(filepath if isinstance(filepath, str) else filepath.name, "rb") transcription = client.audio.transcriptions.create( model="whisper-1", file=audio_file, language=language ) return transcription.text except FileNotFoundError as e: st.error(f"Erreur : {e}") return "" except ValueError as e: st.error(f"Erreur : {e}") return "" except IOError as e: st.error(f"Erreur lors de la lecture du fichier audio : {e}") return "" except Exception as e: st.error(f"Une erreur inattendue s'est produite lors de la transcription : {e}") return "" # ############################################################ def translate_audio(filepath: Union[str, IO]) -> str: """ Traduit un fichier audio temporaire en Anglais. Args: filepath Chemin vers le fichier audio temporaire à traduire. Returns: str: Le texte traduit. """ max_size_mb = 25 translated_text = [] client = OpenAI(api_key=getenv("OPENAI_API_KEY")) try: with open(filepath if isinstance(filepath, str) else filepath.name, "rb") as f: # filepath peut etre un chemin vers un fichier audio ou un objet IO f.seek(0) audio = AudioSegment.from_file(f) duration_ms = len(audio) segment_duration_ms = int( (max_size_mb * 1024 * 1024 * 8) / (audio.frame_rate * audio.sample_width * audio.channels) ) for start in range(0, duration_ms, segment_duration_ms): end = min(start + segment_duration_ms, duration_ms) segment = audio[start:end] buffer = BytesIO() segment.export(buffer, format="mp3") buffer.seek(0) translation = client.audio.translations.create( model="whisper-1", file=("audio.mp3", buffer) ) translated_text.append(translation) return " ".join(translated_text) except Exception as e: print(f"Erreur lors de la traduction de l'audio : {e}") return "" # ############################################################ class SpeechToText(object): def __init__(self, api_key: str): self.api_key = api_key self.client = OpenAI(api_key=self.api_key) def aquire_audio(self, filepath: Union[str, IO, List[Union[str, IO]]]): """ Integrer la detection de langue : Ajoute un appel a la fonction detect_language juste apres l'aquisition de l'audio et avant de choisir entre transcrire ou traduire. """ if isinstance(filepath, str): file_paths = [filepath] elif isinstance(filepath, IO): file_paths = [filepath.name] else: file_paths = [f'{file_path}' if isinstance(filepath, List) and isinstance(file_path, str) else file_path.name for file_path in filepath] # create the list 'file_streams' file_streams = [open(filepath, "rb") for filepath in file_paths] def create_assistant(): return self.client.beta.assistants.create( name="Audio Language Detector", instructions=" ".join([ "Act as an language detection function for an audio file.", "You are the assistant designed to detect the language of an audio file.", "This assistant is designed to detect the language of an audio file.", "You receive an audio file as input, and you analyze it to determine the language spoken in the audio.", "The assistant will return the detected language of the audio in ISO 639-1 format.", "" ]), model="gpt-4o", tools=[{"type": "file_search"}] ) def create_vector_store(): return self.client.beta.vector_stores.create( name="Audio Language Detection" ) assistant = create_assistant() vectore_store = create_vector_store() file_batch = self.client.beta.vector_stores.file_batches.upload_and_poll( vector_store_id=vectore_store.id, files=file_streams ) # update the assistant to use the vector store assistant = self.client.beta.assistants.update( assistant_id=assistant.id, tool_ressources={"file_search": {"vector_store_ids": [vectore_store.id]}} ) ## Create a thread ### Upload the user provided audio message_file = self.client.files.create( file=open(file_paths[0], "rb") )