Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import time | |
| import whisperx | |
| from scipy.signal import resample | |
| import time | |
| import os | |
| import time | |
| class WhisperAutomaticSpeechRecognizer: | |
| # device = "cuda" | |
| # compute_type = "int8" # change to if more gpu memory available | |
| device = "cpu" | |
| compute_type = "float32" | |
| batch_size = 4 | |
| model = whisperx.load_model( | |
| "medium", device, language="en", compute_type=compute_type | |
| , | |
| asr_options={ | |
| "max_new_tokens": 448, | |
| "clip_timestamps": True, | |
| "hallucination_silence_threshold": 0.2, | |
| } | |
| #, | |
| # ๆทปๅ ็ผบๅคฑ็ๅๆฐ | |
| #max_new_tokens=448, # ๅฏๆ นๆฎๅฎ้ ๆ ๅต่ฐๆด | |
| #clip_timestamps=True, | |
| #hallucination_silence_threshold=0.2 # ๅฏๆ นๆฎๅฎ้ ๆ ๅต่ฐๆด | |
| ) | |
| diarize_model = whisperx.DiarizationPipeline( | |
| # use_auth_token=os.environ.get('HF_TOKEN'), device="cuda" | |
| use_auth_token=os.environ.get('HF_TOKEN'), device=device | |
| ) | |
| existing_speaker = None | |
| def downsample_audio_scipy(audio: np.ndarray, original_rate, target_rate=16000): | |
| if original_rate == target_rate: | |
| return audio | |
| if audio.ndim > 1: | |
| audio = np.mean(audio, axis=1) | |
| # Check if audio has one channel | |
| if len(audio.shape) != 1: | |
| raise ValueError("Input audio must have only one channel.") | |
| # Calculate the number of samples in the downsampled audio | |
| num_samples = int(len(audio) * target_rate / original_rate) | |
| downsampled_audio = resample(audio, num_samples) | |
| return downsampled_audio | |
| def transcribe_with_diarization_file(filepath: str): | |
| audio = whisperx.load_audio(filepath, 16000) | |
| return WhisperAutomaticSpeechRecognizer.transcribe_with_diarization( | |
| (16000, audio), None, "", False | |
| ) | |
| def transcribe_with_diarization( | |
| stream, full_stream, full_transcript, streaming=True | |
| ): | |
| start_time = time.time() | |
| sr, y = stream | |
| if streaming: | |
| sr, y = stream | |
| y = WhisperAutomaticSpeechRecognizer.downsample_audio_scipy(y, sr) | |
| y = y.astype(np.float32) | |
| y /= 32768.0 | |
| if full_transcript is None: | |
| full_transcript = "" | |
| transcribe_result = WhisperAutomaticSpeechRecognizer.model.transcribe( | |
| y, batch_size=WhisperAutomaticSpeechRecognizer.batch_size | |
| ) | |
| diarize_segments = WhisperAutomaticSpeechRecognizer.diarize_model(y) | |
| diarize_result = whisperx.assign_word_speakers( | |
| diarize_segments, transcribe_result | |
| ) | |
| new_transcript = "" | |
| for segment in diarize_result["segments"]: | |
| current_speaker = "" | |
| default_first_speaker = "SPEAKER_00" | |
| try: | |
| current_speaker = segment["speaker"] | |
| except KeyError: | |
| current_speaker = default_first_speaker | |
| if WhisperAutomaticSpeechRecognizer.existing_speaker == None: | |
| try: | |
| WhisperAutomaticSpeechRecognizer.existing_speaker = current_speaker | |
| except KeyError: | |
| WhisperAutomaticSpeechRecognizer.existing_speaker = default_first_speaker | |
| new_transcript += f"\n {WhisperAutomaticSpeechRecognizer.existing_speaker} - " | |
| if current_speaker != WhisperAutomaticSpeechRecognizer.existing_speaker and current_speaker is not default_first_speaker: | |
| WhisperAutomaticSpeechRecognizer.existing_speaker = current_speaker | |
| new_transcript += f"\n {WhisperAutomaticSpeechRecognizer.existing_speaker} - " | |
| new_transcript = new_transcript + segment["text"] | |
| full_transcript = full_transcript + new_transcript | |
| end_time = time.time() | |
| if streaming: | |
| time.sleep(5 - (end_time - start_time)) | |
| return full_transcript, stream, full_transcript | |