| import librosa |
| from transformers import Wav2Vec2ForCTC, AutoProcessor |
| import torch |
| import numpy as np |
| from pathlib import Path |
| import concurrent.futures |
|
|
| ASR_SAMPLING_RATE = 16_000 |
| CHUNK_LENGTH_S = 60 |
| MAX_CONCURRENT_CHUNKS = 4 |
|
|
| ASR_LANGUAGES = {} |
| with open(f"data/asr/all_langs.tsv") as f: |
| for line in f: |
| iso, name = line.split(" ", 1) |
| ASR_LANGUAGES[iso.strip()] = name.strip() |
|
|
| MODEL_ID = "facebook/mms-1b-all" |
|
|
| processor = AutoProcessor.from_pretrained(MODEL_ID) |
| model = Wav2Vec2ForCTC.from_pretrained(MODEL_ID) |
|
|
| def load_audio(audio_data): |
| if isinstance(audio_data, tuple): |
| sr, audio_samples = audio_data |
| audio_samples = (audio_samples / 32768.0).astype(np.float32) |
| if sr != ASR_SAMPLING_RATE: |
| audio_samples = librosa.resample( |
| audio_samples, orig_sr=sr, target_sr=ASR_SAMPLING_RATE |
| ) |
| elif isinstance(audio_data, np.ndarray): |
| audio_samples = audio_data |
| elif isinstance(audio_data, str): |
| audio_samples = librosa.load(audio_data, sr=ASR_SAMPLING_RATE, mono=True)[0] |
| else: |
| raise ValueError(f"Invalid Audio Input Instance: {type(audio_data)}") |
| return audio_samples |
|
|
| def process_chunk(chunk, device): |
| inputs = processor(chunk, sampling_rate=ASR_SAMPLING_RATE, return_tensors="pt").to(device) |
| with torch.no_grad(): |
| outputs = model(**inputs).logits |
| ids = torch.argmax(outputs, dim=-1)[0] |
| return processor.decode(ids) |
|
|
| def transcribe(audio_data=None, lang="eng (English)"): |
| if audio_data is None or (isinstance(audio_data, np.ndarray) and audio_data.size == 0): |
| return "<<ERROR: Empty Audio Input>>" |
| |
| try: |
| audio_samples = load_audio(audio_data) |
| except Exception as e: |
| return f"<<ERROR: {str(e)}>>" |
|
|
| lang_code = lang.split()[0] |
| processor.tokenizer.set_target_lang(lang_code) |
| model.load_adapter(lang_code) |
|
|
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| model.to(device) |
|
|
| chunk_length = int(CHUNK_LENGTH_S * ASR_SAMPLING_RATE) |
| chunks = [audio_samples[i:i+chunk_length] for i in range(0, len(audio_samples), chunk_length)] |
|
|
| transcriptions = [] |
| |
| with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENT_CHUNKS) as executor: |
| future_to_chunk = {executor.submit(process_chunk, chunk, device): chunk for chunk in chunks} |
| for future in concurrent.futures.as_completed(future_to_chunk): |
| transcriptions.append(future.result()) |
|
|
| return " ".join(transcriptions) |
|
|
| |
| ASR_EXAMPLES = [ |
| ["upload/english.mp3", "eng (English)"], |
| |
| |
| ] |
|
|