Spaces:
Runtime error
Runtime error
| from pyannote.audio import Pipeline | |
| from pydub import AudioSegment | |
| import whisperx | |
| import torch | |
| import json | |
| import os | |
| from fastapi import FastAPI, File, UploadFile | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydub.utils import mediainfo | |
| import subprocess | |
| from dotenv import load_dotenv | |
| # cargar el archivo .env | |
| load_dotenv() | |
| # Inicializar la aplicaci贸n | |
| app = FastAPI() | |
| # Configurar CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Carga de token y verificaci贸n de disponibilidad de GPU | |
| hf_token = os.getenv("HF_TOKEN") | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Tipo de c贸mputo #"float32" o "float16" si se desea utilizar la aceleraci贸n de la GPU o "float32" si se desea utilizar la CPU. | |
| compute_type = "float16" if device == "cuda" else "float32" | |
| # Transcripci贸n | |
| print("Descargando el modelo...") | |
| model = whisperx.load_model( | |
| "large-v2", device, language="es", compute_type=compute_type | |
| ) | |
| print("隆Modelo descargado con 茅xito!") | |
| def transcribe_wav(audio): | |
| """ | |
| Transcribe un archivo de audio WAV y devuelve el texto transrito. | |
| """ | |
| resultado = model.transcribe(audio, language="es", batch_size=28) | |
| return "\n".join([segment["text"] for segment in resultado["segments"]]) | |
| def segment_and_transcribe_audio(audio_entrada: str) -> dict: | |
| """ | |
| Segmenta y transcribe el archivo de audio de entrada. | |
| Parameters: | |
| :param audio_entrada: str: Ruta al archivo de audio de entrada. | |
| Returns: | |
| dict: Diccionario que contiene las transcripciones para cada segmento de parlante. | |
| """ | |
| transcriptions = {} | |
| if not os.path.exists(audio_entrada): | |
| raise FileNotFoundError("El archivo de audio no existe.") | |
| audio_info = mediainfo(audio_entrada) | |
| duration_s = audio_info.get("duration") | |
| if duration_s is None: | |
| raise ValueError("No se pudo obtener la duraci贸n del archivo de audio.") | |
| duration_s = float(duration_s) | |
| print(f"Duraci贸n del audio: {duration_s} s") | |
| if duration_s < 10: | |
| raise ValueError( | |
| f"La duraci贸n del audio es demasiado corta para realizar una transcripci贸n. Duraci贸n: {duration_s} s" | |
| ) | |
| # Cargar el archivo de audio usando whisperx.load_audio | |
| audio = whisperx.load_audio(audio_entrada) | |
| # Realizar la transcripci贸n completa del audio | |
| transcripcion_completa = transcribe_wav(audio) | |
| transcriptions["full"] = transcripcion_completa | |
| # Inicializar el pipeline de diarizaci贸n de parlantes | |
| pipeline = Pipeline.from_pretrained( | |
| "pyannote/speaker-diarization-3.1", use_auth_token=hf_token | |
| ) | |
| pipeline.to(torch.device(device)) | |
| # Realizar la diarizaci贸n de los parlantes en el audio | |
| with open(audio_entrada, "rb") as f: | |
| diarization = pipeline(f) | |
| # Escribir los resultados de la diarizaci贸n en un archivo de texto | |
| with open("audio.txt", "w") as lab: | |
| diarization.write_lab(lab) | |
| # Leer los resultados de la diarizaci贸n desde el archivo de texto | |
| with open("audio.txt", "r") as file: | |
| lines = file.readlines() | |
| # Procesar los resultados de la diarizaci贸n | |
| data = [] | |
| for line in lines: | |
| parts = line.strip().split() | |
| data.append( | |
| {"start": float(parts[0]), "end": float(parts[1]), "speaker": parts[2]} | |
| ) | |
| # Guardar los resultados de la diarizaci贸n en un archivo JSON | |
| with open("audio.json", "w") as file: | |
| json.dump(data, file, indent=4) | |
| # Cargar el archivo de audio original usando AudioSegment | |
| audio_segment = AudioSegment.from_wav(audio_entrada) | |
| # Leer los resultados de la diarizaci贸n desde el archivo JSON | |
| with open("audio.json", "r") as file: | |
| diarization = json.load(file) | |
| # Segmentar el audio en base a los resultados de la diarizaci贸n | |
| segments = {} | |
| for segment in diarization: | |
| speaker = segment["speaker"] | |
| if speaker not in segments: | |
| segments[speaker] = [] | |
| start = int(segment["start"] * 1000) | |
| end = int(segment["end"] * 1000) | |
| segments[speaker].append(audio_segment[start:end]) | |
| # Transcribir cada segmento de parlante y guardar los resultados | |
| for speaker, segs in segments.items(): | |
| result = sum(segs, AudioSegment.empty()) | |
| result.export(f"{speaker}.wav", format="wav") | |
| transcriptions[speaker] = transcribe_wav(f"{speaker}.wav") | |
| # Eliminar los archivos temporales creados | |
| os.remove("audio.txt") | |
| os.remove(audio_entrada) | |
| os.remove("audio.json") | |
| for speaker in segments.keys(): | |
| os.remove(f"{speaker}.wav") | |
| return transcriptions | |
| def read_root(): | |
| return {"message": "隆Bienvenido a la API de transcripci贸n de audio!"} | |
| def transcribe_audio(audio: UploadFile = File(...)): | |
| """ | |
| Transcribe un archivo de audio y devuelve el texto transcrito. | |
| Parameters: | |
| :param audio: UploadFile: Archivo de audio a transcribir. | |
| Returns: | |
| JSONResponse: Respuesta JSON que contiene el texto transcrito. | |
| """ | |
| # Guardar el archivo de audio en wav con la libreria AudioSegment | |
| audio_segment = AudioSegment.from_file(audio.file) | |
| audio_segment.export("audio.wav", format="wav") | |
| return segment_and_transcribe_audio("audio.wav") | |
| # if __name__ == "__main__": | |
| # subprocess.run( | |
| # ["uvicorn", "app:app", "--host", "localhost", "--port", "7860", "--reload"] | |
| # ) | |