| | import streamlit as st |
| | import whisper |
| | from pyannote.audio import Pipeline |
| | import time |
| | import os |
| | import logging |
| | os.environ["TRANSFORMERS_CACHE"] = "C:\\Users\\Admin\\.cache\\Documents\\huggingface_cache" |
| | |
| | UPLOAD_FOLDER = 'upload' |
| |
|
| | |
| | logging.basicConfig(filename="audio_transcription.log", |
| | level=logging.INFO, |
| | format="%(asctime)s - %(levelname)s - %(message)s") |
| | @st.cache_resource |
| | def load_models(): |
| | try: |
| | |
| | whisper_model = whisper.load_model("medium") |
| | if whisper_model is None: |
| | raise ValueError("Whisper model failed to load.") |
| |
|
| | |
| | diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization@2.1", |
| | use_auth_token="YOUR_TOKEN") |
| |
|
| | |
| | if diarization_pipeline is None: |
| | raise ValueError("Diarization model failed to load.") |
| |
|
| | return whisper_model, diarization_pipeline |
| |
|
| | except Exception as e: |
| | print(f"Error loading models: {e}") |
| | st.error(f"Error loading models: {e}") |
| | return None, None |
| |
|
| | |
| | whisper_model, diarization_pipeline = load_models() |
| |
|
| | |
| | def save_uploaded_file(uploaded_file): |
| | |
| | timestamp = time.strftime("%Y%m%d-%H%M%S") |
| | file_extension = uploaded_file.name.split('.')[-1] |
| | file_name = f"{timestamp}_{uploaded_file.name}" |
| | |
| | |
| | file_path = os.path.join(UPLOAD_FOLDER, file_name) |
| | with open(file_path, "wb") as f: |
| | f.write(uploaded_file.getbuffer()) |
| | |
| | return file_path |
| |
|
| |
|
| | |
| | def process_audio(audio_file_path): |
| | if whisper_model is None or diarization_pipeline is None: |
| | st.error("Models are not loaded properly. Please check the logs.") |
| | return None, None |
| |
|
| | try: |
| | |
| | logging.info(f"Started processing audio file: {audio_file_path}") |
| | |
| | audio = whisper.load_audio(audio_file_path) |
| | audio = whisper.pad_or_trim(audio) |
| |
|
| | |
| | transcription = whisper_model.transcribe(audio, word_timestamps=True) |
| | text = transcription["text"] |
| | word_timestamps = transcription["segments"] |
| | detected_language = transcription["language"] |
| | |
| | |
| | logging.info(f"Detected language: {detected_language}") |
| |
|
| | |
| | diarization = diarization_pipeline({"uri": "audio", "audio": audio_file_path}) |
| |
|
| | |
| | labeled_text = [] |
| | |
| | |
| | current_word_index = 0 |
| | for segment, _, speaker in diarization.itertracks(yield_label=True): |
| | start = segment.start |
| | end = segment.end |
| | labeled_segment = f"[{speaker}] " |
| |
|
| | |
| | while current_word_index < len(word_timestamps): |
| | word_info = word_timestamps[current_word_index] |
| | word_start = word_info["start"] |
| | word_end = word_info["end"] |
| | word_text = word_info["text"] |
| |
|
| | |
| | if word_end <= end: |
| | labeled_segment += word_text + " " |
| | current_word_index += 1 |
| | else: |
| | break |
| | |
| | labeled_text.append(labeled_segment.strip()) |
| | |
| | logging.info(f"Speaker {speaker} spoke: {labeled_segment.strip()}") |
| | |
| | logging.info(f"Processing completed for: {audio_file_path}") |
| |
|
| | return labeled_text, detected_language |
| |
|
| | except Exception as e: |
| | st.error(f"Error processing audio: {e}") |
| | |
| | logging.error(f"Error processing audio {audio_file_path}: {e}") |
| | print(f"Error processing audio: {e}") |
| | return None, None |
| |
|
| |
|
| | |
| | st.title("Multilingual Audio Transcription with Speaker Labels") |
| | st.write("Select an audio file from the 'upload' folder to transcribe and detect speakers.") |
| |
|
| | |
| | uploaded_file = st.file_uploader("Choose an audio file", type=["mp3", "wav", "m4a"]) |
| | if uploaded_file is not None: |
| | |
| | audio_file_path = save_uploaded_file(uploaded_file) |
| | |
| | |
| | st.write(f"File uploaded successfully: {audio_file_path}") |
| |
|
| |
|
| | st.audio(audio_file_path, format="audio/wav") |
| | |
| | with st.spinner("Processing audio..."): |
| | try: |
| | labeled_text, detected_language = process_audio(audio_file_path) |
| | if labeled_text is not None: |
| | st.success("Processing complete!") |
| |
|
| | |
| | st.subheader("Detected Language") |
| | st.write(f"**{detected_language}**") |
| |
|
| | |
| | st.subheader("Transcription with Speaker Labels") |
| | for line in labeled_text: |
| | st.write(line) |
| | except Exception as e: |
| | st.error(f"An error occurred: {e}") |
| |
|
| | |
| | st.markdown("---") |
| | st.markdown( |
| | "Developed with using [Whisper](https://github.com/openai/whisper) and " |
| | "[PyAnnote](https://github.com/pyannote/pyannote-audio)." |
| | ) |
| |
|