Spaces:
Sleeping
Sleeping
| # # src/preprocessing/transcript_generator.py | |
| # import azure.cognitiveservices.speech as speechsdk | |
| # import json | |
| # import re | |
| # import time | |
| # import logging | |
| # logger = logging.getLogger(__name__) | |
| # class TranscriptGenerator: | |
| # def __init__(self, speech_key: str, speech_region: str): | |
| # self.speech_config = speechsdk.SpeechConfig( | |
| # subscription=speech_key, | |
| # region=speech_region | |
| # ) | |
| # self.speech_config.request_word_level_timestamps = True | |
| # self.speech_config.output_format = speechsdk.OutputFormat.Detailed | |
| # def transcribe_audio(self, audio_file_path: str, output_text_file: str): | |
| # audio_config = speechsdk.audio.AudioConfig(filename=audio_file_path) | |
| # speech_recognizer = speechsdk.SpeechRecognizer( | |
| # speech_config=self.speech_config, | |
| # audio_config=audio_config | |
| # ) | |
| # all_results = [] | |
| # done = False | |
| # def handle_final_result(evt): | |
| # if evt.result.reason == speechsdk.ResultReason.RecognizedSpeech: | |
| # result_json = json.loads(evt.result.json) | |
| # if 'NBest' in result_json and result_json['NBest']: | |
| # words = result_json['NBest'][0].get('Words', []) | |
| # all_results.extend(words) | |
| # def stop_cb(evt): | |
| # nonlocal done | |
| # done = True | |
| # speech_recognizer.recognized.connect(handle_final_result) | |
| # speech_recognizer.session_stopped.connect(stop_cb) | |
| # speech_recognizer.canceled.connect(stop_cb) | |
| # speech_recognizer.start_continuous_recognition() | |
| # start_time = time.time() | |
| # while not done and time.time() - start_time < 1800: # 30 min timeout | |
| # time.sleep(0.5) | |
| # speech_recognizer.stop_continuous_recognition() | |
| # # Process and save results | |
| # with open(output_text_file, "w", encoding="utf-8") as f: | |
| # f.write("start_time\tend_time\tspeaker\ttranscript\n") | |
| # current_sentence = [] | |
| # current_start = None | |
| # current_end = None | |
| # for word in all_results: | |
| # word_start = word['Offset'] / 10000000 | |
| # word_end = word_start + (word['Duration'] / 10000000) | |
| # word_text = word['Word'] | |
| # if not current_sentence: | |
| # current_start = word_start | |
| # current_end = word_end | |
| # current_sentence.append(word_text) | |
| # continue | |
| # # Sentence boundary detection | |
| # time_gap = word_start - current_end | |
| # is_punctuation = re.match(r'^[.!?]+$', word_text) | |
| # if time_gap > 1.5 or is_punctuation: | |
| # sentence_text = " ".join(current_sentence) | |
| # f.write(f"{current_start:.2f}\t{current_end:.2f}\tSPEAKER\t{sentence_text}\n") | |
| # current_sentence = [word_text] | |
| # current_start = word_start | |
| # current_end = word_end | |
| # else: | |
| # current_sentence.append(word_text) | |
| # current_end = word_end | |
| # if current_sentence: | |
| # sentence_text = " ".join(current_sentence) | |
| # f.write(f"{current_start:.2f}\t{current_end:.2f}\tSPEAKER\t{sentence_text}\n") | |
| # logger.info(f"Transcript saved to {output_text_file}") | |
| # return len(all_results) > 0 | |
| import time | |
| import re | |
| import os | |
| from faster_whisper import WhisperModel | |
| os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" | |
| class TranscriptGenerator: | |
| def __init__(self, model_size="base.en", compute_type="int8"): | |
| start_load = time.time() | |
| self.model = WhisperModel(model_size, compute_type=compute_type) | |
| self.model_load_time = time.time() - start_load | |
| def transcribe_audio(self, audio_file_path: str, output_text_file: str): | |
| print(os.path.abspath(audio_file_path)) | |
| start_transcribe = time.time() | |
| segments, info = self.model.transcribe(audio_file_path, word_timestamps=True, beam_size=5) | |
| transcription_time = time.time() - start_transcribe | |
| all_words = [] | |
| for segment in segments: | |
| if segment.words: | |
| all_words.extend(segment.words) | |
| with open(output_text_file, "w", encoding="utf-8") as f: | |
| f.write("start_time\tend_time\tspeaker\ttranscript\n") | |
| current_sentence = [] | |
| current_start = None | |
| current_end = None | |
| for word in all_words: | |
| word_start = word.start | |
| word_end = word.end | |
| word_text = word.word.strip() | |
| if not current_sentence: | |
| current_sentence.append(word_text) | |
| current_start = word_start | |
| current_end = word_end | |
| continue | |
| # Check sentence boundary | |
| time_gap = word_start - current_end | |
| is_punctuation = re.match(r'^[.!?]+$', word_text) | |
| if time_gap > 1.5 or is_punctuation: | |
| sentence = " ".join(current_sentence) | |
| f.write(f"{current_start:.2f}\t{current_end:.2f}\tSPEAKER\t{sentence}\n") | |
| current_sentence = [word_text] | |
| current_start = word_start | |
| current_end = word_end | |
| else: | |
| current_sentence.append(word_text) | |
| current_end = word_end | |
| if current_sentence: | |
| sentence = " ".join(current_sentence) | |
| f.write(f"{current_start:.2f}\t{current_end:.2f}\tSPEAKER\t{sentence}\n") | |
| file_written = os.path.exists(output_text_file) and os.path.getsize(output_text_file) > 100 # >100 bytes means not just header | |
| if not all_words: | |
| print(" No words recognized by the model.") | |
| if file_written: | |
| print(f" Transcript saved to {output_text_file}") | |
| else: | |
| print(f" Transcript file is empty or only contains header: {output_text_file}") | |
| return file_written | |