GraphGen / graphgen /models /reader /huggingface_reader.py
github-actions[bot]
Auto-sync from demo at Wed Feb 4 10:34:13 UTC 2026
bf1f6d4
"""
Hugging Face Datasets Reader
This module provides a reader for accessing datasets from Hugging Face Hub.
"""
from typing import TYPE_CHECKING, List, Optional, Union
from graphgen.bases.base_reader import BaseReader
if TYPE_CHECKING:
import numpy as np
import ray
from ray.data import Dataset
class HuggingFaceReader(BaseReader):
"""
Reader for Hugging Face Datasets.
Supports loading datasets from the Hugging Face Hub.
Can specify a dataset by name and optional subset/split.
Columns:
- type: The type of the document (e.g., "text", "image", etc.)
- if type is "text", "content" column must be present (or specify via text_column).
Example:
reader = HuggingFaceReader(text_column="text")
ds = reader.read("wikitext")
# or with split and subset
ds = reader.read("wikitext:wikitext-103-v1:train")
"""
def __init__(
self,
text_column: str = "content",
modalities: Optional[list] = None,
cache_dir: Optional[str] = None,
trust_remote_code: bool = False,
):
"""
Initialize HuggingFaceReader.
:param text_column: Column name containing text content
:param modalities: List of supported modalities
:param cache_dir: Directory to cache downloaded datasets
:param trust_remote_code: Whether to trust remote code in datasets
"""
super().__init__(text_column=text_column, modalities=modalities)
self.cache_dir = cache_dir
self.trust_remote_code = trust_remote_code
def read(
self,
input_path: Union[str, List[str]],
split: Optional[str] = None,
subset: Optional[str] = None,
streaming: bool = False,
limit: Optional[int] = None,
) -> "Dataset":
"""
Read dataset from Hugging Face Hub.
:param input_path: Dataset identifier(s) from Hugging Face Hub
Format: "dataset_name" or "dataset_name:subset:split"
Example: "wikitext" or "wikitext:wikitext-103-v1:train"
:param split: Specific split to load (overrides split in path)
:param subset: Specific subset/configuration to load (overrides subset in path)
:param streaming: Whether to stream the dataset instead of downloading
:param limit: Maximum number of samples to load
:return: Ray Dataset containing the data
"""
try:
import datasets as hf_datasets
except ImportError as exc:
raise ImportError(
"The 'datasets' package is required to use HuggingFaceReader. "
"Please install it with: pip install datasets"
) from exc
if isinstance(input_path, list):
# Handle multiple datasets
all_dss = []
for path in input_path:
ds = self._load_single_dataset(
path,
split=split,
subset=subset,
streaming=streaming,
limit=limit,
hf_datasets=hf_datasets,
)
all_dss.append(ds)
if len(all_dss) == 1:
combined_ds = all_dss[0]
else:
combined_ds = all_dss[0].union(*all_dss[1:])
else:
combined_ds = self._load_single_dataset(
input_path,
split=split,
subset=subset,
streaming=streaming,
limit=limit,
hf_datasets=hf_datasets,
)
# Validate and filter
combined_ds = combined_ds.map_batches(
self._validate_batch, batch_format="pandas"
)
combined_ds = combined_ds.filter(self._should_keep_item)
return combined_ds
def _load_single_dataset(
self,
dataset_path: str,
split: Optional[str] = None,
subset: Optional[str] = None,
streaming: bool = False,
limit: Optional[int] = None,
hf_datasets=None,
) -> "Dataset":
"""
Load a single dataset from Hugging Face Hub.
:param dataset_path: Dataset path, can include subset and split
:param split: Override split
:param subset: Override subset
:param streaming: Whether to stream
:param limit: Max samples
:param hf_datasets: Imported datasets module
:return: Ray Dataset
"""
import numpy as np
import ray
# Parse dataset path format: "dataset_name:subset:split"
parts = dataset_path.split(":")
dataset_name = parts[0]
parsed_subset = parts[1] if len(parts) > 1 else None
parsed_split = parts[2] if len(parts) > 2 else None
# Override with explicit parameters
final_subset = subset or parsed_subset
final_split = split or parsed_split or "train"
# Load dataset from Hugging Face
load_kwargs = {
"cache_dir": self.cache_dir,
"trust_remote_code": self.trust_remote_code,
"streaming": streaming,
}
if final_subset:
load_kwargs["name"] = final_subset
hf_dataset = hf_datasets.load_dataset(
dataset_name, split=final_split, **load_kwargs
)
# Apply limit before converting to Ray dataset for memory efficiency
if limit:
if streaming:
hf_dataset = hf_dataset.take(limit)
else:
hf_dataset = hf_dataset.select(range(limit))
# Convert to Ray dataset using lazy evaluation
ray_ds = ray.data.from_huggingface(hf_dataset)
# Define batch processing function for lazy evaluation
def _process_batch(batch: dict[str, "np.ndarray"]) -> dict[str, "np.ndarray"]:
"""
Process a batch of data to add type field and rename text column.
:param batch: A dictionary with column names as keys and numpy arrays
:return: Processed batch dictionary with numpy arrays
"""
if not batch:
return {}
# Get the number of rows in the batch
num_rows = len(next(iter(batch.values())))
# Add type field if not present
if "type" not in batch:
batch["type"] = np.array(["text"] * num_rows)
# Rename text_column to 'content' if different
if self.text_column != "content" and self.text_column in batch:
batch["content"] = batch.pop(self.text_column)
return batch
# Apply post-processing using map_batches for distributed lazy evaluation
ray_ds = ray_ds.map_batches(_process_batch)
return ray_ds