|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
from dataclasses import dataclass |
|
|
from typing import Any, Literal, Optional, Union |
|
|
|
|
|
from datasets import load_dataset |
|
|
|
|
|
from ...config.data_args import DataArguments |
|
|
from ...extras.types import DatasetInfo, HFDataset |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class DataLoaderPlugin: |
|
|
"""Plugin for loading dataset.""" |
|
|
|
|
|
args: DataArguments |
|
|
"""Data arguments.""" |
|
|
|
|
|
def _get_builder_name(self, path: str) -> Literal["arrow", "csv", "json", "parquet", "text"]: |
|
|
"""Get dataset builder name. |
|
|
|
|
|
Args: |
|
|
path (str): Dataset path. |
|
|
|
|
|
Returns: |
|
|
Literal["arrow", "csv", "json", "parquet", "text"]: Dataset builder name. |
|
|
""" |
|
|
return os.path.splitext(path)[-1][1:].replace("jsonl", "json").replace("txt", "text") |
|
|
|
|
|
def auto_load_data(self, dataset_info: DatasetInfo) -> HFDataset: |
|
|
dataset_dir = dataset_info.get("dataset_dir", self.args.dataset_dir) |
|
|
split = dataset_info.get("split", "train") |
|
|
streaming = dataset_info.get("streaming", False) |
|
|
if "file_name" in dataset_info: |
|
|
filepath = os.path.join(dataset_dir, dataset_info["file_name"]) |
|
|
return self.load_data_from_file(filepath, split, streaming) |
|
|
else: |
|
|
raise NotImplementedError() |
|
|
|
|
|
def load_data_from_file(self, filepath: str, split: str, streaming: bool) -> HFDataset: |
|
|
if os.path.isdir(filepath): |
|
|
filetype = self._get_builder_name(os.listdir(filepath)[0]) |
|
|
dataset = load_dataset(filetype, data_dir=filepath, split=split) |
|
|
elif os.path.isfile(filepath): |
|
|
filetype = self._get_builder_name(filepath) |
|
|
dataset = load_dataset(filetype, data_files=filepath, split=split) |
|
|
else: |
|
|
raise ValueError(f"Can not load dataset from {filepath}.") |
|
|
|
|
|
if streaming: |
|
|
dataset = dataset.to_iterable_dataset() |
|
|
|
|
|
return dataset |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class DataIndexPlugin: |
|
|
"""Plugin for adjusting dataset index.""" |
|
|
|
|
|
def adjust_data_index( |
|
|
self, data_index: list[tuple[str, int]], size: Optional[int], weight: Optional[float] |
|
|
) -> list[tuple[str, int]]: |
|
|
"""Adjust dataset index by size and weight. |
|
|
|
|
|
Args: |
|
|
data_index (list[tuple[str, int]]): List of (dataset_name, sample_index). |
|
|
size (Optional[int]): Desired dataset size. |
|
|
weight (Optional[float]): Desired dataset weight. |
|
|
|
|
|
Returns: |
|
|
list[tuple[str, int]]: Adjusted dataset index. |
|
|
""" |
|
|
if size is not None: |
|
|
data_index = self.adjust_by_size(data_index, size) |
|
|
|
|
|
if weight is not None: |
|
|
data_index = self.adjust_by_weight(data_index, weight) |
|
|
|
|
|
return data_index |
|
|
|
|
|
def adjust_by_size(self, data_index: list[tuple[str, int]], size: int) -> list[tuple[str, int]]: |
|
|
raise NotImplementedError() |
|
|
|
|
|
def adjust_by_weight(self, data_index: list[tuple[str, int]], weight: float) -> list[tuple[str, int]]: |
|
|
raise NotImplementedError() |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class DataSelectorPlugin: |
|
|
"""Plugin for selecting dataset samples.""" |
|
|
|
|
|
data_index: list[tuple[str, int]] |
|
|
"""List of (dataset_name, sample_index)""" |
|
|
|
|
|
def select(self, index: Union[slice, list[int], Any]) -> Union[tuple[str, int], list[tuple[str, int]]]: |
|
|
"""Select dataset samples. |
|
|
|
|
|
Args: |
|
|
index (Union[slice, list[int], Any]): Index of dataset samples. |
|
|
|
|
|
Returns: |
|
|
Union[tuple[str, int], list[tuple[str, int]]]: Selected dataset samples. |
|
|
""" |
|
|
if isinstance(index, slice): |
|
|
return [self.data_index[i] for i in range(*index.indices(len(self.data_index)))] |
|
|
elif isinstance(index, list): |
|
|
return [self.data_index[i] for i in index] |
|
|
else: |
|
|
raise ValueError(f"Invalid index type {type(index)}.") |
|
|
|