File size: 6,127 Bytes
2a8180e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import streamlit as st
import whisper
from pyannote.audio import Pipeline
import time
import os
import logging
os.environ["TRANSFORMERS_CACHE"] = "C:\\Users\\Admin\\.cache\\Documents\\huggingface_cache"
# Load models with error handling
UPLOAD_FOLDER = 'upload'

# Set up logging to write to a file
logging.basicConfig(filename="audio_transcription.log",
                    level=logging.INFO,  # Adjust the log level as needed (e.g., DEBUG, INFO, WARNING)
                    format="%(asctime)s - %(levelname)s - %(message)s")
@st.cache_resource
def load_models():
    try:
        # Load Whisper model
        whisper_model = whisper.load_model("medium")  # Use "medium" or "large" for better accuracy
        if whisper_model is None:
            raise ValueError("Whisper model failed to load.")

        # Load PyAnnote diarization pipeline with token
        diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization@2.1",
                                    use_auth_token="YOUR_TOKEN")

        
        if diarization_pipeline is None:
            raise ValueError("Diarization model failed to load.")

        return whisper_model, diarization_pipeline

    except Exception as e:
        print(f"Error loading models: {e}")
        st.error(f"Error loading models: {e}")
        return None, None

# Initialize models
whisper_model, diarization_pipeline = load_models()

# Function to handle file upload and save to the 'upload' directory
def save_uploaded_file(uploaded_file):
    # Create a timestamp for the filename
    timestamp = time.strftime("%Y%m%d-%H%M%S")
    file_extension = uploaded_file.name.split('.')[-1]
    file_name = f"{timestamp}_{uploaded_file.name}"
    
    # Save the uploaded file to the 'upload' directory with the timestamped filename
    file_path = os.path.join(UPLOAD_FOLDER, file_name)
    with open(file_path, "wb") as f:
        f.write(uploaded_file.getbuffer())
    
    return file_path


# Function to process audio from a file path
def process_audio(audio_file_path):
    if whisper_model is None or diarization_pipeline is None:
        st.error("Models are not loaded properly. Please check the logs.")
        return None, None

    try:
        # Log the start of processing
        logging.info(f"Started processing audio file: {audio_file_path}")
        # Load audio with Whisper's load_audio function
        audio = whisper.load_audio(audio_file_path)
        audio = whisper.pad_or_trim(audio)  # Ensure the audio is the correct length

        # Transcribe with Whisper and get word-level timestamps
        transcription = whisper_model.transcribe(audio, word_timestamps=True)
        text = transcription["text"]
        word_timestamps = transcription["segments"]  # This contains the word timings
        detected_language = transcription["language"]
        
        # Log the detected language
        logging.info(f"Detected language: {detected_language}")

        # Perform speaker diarization
        diarization = diarization_pipeline({"uri": "audio", "audio": audio_file_path})

        # To store speaker-labeled text
        labeled_text = []
        
        # Process diarization and align it with transcribed words
        current_word_index = 0
        for segment, _, speaker in diarization.itertracks(yield_label=True):
            start = segment.start
            end = segment.end
            labeled_segment = f"[{speaker}] "

            # Add words to the labeled segment based on the diarization timestamps
            while current_word_index < len(word_timestamps):
                word_info = word_timestamps[current_word_index]
                word_start = word_info["start"]
                word_end = word_info["end"]
                word_text = word_info["text"]

                # Check if the word's timing falls within the diarization segment
                if word_end <= end:
                    labeled_segment += word_text + " "
                    current_word_index += 1
                else:
                    break  # Exit when we've processed all words within the diarization segment
            
            labeled_text.append(labeled_segment.strip())
            # Log each speaker's contribution
            logging.info(f"Speaker {speaker} spoke: {labeled_segment.strip()}")
        # Log the completion of processing
        logging.info(f"Processing completed for: {audio_file_path}")

        return labeled_text, detected_language

    except Exception as e:
        st.error(f"Error processing audio: {e}")
        # Log the error
        logging.error(f"Error processing audio {audio_file_path}: {e}")
        print(f"Error processing audio: {e}")
        return None, None


# Streamlit App UI
st.title("Multilingual Audio Transcription with Speaker Labels")
st.write("Select an audio file from the 'upload' folder to transcribe and detect speakers.")

# Upload audio file
uploaded_file = st.file_uploader("Choose an audio file", type=["mp3", "wav", "m4a"])
if uploaded_file is not None:
    # Save the uploaded file
    audio_file_path = save_uploaded_file(uploaded_file)
    
    # Display file path for debugging purposes
    st.write(f"File uploaded successfully: {audio_file_path}")


    st.audio(audio_file_path, format="audio/wav")
    
    with st.spinner("Processing audio..."):
        try:
            labeled_text, detected_language = process_audio(audio_file_path)
            if labeled_text is not None:
                st.success("Processing complete!")

                # Display detected language
                st.subheader("Detected Language")
                st.write(f"**{detected_language}**")

                # Display speaker-labeled transcription
                st.subheader("Transcription with Speaker Labels")
                for line in labeled_text:
                    st.write(line)
        except Exception as e:
            st.error(f"An error occurred: {e}")

# Footer
st.markdown("---")
st.markdown(
    "Developed with  using [Whisper](https://github.com/openai/whisper) and "
    "[PyAnnote](https://github.com/pyannote/pyannote-audio)."
)