File size: 3,799 Bytes
9043f3c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 | # Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import os
import typing as tp
import unittest
from tempfile import TemporaryDirectory
from fairseq.binarizer import BinarizeSummary, FileBinarizer, VocabularyDatasetBinarizer
from fairseq.data import Dictionary, indexed_dataset
from tests.utils import make_data, sizes
def build_vocab(data: tp.List[tp.List[str]]) -> Dictionary:
d = Dictionary()
for s in data:
for token in s:
d.add_symbol(token)
d.finalize()
return d
class TestBinarizer(unittest.TestCase):
def compare_ds_data(self, summary, data, prefix, impl, vocab):
self.assertEqual(summary.num_seq, len(data))
self.assertEqual(summary.num_tok, sum([len(s) for s in data]))
dataset = indexed_dataset.make_dataset(prefix, impl)
self.assertEqual(len(dataset), len(data))
decoded = [vocab.string(dataset[i]).split() for i in range(0, len(dataset))]
self.assertEqual(decoded, data)
data_sizes = [i.item() for i in dataset.sizes]
self.assertEqual(data_sizes, sizes(data))
def test_can_binarize_line(self):
data = make_data(length=1)
vocab = build_vocab(data)
binarizer = VocabularyDatasetBinarizer(
vocab,
)
sentence = data[0]
summary = BinarizeSummary()
tensor = binarizer.binarize_line(
" ".join(sentence),
summary,
)
self.assertEqual(len(tensor), len(sentence) + 1)
self.assertEqual(summary.num_tok, len(sentence) + 1)
self.assertEqual(summary.num_seq, 1)
def test_can_binarize_file_chunk(self):
# test without multiprocess logic
with TemporaryDirectory() as dirname:
raw_file = os.path.join(dirname, "raw1")
prefix = os.path.join(dirname, "test1")
impl = "mmap"
data = make_data(out_file=raw_file)
vocab = build_vocab(data)
binarizer = VocabularyDatasetBinarizer(
vocab,
append_eos=False,
)
summary = FileBinarizer._binarize_chunk_and_finalize(
binarizer,
raw_file,
offset_start=0,
offset_end=-1,
output_prefix=prefix,
dataset_impl=impl,
vocab_size=len(vocab),
)
self.compare_ds_data(summary, data, prefix, impl, vocab)
def test_can_multiprocess(self):
with TemporaryDirectory() as dirname:
raw_file = os.path.join(dirname, "raw1")
prefix = os.path.join(dirname, "test1")
impl = "mmap"
data = make_data(out_file=raw_file)
vocab = build_vocab(data)
binarizer = VocabularyDatasetBinarizer(
vocab,
append_eos=False,
)
# with one worker
summary = FileBinarizer.multiprocess_dataset(
raw_file,
impl,
binarizer,
output_prefix=prefix,
vocab_size=len(vocab),
num_workers=1,
)
self.compare_ds_data(summary, data, prefix, impl, vocab)
# with multiple worker
prefix_multi = os.path.join(dirname, "test2")
summary = FileBinarizer.multiprocess_dataset(
raw_file,
impl,
binarizer,
output_prefix=prefix_multi,
vocab_size=len(vocab),
num_workers=3,
)
self.compare_ds_data(summary, data, prefix_multi, impl, vocab)
|