File size: 6,127 Bytes
2a8180e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | import streamlit as st
import whisper
from pyannote.audio import Pipeline
import time
import os
import logging
os.environ["TRANSFORMERS_CACHE"] = "C:\\Users\\Admin\\.cache\\Documents\\huggingface_cache"
# Load models with error handling
UPLOAD_FOLDER = 'upload'
# Set up logging to write to a file
logging.basicConfig(filename="audio_transcription.log",
level=logging.INFO, # Adjust the log level as needed (e.g., DEBUG, INFO, WARNING)
format="%(asctime)s - %(levelname)s - %(message)s")
@st.cache_resource
def load_models():
try:
# Load Whisper model
whisper_model = whisper.load_model("medium") # Use "medium" or "large" for better accuracy
if whisper_model is None:
raise ValueError("Whisper model failed to load.")
# Load PyAnnote diarization pipeline with token
diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization@2.1",
use_auth_token="YOUR_TOKEN")
if diarization_pipeline is None:
raise ValueError("Diarization model failed to load.")
return whisper_model, diarization_pipeline
except Exception as e:
print(f"Error loading models: {e}")
st.error(f"Error loading models: {e}")
return None, None
# Initialize models
whisper_model, diarization_pipeline = load_models()
# Function to handle file upload and save to the 'upload' directory
def save_uploaded_file(uploaded_file):
# Create a timestamp for the filename
timestamp = time.strftime("%Y%m%d-%H%M%S")
file_extension = uploaded_file.name.split('.')[-1]
file_name = f"{timestamp}_{uploaded_file.name}"
# Save the uploaded file to the 'upload' directory with the timestamped filename
file_path = os.path.join(UPLOAD_FOLDER, file_name)
with open(file_path, "wb") as f:
f.write(uploaded_file.getbuffer())
return file_path
# Function to process audio from a file path
def process_audio(audio_file_path):
if whisper_model is None or diarization_pipeline is None:
st.error("Models are not loaded properly. Please check the logs.")
return None, None
try:
# Log the start of processing
logging.info(f"Started processing audio file: {audio_file_path}")
# Load audio with Whisper's load_audio function
audio = whisper.load_audio(audio_file_path)
audio = whisper.pad_or_trim(audio) # Ensure the audio is the correct length
# Transcribe with Whisper and get word-level timestamps
transcription = whisper_model.transcribe(audio, word_timestamps=True)
text = transcription["text"]
word_timestamps = transcription["segments"] # This contains the word timings
detected_language = transcription["language"]
# Log the detected language
logging.info(f"Detected language: {detected_language}")
# Perform speaker diarization
diarization = diarization_pipeline({"uri": "audio", "audio": audio_file_path})
# To store speaker-labeled text
labeled_text = []
# Process diarization and align it with transcribed words
current_word_index = 0
for segment, _, speaker in diarization.itertracks(yield_label=True):
start = segment.start
end = segment.end
labeled_segment = f"[{speaker}] "
# Add words to the labeled segment based on the diarization timestamps
while current_word_index < len(word_timestamps):
word_info = word_timestamps[current_word_index]
word_start = word_info["start"]
word_end = word_info["end"]
word_text = word_info["text"]
# Check if the word's timing falls within the diarization segment
if word_end <= end:
labeled_segment += word_text + " "
current_word_index += 1
else:
break # Exit when we've processed all words within the diarization segment
labeled_text.append(labeled_segment.strip())
# Log each speaker's contribution
logging.info(f"Speaker {speaker} spoke: {labeled_segment.strip()}")
# Log the completion of processing
logging.info(f"Processing completed for: {audio_file_path}")
return labeled_text, detected_language
except Exception as e:
st.error(f"Error processing audio: {e}")
# Log the error
logging.error(f"Error processing audio {audio_file_path}: {e}")
print(f"Error processing audio: {e}")
return None, None
# Streamlit App UI
st.title("Multilingual Audio Transcription with Speaker Labels")
st.write("Select an audio file from the 'upload' folder to transcribe and detect speakers.")
# Upload audio file
uploaded_file = st.file_uploader("Choose an audio file", type=["mp3", "wav", "m4a"])
if uploaded_file is not None:
# Save the uploaded file
audio_file_path = save_uploaded_file(uploaded_file)
# Display file path for debugging purposes
st.write(f"File uploaded successfully: {audio_file_path}")
st.audio(audio_file_path, format="audio/wav")
with st.spinner("Processing audio..."):
try:
labeled_text, detected_language = process_audio(audio_file_path)
if labeled_text is not None:
st.success("Processing complete!")
# Display detected language
st.subheader("Detected Language")
st.write(f"**{detected_language}**")
# Display speaker-labeled transcription
st.subheader("Transcription with Speaker Labels")
for line in labeled_text:
st.write(line)
except Exception as e:
st.error(f"An error occurred: {e}")
# Footer
st.markdown("---")
st.markdown(
"Developed with using [Whisper](https://github.com/openai/whisper) and "
"[PyAnnote](https://github.com/pyannote/pyannote-audio)."
)
|