| | |
| | import torch |
| | import torch.distributed as dist |
| | import sys |
| | sys.path.append("..") |
| |
|
| | import pytest |
| | import glob |
| | import tqdm |
| | import os |
| | import argparse |
| | import stanza |
| | import json |
| | from transformers import AutoTokenizer |
| |
|
| | def chunk_text(text, tokenizer, max_length=512): |
| | tokens = tokenizer(text)['input_ids'] |
| | chunks = [tokens[i:i + max_length] for i in range(0, len(tokens), max_length)] |
| | return [tokenizer.decode(chunk, skip_special_tokens=True) for chunk in chunks] |
| |
|
| | def init_distributed_mode(): |
| | dist.init_process_group(backend='nccl') |
| | rank = dist.get_rank() |
| | torch.cuda.set_device(rank) |
| | return rank |
| |
|
| | def process_single_file(file, rank, tokenizer, nlp1, nlp2): |
| | print(f"GPU {rank}: Processing {file.name}") |
| | lines = file.readlines() |
| |
|
| | |
| | num_lines = len(lines) |
| | num_gpus = dist.get_world_size() |
| |
|
| | lines_per_gpu = (num_lines + num_gpus - 1) // num_gpus |
| | start_idx = rank * lines_per_gpu |
| | end_idx = min(start_idx + lines_per_gpu, num_lines) |
| | gpu_lines = lines[start_idx:end_idx] |
| |
|
| | line_batches = [gpu_lines[i:i + BATCH_SIZE] for i in range(0, len(gpu_lines), BATCH_SIZE)] |
| | text_batches = [" ".join(l) for l in line_batches] |
| |
|
| | line_annotations = [] |
| | for text in tqdm.tqdm(text_batches, desc=f"GPU {rank}"): |
| | text_chunks = chunk_text(text, tokenizer) |
| | for chunk in text_chunks: |
| | doc = nlp1(chunk) |
| | sent_annotations = [] |
| | for sent in doc.sentences: |
| | word_annotations = [] |
| | for token, word in zip(sent.tokens, sent.words): |
| | wa = { |
| | 'id': word.id, |
| | 'text': word.text, |
| | 'lemma': word.lemma, |
| | 'upos': word.upos, |
| | 'xpos': word.xpos, |
| | 'feats': word.feats, |
| | 'start_char': token.start_char, |
| | 'end_char': token.end_char |
| | } |
| | word_annotations.append(wa) |
| |
|
| | sa = { |
| | 'sent_text': sent.text, |
| | 'word_annotations': word_annotations |
| | } |
| | if args.parse: |
| | sa['constituency_parse'] = __get_constituency_parse(sent, nlp2) |
| | |
| | sent_annotations.append(sa) |
| | line_annotations.append({'sent_annotations': sent_annotations}) |
| |
|
| | |
| | temp_filename = os.path.splitext(file.name)[0] + f'_rank{rank}.json' |
| | with open(temp_filename, "w") as outfile: |
| | json.dump(line_annotations, outfile, indent=4) |
| |
|
| | return temp_filename |
| |
|
| | def merge_files(temp_files, output_file): |
| | merged_data = [] |
| | for file in temp_files: |
| | with open(file, "r") as infile: |
| | data = json.load(infile) |
| | merged_data.extend(data) |
| | os.remove(file) |
| | |
| | with open(output_file, "w") as outfile: |
| | json.dump(merged_data, outfile, indent=4) |
| |
|
| | def run_on_gpu(rank, args, tokenizer, nlp1, nlp2): |
| | print(f"Running on Rank {rank}, using GPU {torch.cuda.current_device()}") |
| |
|
| | temp_files = [] |
| | if len(args.path) == 1: |
| | temp_files.append(process_single_file(args.path[0], rank, tokenizer, nlp1, nlp2)) |
| | dist.barrier() |
| | if rank == 0: |
| | |
| | final_output = os.path.splitext(args.path[0].name)[0] + '_merged.json' |
| | merge_files(temp_files, final_output) |
| | else: |
| | files_per_gpu = len(args.path) // dist.get_world_size() |
| | start_idx = rank * files_per_gpu |
| | end_idx = start_idx + files_per_gpu if rank != dist.get_world_size() - 1 else len(args.path) |
| | gpu_files = args.path[start_idx:end_idx] |
| | |
| | for file in gpu_files: |
| | process_single_file(file, rank, tokenizer, nlp1, nlp2) |
| |
|
| | def __get_constituency_parse(sent, nlp): |
| | try: |
| | parse_doc = nlp(sent.text) |
| | except: |
| | return None |
| | parse_trees = [str(sent.constituency) for sent in parse_doc.sentences] |
| | return "(ROOT " + " ".join(parse_trees) + ")" |
| |
|
| | if __name__ == "__main__": |
| | parser = argparse.ArgumentParser( |
| | prog='Tag BabyLM dataset', |
| | description='Tag BabyLM dataset using Stanza') |
| | parser.add_argument('path', type=argparse.FileType('r'), |
| | nargs='+', help="Path to file(s)") |
| | parser.add_argument('-p', '--parse', action='store_true', |
| | help="Include constituency parse") |
| | args = parser.parse_args() |
| |
|
| | rank = init_distributed_mode() |
| |
|
| | BATCH_SIZE = 1000 |
| | tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") |
| | nlp1 = stanza.Pipeline(lang='en', processors='tokenize,pos,lemma', package="default_accurate", use_gpu=True) |
| |
|
| | nlp2 = None |
| | if args.parse: |
| | nlp2 = stanza.Pipeline(lang='en', processors='tokenize,pos,constituency', package="default_accurate", use_gpu=True) |
| |
|
| | run_on_gpu(rank, args, tokenizer, nlp1, nlp2) |