import copy import csv import glob import json import logging import logging as log import os import random import re import shutil import string import sys import unicodedata import jiwer import lightning.pytorch as pl import nemo import nemo.collections.asr as nemo_asr import numpy as np import torch from datasets import load_dataset from jiwer import wer from lightning.pytorch.callbacks import Callback, EarlyStopping, ModelCheckpoint from lightning.pytorch.utilities.model_summary import ModelSummary from omegaconf import OmegaConf from scipy.io import wavfile # * Have first: V0 -> this from v0_import.import_scr import push_file_to_hub class LossLogger(Callback): def __init__(self, exp_dir): super().__init__() self.train_losses = [] self.val_losses = [] self.train_wer = [] self.val_wer = [] self.lr_list = [] # ? lr plot self.step_list = [] # ? step plot self.num_last = 100 # ? epoch unit self.num_plot = 100 # ? epoch self.allow_show_plot = False # ? Allow show plot in notebook self.exp_dir = exp_dir def on_train_epoch_end(self, trainer, pl_module): train_loss = trainer.callback_metrics.get('train_loss') epoch_idx = trainer.current_epoch lr = trainer.optimizers[0].param_groups[0]['lr'] # Print lr optimize_step = trainer.global_step # <-- this is what you want log.info(f"Epoch {epoch_idx} ended." + "=" * 100) if train_loss is not None: self.train_losses.append(train_loss.item()) self.lr_list.append(lr) # Add lr self.step_list.append(optimize_step) # Add step log.info( f"Train Loss: {train_loss.item()}, lr: {lr}, step: {optimize_step}") if epoch_idx != 0 and epoch_idx % self.num_plot == 0: self._plot_train() def on_validation_epoch_end(self, trainer, pl_module): val_loss = trainer.callback_metrics.get('val_loss') val_wer = trainer.callback_metrics.get('val_wer') if val_loss is not None: self.val_losses.append(val_loss.item()) log.info(f"Validation Loss: {val_loss.item()}") if val_wer is not None: self.val_wer.append(val_wer.item()) log.info(f"Validation WER: {val_wer.item()}") def _plot_train(self): import matplotlib.pyplot as plt plt.figure(figsize=(20, 16)) # Bigger figure num = self.num_last # ===== Loss Plot ===== plt.subplot(2, 2, 1) plt.plot(self.train_losses[-num:], label='Training Loss', linewidth=1) plt.plot(self.val_losses[-num:], label='Validation Loss', linewidth=1) plt.xlabel('Epoch') plt.ylabel('Loss') plt.legend() plt.title('Training and Validation Loss') plt.grid(True, linestyle='--', alpha=0.6) # ===== WER Plot ===== plt.subplot(2, 2, 2) plt.plot(self.train_wer[-num:], label='Training WER', linewidth=1) plt.plot(self.val_wer[-num:], label='Validation WER', linewidth=1) plt.xlabel('Epoch') plt.ylabel('WER') plt.legend() plt.title('Training and Validation WER') plt.grid(True, linestyle='--', alpha=0.6) # ===== Learning Rate Plot ===== plt.subplot(2, 2, 3) plt.plot(self.lr_list[-num:], label='Learning rate', linewidth=1) plt.xlabel('Epoch') plt.ylabel('LR') plt.legend() plt.title('Learning Rate Schedule') plt.grid(True, linestyle='--', alpha=0.6) # ===== Optimize step Plot ===== plt.subplot(2, 2, 4) plt.plot(self.step_list[-num:], label='Optimize step', linewidth=1) plt.xlabel('Epoch') plt.ylabel('Step') plt.legend() plt.title('Step Optimization') plt.grid(True, linestyle='--', alpha=0.6) plt.tight_layout() # allow_show_plot = True # Allow show plot in notebook if self.allow_show_plot: plt.show() else: plot_png = os.path.join( self.exp_dir, f"training_process_{len(self.val_wer)}.png") plt.savefig(plot_png) push_file_to_hub(plot_png) def on_train_end(self, trainer, pl_module): self.num_last = len(self.val_wer) self._plot_train() config_path = "v2_run/Conformer_nemo/configs/conformer.yaml" # ! NOTE: Setting res_exp_dir = "test_conformer" # ? NOTE: Setting os.makedirs(res_exp_dir, exist_ok=True) src_folder = "v2_run/Conformer_nemo" # ? dst_folder = os.path.join(res_exp_dir, "code-folder") shutil.copytree(src_folder, dst_folder, dirs_exist_ok=True) log.info(f"Copied code to {dst_folder}") def write_txt_exp_dir(name, var): path = os.path.join(res_exp_dir, name) with open(path, "w", encoding="utf-8") as f: f.write(str(var)) f.close() # ============================================================================== def create_time_callbacks(num_keep, min_stop, max_hour): # num_keep = 500 early_stop_callback = EarlyStopping( monitor="val_wer", # Metric to monitor mode="min", # Lower is better stopping_threshold=min_stop, # Stop if val_wer < 0.x patience=num_keep, # Stop immediately when not reduce verbose=True ) # Keep top 5 checkpoints based on val_wer num_avg = 5 save_last = False checkpoint_callback = ModelCheckpoint( dirpath=f"{res_exp_dir}/ckpts", # Dir of ckpts filename="epoch{epoch}-{val_wer:.4f}", monitor="val_wer", # ! Use val_cer metric mode="min", save_top_k=num_avg, # Only keep 5 best save_last=save_last, # Also save last epoch: False ) # max_time_training = "00:09:00:00" max_time_training = f"00:{max_hour}:02:00" callback_list = [LossLogger(res_exp_dir), early_stop_callback, checkpoint_callback] # Difference with root version return max_time_training, callback_list def create_new_trainer(epochs, min_stop, max_hour="09"): # NOTE: Setting max_hour = "00" # ! Must edit when run: Conformer log.info(f"Hour to train is {max_hour}") setting = { 'num_keep': 500, 'precision': 'bf16', # ! Use AMP: Difference with root version 'accumulate_grad_batches': 1, 'max_hour': max_hour, 'enable_progress_bar': False, # Off bar training to shorter log } log.info(f"Precision to train is {setting['precision']}") log.info( f"Grad batch size to train is {16} x {setting['accumulate_grad_batches']}") # ! Bsize # Create callbacks max_time_training, callback_list = create_time_callbacks( num_keep=setting['num_keep'], min_stop=min_stop, max_hour=max_hour) # Training args trainer_dict = { # Hardware 'precision': setting['precision'], # Trade-off 'devices': 1, 'num_nodes': 1, 'accelerator': 'gpu', 'strategy': 'auto', # Must: no multi gpu # Training 'max_epochs': epochs, 'accumulate_grad_batches': setting['accumulate_grad_batches'], 'gradient_clip_val': 0.0, # Prediction monitor 'log_every_n_steps': 100, # Logging in a epoch train 'val_check_interval': 1.0, # Compute wer after 1.0 epoch # No-related 'enable_progress_bar': setting['enable_progress_bar'], 'num_sanity_val_steps': 0, 'check_val_every_n_epoch': 1, # If True, enables cudnn benchmarking for faster training. 'sync_batchnorm': True, 'benchmark': False, # Saving and callback: New setting for callbacks 'enable_checkpointing': True, 'max_time': max_time_training, 'callbacks': callback_list, } write_txt_exp_dir("args_trainer.txt", trainer_dict) trainer = pl.Trainer(**trainer_dict) return trainer # ============================================================================== # Dont need to edit, please.. def reload_nemo_from_avg(best_paths, nemo_model): w_only = False # NOTE: Use w_only = False because it error load_strict = False def average_checkpoints(paths): avg_state_dict = None for path in paths: ckpt = torch.load(path, map_location="cpu", weights_only=w_only)["state_dict"] if avg_state_dict is None: avg_state_dict = {k: v.clone() for k, v in ckpt.items()} else: for k in avg_state_dict: # if it's int/bool, leave as-is if torch.is_floating_point(avg_state_dict[k]): avg_state_dict[k] += ckpt[k] for k in avg_state_dict: if torch.is_floating_point(avg_state_dict[k]): avg_state_dict[k] /= len(paths) return avg_state_dict # Average log.info(f"\n\nBest paths for AVG(model): {best_paths}") avg_weights = average_checkpoints(best_paths) # Assign averaged weights to NeMo model nemo_model = nemo_model.to("cuda" if torch.cuda.is_available() else "cpu") nemo_model.load_state_dict(avg_weights, strict=load_strict) return nemo_model, avg_weights def save_model_to_path(nemo_model, avg_weights, nemo_model_path, avg_ckpt_path): torch.save({"state_dict": avg_weights}, avg_ckpt_path) nemo_model.save_to(nemo_model_path) log.info(f"\n\nSaved avg weights (.ckpt) at {avg_ckpt_path}") log.info(f"Saved averaged NeMo model at {nemo_model_path}") def nemo_inference_for_mfpath(nemo_model, mfpath): def save_gen_list(text_list, gt_list): random_name = ''.join(random.choices( string.ascii_lowercase + string.digits, k=8)) file_path = f"{random_name}.csv" # Save rd name file_path = os.path.join(res_exp_dir, file_path) log.info(f"Saved gen at {file_path}") # Write it as .csv with open(file_path, mode="w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow(["Gen", "GT"]) # header for first, second in zip(text_list, gt_list): writer.writerow([first, second]) with open(mfpath, "r", encoding="utf-8") as fin: data = [json.loads(line) for line in fin] log.info(f"\n\nLoaded {len(data)} entries from {mfpath}") references = [] predictions = [] from tqdm import tqdm for entry in data: # Limit data if need ref = entry['text'] audio_path = entry['audio_filepath'] with torch.no_grad(): pred = nemo_model.transcribe(audio_path, verbose=False)[0].text # if use_norm: # pred = normalize_text_vietnamese(pred) references.append(ref) predictions.append(pred) # Computer wer wer_score = wer(references, predictions) log.info(f"WER: {wer_score}") # Save pred save_gen_list(text_list=predictions, gt_list=references) return wer_score