Spaces:
Sleeping
Sleeping
File size: 6,210 Bytes
0e8b4d0 d8fd28f 0e8b4d0 d8fd28f 0e8b4d0 d8fd28f 0e8b4d0 d8fd28f 0e8b4d0 d8fd28f 0e8b4d0 d8fd28f 0e8b4d0 d8fd28f 0e8b4d0 d8fd28f 0e8b4d0 d8fd28f 0e8b4d0 d8fd28f 0e8b4d0 c871931 0e8b4d0 c871931 0e8b4d0 c871931 0e8b4d0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# # src/preprocessing/transcript_generator.py
# import azure.cognitiveservices.speech as speechsdk
# import json
# import re
# import time
# import logging
# logger = logging.getLogger(__name__)
# class TranscriptGenerator:
# def __init__(self, speech_key: str, speech_region: str):
# self.speech_config = speechsdk.SpeechConfig(
# subscription=speech_key,
# region=speech_region
# )
# self.speech_config.request_word_level_timestamps = True
# self.speech_config.output_format = speechsdk.OutputFormat.Detailed
# def transcribe_audio(self, audio_file_path: str, output_text_file: str):
# audio_config = speechsdk.audio.AudioConfig(filename=audio_file_path)
# speech_recognizer = speechsdk.SpeechRecognizer(
# speech_config=self.speech_config,
# audio_config=audio_config
# )
# all_results = []
# done = False
# def handle_final_result(evt):
# if evt.result.reason == speechsdk.ResultReason.RecognizedSpeech:
# result_json = json.loads(evt.result.json)
# if 'NBest' in result_json and result_json['NBest']:
# words = result_json['NBest'][0].get('Words', [])
# all_results.extend(words)
# def stop_cb(evt):
# nonlocal done
# done = True
# speech_recognizer.recognized.connect(handle_final_result)
# speech_recognizer.session_stopped.connect(stop_cb)
# speech_recognizer.canceled.connect(stop_cb)
# speech_recognizer.start_continuous_recognition()
# start_time = time.time()
# while not done and time.time() - start_time < 1800: # 30 min timeout
# time.sleep(0.5)
# speech_recognizer.stop_continuous_recognition()
# # Process and save results
# with open(output_text_file, "w", encoding="utf-8") as f:
# f.write("start_time\tend_time\tspeaker\ttranscript\n")
# current_sentence = []
# current_start = None
# current_end = None
# for word in all_results:
# word_start = word['Offset'] / 10000000
# word_end = word_start + (word['Duration'] / 10000000)
# word_text = word['Word']
# if not current_sentence:
# current_start = word_start
# current_end = word_end
# current_sentence.append(word_text)
# continue
# # Sentence boundary detection
# time_gap = word_start - current_end
# is_punctuation = re.match(r'^[.!?]+$', word_text)
# if time_gap > 1.5 or is_punctuation:
# sentence_text = " ".join(current_sentence)
# f.write(f"{current_start:.2f}\t{current_end:.2f}\tSPEAKER\t{sentence_text}\n")
# current_sentence = [word_text]
# current_start = word_start
# current_end = word_end
# else:
# current_sentence.append(word_text)
# current_end = word_end
# if current_sentence:
# sentence_text = " ".join(current_sentence)
# f.write(f"{current_start:.2f}\t{current_end:.2f}\tSPEAKER\t{sentence_text}\n")
# logger.info(f"Transcript saved to {output_text_file}")
# return len(all_results) > 0
import time
import re
import os
from faster_whisper import WhisperModel
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
class TranscriptGenerator:
def __init__(self, model_size="base.en", compute_type="int8"):
start_load = time.time()
self.model = WhisperModel(model_size, compute_type=compute_type)
self.model_load_time = time.time() - start_load
def transcribe_audio(self, audio_file_path: str, output_text_file: str):
print(os.path.abspath(audio_file_path))
start_transcribe = time.time()
segments, info = self.model.transcribe(audio_file_path, word_timestamps=True, beam_size=5)
transcription_time = time.time() - start_transcribe
all_words = []
for segment in segments:
if segment.words:
all_words.extend(segment.words)
with open(output_text_file, "w", encoding="utf-8") as f:
f.write("start_time\tend_time\tspeaker\ttranscript\n")
current_sentence = []
current_start = None
current_end = None
for word in all_words:
word_start = word.start
word_end = word.end
word_text = word.word.strip()
if not current_sentence:
current_sentence.append(word_text)
current_start = word_start
current_end = word_end
continue
# Check sentence boundary
time_gap = word_start - current_end
is_punctuation = re.match(r'^[.!?]+$', word_text)
if time_gap > 1.5 or is_punctuation:
sentence = " ".join(current_sentence)
f.write(f"{current_start:.2f}\t{current_end:.2f}\tSPEAKER\t{sentence}\n")
current_sentence = [word_text]
current_start = word_start
current_end = word_end
else:
current_sentence.append(word_text)
current_end = word_end
if current_sentence:
sentence = " ".join(current_sentence)
f.write(f"{current_start:.2f}\t{current_end:.2f}\tSPEAKER\t{sentence}\n")
file_written = os.path.exists(output_text_file) and os.path.getsize(output_text_file) > 100 # >100 bytes means not just header
if not all_words:
print(" No words recognized by the model.")
if file_written:
print(f" Transcript saved to {output_text_file}")
else:
print(f" Transcript file is empty or only contains header: {output_text_file}")
return file_written
|