Spaces:
Runtime error
Runtime error
| import torchdata.datapipes as dp | |
| import os | |
| import tarfile | |
| from torchdata.datapipes.iter import TarArchiveLoader | |
| from typing import cast, IO, Iterable, Iterator, Optional, Tuple, Dict | |
| from torchdata.datapipes import functional_datapipe | |
| from io import BufferedIOBase | |
| from torchdata.datapipes.utils import StreamWrapper | |
| from torchdata.datapipes.utils.common import validate_pathname_binary_tuple | |
| import warnings | |
| from torchdata.datapipes.iter import IterDataPipe | |
| import json | |
| class TarArchiveLoaderWoException(TarArchiveLoader): | |
| def __iter__(self) -> Iterator[Tuple[str, BufferedIOBase]]: | |
| for data in self.datapipe: | |
| validate_pathname_binary_tuple(data) | |
| pathname, data_stream = data | |
| try: | |
| if isinstance(data_stream, StreamWrapper) and isinstance(data_stream.file_obj, tarfile.TarFile): | |
| tar = data_stream.file_obj | |
| else: | |
| reading_mode = (self.mode if hasattr(data_stream, "seekable") and data_stream.seekable() else | |
| self.mode.replace(":", "|")) | |
| # typing.cast is used here to silence mypy's type checker | |
| tar = tarfile.open(fileobj=cast(Optional[IO[bytes]], data_stream), mode=reading_mode) | |
| for tarinfo in tar: | |
| if not tarinfo.isfile(): | |
| continue | |
| extracted_fobj = tar.extractfile(tarinfo) | |
| if extracted_fobj is None: | |
| warnings.warn(f"failed to extract file {tarinfo.name} from source tarfile {pathname}") | |
| raise tarfile.ExtractError | |
| inner_pathname = os.path.normpath(os.path.join(pathname, tarinfo.name)) | |
| yield inner_pathname, StreamWrapper(extracted_fobj, data_stream, | |
| name=inner_pathname) # type: ignore[misc] | |
| except Exception as e: | |
| warnings.warn(f"Unable to extract files from corrupted tarfile stream {pathname} due to: {e}, abort!") | |
| # raise e | |
| finally: | |
| if isinstance(data_stream, StreamWrapper): | |
| data_stream.autoclose() | |
| class JsonlParserIterDataPipe(IterDataPipe[Tuple[str, Dict]]): | |
| def __init__(self, source_datapipe: IterDataPipe[Tuple[str, IO]], **kwargs) -> None: | |
| self.source_datapipe: IterDataPipe[Tuple[str, IO]] = source_datapipe | |
| self.kwargs = kwargs | |
| def __iter__(self) -> Iterator[Tuple[str, Dict]]: | |
| for file_name, stream in self.source_datapipe: | |
| for idx, line in enumerate(stream): | |
| if line.strip() != '': | |
| try: | |
| yield f'{file_name}_line{idx}', json.loads(line) | |
| except Exception as e: | |
| warnings.warn(f"Error occured when parsing string to json due to: {e} abort!") | |