Instructions to use QuanHoangNgoc/test_conformer with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- NeMo
How to use QuanHoangNgoc/test_conformer with NeMo:
# tag did not correspond to a valid NeMo domain.
- Notebooks
- Google Colab
- Kaggle
| import copy | |
| import csv | |
| import glob | |
| import json | |
| import logging | |
| import logging as log | |
| import os | |
| import random | |
| import re | |
| import shutil | |
| import string | |
| import sys | |
| import unicodedata | |
| import jiwer | |
| import lightning.pytorch as pl | |
| import nemo | |
| import nemo.collections.asr as nemo_asr | |
| import numpy as np | |
| import torch | |
| from datasets import load_dataset | |
| from jiwer import wer | |
| from lightning.pytorch.callbacks import Callback, EarlyStopping, ModelCheckpoint | |
| from lightning.pytorch.utilities.model_summary import ModelSummary | |
| from omegaconf import OmegaConf | |
| from scipy.io import wavfile | |
| # * Have first: V0 -> this | |
| from v0_import.import_scr import push_file_to_hub | |
| class LossLogger(Callback): | |
| def __init__(self, exp_dir): | |
| super().__init__() | |
| self.train_losses = [] | |
| self.val_losses = [] | |
| self.train_wer = [] | |
| self.val_wer = [] | |
| self.lr_list = [] # ? lr plot | |
| self.step_list = [] # ? step plot | |
| self.num_last = 100 # ? epoch unit | |
| self.num_plot = 100 # ? epoch | |
| self.allow_show_plot = False # ? Allow show plot in notebook | |
| self.exp_dir = exp_dir | |
| def on_train_epoch_end(self, trainer, pl_module): | |
| train_loss = trainer.callback_metrics.get('train_loss') | |
| epoch_idx = trainer.current_epoch | |
| lr = trainer.optimizers[0].param_groups[0]['lr'] # Print lr | |
| optimize_step = trainer.global_step # <-- this is what you want | |
| log.info(f"Epoch {epoch_idx} ended." + "=" * 100) | |
| if train_loss is not None: | |
| self.train_losses.append(train_loss.item()) | |
| self.lr_list.append(lr) # Add lr | |
| self.step_list.append(optimize_step) # Add step | |
| log.info( | |
| f"Train Loss: {train_loss.item()}, lr: {lr}, step: {optimize_step}") | |
| if epoch_idx != 0 and epoch_idx % self.num_plot == 0: | |
| self._plot_train() | |
| def on_validation_epoch_end(self, trainer, pl_module): | |
| val_loss = trainer.callback_metrics.get('val_loss') | |
| val_wer = trainer.callback_metrics.get('val_wer') | |
| if val_loss is not None: | |
| self.val_losses.append(val_loss.item()) | |
| log.info(f"Validation Loss: {val_loss.item()}") | |
| if val_wer is not None: | |
| self.val_wer.append(val_wer.item()) | |
| log.info(f"Validation WER: {val_wer.item()}") | |
| def _plot_train(self): | |
| import matplotlib.pyplot as plt | |
| plt.figure(figsize=(20, 16)) # Bigger figure | |
| num = self.num_last | |
| # ===== Loss Plot ===== | |
| plt.subplot(2, 2, 1) | |
| plt.plot(self.train_losses[-num:], label='Training Loss', linewidth=1) | |
| plt.plot(self.val_losses[-num:], label='Validation Loss', linewidth=1) | |
| plt.xlabel('Epoch') | |
| plt.ylabel('Loss') | |
| plt.legend() | |
| plt.title('Training and Validation Loss') | |
| plt.grid(True, linestyle='--', alpha=0.6) | |
| # ===== WER Plot ===== | |
| plt.subplot(2, 2, 2) | |
| plt.plot(self.train_wer[-num:], label='Training WER', linewidth=1) | |
| plt.plot(self.val_wer[-num:], label='Validation WER', linewidth=1) | |
| plt.xlabel('Epoch') | |
| plt.ylabel('WER') | |
| plt.legend() | |
| plt.title('Training and Validation WER') | |
| plt.grid(True, linestyle='--', alpha=0.6) | |
| # ===== Learning Rate Plot ===== | |
| plt.subplot(2, 2, 3) | |
| plt.plot(self.lr_list[-num:], label='Learning rate', linewidth=1) | |
| plt.xlabel('Epoch') | |
| plt.ylabel('LR') | |
| plt.legend() | |
| plt.title('Learning Rate Schedule') | |
| plt.grid(True, linestyle='--', alpha=0.6) | |
| # ===== Optimize step Plot ===== | |
| plt.subplot(2, 2, 4) | |
| plt.plot(self.step_list[-num:], label='Optimize step', linewidth=1) | |
| plt.xlabel('Epoch') | |
| plt.ylabel('Step') | |
| plt.legend() | |
| plt.title('Step Optimization') | |
| plt.grid(True, linestyle='--', alpha=0.6) | |
| plt.tight_layout() | |
| # allow_show_plot = True # Allow show plot in notebook | |
| if self.allow_show_plot: | |
| plt.show() | |
| else: | |
| plot_png = os.path.join( | |
| self.exp_dir, f"training_process_{len(self.val_wer)}.png") | |
| plt.savefig(plot_png) | |
| push_file_to_hub(plot_png) | |
| def on_train_end(self, trainer, pl_module): | |
| self.num_last = len(self.val_wer) | |
| self._plot_train() | |
| config_path = "v2_run/Conformer_nemo/configs/conformer.yaml" # ! NOTE: Setting | |
| res_exp_dir = "test_conformer" # ? NOTE: Setting | |
| os.makedirs(res_exp_dir, exist_ok=True) | |
| src_folder = "v2_run/Conformer_nemo" # ? | |
| dst_folder = os.path.join(res_exp_dir, "code-folder") | |
| shutil.copytree(src_folder, dst_folder, dirs_exist_ok=True) | |
| log.info(f"Copied code to {dst_folder}") | |
| def write_txt_exp_dir(name, var): | |
| path = os.path.join(res_exp_dir, name) | |
| with open(path, "w", encoding="utf-8") as f: | |
| f.write(str(var)) | |
| f.close() | |
| # ============================================================================== | |
| def create_time_callbacks(num_keep, min_stop, max_hour): | |
| # num_keep = 500 | |
| early_stop_callback = EarlyStopping( | |
| monitor="val_wer", # Metric to monitor | |
| mode="min", # Lower is better | |
| stopping_threshold=min_stop, # Stop if val_wer < 0.x | |
| patience=num_keep, # Stop immediately when not reduce | |
| verbose=True | |
| ) | |
| # Keep top 5 checkpoints based on val_wer | |
| num_avg = 5 | |
| save_last = False | |
| checkpoint_callback = ModelCheckpoint( | |
| dirpath=f"{res_exp_dir}/ckpts", # Dir of ckpts | |
| filename="epoch{epoch}-{val_wer:.4f}", | |
| monitor="val_wer", # ! Use val_cer metric | |
| mode="min", | |
| save_top_k=num_avg, # Only keep 5 best | |
| save_last=save_last, # Also save last epoch: False | |
| ) | |
| # max_time_training = "00:09:00:00" | |
| max_time_training = f"00:{max_hour}:02:00" | |
| callback_list = [LossLogger(res_exp_dir), | |
| early_stop_callback, checkpoint_callback] # Difference with root version | |
| return max_time_training, callback_list | |
| def create_new_trainer(epochs, min_stop, max_hour="09"): | |
| # NOTE: Setting | |
| max_hour = "00" # ! Must edit when run: Conformer | |
| log.info(f"Hour to train is {max_hour}") | |
| setting = { | |
| 'num_keep': 500, | |
| 'precision': 'bf16', # ! Use AMP: Difference with root version | |
| 'accumulate_grad_batches': 1, | |
| 'max_hour': max_hour, | |
| 'enable_progress_bar': False, # Off bar training to shorter log | |
| } | |
| log.info(f"Precision to train is {setting['precision']}") | |
| log.info( | |
| f"Grad batch size to train is {16} x {setting['accumulate_grad_batches']}") # ! Bsize | |
| # Create callbacks | |
| max_time_training, callback_list = create_time_callbacks( | |
| num_keep=setting['num_keep'], min_stop=min_stop, max_hour=max_hour) | |
| # Training args | |
| trainer_dict = { | |
| # Hardware | |
| 'precision': setting['precision'], # Trade-off | |
| 'devices': 1, | |
| 'num_nodes': 1, | |
| 'accelerator': 'gpu', | |
| 'strategy': 'auto', # Must: no multi gpu | |
| # Training | |
| 'max_epochs': epochs, | |
| 'accumulate_grad_batches': setting['accumulate_grad_batches'], | |
| 'gradient_clip_val': 0.0, | |
| # Prediction monitor | |
| 'log_every_n_steps': 100, # Logging in a epoch train | |
| 'val_check_interval': 1.0, # Compute wer after 1.0 epoch | |
| # No-related | |
| 'enable_progress_bar': setting['enable_progress_bar'], | |
| 'num_sanity_val_steps': 0, | |
| 'check_val_every_n_epoch': 1, | |
| # If True, enables cudnn benchmarking for faster training. | |
| 'sync_batchnorm': True, | |
| 'benchmark': False, | |
| # Saving and callback: New setting for callbacks | |
| 'enable_checkpointing': True, | |
| 'max_time': max_time_training, | |
| 'callbacks': callback_list, | |
| } | |
| write_txt_exp_dir("args_trainer.txt", trainer_dict) | |
| trainer = pl.Trainer(**trainer_dict) | |
| return trainer | |
| # ============================================================================== | |
| # Dont need to edit, please.. | |
| def reload_nemo_from_avg(best_paths, nemo_model): | |
| w_only = False # NOTE: Use w_only = False because it error | |
| load_strict = False | |
| def average_checkpoints(paths): | |
| avg_state_dict = None | |
| for path in paths: | |
| ckpt = torch.load(path, map_location="cpu", | |
| weights_only=w_only)["state_dict"] | |
| if avg_state_dict is None: | |
| avg_state_dict = {k: v.clone() for k, v in ckpt.items()} | |
| else: | |
| for k in avg_state_dict: | |
| # if it's int/bool, leave as-is | |
| if torch.is_floating_point(avg_state_dict[k]): | |
| avg_state_dict[k] += ckpt[k] | |
| for k in avg_state_dict: | |
| if torch.is_floating_point(avg_state_dict[k]): | |
| avg_state_dict[k] /= len(paths) | |
| return avg_state_dict | |
| # Average | |
| log.info(f"\n\nBest paths for AVG(model): {best_paths}") | |
| avg_weights = average_checkpoints(best_paths) | |
| # Assign averaged weights to NeMo model | |
| nemo_model = nemo_model.to("cuda" if torch.cuda.is_available() else "cpu") | |
| nemo_model.load_state_dict(avg_weights, strict=load_strict) | |
| return nemo_model, avg_weights | |
| def save_model_to_path(nemo_model, avg_weights, nemo_model_path, avg_ckpt_path): | |
| torch.save({"state_dict": avg_weights}, avg_ckpt_path) | |
| nemo_model.save_to(nemo_model_path) | |
| log.info(f"\n\nSaved avg weights (.ckpt) at {avg_ckpt_path}") | |
| log.info(f"Saved averaged NeMo model at {nemo_model_path}") | |
| def nemo_inference_for_mfpath(nemo_model, mfpath): | |
| def save_gen_list(text_list, gt_list): | |
| random_name = ''.join(random.choices( | |
| string.ascii_lowercase + string.digits, k=8)) | |
| file_path = f"{random_name}.csv" | |
| # Save rd name | |
| file_path = os.path.join(res_exp_dir, file_path) | |
| log.info(f"Saved gen at {file_path}") | |
| # Write it as .csv | |
| with open(file_path, mode="w", newline="", encoding="utf-8") as f: | |
| writer = csv.writer(f) | |
| writer.writerow(["Gen", "GT"]) # header | |
| for first, second in zip(text_list, gt_list): | |
| writer.writerow([first, second]) | |
| with open(mfpath, "r", encoding="utf-8") as fin: | |
| data = [json.loads(line) for line in fin] | |
| log.info(f"\n\nLoaded {len(data)} entries from {mfpath}") | |
| references = [] | |
| predictions = [] | |
| from tqdm import tqdm | |
| for entry in data: # Limit data if need | |
| ref = entry['text'] | |
| audio_path = entry['audio_filepath'] | |
| with torch.no_grad(): | |
| pred = nemo_model.transcribe(audio_path, verbose=False)[0].text | |
| # if use_norm: | |
| # pred = normalize_text_vietnamese(pred) | |
| references.append(ref) | |
| predictions.append(pred) | |
| # Computer wer | |
| wer_score = wer(references, predictions) | |
| log.info(f"WER: {wer_score}") | |
| # Save pred | |
| save_gen_list(text_list=predictions, gt_list=references) | |
| return wer_score | |