Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """AudioSpeechSentimentAnalysis_JRMDIOUF.ipynb | |
| Automatically generated by Colab. | |
| Original file is located at | |
| https://colab.research.google.com/drive/1tizgeMs7DXaZPQO3V253paATKev0ra0m | |
| """ | |
| #!pip install transformers | |
| #!pip install wandb | |
| import os | |
| os.environ["CUDA_LAUNCH_BLOCKING"] = "1" | |
| import pickle | |
| import re | |
| from typing import DefaultDict | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import pandas as pd | |
| import seaborn as sns | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| import torchaudio | |
| import torchaudio.functional as F | |
| import wandb | |
| # from google.colab import userdata | |
| # from huggingface_hub import login | |
| from sklearn.metrics import ( | |
| accuracy_score, | |
| confusion_matrix, | |
| precision_score, | |
| recall_score, | |
| ) | |
| from torch.utils.data import DataLoader, Dataset, Subset | |
| from transformers import AutoTokenizer, BertModel, Wav2Vec2ForCTC, Wav2Vec2Processor | |
| """hf_token = userdata.get("HF_TOKEN") | |
| wandb_token = userdata.get("WAND_TOKEN")""" | |
| # Commented out IPython magic to ensure Python compatibility. | |
| # %env HF_TOKEN_ENV=$hf_token | |
| """!wget -nc --header "Authorization: Bearer ${HF_TOKEN_ENV}" https://huggingface.co/datasets/asapp/slue/resolve/main/data/voxceleb/dev.tsv | |
| !wget -nc --header "Authorization: Bearer ${HF_TOKEN_ENV}" https://huggingface.co/datasets/asapp/slue/resolve/main/data/voxceleb/fine-tune.tsv | |
| !wget -nc --header "Authorization: Bearer ${HF_TOKEN_ENV}" https://huggingface.co/datasets/asapp/slue/resolve/main/data/voxceleb/test.tsv | |
| !wget -nc --header "Authorization: Bearer ${HF_TOKEN_ENV}" https://huggingface.co/datasets/asapp/slue/resolve/main/data/voxceleb/audio/dev.zip | |
| !wget -nc --header "Authorization: Bearer ${HF_TOKEN_ENV}" https://huggingface.co/datasets/asapp/slue/resolve/main/data/voxceleb/audio/fine-tune.zip | |
| !wget -nc --header "Authorization: Bearer ${HF_TOKEN_ENV}" https://huggingface.co/datasets/asapp/slue/resolve/main/data/voxceleb/audio/test.zip | |
| if not os.path.exists("dev_raw"): | |
| print("dev_raw folder not found. Unzipping dev.zip...") | |
| !unzip -q dev.zip | |
| else: | |
| print("dev_raw folder already exists. Skipping unzip.") | |
| if not os.path.exists("fine-tune_raw"): | |
| print("fine-tune_raw folder not found. Unzipping fine-tune.zip...") | |
| !unzip -q fine-tune.zip | |
| else: | |
| print("fine-tune_raw folder already exists. Skipping unzip.") | |
| if not os.path.exists("test_raw"): | |
| print("test_raw folder not found. Unzipping test.zip...") | |
| !unzip -q test.zip | |
| else: | |
| print("test_raw folder already exists. Skipping unzip.")""" | |
| device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
| NUM_EPOCHS = 5 | |
| BATCH_SIZE = 16 | |
| SAVED_CUSTOM_BERT_TOKEN_MAX_LEN_PATH = "max_len.pkl" | |
| SAVED_CUSTOM_BERT_TOKENIZER_DIR = "bert_tokenizer_local" | |
| SAVED_CUSTOM_BERT_MODEL_PATH = "custom_bert_model.bin" | |
| SAVED_TARGET_CAT_PATH = "categories.bin" | |
| TRAIN_DS_PATH = "fine-tune.tsv" | |
| TEST_DS_PATH = "test.tsv" | |
| BERT_BASE_MODEL = "google-bert/bert-base-uncased" | |
| INTERMEDIATE_CUSTOM_BERT_LAYER_SIZE = 30 | |
| SAVED_AUDIO_MODEL_DIR_PATH = "wav2vec2_local" | |
| AUDIO_BASE_MODEL = "facebook/wav2vec2-base-960h" | |
| PROCESSOR_NAME = "preprocessor_config.json" | |
| MODEL_NAME = "config.json" | |
| SENTIMENT_MODALITIES = ["Neutral", "Positive", "Negative"] | |
| class CustomBertDataset(Dataset): | |
| def __init__( | |
| self, | |
| file_path, | |
| audio_folder, | |
| model_path=BERT_BASE_MODEL, | |
| saved_target_cats_path=SAVED_TARGET_CAT_PATH, | |
| saved_max_len_path=SAVED_CUSTOM_BERT_TOKEN_MAX_LEN_PATH, | |
| ): | |
| self.model_path = model_path | |
| self.tokenizer = AutoTokenizer.from_pretrained(self.model_path) | |
| self.lines = open(file_path).readlines() | |
| self.lines = np.array( | |
| [ | |
| [ | |
| re.split(r"\t+", line.replace("\n", ""))[1], | |
| re.split(r"\t+", line.replace("\n", ""))[4], | |
| re.split(r"\t+", line.replace("\n", ""))[0], | |
| ] | |
| for i, line in enumerate(self.lines) | |
| if line != "\n" and i != 0 | |
| ] | |
| ) | |
| self.elem_cats = self.lines[:, 1] | |
| self.corpus = self.lines[:, 0] | |
| self.audio_files_id = self.lines[:, 2] | |
| # We have to proceed in this order here | |
| self.corpus = [ | |
| sent.lower() | |
| for sent, cat in zip(self.corpus, self.elem_cats) | |
| if cat in SENTIMENT_MODALITIES | |
| ] | |
| self.audio_files = np.array( | |
| [ | |
| os.path.join(audio_folder, f"{file_name}.flac") | |
| for file_name, cat in zip(self.audio_files_id, self.elem_cats) | |
| if cat in SENTIMENT_MODALITIES | |
| ] | |
| ) | |
| self.elem_cats = [cat for cat in self.elem_cats if cat in SENTIMENT_MODALITIES] | |
| self.unique_cats = sorted(list(set(self.elem_cats))) | |
| self.num_class = len(self.unique_cats) | |
| self.cats_dict = {cat: i for i, cat in enumerate(self.unique_cats)} | |
| self.targets = np.array([self.cats_dict[cat] for cat in self.elem_cats]) | |
| torch.save(self.unique_cats, saved_target_cats_path) | |
| self.tokenizer.save_pretrained(SAVED_CUSTOM_BERT_TOKENIZER_DIR) | |
| """entry_dict = DefaultDict(list) | |
| for i in range(len(self.corpus)): | |
| entry_dict[self.targets[i]].append(self.corpus[i]) | |
| self.final_corpus = [] | |
| self.final_targets = [] | |
| n=0 | |
| while n < len(self.corpus): | |
| for key in entry_dict.keys(): | |
| if len(entry_dict[key]) > 0: | |
| self.final_corpus.append(entry_dict[key].pop(0)) | |
| self.final_targets.append(key) | |
| n+=1 | |
| self.corpus = np.array(self.final_corpus) | |
| self.targets = np.array(self.final_targets)""" | |
| self.max_len = 0 | |
| for sent in self.corpus: | |
| input_ids = self.tokenizer.encode(sent, add_special_tokens=True) | |
| self.max_len = max(self.max_len, len(input_ids)) | |
| self.max_len = min(self.max_len, 512) | |
| print(f"Max length : {self.max_len}") | |
| print(f"Nombre de classes : {self.num_class}") | |
| print(f"Exemples de targets : {np.unique(self.targets)}") | |
| # Save max_len | |
| with open(saved_max_len_path, "wb") as f: | |
| pickle.dump(self.max_len, f) | |
| print(f"max_len saved to {saved_max_len_path}") | |
| def __len__(self): | |
| return len(self.elem_cats) | |
| def __getitem__(self, idx): | |
| text = self.corpus[idx] | |
| target = self.targets[idx] | |
| # Vérification : target doit être entre 0 et num_class - 1 | |
| if target < 0 or target >= self.num_class: | |
| raise ValueError( | |
| f"Target out of bounds: {target} not in [0, {self.num_class - 1}]" | |
| ) | |
| encoded_input = self.tokenizer.encode_plus( | |
| text, | |
| max_length=self.max_len, | |
| padding="max_length", | |
| truncation=True, | |
| return_tensors="pt", | |
| ) | |
| return ( | |
| encoded_input["input_ids"].squeeze(0), | |
| encoded_input["attention_mask"].squeeze(0), | |
| torch.tensor(target, dtype=torch.long), | |
| self.audio_files[idx], | |
| ) | |
| # return np.array(encoded_input), torch.tensor(target, dtype=torch.long) | |
| class CustomBertModel(nn.Module): | |
| def __init__(self, num_class, model_path=BERT_BASE_MODEL): | |
| super(CustomBertModel, self).__init__() | |
| self.model_path = model_path | |
| self.num_class = num_class | |
| self.bert = BertModel.from_pretrained(self.model_path) | |
| # Freeze of the parameters of this layer for the training process | |
| for param in self.bert.parameters(): | |
| param.requires_grad = False | |
| # self.proj_intermediate = nn.Sequential(nn.Linear(self.bert.config.hidden_size, INTERMEDIATE_CUSTOM_BERT_LAYER_SIZE),nn.Linear(INTERMEDIATE_CUSTOM_BERT_LAYER_SIZE, INTERMEDIATE_CUSTOM_BERT_LAYER_SIZE), INTERMEDIATE_CUSTOM_BERT_LAYER_SIZE),nn.Linear(INTERMEDIATE_CUSTOM_BERT_LAYER_SIZE, INTERMEDIATE_CUSTOM_BERT_LAYER_SIZE)) | |
| self.proj_lin = nn.Linear(self.bert.config.hidden_size, self.num_class) | |
| def forward(self, input_ids, attention_mask): | |
| x = self.bert(input_ids=input_ids, attention_mask=attention_mask) | |
| x = x.last_hidden_state[:, 0, :] | |
| # x = self.proj_intermediate(x) | |
| x = self.proj_lin(x) | |
| return x | |
| def train_step(model, train_dataloader, loss_fn, optimizer): | |
| num_iterations = len(train_dataloader) | |
| for i in range(NUM_EPOCHS): | |
| print(f"Training Epoch n° {i}") | |
| model.train() | |
| for j, batch in enumerate(train_dataloader): | |
| input = batch[:][0] | |
| attention = batch[:][1] | |
| target = batch[:][2] | |
| output = model(input.to(device), attention.to(device)) | |
| loss = loss_fn(output, target.to(device)) | |
| optimizer.zero_grad() | |
| loss.backward() | |
| optimizer.step() | |
| run.log({"Training loss": loss}) | |
| print(f"Epoch {i+1} | step {j+1} / {num_iterations} | loss : {loss}") | |
| # Save model | |
| torch.save(model.state_dict(), SAVED_CUSTOM_BERT_MODEL_PATH) | |
| print(f"Custom BERT Model saved at {SAVED_CUSTOM_BERT_MODEL_PATH}") | |
| def eval_step( | |
| test_dataloader, | |
| loss_fn, | |
| num_class, | |
| saved_model_path=SAVED_CUSTOM_BERT_MODEL_PATH, | |
| saved_target_cats_path=SAVED_TARGET_CAT_PATH, | |
| ): | |
| y_pred = [] | |
| y_true = [] | |
| num_iterations = len(test_dataloader) | |
| # Load the saved model | |
| saved_model = CustomBertModel(num_class) | |
| saved_model.load_state_dict( | |
| torch.load(saved_model_path, weights_only=False) | |
| ) # Explicitly set weights_only to False | |
| saved_model = saved_model.to(device) | |
| saved_model.eval() # Set the model to evaluation mode | |
| print(f"Model loaded from path :{saved_model_path}") | |
| with torch.no_grad(): | |
| for j, batch in enumerate(test_dataloader): | |
| input = batch[:][0] | |
| attention = batch[:][1] | |
| target = batch[:][2] | |
| output = saved_model(input.to(device), attention.to(device)) | |
| loss = loss_fn(output, target.to(device)) | |
| run.log({"Eval loss": loss}) | |
| print(f"Step {j+1} / {num_iterations} | Eval loss : {loss}") | |
| y_pred.extend(output.cpu().numpy().argmax(axis=1)) | |
| y_true.extend(target.cpu().numpy()) | |
| class_labels = torch.load(saved_target_cats_path, weights_only=False) | |
| true_labels = [class_labels[i] for i in y_true] | |
| pred_labels = [class_labels[i] for i in y_pred] | |
| print(f"Accuracy : {accuracy_score(true_labels, pred_labels)}") | |
| cm = confusion_matrix(true_labels, pred_labels, labels=class_labels) | |
| df_cm = pd.DataFrame(cm, index=class_labels, columns=class_labels) | |
| sns.heatmap(df_cm, annot=True, fmt="d") | |
| plt.title("Confusion Matrix for Sentiment analysis dataset") | |
| plt.xlabel("Predicted Label") | |
| plt.ylabel("True Label") | |
| plt.show() | |
| def eval_pipeline_step( | |
| test_dataloader, | |
| loss_fn, | |
| num_class, | |
| audio_model_dir=SAVED_AUDIO_MODEL_DIR_PATH, | |
| audio_model_name=MODEL_NAME, | |
| audio_processor_name=PROCESSOR_NAME, | |
| saved_model_path=SAVED_CUSTOM_BERT_MODEL_PATH, | |
| saved_target_cats_path=SAVED_TARGET_CAT_PATH, | |
| ): | |
| y_pred = [] | |
| y_true = [] | |
| num_iterations = len(test_dataloader) | |
| # Load the saved model | |
| saved_model = CustomBertModel(num_class) | |
| saved_model.load_state_dict( | |
| torch.load(saved_model_path, weights_only=False) | |
| ) # Explicitly set weights_only to False | |
| saved_model = saved_model.to(device) | |
| saved_model.eval() # Set the model to evaluation mode | |
| print(f"Model loaded from path :{saved_model_path}") | |
| audio_processor = None | |
| audio_model = None | |
| processor_path = os.path.join( | |
| audio_model_dir, audio_processor_name | |
| ) # Check for a key file, like the preprocessor config | |
| model_path = os.path.join( | |
| audio_model_dir, audio_model_name | |
| ) # Check for a key file, like the model config | |
| if ( | |
| os.path.exists(audio_model_dir) | |
| and os.path.exists(processor_path) | |
| and os.path.exists(model_path) | |
| ): | |
| print("Local Wav2Vec2 processor and model found. Loading from local directory.") | |
| audio_processor = Wav2Vec2Processor.from_pretrained(audio_model_dir) | |
| audio_model = Wav2Vec2ForCTC.from_pretrained(audio_model_dir) | |
| else: | |
| print( | |
| "Local Wav2Vec2 processor and model not found. Downloading from Hugging Face Hub." | |
| ) | |
| audio_processor = Wav2Vec2Processor.from_pretrained(AUDIO_BASE_MODEL) | |
| audio_model = Wav2Vec2ForCTC.from_pretrained(AUDIO_BASE_MODEL) | |
| # Optionally save the downloaded model and processor for future use | |
| audio_processor.save_pretrained(audio_model_dir) | |
| audio_model.save_pretrained(audio_model_dir) | |
| print(f"Wav2Vec2 processor and model downloaded and saved to {audio_model_dir}") | |
| # Move audio model to GPU | |
| audio_model = audio_model.to(device) | |
| audio_model.eval() | |
| with torch.no_grad(): | |
| for j, batch in enumerate(test_dataloader): | |
| target = batch[:][2] | |
| audio_file_path = batch[:][3] | |
| encoded_inputs = [] | |
| attention_masks = [] | |
| bundle = torchaudio.pipelines.WAV2VEC2_ASR_BASE_960H | |
| sample_rate = bundle.sample_rate | |
| for audio_file in audio_file_path: | |
| waveform, sr = torchaudio.load(audio_file) | |
| if sr != sample_rate: | |
| print("Resampling") | |
| resampler = torchaudio.transforms.Resample( | |
| orig_freq=sr, new_freq=sample_rate | |
| ) | |
| waveform = resampler(waveform) | |
| # Move waveform to GPU before processing | |
| input_values = audio_processor( | |
| waveform.squeeze().numpy(), | |
| sampling_rate=sample_rate, | |
| return_tensors="pt", | |
| ).input_values.to(device) | |
| with torch.no_grad(): | |
| logits = audio_model(input_values).logits | |
| predicted_ids_hf = torch.argmax(logits, dim=-1) | |
| transcript_hf = audio_processor.decode( | |
| predicted_ids_hf[0].cpu().numpy() | |
| ) # Move predicted_ids_hf back to CPU for decoding | |
| transcript_hf = ( | |
| transcript_hf.lower() if transcript_hf is not None else None | |
| ) | |
| encoded_input = test_dataloader.dataset.tokenizer.encode_plus( | |
| transcript_hf, | |
| max_length=test_dataloader.dataset.max_len, | |
| padding="max_length", | |
| truncation=True, | |
| return_tensors="pt", | |
| ) | |
| encoded_inputs.append(encoded_input["input_ids"].squeeze(0)) | |
| attention_masks.append(encoded_input["attention_mask"].squeeze(0)) | |
| text_input = torch.stack(encoded_inputs) | |
| attention = torch.stack(attention_masks) | |
| output = saved_model(text_input.to(device), attention.to(device)) | |
| loss = loss_fn(output, target.to(device)) | |
| run.log({"Pipeline Eval loss": loss}) | |
| print(f"Step {j+1} / {num_iterations} | Pipeline Eval loss : {loss}") | |
| y_pred.extend(output.cpu().numpy().argmax(axis=1)) | |
| y_true.extend(target.cpu().numpy()) | |
| class_labels = torch.load(saved_target_cats_path, weights_only=False) | |
| true_labels = [class_labels[i] for i in y_true] | |
| pred_labels = [class_labels[i] for i in y_pred] | |
| print(f"Pipeline Accuracy : {accuracy_score(true_labels, pred_labels)}") | |
| cm = confusion_matrix(true_labels, pred_labels, labels=class_labels) | |
| df_cm = pd.DataFrame(cm, index=class_labels, columns=class_labels) | |
| sns.heatmap(df_cm, annot=True, fmt="d") | |
| plt.title("Confusion Matrix for Sentiment analysis Pipeline") | |
| plt.xlabel("Predicted Label") | |
| plt.ylabel("True Label") | |
| plt.show() | |
| def get_audio_sentiment( | |
| input_audio_path, | |
| num_class=len(SENTIMENT_MODALITIES), | |
| audio_model_dir=SAVED_AUDIO_MODEL_DIR_PATH, | |
| audio_model_name=MODEL_NAME, | |
| audio_processor_name=PROCESSOR_NAME, | |
| saved_model_path=SAVED_CUSTOM_BERT_MODEL_PATH, | |
| saved_target_cats_path=SAVED_TARGET_CAT_PATH, | |
| tokenizer_save_directory=SAVED_CUSTOM_BERT_TOKENIZER_DIR, | |
| saved_max_len_path=SAVED_CUSTOM_BERT_TOKEN_MAX_LEN_PATH, | |
| ): | |
| # Load the saved model | |
| saved_model = CustomBertModel(num_class) | |
| saved_model.load_state_dict( | |
| torch.load( | |
| saved_model_path, weights_only=False, map_location=torch.device(device) | |
| ) | |
| ) # Explicitly set weights_only to False | |
| saved_model = saved_model.to(device) | |
| saved_model.eval() # Set the model to evaluation mode | |
| print(f"Model loaded from path :{saved_model_path}") | |
| loaded_tokenizer = AutoTokenizer.from_pretrained(tokenizer_save_directory) | |
| max_len = 0 | |
| with open(saved_max_len_path, "rb") as f: | |
| max_len = pickle.load(f) | |
| audio_processor = None | |
| audio_model = None | |
| processor_path = os.path.join( | |
| audio_model_dir, audio_processor_name | |
| ) # Check for a key file, like the preprocessor config | |
| model_path = os.path.join( | |
| audio_model_dir, audio_model_name | |
| ) # Check for a key file, like the model config | |
| if ( | |
| os.path.exists(audio_model_dir) | |
| and os.path.exists(processor_path) | |
| and os.path.exists(model_path) | |
| ): | |
| print("Local Wav2Vec2 processor and model found. Loading from local directory.") | |
| audio_processor = Wav2Vec2Processor.from_pretrained(audio_model_dir) | |
| audio_model = Wav2Vec2ForCTC.from_pretrained(audio_model_dir) | |
| else: | |
| print( | |
| "Local Wav2Vec2 processor and model not found. Downloading from Hugging Face Hub." | |
| ) | |
| audio_processor = Wav2Vec2Processor.from_pretrained(AUDIO_BASE_MODEL) | |
| audio_model = Wav2Vec2ForCTC.from_pretrained(AUDIO_BASE_MODEL) | |
| # Optionally save the downloaded model and processor for future use | |
| audio_processor.save_pretrained(audio_model_dir) | |
| audio_model.save_pretrained(audio_model_dir) | |
| print(f"Wav2Vec2 processor and model downloaded and saved to {audio_model_dir}") | |
| # Move audio model to GPU | |
| audio_model = audio_model.to(device) | |
| audio_model.eval() | |
| with torch.no_grad(): | |
| audio_file_path = input_audio_path | |
| encoded_inputs = [] | |
| attention_masks = [] | |
| bundle = torchaudio.pipelines.WAV2VEC2_ASR_BASE_960H | |
| sample_rate = bundle.sample_rate | |
| waveform, sr = torchaudio.load(audio_file_path) | |
| if sr != sample_rate: | |
| print("Resampling") | |
| resampler = torchaudio.transforms.Resample( | |
| orig_freq=sr, new_freq=sample_rate | |
| ) | |
| waveform = resampler(waveform) | |
| # Move waveform to GPU before processing | |
| input_values = audio_processor( | |
| waveform.squeeze().numpy(), sampling_rate=sample_rate, return_tensors="pt" | |
| ).input_values.to(device) | |
| with torch.no_grad(): | |
| logits = audio_model(input_values).logits | |
| predicted_ids_hf = torch.argmax(logits, dim=-1) | |
| transcript_hf = audio_processor.decode( | |
| predicted_ids_hf[0].cpu().numpy() | |
| ) # Move predicted_ids_hf back to CPU for decoding | |
| transcript_hf = transcript_hf.lower() if transcript_hf is not None else None | |
| encoded_input = loaded_tokenizer.encode_plus( | |
| transcript_hf, | |
| max_length=max_len, | |
| padding="max_length", | |
| truncation=True, | |
| return_tensors="pt", | |
| ) | |
| encoded_inputs.append(encoded_input["input_ids"].squeeze(0)) | |
| attention_masks.append(encoded_input["attention_mask"].squeeze(0)) | |
| # Stack the lists of tensors before moving to device | |
| text_input = torch.stack(encoded_inputs) | |
| attention = torch.stack(attention_masks) | |
| output = saved_model(text_input.to(device), attention.to(device)) | |
| class_labels = torch.load(saved_target_cats_path, weights_only=False) | |
| return class_labels[output.cpu().numpy().argmax(axis=1)[0]] | |
| # Login using e.g. `huggingface-cli login` to access this dataset | |
| # global_train_ds = load_dataset("asapp/slue-voxceleb", streaming=True, token='jrmd_hf_token') | |
| # global_train_ds = load_dataset('asapp/slue',token='jrmd_hf_token') | |
| # global_train_ds = load_dataset('voxceleb',token='jrmd_hf_token') | |
| # global_test_ds = load_dataset("asapp/slue", "voxceleb", split="test", token='jrmd_hf_token') | |
| # Get torchaudio pipeline components | |
| """bundle = torchaudio.pipelines.WAV2VEC2_ASR_BASE_960H | |
| #model = bundle.get_model() | |
| #labels = bundle.get_labels() | |
| sample_rate = bundle.sample_rate""" | |
| """waveform, sr = torchaudio.load("/content/dev_raw/id10012_0AXjxNXiEzo_00001.flac") | |
| # Resample if sr != sample_rate (or model_hf.config.sampling_rate) | |
| if sr != sample_rate: | |
| print("Resampling") | |
| resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=sample_rate) | |
| waveform = resampler(waveform)""" | |
| # Using torchaudio pipeline - Manual Greedy Decoding | |
| """with torch.no_grad(): | |
| emission = model(waveform)""" | |
| # Assuming emission is log-probabilities or logits | |
| # Perform greedy decoding: get the index of the max probability at each time step | |
| # predicted_ids_torchaudio = torch.argmax(emission[0], dim=-1) | |
| # Process the predicted IDs: remove consecutive duplicates and blank tokens | |
| # Assuming the blank token is at index 0 (which is common for CTC, check labels if unsure) | |
| """processed_ids_torchaudio = [] | |
| for id in predicted_ids_torchaudio[0]: # emission has shape (batch_size, num_frames, num_labels) | |
| if id.item() != 0 and (len(processed_ids_torchaudio) == 0 or id.item() != processed_ids_torchaudio[-1]): | |
| processed_ids_torchaudio.append(id.item())""" | |
| """# Convert token IDs to transcript using labels | |
| #transcript = "".join([labels[id] for id in processed_ids_torchaudio]) | |
| # Using Hugging Face transformers | |
| # Note: processor and model_hf are defined in cell DnJDG6P3BTjZ | |
| # To make this cell fully self-contained, you might want to include their definitions here as well. | |
| # For now, assuming they are defined in a previously executed cell. | |
| processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base-960h") | |
| model_hf = Wav2Vec2ForCTC.from_pretrained("facebook/wav2vec2-base-960h") | |
| # Load and resample waveform | |
| waveform, sr = torchaudio.load("/content/dev_raw/id10012_0AXjxNXiEzo_00001.flac") | |
| if sr != sample_rate: | |
| print("Resampling") | |
| resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=sample_rate) | |
| waveform = resampler(waveform) | |
| input_values = processor(waveform.squeeze().numpy(), sampling_rate=sample_rate, return_tensors="pt").input_values | |
| with torch.no_grad(): | |
| logits = model_hf(input_values).logits | |
| predicted_ids_hf = torch.argmax(logits, dim=-1) | |
| transcript_hf = processor.decode(predicted_ids_hf[0]) | |
| #print("Torchaudio Transcript:", transcript) | |
| print("Hugging Face Transcript:", transcript_hf)""" | |
| if __name__ == "__main__": | |
| wandb.login(key=wandb_token) | |
| run = wandb.init(project="DIT-Wav2Vec-Bert-Sentiment-Analysis-project") | |
| bert_train_dataset = CustomBertDataset(TRAIN_DS_PATH, "fine-tune_raw") | |
| bert_test_dataset = CustomBertDataset(TEST_DS_PATH, "test_raw") | |
| print(f"Size of bert dataset : {len(bert_train_dataset)}") | |
| """train_dataset = Subset(our_bert_dataset, range(int(len(our_bert_dataset)*0.8))) | |
| test_dataset = Subset(our_bert_dataset, range(int(len(our_bert_dataset)*0.8), len(our_bert_dataset)))""" | |
| train_dataloader = DataLoader( | |
| bert_train_dataset, batch_size=BATCH_SIZE, shuffle=True | |
| ) | |
| test_dataloader = DataLoader( | |
| bert_test_dataset, batch_size=BATCH_SIZE, shuffle=False | |
| ) | |
| our_bert_model = CustomBertModel(bert_train_dataset.num_class) | |
| our_bert_model = our_bert_model.to(device) | |
| loss_fn = nn.CrossEntropyLoss() | |
| optimizer = optim.SGD( | |
| filter(lambda p: p.requires_grad, our_bert_model.parameters()), lr=0.01 | |
| ) | |
| train_step(our_bert_model, train_dataloader, loss_fn, optimizer) | |
| eval_step(test_dataloader, loss_fn, bert_train_dataset.num_class) | |
| eval_pipeline_step(test_dataloader, loss_fn, bert_train_dataset.num_class) | |
| test_inference_audio_path = "/content/dev_raw/id10012_0AXjxNXiEzo_00001.flac" | |
| print(get_audio_sentiment(test_inference_audio_path)) | |