Spaces:
Running
Running
| """ | |
| Hugging Face Datasets Reader | |
| This module provides a reader for accessing datasets from Hugging Face Hub. | |
| """ | |
| from typing import TYPE_CHECKING, List, Optional, Union | |
| from graphgen.bases.base_reader import BaseReader | |
| if TYPE_CHECKING: | |
| import numpy as np | |
| import ray | |
| from ray.data import Dataset | |
| class HuggingFaceReader(BaseReader): | |
| """ | |
| Reader for Hugging Face Datasets. | |
| Supports loading datasets from the Hugging Face Hub. | |
| Can specify a dataset by name and optional subset/split. | |
| Columns: | |
| - type: The type of the document (e.g., "text", "image", etc.) | |
| - if type is "text", "content" column must be present (or specify via text_column). | |
| Example: | |
| reader = HuggingFaceReader(text_column="text") | |
| ds = reader.read("wikitext") | |
| # or with split and subset | |
| ds = reader.read("wikitext:wikitext-103-v1:train") | |
| """ | |
| def __init__( | |
| self, | |
| text_column: str = "content", | |
| modalities: Optional[list] = None, | |
| cache_dir: Optional[str] = None, | |
| trust_remote_code: bool = False, | |
| ): | |
| """ | |
| Initialize HuggingFaceReader. | |
| :param text_column: Column name containing text content | |
| :param modalities: List of supported modalities | |
| :param cache_dir: Directory to cache downloaded datasets | |
| :param trust_remote_code: Whether to trust remote code in datasets | |
| """ | |
| super().__init__(text_column=text_column, modalities=modalities) | |
| self.cache_dir = cache_dir | |
| self.trust_remote_code = trust_remote_code | |
| def read( | |
| self, | |
| input_path: Union[str, List[str]], | |
| split: Optional[str] = None, | |
| subset: Optional[str] = None, | |
| streaming: bool = False, | |
| limit: Optional[int] = None, | |
| ) -> "Dataset": | |
| """ | |
| Read dataset from Hugging Face Hub. | |
| :param input_path: Dataset identifier(s) from Hugging Face Hub | |
| Format: "dataset_name" or "dataset_name:subset:split" | |
| Example: "wikitext" or "wikitext:wikitext-103-v1:train" | |
| :param split: Specific split to load (overrides split in path) | |
| :param subset: Specific subset/configuration to load (overrides subset in path) | |
| :param streaming: Whether to stream the dataset instead of downloading | |
| :param limit: Maximum number of samples to load | |
| :return: Ray Dataset containing the data | |
| """ | |
| try: | |
| import datasets as hf_datasets | |
| except ImportError as exc: | |
| raise ImportError( | |
| "The 'datasets' package is required to use HuggingFaceReader. " | |
| "Please install it with: pip install datasets" | |
| ) from exc | |
| if isinstance(input_path, list): | |
| # Handle multiple datasets | |
| all_dss = [] | |
| for path in input_path: | |
| ds = self._load_single_dataset( | |
| path, | |
| split=split, | |
| subset=subset, | |
| streaming=streaming, | |
| limit=limit, | |
| hf_datasets=hf_datasets, | |
| ) | |
| all_dss.append(ds) | |
| if len(all_dss) == 1: | |
| combined_ds = all_dss[0] | |
| else: | |
| combined_ds = all_dss[0].union(*all_dss[1:]) | |
| else: | |
| combined_ds = self._load_single_dataset( | |
| input_path, | |
| split=split, | |
| subset=subset, | |
| streaming=streaming, | |
| limit=limit, | |
| hf_datasets=hf_datasets, | |
| ) | |
| # Validate and filter | |
| combined_ds = combined_ds.map_batches( | |
| self._validate_batch, batch_format="pandas" | |
| ) | |
| combined_ds = combined_ds.filter(self._should_keep_item) | |
| return combined_ds | |
| def _load_single_dataset( | |
| self, | |
| dataset_path: str, | |
| split: Optional[str] = None, | |
| subset: Optional[str] = None, | |
| streaming: bool = False, | |
| limit: Optional[int] = None, | |
| hf_datasets=None, | |
| ) -> "Dataset": | |
| """ | |
| Load a single dataset from Hugging Face Hub. | |
| :param dataset_path: Dataset path, can include subset and split | |
| :param split: Override split | |
| :param subset: Override subset | |
| :param streaming: Whether to stream | |
| :param limit: Max samples | |
| :param hf_datasets: Imported datasets module | |
| :return: Ray Dataset | |
| """ | |
| import numpy as np | |
| import ray | |
| # Parse dataset path format: "dataset_name:subset:split" | |
| parts = dataset_path.split(":") | |
| dataset_name = parts[0] | |
| parsed_subset = parts[1] if len(parts) > 1 else None | |
| parsed_split = parts[2] if len(parts) > 2 else None | |
| # Override with explicit parameters | |
| final_subset = subset or parsed_subset | |
| final_split = split or parsed_split or "train" | |
| # Load dataset from Hugging Face | |
| load_kwargs = { | |
| "cache_dir": self.cache_dir, | |
| "trust_remote_code": self.trust_remote_code, | |
| "streaming": streaming, | |
| } | |
| if final_subset: | |
| load_kwargs["name"] = final_subset | |
| hf_dataset = hf_datasets.load_dataset( | |
| dataset_name, split=final_split, **load_kwargs | |
| ) | |
| # Apply limit before converting to Ray dataset for memory efficiency | |
| if limit: | |
| if streaming: | |
| hf_dataset = hf_dataset.take(limit) | |
| else: | |
| hf_dataset = hf_dataset.select(range(limit)) | |
| # Convert to Ray dataset using lazy evaluation | |
| ray_ds = ray.data.from_huggingface(hf_dataset) | |
| # Define batch processing function for lazy evaluation | |
| def _process_batch(batch: dict[str, "np.ndarray"]) -> dict[str, "np.ndarray"]: | |
| """ | |
| Process a batch of data to add type field and rename text column. | |
| :param batch: A dictionary with column names as keys and numpy arrays | |
| :return: Processed batch dictionary with numpy arrays | |
| """ | |
| if not batch: | |
| return {} | |
| # Get the number of rows in the batch | |
| num_rows = len(next(iter(batch.values()))) | |
| # Add type field if not present | |
| if "type" not in batch: | |
| batch["type"] = np.array(["text"] * num_rows) | |
| # Rename text_column to 'content' if different | |
| if self.text_column != "content" and self.text_column in batch: | |
| batch["content"] = batch.pop(self.text_column) | |
| return batch | |
| # Apply post-processing using map_batches for distributed lazy evaluation | |
| ray_ds = ray_ds.map_batches(_process_batch) | |
| return ray_ds | |