# Copyright (c) Facebook, Inc. and its affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import mmap import os import shutil import struct import typing as tp from functools import lru_cache import numpy as np import torch from fairseq.data import indexed_dataset from fairseq.data.huffman import HuffmanCoder from fairseq.file_io import PathManager class HuffmanMMapIndex: """ keep an index of the offsets in the huffman binary file. First a header, then the list of sizes (num tokens) for each instance and finally the addresses of each instance. """ _HDR_MAGIC = b"HUFFIDX\x00\x00" _VERSION = 1 @classmethod def writer(cls, path: str, data_len: int): class _Writer: def __enter__(self): self._file = open(path, "wb") # write header (magic + version) self._file.write(cls._HDR_MAGIC) self._file.write(struct.pack(" None: self._path_prefix = path_prefix self._coder = coder self._sizes = [] self._ptrs = [] self._data_len = 0 def open(self): self._coder.to_file(vocab_file_path(self._path_prefix)) self._data_file = open(indexed_dataset.data_file_path(self._path_prefix), "wb") def __enter__(self) -> "HuffmanMMapIndexedDatasetBuilder": self.open() return self def add_item(self, tokens: tp.List[str]) -> None: """ add a list of tokens to the dataset, they will compressed with the provided coder before being written to file. """ encoded = self._coder.encode(tokens) code_len = len(encoded) last_ptr = 0 if len(self._ptrs) > 0: last_ptr = self._ptrs[-1] self._sizes.append(len(tokens)) self._ptrs.append(last_ptr + code_len) self._data_len += code_len self._data_file.write(encoded) def append(self, other_dataset_path_prefix: str) -> None: """ append an existing dataset. Beware, if it wasn't built with the same coder, you are in trouble. """ other_index = HuffmanMMapIndex( indexed_dataset.index_file_path(other_dataset_path_prefix) ) for (ptr, size) in other_index: self._ptrs.append(ptr + self._data_len) self._sizes.append(size) # Concatenate data with open(indexed_dataset.data_file_path(other_dataset_path_prefix), "rb") as f: shutil.copyfileobj(f, self._data_file) self._data_len += other_index.data_len def close(self): self._data_file.close() with HuffmanMMapIndex.writer( indexed_dataset.index_file_path(self._path_prefix), self._data_len ) as index: index.write(self._sizes, self._ptrs) def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.close()