import math import os import sys import numpy as np import torch from faster_whisper import WhisperModel from pyannote.audio import Pipeline from pydub import AudioSegment from speechbrain.inference.classifiers import EncoderClassifier model_size = "large-v3" # Run on GPU with FP16 # model = WhisperModel(model_size, device="cpu", compute_type="float16") INDEX_TO_LANG = { 0: 'Abkhazian', 1: 'Afrikaans', 2: 'Amharic', 3: 'Arabic', 4: 'Assamese', 5: 'Azerbaijani', 6: 'Bashkir', 7: 'Belarusian', 8: 'Bulgarian', 9: 'Bengali', 10: 'Tibetan', 11: 'Breton', 12: 'Bosnian', 13: 'Catalan', 14: 'Cebuano', 15: 'Czech', 16: 'Welsh', 17: 'Danish', 18: 'German', 19: 'Greek', 20: 'English', 21: 'Esperanto', 22: 'Spanish', 23: 'Estonian', 24: 'Basque', 25: 'Persian', 26: 'Finnish', 27: 'Faroese', 28: 'French', 29: 'Galician', 30: 'Guarani', 31: 'Gujarati', 32: 'Manx', 33: 'Hausa', 34: 'Hawaiian', 35: 'Hindi', 36: 'Croatian', 37: 'Haitian', 38: 'Hungarian', 39: 'Armenian', 40: 'Interlingua', 41: 'Indonesian', 42: 'Icelandic', 43: 'Italian', 44: 'Hebrew', 45: 'Japanese', 46: 'Javanese', 47: 'Georgian', 48: 'Kazakh', 49: 'Central Khmer', 50: 'Kannada', 51: 'Korean', 52: 'Latin', 53: 'Luxembourgish', 54: 'Lingala', 55: 'Lao', 56: 'Lithuanian', 57: 'Latvian', 58: 'Malagasy', 59: 'Maori', 60: 'Macedonian', 61: 'Malayalam', 62: 'Mongolian', 63: 'Marathi', 64: 'Malay', 65: 'Maltese', 66: 'Burmese', 67: 'Nepali', 68: 'Dutch', 69: 'Norwegian Nynorsk', 70: 'Norwegian', 71: 'Occitan', 72: 'Panjabi', 73: 'Polish', 74: 'Pushto', 75: 'Portuguese', 76: 'Romanian', 77: 'Russian', 78: 'Sanskrit', 79: 'Scots', 80: 'Sindhi', 81: 'Sinhala', 82: 'Slovak', 83: 'Slovenian', 84: 'Shona', 85: 'Somali', 86: 'Albanian', 87: 'Serbian', 88: 'Sundanese', 89: 'Swedish', 90: 'Swahili', 91: 'Tamil', 92: 'Telugu', 93: 'Tajik', 94: 'Thai', 95: 'Turkmen', 96: 'Tagalog', 97: 'Turkish', 98: 'Tatar', 99: 'Ukrainian', 100: 'Urdu', 101: 'Uzbek', 102: 'Vietnamese', 103: 'Waray', 104: 'Yiddish', 105: 'Yoruba', 106: 'Chinese' } LANG_TO_INDEX = {v: k for k, v in INDEX_TO_LANG.items()} def identify_languages(file_path, languages: list[str] = ["Russian", "Belarusian", "Ukrainian", "Kazakh"]) -> dict[ str, float]: language_id = EncoderClassifier.from_hparams(source="speechbrain/lang-id-voxlingua107-ecapa") signal = language_id.load_audio(file_path) lang_scores, _, _, _ = language_id.classify_batch(signal) all_scores = {INDEX_TO_LANG[i]: 100 * math.exp(score) for i, score in enumerate(lang_scores[0])} selected_scores = {lang: float(all_scores[lang]) for lang in languages} return selected_scores def detect_language_local(file_path): language_scores = identify_languages(file_path) language_result = max(language_scores, key=language_scores.get) if language_result.lower() in ["russian", "belarusian", "ukrainian"]: selected_language = "ru" else: selected_language = "kk" return selected_language def transcribe_and_diarize_audio(filename, language): diarized_segments = _diarize_audio(filename) combined_diarized_segments = _combine_segments_with_same_speaker(diarized_segments) transcribed_segments = _transcribe_audio(filename, language) pure_text = "\n".join(segment.text for segment in transcribed_segments) segments = _combine_diarized_and_transcribed_segments( combined_diarized_segments, transcribed_segments, ) diarized_text = " ".join( "[%.1fs -> %.1fs] (%s) %s" % (segment["start"], segment["end"], segment["speaker"], segment["text"]) for segment in segments) return pure_text, diarized_text diarization_pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization-3.1", use_auth_token=os.getenv('HUGGING_FACE_TOKEN'), ) model_size = "large-v3" transcription_model = WhisperModel( model_size, device="cuda:0" if torch.cuda.is_available() else "cpu", # device="cpu", compute_type="int8", # compute_type="int8_float16", # compute_type="float32" ) def get_audio_length_in_minutes(file_path): audio = AudioSegment.from_file(file_path) duration = len(audio) return round(duration / 60000, 2) def _diarize_audio(filename): diarization_pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")) # diarization_pipeline.to(torch.device("cpu")) diarization_result = diarization_pipeline(filename, max_speakers=2, min_speakers=2) diarized_segments = [] for turn, _, speaker in diarization_result.itertracks(yield_label=True): diarized_segments.append( { "segment": {"start": turn.start, "end": turn.end}, "speaker": speaker, } ) return diarized_segments def _combine_segments_with_same_speaker(segments): new_segments = [] prev_segment = cur_segment = segments[0] for i in range(1, len(segments)): cur_segment = segments[i] # check if we have changed speaker ("label") if cur_segment["speaker"] != prev_segment["speaker"] and i < len(segments): # add the start/end times for the super-segment to the new list new_segments.append( { "segment": { "start": prev_segment["segment"]["start"], "end": cur_segment["segment"]["start"], }, "speaker": prev_segment["speaker"], } ) prev_segment = segments[i] return new_segments def _transcribe_audio(filename, language): segments, _ = transcription_model.transcribe( filename, beam_size=20, language=language, ) return list(segments) def _combine_diarized_and_transcribed_segments(diarized_segments, transcribed_segments): # get the end timestamps for each chunk from the ASR output end_timestamps = np.array( [ (chunk.end if chunk.end is not None else sys.float_info.max) for chunk in transcribed_segments ] ) segmented_preds = [] # align the diarizer timestamps and the ASR timestamps for segment in diarized_segments: # get the diarizer end timestamp end_time = segment["segment"]["end"] # find the ASR end timestamp that is closest to the diarizer's end timestamp and cut the transcript to here upto_idx = np.argmin(np.abs(end_timestamps - end_time)) segmented_preds.append( { "speaker": segment["speaker"], "text": "".join( [chunk.text for chunk in transcribed_segments[: upto_idx + 1]] ), "start": transcribed_segments[0].start, "end": transcribed_segments[upto_idx].end, } ) # crop the transcribed_segmentss and timestamp lists according to the latest timestamp (for faster argmin) transcribed_segments = transcribed_segments[upto_idx + 1:] end_timestamps = end_timestamps[upto_idx + 1:] if len(end_timestamps) == 0: break return segmented_preds