| | |
| | |
| | |
| | |
| |
|
| | import shutil |
| | import struct |
| | from functools import lru_cache |
| |
|
| | import numpy as np |
| | import torch |
| | from fairseq.dataclass.constants import DATASET_IMPL_CHOICES |
| | from fairseq.data.fasta_dataset import FastaDataset |
| | from fairseq.file_io import PathManager |
| |
|
| | from . import FairseqDataset |
| |
|
| | from typing import Union |
| |
|
| |
|
| | def best_fitting_int_dtype( |
| | max_int_to_represent, |
| | ) -> Union[np.uint16, np.uint32, np.int64]: |
| |
|
| | if max_int_to_represent is None: |
| | return np.uint32 |
| | elif max_int_to_represent < 65500: |
| | return np.uint16 |
| | elif max_int_to_represent < 4294967295: |
| | return np.uint32 |
| | else: |
| | return np.int64 |
| | |
| | |
| |
|
| |
|
| | def get_available_dataset_impl(): |
| | return list(map(str, DATASET_IMPL_CHOICES)) |
| |
|
| |
|
| | def infer_dataset_impl(path): |
| | if IndexedRawTextDataset.exists(path): |
| | return "raw" |
| | elif IndexedDataset.exists(path): |
| | with open(index_file_path(path), "rb") as f: |
| | magic = f.read(8) |
| | if magic == IndexedDataset._HDR_MAGIC: |
| | return "cached" |
| | elif magic == MMapIndexedDataset.Index._HDR_MAGIC[:8]: |
| | return "mmap" |
| | else: |
| | return None |
| | elif FastaDataset.exists(path): |
| | return "fasta" |
| | else: |
| | return None |
| |
|
| |
|
| | def make_builder(out_file, impl, vocab_size=None): |
| | if impl == "mmap": |
| | return MMapIndexedDatasetBuilder( |
| | out_file, dtype=best_fitting_int_dtype(vocab_size) |
| | ) |
| | elif impl == "fasta": |
| | raise NotImplementedError |
| | else: |
| | return IndexedDatasetBuilder(out_file) |
| |
|
| |
|
| | def make_dataset(path, impl, fix_lua_indexing=False, dictionary=None): |
| | if impl == "raw" and IndexedRawTextDataset.exists(path): |
| | assert dictionary is not None |
| | return IndexedRawTextDataset(path, dictionary) |
| | elif impl == "lazy" and IndexedDataset.exists(path): |
| | return IndexedDataset(path, fix_lua_indexing=fix_lua_indexing) |
| | elif impl == "cached" and IndexedDataset.exists(path): |
| | return IndexedCachedDataset(path, fix_lua_indexing=fix_lua_indexing) |
| | elif impl == "mmap" and MMapIndexedDataset.exists(path): |
| | return MMapIndexedDataset(path) |
| | elif impl == "fasta" and FastaDataset.exists(path): |
| | from fairseq.data.fasta_dataset import EncodedFastaDataset |
| |
|
| | return EncodedFastaDataset(path, dictionary) |
| | return None |
| |
|
| |
|
| | def dataset_exists(path, impl): |
| | if impl == "raw": |
| | return IndexedRawTextDataset.exists(path) |
| | elif impl == "mmap": |
| | return MMapIndexedDataset.exists(path) |
| | else: |
| | return IndexedDataset.exists(path) |
| |
|
| |
|
| | def read_longs(f, n): |
| | a = np.empty(n, dtype=np.int64) |
| | f.readinto(a) |
| | return a |
| |
|
| |
|
| | def write_longs(f, a): |
| | f.write(np.array(a, dtype=np.int64)) |
| |
|
| |
|
| | _code_to_dtype = { |
| | 1: np.uint8, |
| | 2: np.int8, |
| | 3: np.int16, |
| | 4: np.int32, |
| | 5: np.int64, |
| | 6: np.float, |
| | 7: np.double, |
| | 8: np.uint16, |
| | 9: np.uint32, |
| | 10: np.uint64, |
| | } |
| |
|
| |
|
| | def _dtype_header_code(dtype) -> int: |
| | for k in _code_to_dtype.keys(): |
| | if _code_to_dtype[k] == dtype: |
| | return k |
| | raise ValueError(dtype) |
| |
|
| |
|
| | def index_file_path(prefix_path): |
| | return prefix_path + ".idx" |
| |
|
| |
|
| | def data_file_path(prefix_path): |
| | return prefix_path + ".bin" |
| |
|
| |
|
| | class IndexedDataset(FairseqDataset): |
| | """Loader for TorchNet IndexedDataset""" |
| |
|
| | _HDR_MAGIC = b"TNTIDX\x00\x00" |
| |
|
| | def __init__(self, path, fix_lua_indexing=False): |
| | super().__init__() |
| | self.path = path |
| | self.fix_lua_indexing = fix_lua_indexing |
| | self.data_file = None |
| | self.read_index(path) |
| |
|
| | def read_index(self, path): |
| | with open(index_file_path(path), "rb") as f: |
| | magic = f.read(8) |
| | assert magic == self._HDR_MAGIC, ( |
| | "Index file doesn't match expected format. " |
| | "Make sure that --dataset-impl is configured properly." |
| | ) |
| | version = f.read(8) |
| | assert struct.unpack("<Q", version) == (1,) |
| | code, self.element_size = struct.unpack("<QQ", f.read(16)) |
| | self.dtype = _code_to_dtype[code] |
| | self._len, self.s = struct.unpack("<QQ", f.read(16)) |
| | self.dim_offsets = read_longs(f, self._len + 1) |
| | self.data_offsets = read_longs(f, self._len + 1) |
| | self.sizes = read_longs(f, self.s) |
| |
|
| | def read_data(self, path): |
| | self.data_file = open(data_file_path(path), "rb", buffering=0) |
| |
|
| | def check_index(self, i): |
| | if i < 0 or i >= self._len: |
| | raise IndexError("index out of range") |
| |
|
| | def __del__(self): |
| | if self.data_file: |
| | self.data_file.close() |
| |
|
| | @lru_cache(maxsize=8) |
| | def __getitem__(self, i) -> torch.Tensor: |
| | if not self.data_file: |
| | self.read_data(self.path) |
| | self.check_index(i) |
| | tensor_size = self.sizes[self.dim_offsets[i] : self.dim_offsets[i + 1]] |
| | a = np.empty(tensor_size, dtype=self.dtype) |
| | self.data_file.seek(self.data_offsets[i] * self.element_size) |
| | self.data_file.readinto(a) |
| | item = torch.from_numpy(a).long() |
| | if self.fix_lua_indexing: |
| | item -= 1 |
| | return item |
| |
|
| | def __len__(self): |
| | return self._len |
| |
|
| | def num_tokens(self, index): |
| | return self.sizes[index] |
| |
|
| | def size(self, index): |
| | return self.sizes[index] |
| |
|
| | @staticmethod |
| | def exists(path): |
| | return PathManager.exists(index_file_path(path)) and PathManager.exists( |
| | data_file_path(path) |
| | ) |
| |
|
| | @property |
| | def supports_prefetch(self): |
| | return False |
| |
|
| |
|
| | class IndexedCachedDataset(IndexedDataset): |
| | def __init__(self, path, fix_lua_indexing=False): |
| | super().__init__(path, fix_lua_indexing=fix_lua_indexing) |
| | self.cache = None |
| | self.cache_index = {} |
| |
|
| | @property |
| | def supports_prefetch(self): |
| | return True |
| |
|
| | def prefetch(self, indices): |
| | if all(i in self.cache_index for i in indices): |
| | return |
| | if not self.data_file: |
| | self.read_data(self.path) |
| | indices = sorted(set(indices)) |
| | total_size = 0 |
| | for i in indices: |
| | total_size += self.data_offsets[i + 1] - self.data_offsets[i] |
| | self.cache = np.empty(total_size, dtype=self.dtype) |
| | ptx = 0 |
| | self.cache_index.clear() |
| | for i in indices: |
| | self.cache_index[i] = ptx |
| | size = self.data_offsets[i + 1] - self.data_offsets[i] |
| | a = self.cache[ptx : ptx + size] |
| | self.data_file.seek(self.data_offsets[i] * self.element_size) |
| | self.data_file.readinto(a) |
| | ptx += size |
| | if self.data_file: |
| | |
| | self.data_file.close() |
| | self.data_file = None |
| |
|
| | @lru_cache(maxsize=8) |
| | def __getitem__(self, i): |
| | self.check_index(i) |
| | tensor_size = self.sizes[self.dim_offsets[i] : self.dim_offsets[i + 1]] |
| | a = np.empty(tensor_size, dtype=self.dtype) |
| | ptx = self.cache_index[i] |
| | np.copyto(a, self.cache[ptx : ptx + a.size]) |
| | item = torch.from_numpy(a).long() |
| | if self.fix_lua_indexing: |
| | item -= 1 |
| | return item |
| |
|
| |
|
| | class IndexedRawTextDataset(FairseqDataset): |
| | """Takes a text file as input and binarizes it in memory at instantiation. |
| | Original lines are also kept in memory""" |
| |
|
| | def __init__(self, path, dictionary, append_eos=True, reverse_order=False): |
| | self.tokens_list = [] |
| | self.lines = [] |
| | self.sizes = [] |
| | self.append_eos = append_eos |
| | self.reverse_order = reverse_order |
| | self.read_data(path, dictionary) |
| | self.size = len(self.tokens_list) |
| |
|
| | def read_data(self, path, dictionary): |
| | with open(path, "r", encoding="utf-8") as f: |
| | for line in f: |
| | self.lines.append(line.strip("\n")) |
| | tokens = dictionary.encode_line( |
| | line, |
| | add_if_not_exist=False, |
| | append_eos=self.append_eos, |
| | reverse_order=self.reverse_order, |
| | ).long() |
| | self.tokens_list.append(tokens) |
| | self.sizes.append(len(tokens)) |
| | self.sizes = np.array(self.sizes) |
| |
|
| | def check_index(self, i): |
| | if i < 0 or i >= self.size: |
| | raise IndexError("index out of range") |
| |
|
| | @lru_cache(maxsize=8) |
| | def __getitem__(self, i): |
| | self.check_index(i) |
| | return self.tokens_list[i] |
| |
|
| | def get_original_text(self, i): |
| | self.check_index(i) |
| | return self.lines[i] |
| |
|
| | def __del__(self): |
| | pass |
| |
|
| | def __len__(self): |
| | return self.size |
| |
|
| | def num_tokens(self, index): |
| | return self.sizes[index] |
| |
|
| | def size(self, index): |
| | return self.sizes[index] |
| |
|
| | @staticmethod |
| | def exists(path): |
| | return PathManager.exists(path) |
| |
|
| |
|
| | class IndexedDatasetBuilder: |
| | element_sizes = { |
| | np.uint8: 1, |
| | np.int8: 1, |
| | np.int16: 2, |
| | np.int32: 4, |
| | np.int64: 8, |
| | np.float: 4, |
| | np.double: 8, |
| | } |
| |
|
| | def __init__(self, out_file, dtype=np.int32): |
| | self.out_file = open(out_file, "wb") |
| | self.dtype = dtype |
| | self.data_offsets = [0] |
| | self.dim_offsets = [0] |
| | self.sizes = [] |
| | self.element_size = self.element_sizes[self.dtype] |
| |
|
| | def add_item(self, tensor): |
| | |
| | bytes = self.out_file.write(np.array(tensor.numpy() + 1, dtype=self.dtype)) |
| | self.data_offsets.append(self.data_offsets[-1] + bytes / self.element_size) |
| | for s in tensor.size(): |
| | self.sizes.append(s) |
| | self.dim_offsets.append(self.dim_offsets[-1] + len(tensor.size())) |
| |
|
| | def merge_file_(self, another_file): |
| | index = IndexedDataset(another_file) |
| | assert index.dtype == self.dtype |
| |
|
| | begin = self.data_offsets[-1] |
| | for offset in index.data_offsets[1:]: |
| | self.data_offsets.append(begin + offset) |
| | self.sizes.extend(index.sizes) |
| | begin = self.dim_offsets[-1] |
| | for dim_offset in index.dim_offsets[1:]: |
| | self.dim_offsets.append(begin + dim_offset) |
| |
|
| | with open(data_file_path(another_file), "rb") as f: |
| | while True: |
| | data = f.read(1024) |
| | if data: |
| | self.out_file.write(data) |
| | else: |
| | break |
| |
|
| | def finalize(self, index_file): |
| | self.out_file.close() |
| | index = open(index_file, "wb") |
| | index.write(b"TNTIDX\x00\x00") |
| | index.write(struct.pack("<Q", 1)) |
| | index.write( |
| | struct.pack("<QQ", _dtype_header_code(self.dtype), self.element_size) |
| | ) |
| | index.write(struct.pack("<QQ", len(self.data_offsets) - 1, len(self.sizes))) |
| | write_longs(index, self.dim_offsets) |
| | write_longs(index, self.data_offsets) |
| | write_longs(index, self.sizes) |
| | index.close() |
| |
|
| |
|
| | def _warmup_mmap_file(path): |
| | with open(path, "rb") as stream: |
| | while stream.read(100 * 1024 * 1024): |
| | pass |
| |
|
| |
|
| | class MMapIndexedDataset(torch.utils.data.Dataset): |
| | class Index: |
| | _HDR_MAGIC = b"MMIDIDX\x00\x00" |
| |
|
| | @classmethod |
| | def writer(cls, path, dtype): |
| | class _Writer: |
| | def __enter__(self): |
| | self._file = open(path, "wb") |
| |
|
| | self._file.write(cls._HDR_MAGIC) |
| | self._file.write(struct.pack("<Q", 1)) |
| | self._file.write(struct.pack("<B", _dtype_header_code(dtype))) |
| |
|
| | return self |
| |
|
| | @staticmethod |
| | def _get_pointers(sizes): |
| | dtype_size = dtype().itemsize |
| | address = 0 |
| | pointers = [] |
| |
|
| | for size in sizes: |
| | pointers.append(address) |
| | address += size * dtype_size |
| |
|
| | return pointers |
| |
|
| | def write(self, sizes): |
| | pointers = self._get_pointers(sizes) |
| |
|
| | self._file.write(struct.pack("<Q", len(sizes))) |
| |
|
| | sizes = np.array(sizes, dtype=np.int32) |
| | self._file.write(sizes.tobytes(order="C")) |
| | del sizes |
| |
|
| | pointers = np.array(pointers, dtype=np.int64) |
| | self._file.write(pointers.tobytes(order="C")) |
| | del pointers |
| |
|
| | def __exit__(self, exc_type, exc_val, exc_tb): |
| | self._file.close() |
| |
|
| | return _Writer() |
| |
|
| | def __init__(self, path): |
| | with open(path, "rb") as stream: |
| | magic_test = stream.read(9) |
| | assert self._HDR_MAGIC == magic_test, ( |
| | "Index file doesn't match expected format. " |
| | "Make sure that --dataset-impl is configured properly." |
| | ) |
| | version = struct.unpack("<Q", stream.read(8)) |
| | assert (1,) == version |
| |
|
| | (dtype_code,) = struct.unpack("<B", stream.read(1)) |
| | self._dtype = _code_to_dtype[dtype_code] |
| | self._dtype_size = self._dtype().itemsize |
| |
|
| | self._len = struct.unpack("<Q", stream.read(8))[0] |
| | offset = stream.tell() |
| |
|
| | _warmup_mmap_file(path) |
| |
|
| | self._bin_buffer_mmap = np.memmap(path, mode="r", order="C") |
| | self._bin_buffer = memoryview(self._bin_buffer_mmap) |
| | self._sizes = np.frombuffer( |
| | self._bin_buffer, dtype=np.int32, count=self._len, offset=offset |
| | ) |
| | self._pointers = np.frombuffer( |
| | self._bin_buffer, |
| | dtype=np.int64, |
| | count=self._len, |
| | offset=offset + self._sizes.nbytes, |
| | ) |
| |
|
| | def __del__(self): |
| | self._bin_buffer_mmap._mmap.close() |
| | del self._bin_buffer_mmap |
| |
|
| | @property |
| | def dtype(self): |
| | return self._dtype |
| |
|
| | @property |
| | def sizes(self): |
| | return self._sizes |
| |
|
| | @lru_cache(maxsize=8) |
| | def __getitem__(self, i): |
| | return self._pointers[i], self._sizes[i] |
| |
|
| | def __len__(self): |
| | return self._len |
| |
|
| | def __init__(self, path): |
| | super().__init__() |
| |
|
| | self._path = None |
| | self._index = None |
| | self._bin_buffer = None |
| |
|
| | self._do_init(path) |
| |
|
| | def __getstate__(self): |
| | return self._path |
| |
|
| | def __setstate__(self, state): |
| | self._do_init(state) |
| |
|
| | def _do_init(self, path): |
| | self._path = path |
| | self._index = self.Index(index_file_path(self._path)) |
| |
|
| | _warmup_mmap_file(data_file_path(self._path)) |
| | self._bin_buffer_mmap = np.memmap( |
| | data_file_path(self._path), mode="r", order="C" |
| | ) |
| | self._bin_buffer = memoryview(self._bin_buffer_mmap) |
| |
|
| | def __del__(self): |
| | self._bin_buffer_mmap._mmap.close() |
| | del self._bin_buffer_mmap |
| | del self._index |
| |
|
| | def __len__(self): |
| | return len(self._index) |
| |
|
| | @lru_cache(maxsize=8) |
| | def __getitem__(self, i): |
| | ptr, size = self._index[i] |
| | np_array = np.frombuffer( |
| | self._bin_buffer, dtype=self._index.dtype, count=size, offset=ptr |
| | ) |
| | if self._index.dtype != np.int64: |
| | np_array = np_array.astype(np.int64) |
| |
|
| | return torch.from_numpy(np_array) |
| |
|
| | @property |
| | def sizes(self): |
| | return self._index.sizes |
| |
|
| | @property |
| | def supports_prefetch(self): |
| | return False |
| |
|
| | @staticmethod |
| | def exists(path): |
| | return PathManager.exists(index_file_path(path)) and PathManager.exists( |
| | data_file_path(path) |
| | ) |
| |
|
| |
|
| | def get_indexed_dataset_to_local(path) -> str: |
| | local_index_path = PathManager.get_local_path(index_file_path(path)) |
| | local_data_path = PathManager.get_local_path(data_file_path(path)) |
| |
|
| | assert local_index_path.endswith(".idx") and local_data_path.endswith(".bin"), ( |
| | "PathManager.get_local_path does not return files with expected patterns: " |
| | f"{local_index_path} and {local_data_path}" |
| | ) |
| |
|
| | local_path = local_data_path[:-4] |
| | assert local_path == local_index_path[:-4] |
| | return local_path |
| |
|
| |
|
| | class MMapIndexedDatasetBuilder: |
| | def __init__(self, out_file, dtype=np.int64): |
| | self._data_file = open(out_file, "wb") |
| | self._dtype = dtype |
| | self._sizes = [] |
| |
|
| | def add_item(self, tensor): |
| | np_array = np.array(tensor.numpy(), dtype=self._dtype) |
| | self._data_file.write(np_array.tobytes(order="C")) |
| | self._sizes.append(np_array.size) |
| |
|
| | def merge_file_(self, another_file): |
| | |
| | index = MMapIndexedDataset.Index(index_file_path(another_file)) |
| | assert index.dtype == self._dtype |
| |
|
| | for size in index.sizes: |
| | self._sizes.append(size) |
| |
|
| | |
| | with open(data_file_path(another_file), "rb") as f: |
| | shutil.copyfileobj(f, self._data_file) |
| |
|
| | def finalize(self, index_file): |
| | self._data_file.close() |
| |
|
| | with MMapIndexedDataset.Index.writer(index_file, self._dtype) as index: |
| | index.write(self._sizes) |
| |
|