| | |
| | import argparse |
| | import re |
| | from typing import Dict |
| |
|
| | import torch |
| | from datasets import Audio, Dataset, load_dataset, load_metric |
| | from num2words import num2words as n2w |
| | from slugify import slugify |
| |
|
| | from transformers import AutoFeatureExtractor, AutoModelForCTC, pipeline, Wav2Vec2Processor, Wav2Vec2ProcessorWithLM, Wav2Vec2FeatureExtractor |
| | |
| |
|
| | from cardinal_numbers import convert_nums |
| |
|
| |
|
| | def log_results(result: Dataset, args: Dict[str, str]): |
| | """DO NOT CHANGE. This function computes and logs the result metrics.""" |
| |
|
| | log_outputs = args.log_outputs |
| | lm = "withLM" if args.use_lm else "noLM" |
| | model_id = args.model_id.replace("/", "_").replace(".", "") |
| | if args.filter: |
| | extra_args = [args.config, slugify(args.filter), args.split, lm] |
| | else: |
| | extra_args = [args.config, args.split, lm] |
| | dataset_id = "_".join([model_id] + args.dataset.split("/") + extra_args) |
| |
|
| | |
| | wer = load_metric("wer") |
| | cer = load_metric("cer") |
| |
|
| | |
| | wer_result = wer.compute(references=result["target"], predictions=result["prediction"]) |
| | cer_result = cer.compute(references=result["target"], predictions=result["prediction"]) |
| |
|
| | |
| | result_str = f"{dataset_id}\nWER: {wer_result}\nCER: {cer_result}" |
| | print(result_str) |
| |
|
| | with open(f"{dataset_id}_eval_results.txt", "w") as f: |
| | f.write(result_str) |
| | with open(f"{dataset_id}_eval_results.tsv", "w") as f: |
| | f.write("\t".join([args.model_id, args.dataset, args.config, args.filter, args.split, str(lm), str(wer_result), str(cer_result)])) |
| |
|
| | |
| | if log_outputs is not None: |
| | pred_file = f"log_{dataset_id}_predictions.txt" |
| | target_file = f"log_{dataset_id}_targets.txt" |
| |
|
| | with open(pred_file, "w") as p, open(target_file, "w") as t: |
| | |
| | def write_to_file(batch, i): |
| | p.write(f"{i}" + "\n") |
| | p.write(batch["prediction"] + "\n") |
| | t.write(f"{i}" + "\n") |
| | t.write(batch["target"] + "\n") |
| |
|
| | result.map(write_to_file, with_indices=True) |
| |
|
| |
|
| | def normalize_text(original_text: str, dataset: str) -> str: |
| | """DO ADAPT FOR YOUR USE CASE. this function normalizes the target text.""" |
| |
|
| | text = original_text.lower() |
| | if dataset.lower().endswith("fleurs"): |
| | replacements = ( |
| | (r"\be\.kr", "etter kristus fødsel"), |
| | (r"\bf\.kr", "før kristi fødsel"), |
| | (r"\bca[.]?\b", "circa"), |
| | (r"(\d)\s*km/t", r"\1 kilometer i timen"), |
| | (r"(\d)\s*km", r"\1 kilometer"), |
| | (r"(\d)\s*cm", r"\1 centimeter"), |
| | (r"(\d)\s*mm", r"\1 millimeter"), |
| | (r"kl\.", "klokka"), |
| | (r"f\.eks", "for eksempel"), |
| | ) |
| | for abrev, expasion in replacements: |
| | text = re.sub(abrev, expasion, text) |
| | text = re.sub(r'(\d+)[-–](\d+)', r'\1 til \2', text) |
| | text = re.sub(r'(\d{2}):00', r'\1', text) |
| | text = re.sub(r"(\d{2}):0(\d{1})", r"\1 null \2", text) |
| | text = re.sub(r"(\d{1,2}):(\d{1,2})", r"\1 \2", text) |
| | text = re.sub(r"(1[1-9])00", r"\1 hundre", text) |
| | text = re.sub(r"(1[1-9])0([1-9])", r"\1 null \2 ", text) |
| | text = re.sub(r"(1[1-9])([1-9]\d)", r"\1 \2 ", text) |
| | text = re.sub(r"(20)0([1-9])", r"\1 null \2 ", text) |
| | text = re.sub(r"(20)(\d{2})", r"\1 \2 ", text) |
| | text = re.sub(r"(\d{1,3})[.](\d{1,2})", r"\1 dot \2 ", text) |
| | text = re.sub(r"(\d{1,2})[ .](\d{3})", r"\1\2", text) |
| | text = re.sub(r'(\w+)-(\w+)', r'\1 \2', text) |
| | |
| | text = re.compile(r"-?0?[1-9][\d.]*").sub(lambda x: convert_nums(int(x.group(0)), nn=True), text.replace(".", "")) |
| |
|
| |
|
| | chars_to_ignore_regex = '[\,\?\.\!\-\;\:\"\“\%\‘\”\�\'\–\_\\\+\#\/]' |
| | text = re.sub(chars_to_ignore_regex, "", text) + " " |
| |
|
| | if dataset.lower().endswith("nst"): |
| | text = text.lower() |
| | text = text.replace("(...vær stille under dette opptaket...)", "") |
| | text = re.sub('[áàâ]', 'a', text) |
| | text = re.sub('[ä]', 'æ', text) |
| | text = re.sub('[éèëê]', 'e', text) |
| | text = re.sub('[íìïî]', 'i', text) |
| | text = re.sub('[óòöô]', 'o', text) |
| | text = re.sub('[ö]', 'ø', text) |
| | text = re.sub('[ç]', 'c', text) |
| | text = re.sub('[úùüû]', 'u', text) |
| | |
| | text = re.sub('\s+', ' ', text) |
| | elif dataset.lower().endswith("npsc"): |
| | text = re.sub('[áàâ]', 'a', text) |
| | text = re.sub('[ä]', 'æ', text) |
| | text = re.sub('[éèëê]', 'e', text) |
| | text = re.sub('[íìïî]', 'i', text) |
| | text = re.sub('[óòöô]', 'o', text) |
| | text = re.sub('[ö]', 'ø', text) |
| | text = re.sub('[ç]', 'c', text) |
| | text = re.sub('[úùüû]', 'u', text) |
| | text = re.sub('\s+', ' ', text) |
| | elif dataset.lower().endswith("fleurs"): |
| | text = re.sub('[áàâ]', 'a', text) |
| | text = re.sub('[ä]', 'æ', text) |
| | text = re.sub('[éèëê]', 'e', text) |
| | text = re.sub('[íìïî]', 'i', text) |
| | text = re.sub('[óòöô]', 'o', text) |
| | text = re.sub('[ö]', 'ø', text) |
| | text = re.sub('[ç]', 'c', text) |
| | text = re.sub('[úùüû]', 'u', text) |
| | text = re.sub('[«»]', '', text) |
| | text = re.sub('\s+', ' ', text) |
| | text = re.sub('<ee>', 'eee', text) |
| | text = re.sub('<qq>', 'qqq', text) |
| | text = re.sub('<mm>', 'mmm', text) |
| | text = re.sub('<inaudible>', 'xxx', text) |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | return text |
| |
|
| |
|
| | def main(args): |
| | |
| | dataset = load_dataset(args.dataset, args.config, split=args.split, use_auth_token=True) |
| | if args.filter: |
| | attribute, value = list(map(str.strip, args.filter.split(":"))) |
| | dataset = dataset.filter( |
| | lambda x: x[attribute] == value, |
| | desc=f"Filtering on {args.filter}", |
| | ) |
| | |
| | |
| |
|
| | |
| | feature_extractor = AutoFeatureExtractor.from_pretrained(args.model_id) |
| | sampling_rate = feature_extractor.sampling_rate |
| |
|
| | |
| | dataset = dataset.cast_column("audio", Audio(sampling_rate=sampling_rate)) |
| |
|
| | |
| | if args.device is None: |
| | args.device = 0 if torch.cuda.is_available() else -1 |
| | |
| |
|
| | model_instance = AutoModelForCTC.from_pretrained(args.model_id) |
| | if args.use_lm: |
| | processor = Wav2Vec2ProcessorWithLM.from_pretrained(args.model_id) |
| | decoder = processor.decoder |
| | else: |
| | processor = Wav2Vec2Processor.from_pretrained(args.model_id) |
| | decoder = None |
| | asr = pipeline( |
| | "automatic-speech-recognition", |
| | model=model_instance, |
| | tokenizer=processor.tokenizer, |
| | feature_extractor=processor.feature_extractor, |
| | decoder=decoder, |
| | device=args.device |
| | ) |
| |
|
| | |
| | |
| | |
| |
|
| | |
| |
|
| | |
| | def map_to_pred(batch): |
| | prediction = asr( |
| | batch["audio"]["array"], chunk_length_s=args.chunk_length_s, stride_length_s=args.stride_length_s |
| | ) |
| |
|
| | batch["prediction"] = prediction["text"] |
| | batch["target"] = normalize_text(batch[args.text_column], args.dataset) |
| | return batch |
| |
|
| | |
| | result = dataset.map(map_to_pred, remove_columns=dataset.column_names) |
| |
|
| | |
| | |
| | log_results(result, args) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | parser = argparse.ArgumentParser() |
| |
|
| | parser.add_argument( |
| | "--model_id", type=str, required=True, help="Model identifier. Should be loadable with 🤗 Transformers" |
| | ) |
| | parser.add_argument( |
| | "--dataset", |
| | type=str, |
| | required=True, |
| | help="Dataset name to evaluate the `model_id`. Should be loadable with 🤗 Datasets", |
| | ) |
| | parser.add_argument( |
| | "--config", type=str, required=True, help="Config of the dataset. *E.g.* `'en'` for Common Voice" |
| | ) |
| | parser.add_argument( |
| | "--filter", type=str, default="", help="Simple filter on attributes. *E.g.* `region_of_youth:Troms` would pnly keep those samplesfor which the condition is met" |
| | ) |
| | parser.add_argument("--split", type=str, required=True, help="Split of the dataset. *E.g.* `'test'`") |
| | parser.add_argument( |
| | "--text_column", type=str, default="text", help="Column name containing the transcription." |
| | ) |
| | parser.add_argument( |
| | "--chunk_length_s", type=float, default=None, help="Chunk length in seconds. Defaults to 5 seconds." |
| | ) |
| | parser.add_argument( |
| | "--stride_length_s", type=float, default=None, help="Stride of the audio chunks. Defaults to 1 second." |
| | ) |
| | parser.add_argument( |
| | "--log_outputs", action="store_true", help="If defined, write outputs to log file for analysis." |
| | ) |
| | parser.add_argument( |
| | "--device", |
| | type=int, |
| | default=None, |
| | help="The device to run the pipeline on. -1 for CPU (default), 0 for the first GPU and so on.", |
| | ) |
| | parser.add_argument( |
| | "--use_lm", action="store_true", help="If defined, use included language model as the decoder." |
| | ) |
| | args = parser.parse_args() |
| |
|
| | main(args) |
| |
|