File size: 6,210 Bytes
0e8b4d0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d8fd28f
0e8b4d0
 
 
d8fd28f
0e8b4d0
d8fd28f
 
0e8b4d0
 
 
 
d8fd28f
 
0e8b4d0
 
 
 
 
 
 
 
 
 
d8fd28f
 
0e8b4d0
d8fd28f
 
 
 
0e8b4d0
 
 
 
d8fd28f
 
0e8b4d0
d8fd28f
 
 
 
0e8b4d0
d8fd28f
 
 
 
0e8b4d0
 
d8fd28f
 
 
 
 
 
 
 
0e8b4d0
 
 
 
 
 
c871931
0e8b4d0
c871931
0e8b4d0
c871931
0e8b4d0
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# # src/preprocessing/transcript_generator.py
# import azure.cognitiveservices.speech as speechsdk
# import json
# import re
# import time
# import logging

# logger = logging.getLogger(__name__)

# class TranscriptGenerator:
#     def __init__(self, speech_key: str, speech_region: str):
#         self.speech_config = speechsdk.SpeechConfig(
#             subscription=speech_key,
#             region=speech_region
#         )
#         self.speech_config.request_word_level_timestamps = True
#         self.speech_config.output_format = speechsdk.OutputFormat.Detailed

#     def transcribe_audio(self, audio_file_path: str, output_text_file: str):
#         audio_config = speechsdk.audio.AudioConfig(filename=audio_file_path)
#         speech_recognizer = speechsdk.SpeechRecognizer(
#             speech_config=self.speech_config, 
#             audio_config=audio_config
#         )

#         all_results = []
#         done = False

#         def handle_final_result(evt):
#             if evt.result.reason == speechsdk.ResultReason.RecognizedSpeech:
#                 result_json = json.loads(evt.result.json)
#                 if 'NBest' in result_json and result_json['NBest']:
#                     words = result_json['NBest'][0].get('Words', [])
#                     all_results.extend(words)

#         def stop_cb(evt):
#             nonlocal done
#             done = True

#         speech_recognizer.recognized.connect(handle_final_result)
#         speech_recognizer.session_stopped.connect(stop_cb)
#         speech_recognizer.canceled.connect(stop_cb)

#         speech_recognizer.start_continuous_recognition()
#         start_time = time.time()
#         while not done and time.time() - start_time < 1800:  # 30 min timeout
#             time.sleep(0.5)
#         speech_recognizer.stop_continuous_recognition()

#         # Process and save results
#         with open(output_text_file, "w", encoding="utf-8") as f:
#             f.write("start_time\tend_time\tspeaker\ttranscript\n")
#             current_sentence = []
#             current_start = None
#             current_end = None

#             for word in all_results:
#                 word_start = word['Offset'] / 10000000
#                 word_end = word_start + (word['Duration'] / 10000000)
#                 word_text = word['Word']

#                 if not current_sentence:
#                     current_start = word_start
#                     current_end = word_end
#                     current_sentence.append(word_text)
#                     continue

#                 # Sentence boundary detection
#                 time_gap = word_start - current_end
#                 is_punctuation = re.match(r'^[.!?]+$', word_text)

#                 if time_gap > 1.5 or is_punctuation:
#                     sentence_text = " ".join(current_sentence)
#                     f.write(f"{current_start:.2f}\t{current_end:.2f}\tSPEAKER\t{sentence_text}\n")
#                     current_sentence = [word_text]
#                     current_start = word_start
#                     current_end = word_end
#                 else:
#                     current_sentence.append(word_text)
#                     current_end = word_end

#             if current_sentence:
#                 sentence_text = " ".join(current_sentence)
#                 f.write(f"{current_start:.2f}\t{current_end:.2f}\tSPEAKER\t{sentence_text}\n")

#         logger.info(f"Transcript saved to {output_text_file}")
#         return len(all_results) > 0


import time
import re
import os
from faster_whisper import WhisperModel

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

class TranscriptGenerator:
    def __init__(self, model_size="base.en", compute_type="int8"):
        start_load = time.time()
        self.model = WhisperModel(model_size, compute_type=compute_type)
        self.model_load_time = time.time() - start_load

    def transcribe_audio(self, audio_file_path: str, output_text_file: str):
        print(os.path.abspath(audio_file_path))
        start_transcribe = time.time()
        segments, info = self.model.transcribe(audio_file_path, word_timestamps=True, beam_size=5)
        transcription_time = time.time() - start_transcribe

        all_words = []
        for segment in segments:
            if segment.words:
                all_words.extend(segment.words)

        with open(output_text_file, "w", encoding="utf-8") as f:
            f.write("start_time\tend_time\tspeaker\ttranscript\n")

            current_sentence = []
            current_start = None
            current_end = None

            for word in all_words:
                word_start = word.start
                word_end = word.end
                word_text = word.word.strip()

                if not current_sentence:
                    current_sentence.append(word_text)
                    current_start = word_start
                    current_end = word_end
                    continue

                # Check sentence boundary
                time_gap = word_start - current_end
                is_punctuation = re.match(r'^[.!?]+$', word_text)

                if time_gap > 1.5 or is_punctuation:
                    sentence = " ".join(current_sentence)
                    f.write(f"{current_start:.2f}\t{current_end:.2f}\tSPEAKER\t{sentence}\n")
                    current_sentence = [word_text]
                    current_start = word_start
                    current_end = word_end
                else:
                    current_sentence.append(word_text)
                    current_end = word_end

            if current_sentence:
                sentence = " ".join(current_sentence)
                f.write(f"{current_start:.2f}\t{current_end:.2f}\tSPEAKER\t{sentence}\n")

        file_written = os.path.exists(output_text_file) and os.path.getsize(output_text_file) > 100  # >100 bytes means not just header

        if not all_words:
            print(" No words recognized by the model.")
        if file_written:
            print(f" Transcript saved to {output_text_file}")
        else:
            print(f" Transcript file is empty or only contains header: {output_text_file}")

        return file_written