Recording-QC-Bot / src /preprocessing /transcript_generator.py
varund2003's picture
cleaned transcript script
c871931
# # src/preprocessing/transcript_generator.py
# import azure.cognitiveservices.speech as speechsdk
# import json
# import re
# import time
# import logging
# logger = logging.getLogger(__name__)
# class TranscriptGenerator:
# def __init__(self, speech_key: str, speech_region: str):
# self.speech_config = speechsdk.SpeechConfig(
# subscription=speech_key,
# region=speech_region
# )
# self.speech_config.request_word_level_timestamps = True
# self.speech_config.output_format = speechsdk.OutputFormat.Detailed
# def transcribe_audio(self, audio_file_path: str, output_text_file: str):
# audio_config = speechsdk.audio.AudioConfig(filename=audio_file_path)
# speech_recognizer = speechsdk.SpeechRecognizer(
# speech_config=self.speech_config,
# audio_config=audio_config
# )
# all_results = []
# done = False
# def handle_final_result(evt):
# if evt.result.reason == speechsdk.ResultReason.RecognizedSpeech:
# result_json = json.loads(evt.result.json)
# if 'NBest' in result_json and result_json['NBest']:
# words = result_json['NBest'][0].get('Words', [])
# all_results.extend(words)
# def stop_cb(evt):
# nonlocal done
# done = True
# speech_recognizer.recognized.connect(handle_final_result)
# speech_recognizer.session_stopped.connect(stop_cb)
# speech_recognizer.canceled.connect(stop_cb)
# speech_recognizer.start_continuous_recognition()
# start_time = time.time()
# while not done and time.time() - start_time < 1800: # 30 min timeout
# time.sleep(0.5)
# speech_recognizer.stop_continuous_recognition()
# # Process and save results
# with open(output_text_file, "w", encoding="utf-8") as f:
# f.write("start_time\tend_time\tspeaker\ttranscript\n")
# current_sentence = []
# current_start = None
# current_end = None
# for word in all_results:
# word_start = word['Offset'] / 10000000
# word_end = word_start + (word['Duration'] / 10000000)
# word_text = word['Word']
# if not current_sentence:
# current_start = word_start
# current_end = word_end
# current_sentence.append(word_text)
# continue
# # Sentence boundary detection
# time_gap = word_start - current_end
# is_punctuation = re.match(r'^[.!?]+$', word_text)
# if time_gap > 1.5 or is_punctuation:
# sentence_text = " ".join(current_sentence)
# f.write(f"{current_start:.2f}\t{current_end:.2f}\tSPEAKER\t{sentence_text}\n")
# current_sentence = [word_text]
# current_start = word_start
# current_end = word_end
# else:
# current_sentence.append(word_text)
# current_end = word_end
# if current_sentence:
# sentence_text = " ".join(current_sentence)
# f.write(f"{current_start:.2f}\t{current_end:.2f}\tSPEAKER\t{sentence_text}\n")
# logger.info(f"Transcript saved to {output_text_file}")
# return len(all_results) > 0
import time
import re
import os
from faster_whisper import WhisperModel
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
class TranscriptGenerator:
def __init__(self, model_size="base.en", compute_type="int8"):
start_load = time.time()
self.model = WhisperModel(model_size, compute_type=compute_type)
self.model_load_time = time.time() - start_load
def transcribe_audio(self, audio_file_path: str, output_text_file: str):
print(os.path.abspath(audio_file_path))
start_transcribe = time.time()
segments, info = self.model.transcribe(audio_file_path, word_timestamps=True, beam_size=5)
transcription_time = time.time() - start_transcribe
all_words = []
for segment in segments:
if segment.words:
all_words.extend(segment.words)
with open(output_text_file, "w", encoding="utf-8") as f:
f.write("start_time\tend_time\tspeaker\ttranscript\n")
current_sentence = []
current_start = None
current_end = None
for word in all_words:
word_start = word.start
word_end = word.end
word_text = word.word.strip()
if not current_sentence:
current_sentence.append(word_text)
current_start = word_start
current_end = word_end
continue
# Check sentence boundary
time_gap = word_start - current_end
is_punctuation = re.match(r'^[.!?]+$', word_text)
if time_gap > 1.5 or is_punctuation:
sentence = " ".join(current_sentence)
f.write(f"{current_start:.2f}\t{current_end:.2f}\tSPEAKER\t{sentence}\n")
current_sentence = [word_text]
current_start = word_start
current_end = word_end
else:
current_sentence.append(word_text)
current_end = word_end
if current_sentence:
sentence = " ".join(current_sentence)
f.write(f"{current_start:.2f}\t{current_end:.2f}\tSPEAKER\t{sentence}\n")
file_written = os.path.exists(output_text_file) and os.path.getsize(output_text_file) > 100 # >100 bytes means not just header
if not all_words:
print(" No words recognized by the model.")
if file_written:
print(f" Transcript saved to {output_text_file}")
else:
print(f" Transcript file is empty or only contains header: {output_text_file}")
return file_written