| """Use a Stanza tokenizer to turn a text file into one tokenized paragraph per line | |
| For example, the output of this script is suitable for Glove | |
| Currently this *only* supports tokenization, no MWT splitting. | |
| It also would be beneficial to have an option to convert spaces into | |
| NBSP, underscore, or some other marker to make it easier to process | |
| languages such as VI which have spaces in them | |
| """ | |
| import argparse | |
| import io | |
| import os | |
| import time | |
| import re | |
| import zipfile | |
| import torch | |
| import stanza | |
| from stanza.models.common.utils import open_read_text, default_device | |
| from stanza.models.tokenization.data import TokenizationDataset | |
| from stanza.models.tokenization.utils import output_predictions | |
| from stanza.pipeline.tokenize_processor import TokenizeProcessor | |
| from stanza.utils.get_tqdm import get_tqdm | |
| tqdm = get_tqdm() | |
| NEWLINE_SPLIT_RE = re.compile(r"\n\s*\n") | |
| def tokenize_to_file(tokenizer, fin, fout, chunk_size=500): | |
| raw_text = fin.read() | |
| documents = NEWLINE_SPLIT_RE.split(raw_text) | |
| for chunk_start in tqdm(range(0, len(documents), chunk_size), leave=False): | |
| chunk_end = min(chunk_start + chunk_size, len(documents)) | |
| chunk = documents[chunk_start:chunk_end] | |
| in_docs = [stanza.Document([], text=d) for d in chunk] | |
| out_docs = tokenizer.bulk_process(in_docs) | |
| for document in out_docs: | |
| for sent_idx, sentence in enumerate(document.sentences): | |
| if sent_idx > 0: | |
| fout.write(" ") | |
| fout.write(" ".join(x.text for x in sentence.tokens)) | |
| fout.write("\n") | |
| def main(args=None): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--lang", type=str, default="sd", help="Which language to use for tokenization") | |
| parser.add_argument("--tokenize_model_path", type=str, default=None, help="Specific tokenizer model to use") | |
| parser.add_argument("input_files", type=str, nargs="+", help="Which input files to tokenize") | |
| parser.add_argument("--output_file", type=str, default="glove.txt", help="Where to write the tokenized output") | |
| parser.add_argument("--model_dir", type=str, default=None, help="Where to get models for a Pipeline (None => default models dir)") | |
| parser.add_argument("--chunk_size", type=int, default=500, help="How many 'documents' to use in a chunk when tokenizing. This is separate from the tokenizer batching - this limits how much memory gets used at once, since we don't need to store an entire file in memory at once") | |
| args = parser.parse_args(args=args) | |
| if os.path.exists(args.output_file): | |
| print("Cowardly refusing to overwrite existing output file %s" % args.output_file) | |
| return | |
| if args.tokenize_model_path: | |
| config = { "model_path": args.tokenize_model_path, | |
| "check_requirements": False } | |
| tokenizer = TokenizeProcessor(config, pipeline=None, device=default_device()) | |
| else: | |
| pipe = stanza.Pipeline(lang=args.lang, processors="tokenize", model_dir=args.model_dir) | |
| tokenizer = pipe.processors["tokenize"] | |
| with open(args.output_file, "w", encoding="utf-8") as fout: | |
| for filename in tqdm(args.input_files): | |
| if filename.endswith(".zip"): | |
| with zipfile.ZipFile(filename) as zin: | |
| input_names = zin.namelist() | |
| for input_name in tqdm(input_names, leave=False): | |
| with zin.open(input_names[0]) as fin: | |
| fin = io.TextIOWrapper(fin, encoding='utf-8') | |
| tokenize_to_file(tokenizer, fin, fout) | |
| else: | |
| with open_read_text(filename, encoding="utf-8") as fin: | |
| tokenize_to_file(tokenizer, fin, fout) | |
| if __name__ == '__main__': | |
| main() | |