github-actions[bot] commited on
Commit
bf1f6d4
·
1 Parent(s): 5445ab9

Auto-sync from demo at Wed Feb 4 10:34:13 UTC 2026

Browse files
graphgen/models/__init__.py CHANGED
@@ -33,6 +33,7 @@ if TYPE_CHECKING:
33
  )
34
  from .reader import (
35
  CSVReader,
 
36
  JSONReader,
37
  ParquetReader,
38
  PDFReader,
@@ -92,6 +93,7 @@ _import_map = {
92
  "PickleReader": ".reader",
93
  "RDFReader": ".reader",
94
  "TXTReader": ".reader",
 
95
  # Searcher
96
  "NCBISearch": ".searcher.db.ncbi_searcher",
97
  "RNACentralSearch": ".searcher.db.rnacentral_searcher",
 
33
  )
34
  from .reader import (
35
  CSVReader,
36
+ HuggingFaceReader,
37
  JSONReader,
38
  ParquetReader,
39
  PDFReader,
 
93
  "PickleReader": ".reader",
94
  "RDFReader": ".reader",
95
  "TXTReader": ".reader",
96
+ "HuggingFaceReader": ".reader",
97
  # Searcher
98
  "NCBISearch": ".searcher.db.ncbi_searcher",
99
  "RNACentralSearch": ".searcher.db.rnacentral_searcher",
graphgen/models/reader/__init__.py CHANGED
@@ -1,4 +1,5 @@
1
  from .csv_reader import CSVReader
 
2
  from .json_reader import JSONReader
3
  from .parquet_reader import ParquetReader
4
  from .pdf_reader import PDFReader
 
1
  from .csv_reader import CSVReader
2
+ from .huggingface_reader import HuggingFaceReader
3
  from .json_reader import JSONReader
4
  from .parquet_reader import ParquetReader
5
  from .pdf_reader import PDFReader
graphgen/models/reader/huggingface_reader.py ADDED
@@ -0,0 +1,201 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Hugging Face Datasets Reader
3
+ This module provides a reader for accessing datasets from Hugging Face Hub.
4
+ """
5
+
6
+ from typing import TYPE_CHECKING, List, Optional, Union
7
+
8
+ from graphgen.bases.base_reader import BaseReader
9
+
10
+ if TYPE_CHECKING:
11
+ import numpy as np
12
+ import ray
13
+ from ray.data import Dataset
14
+
15
+
16
+ class HuggingFaceReader(BaseReader):
17
+ """
18
+ Reader for Hugging Face Datasets.
19
+
20
+ Supports loading datasets from the Hugging Face Hub.
21
+ Can specify a dataset by name and optional subset/split.
22
+
23
+ Columns:
24
+ - type: The type of the document (e.g., "text", "image", etc.)
25
+ - if type is "text", "content" column must be present (or specify via text_column).
26
+
27
+ Example:
28
+ reader = HuggingFaceReader(text_column="text")
29
+ ds = reader.read("wikitext")
30
+ # or with split and subset
31
+ ds = reader.read("wikitext:wikitext-103-v1:train")
32
+ """
33
+
34
+ def __init__(
35
+ self,
36
+ text_column: str = "content",
37
+ modalities: Optional[list] = None,
38
+ cache_dir: Optional[str] = None,
39
+ trust_remote_code: bool = False,
40
+ ):
41
+ """
42
+ Initialize HuggingFaceReader.
43
+
44
+ :param text_column: Column name containing text content
45
+ :param modalities: List of supported modalities
46
+ :param cache_dir: Directory to cache downloaded datasets
47
+ :param trust_remote_code: Whether to trust remote code in datasets
48
+ """
49
+ super().__init__(text_column=text_column, modalities=modalities)
50
+ self.cache_dir = cache_dir
51
+ self.trust_remote_code = trust_remote_code
52
+
53
+ def read(
54
+ self,
55
+ input_path: Union[str, List[str]],
56
+ split: Optional[str] = None,
57
+ subset: Optional[str] = None,
58
+ streaming: bool = False,
59
+ limit: Optional[int] = None,
60
+ ) -> "Dataset":
61
+ """
62
+ Read dataset from Hugging Face Hub.
63
+
64
+ :param input_path: Dataset identifier(s) from Hugging Face Hub
65
+ Format: "dataset_name" or "dataset_name:subset:split"
66
+ Example: "wikitext" or "wikitext:wikitext-103-v1:train"
67
+ :param split: Specific split to load (overrides split in path)
68
+ :param subset: Specific subset/configuration to load (overrides subset in path)
69
+ :param streaming: Whether to stream the dataset instead of downloading
70
+ :param limit: Maximum number of samples to load
71
+ :return: Ray Dataset containing the data
72
+ """
73
+ try:
74
+ import datasets as hf_datasets
75
+ except ImportError as exc:
76
+ raise ImportError(
77
+ "The 'datasets' package is required to use HuggingFaceReader. "
78
+ "Please install it with: pip install datasets"
79
+ ) from exc
80
+
81
+ if isinstance(input_path, list):
82
+ # Handle multiple datasets
83
+ all_dss = []
84
+ for path in input_path:
85
+ ds = self._load_single_dataset(
86
+ path,
87
+ split=split,
88
+ subset=subset,
89
+ streaming=streaming,
90
+ limit=limit,
91
+ hf_datasets=hf_datasets,
92
+ )
93
+ all_dss.append(ds)
94
+
95
+ if len(all_dss) == 1:
96
+ combined_ds = all_dss[0]
97
+ else:
98
+ combined_ds = all_dss[0].union(*all_dss[1:])
99
+ else:
100
+ combined_ds = self._load_single_dataset(
101
+ input_path,
102
+ split=split,
103
+ subset=subset,
104
+ streaming=streaming,
105
+ limit=limit,
106
+ hf_datasets=hf_datasets,
107
+ )
108
+
109
+ # Validate and filter
110
+ combined_ds = combined_ds.map_batches(
111
+ self._validate_batch, batch_format="pandas"
112
+ )
113
+ combined_ds = combined_ds.filter(self._should_keep_item)
114
+
115
+ return combined_ds
116
+
117
+ def _load_single_dataset(
118
+ self,
119
+ dataset_path: str,
120
+ split: Optional[str] = None,
121
+ subset: Optional[str] = None,
122
+ streaming: bool = False,
123
+ limit: Optional[int] = None,
124
+ hf_datasets=None,
125
+ ) -> "Dataset":
126
+ """
127
+ Load a single dataset from Hugging Face Hub.
128
+
129
+ :param dataset_path: Dataset path, can include subset and split
130
+ :param split: Override split
131
+ :param subset: Override subset
132
+ :param streaming: Whether to stream
133
+ :param limit: Max samples
134
+ :param hf_datasets: Imported datasets module
135
+ :return: Ray Dataset
136
+ """
137
+ import numpy as np
138
+ import ray
139
+
140
+ # Parse dataset path format: "dataset_name:subset:split"
141
+ parts = dataset_path.split(":")
142
+ dataset_name = parts[0]
143
+ parsed_subset = parts[1] if len(parts) > 1 else None
144
+ parsed_split = parts[2] if len(parts) > 2 else None
145
+
146
+ # Override with explicit parameters
147
+ final_subset = subset or parsed_subset
148
+ final_split = split or parsed_split or "train"
149
+
150
+ # Load dataset from Hugging Face
151
+ load_kwargs = {
152
+ "cache_dir": self.cache_dir,
153
+ "trust_remote_code": self.trust_remote_code,
154
+ "streaming": streaming,
155
+ }
156
+
157
+ if final_subset:
158
+ load_kwargs["name"] = final_subset
159
+
160
+ hf_dataset = hf_datasets.load_dataset(
161
+ dataset_name, split=final_split, **load_kwargs
162
+ )
163
+
164
+ # Apply limit before converting to Ray dataset for memory efficiency
165
+ if limit:
166
+ if streaming:
167
+ hf_dataset = hf_dataset.take(limit)
168
+ else:
169
+ hf_dataset = hf_dataset.select(range(limit))
170
+
171
+ # Convert to Ray dataset using lazy evaluation
172
+ ray_ds = ray.data.from_huggingface(hf_dataset)
173
+
174
+ # Define batch processing function for lazy evaluation
175
+ def _process_batch(batch: dict[str, "np.ndarray"]) -> dict[str, "np.ndarray"]:
176
+ """
177
+ Process a batch of data to add type field and rename text column.
178
+
179
+ :param batch: A dictionary with column names as keys and numpy arrays
180
+ :return: Processed batch dictionary with numpy arrays
181
+ """
182
+ if not batch:
183
+ return {}
184
+
185
+ # Get the number of rows in the batch
186
+ num_rows = len(next(iter(batch.values())))
187
+
188
+ # Add type field if not present
189
+ if "type" not in batch:
190
+ batch["type"] = np.array(["text"] * num_rows)
191
+
192
+ # Rename text_column to 'content' if different
193
+ if self.text_column != "content" and self.text_column in batch:
194
+ batch["content"] = batch.pop(self.text_column)
195
+
196
+ return batch
197
+
198
+ # Apply post-processing using map_batches for distributed lazy evaluation
199
+ ray_ds = ray_ds.map_batches(_process_batch)
200
+
201
+ return ray_ds
graphgen/operators/read/read.py CHANGED
@@ -4,6 +4,7 @@ from typing import TYPE_CHECKING, Any, List, Optional, Union
4
  from graphgen.common.init_storage import init_storage
5
  from graphgen.models import (
6
  CSVReader,
 
7
  JSONReader,
8
  ParquetReader,
9
  PDFReader,
@@ -51,6 +52,103 @@ def _build_reader(suffix: str, cache_dir: str | None, **reader_kwargs):
51
  return reader_cls(**reader_kwargs)
52
 
53
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54
  def read(
55
  input_path: Union[str, List[str]],
56
  allowed_suffix: Optional[List[str]] = None,
@@ -63,8 +161,11 @@ def read(
63
  ) -> "ray.data.Dataset":
64
  """
65
  Unified entry point to read files of multiple types using Ray Data.
 
66
 
67
- :param input_path: File or directory path(s) to read from
 
 
68
  :param allowed_suffix: List of allowed file suffixes (e.g., ['pdf', 'txt'])
69
  :param working_dir: Directory to cache intermediate files (PDF processing)
70
  :param kv_backend: Backend for key-value storage
@@ -76,80 +177,53 @@ def read(
76
  """
77
  import ray
78
 
79
- input_path_cache = init_storage(
80
- backend=kv_backend, working_dir=working_dir, namespace="input_path"
81
- )
 
 
 
 
 
 
 
 
 
 
 
 
82
  read_storage = init_storage(
83
  backend=kv_backend, working_dir=working_dir, namespace="read"
84
  )
85
- try:
86
- # 1. Scan all paths to discover files
87
- logger.info("[READ] Scanning paths: %s", input_path)
88
- with ParallelFileScanner(
89
- input_path_cache=input_path_cache,
90
- allowed_suffix=allowed_suffix,
91
- rescan=False,
92
- max_workers=parallelism if parallelism > 0 else 1,
93
- ) as scanner:
94
- all_files = []
95
- scan_results = scanner.scan(input_path, recursive=recursive)
96
-
97
- for result in scan_results.values():
98
- all_files.extend(result.get("files", []))
99
-
100
- logger.info("[READ] Found %d files to process", len(all_files))
101
-
102
- if not all_files:
103
- raise ValueError("No files found to read.")
104
-
105
- # 2. Group files by suffix to use appropriate reader
106
- files_by_suffix = {}
107
- for file_info in all_files:
108
- suffix = Path(file_info["path"]).suffix.lower().lstrip(".")
109
- if allowed_suffix and suffix not in [
110
- s.lower().lstrip(".") for s in allowed_suffix
111
- ]:
112
- continue
113
- files_by_suffix.setdefault(suffix, []).append(file_info["path"])
114
 
115
- # 3. Create read tasks
116
- read_tasks = []
117
- for suffix, file_paths in files_by_suffix.items():
118
- reader = _build_reader(suffix, working_dir, **reader_kwargs)
119
- ds = reader.read(file_paths)
120
- read_tasks.append(ds)
121
-
122
- # 4. Combine all datasets
123
- if not read_tasks:
124
- raise ValueError("No datasets created from the provided files.")
125
-
126
- if len(read_tasks) == 1:
127
- combined_ds = read_tasks[0]
128
- else:
129
- combined_ds = read_tasks[0].union(*read_tasks[1:])
130
-
131
- if read_nums is not None:
132
- combined_ds = combined_ds.limit(read_nums)
133
-
134
- def add_trace_id(batch):
135
- batch["_trace_id"] = batch.apply(
136
- lambda row: compute_dict_hash(row, prefix="read-"), axis=1
137
  )
138
- records = batch.to_dict(orient="records")
139
- data_to_upsert = {record["_trace_id"]: record for record in records}
140
- read_storage.upsert(data_to_upsert)
141
- read_storage.index_done_callback()
142
- return batch
143
-
144
- combined_ds = combined_ds.map_batches(add_trace_id, batch_format="pandas")
145
 
146
- # sample record
147
- for i, item in enumerate(combined_ds.take(1)):
148
- logger.debug("[READ] Sample record %d: %s", i, item)
149
 
150
- logger.info("[READ] Successfully read files from %s", input_path)
151
- return combined_ds
152
 
153
  except Exception as e:
154
- logger.error("[READ] Failed to read files from %s: %s", input_path, e)
155
  raise
 
4
  from graphgen.common.init_storage import init_storage
5
  from graphgen.models import (
6
  CSVReader,
7
+ HuggingFaceReader,
8
  JSONReader,
9
  ParquetReader,
10
  PDFReader,
 
52
  return reader_cls(**reader_kwargs)
53
 
54
 
55
+ def _process_huggingface_datasets(hf_uris: List[str], reader_kwargs: dict) -> list:
56
+ """Process HuggingFace datasets and return list of Ray datasets."""
57
+ logger.info("[READ] Processing HuggingFace datasets: %s", hf_uris)
58
+ hf_reader = HuggingFaceReader(**reader_kwargs)
59
+ read_tasks = []
60
+ for hf_uri in hf_uris:
61
+ # Parse URI format: "huggingface://dataset_name:subset:split"
62
+ uri_part = hf_uri.replace("huggingface://", "")
63
+ ds = hf_reader.read(uri_part)
64
+ read_tasks.append(ds)
65
+ logger.info("[READ] Successfully loaded %d HuggingFace dataset(s)", len(hf_uris))
66
+ return read_tasks
67
+
68
+
69
+ def _process_local_files(
70
+ local_paths: List[str],
71
+ allowed_suffix: Optional[List[str]],
72
+ kv_backend: str,
73
+ working_dir: str,
74
+ parallelism: int,
75
+ recursive: bool,
76
+ reader_kwargs: dict,
77
+ ) -> list:
78
+ """Process local files and return list of Ray datasets."""
79
+ logger.info("[READ] Scanning local paths: %s", local_paths)
80
+ read_tasks = []
81
+ input_path_cache = init_storage(
82
+ backend=kv_backend, working_dir=working_dir, namespace="input_path"
83
+ )
84
+ with ParallelFileScanner(
85
+ input_path_cache=input_path_cache,
86
+ allowed_suffix=allowed_suffix,
87
+ rescan=False,
88
+ max_workers=parallelism if parallelism > 0 else 1,
89
+ ) as scanner:
90
+ all_files = []
91
+ scan_results = scanner.scan(local_paths, recursive=recursive)
92
+
93
+ for result in scan_results.values():
94
+ all_files.extend(result.get("files", []))
95
+
96
+ logger.info("[READ] Found %d files to process", len(all_files))
97
+
98
+ if all_files:
99
+ # Group files by suffix to use appropriate reader
100
+ files_by_suffix = {}
101
+ for file_info in all_files:
102
+ suffix = Path(file_info["path"]).suffix.lower().lstrip(".")
103
+ if allowed_suffix and suffix not in [
104
+ s.lower().lstrip(".") for s in allowed_suffix
105
+ ]:
106
+ continue
107
+ files_by_suffix.setdefault(suffix, []).append(file_info["path"])
108
+
109
+ # Create read tasks for files
110
+ for suffix, file_paths in files_by_suffix.items():
111
+ reader = _build_reader(suffix, working_dir, **reader_kwargs)
112
+ ds = reader.read(file_paths)
113
+ read_tasks.append(ds)
114
+
115
+ return read_tasks
116
+
117
+
118
+ def _combine_datasets(
119
+ read_tasks: list,
120
+ read_nums: Optional[int],
121
+ read_storage,
122
+ input_path: Union[str, List[str]],
123
+ ) -> "ray.data.Dataset":
124
+ """Combine datasets and apply post-processing."""
125
+ combined_ds = (
126
+ read_tasks[0] if len(read_tasks) == 1 else read_tasks[0].union(*read_tasks[1:])
127
+ )
128
+
129
+ if read_nums is not None:
130
+ combined_ds = combined_ds.limit(read_nums)
131
+
132
+ def add_trace_id(batch):
133
+ batch["_trace_id"] = batch.apply(
134
+ lambda row: compute_dict_hash(row, prefix="read-"), axis=1
135
+ )
136
+ records = batch.to_dict(orient="records")
137
+ data_to_upsert = {record["_trace_id"]: record for record in records}
138
+ read_storage.upsert(data_to_upsert)
139
+ read_storage.index_done_callback()
140
+ return batch
141
+
142
+ combined_ds = combined_ds.map_batches(add_trace_id, batch_format="pandas")
143
+
144
+ # sample record
145
+ for i, item in enumerate(combined_ds.take(1)):
146
+ logger.debug("[READ] Sample record %d: %s", i, item)
147
+
148
+ logger.info("[READ] Successfully read data from %s", input_path)
149
+ return combined_ds
150
+
151
+
152
  def read(
153
  input_path: Union[str, List[str]],
154
  allowed_suffix: Optional[List[str]] = None,
 
161
  ) -> "ray.data.Dataset":
162
  """
163
  Unified entry point to read files of multiple types using Ray Data.
164
+ Supports both local files and Hugging Face datasets.
165
 
166
+ :param input_path: File or directory path(s) to read from, or HuggingFace dataset URIs
167
+ Format for HuggingFace: "huggingface://dataset_name:subset:split"
168
+ Example: "huggingface://wikitext:wikitext-103-v1:train"
169
  :param allowed_suffix: List of allowed file suffixes (e.g., ['pdf', 'txt'])
170
  :param working_dir: Directory to cache intermediate files (PDF processing)
171
  :param kv_backend: Backend for key-value storage
 
177
  """
178
  import ray
179
 
180
+ # Convert single input_path to list for uniform processing
181
+ if isinstance(input_path, str):
182
+ input_paths = [input_path]
183
+ else:
184
+ input_paths = input_path
185
+
186
+ # Separate HuggingFace URIs from local file paths
187
+ hf_uris = []
188
+ local_paths = []
189
+ for path in input_paths:
190
+ if isinstance(path, str) and path.startswith("huggingface://"):
191
+ hf_uris.append(path)
192
+ else:
193
+ local_paths.append(path)
194
+
195
  read_storage = init_storage(
196
  backend=kv_backend, working_dir=working_dir, namespace="read"
197
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
198
 
199
+ try:
200
+ read_tasks = []
201
+
202
+ # 1. Process HuggingFace datasets if any
203
+ if hf_uris:
204
+ read_tasks.extend(_process_huggingface_datasets(hf_uris, reader_kwargs))
205
+
206
+ # 2. Process local file paths if any
207
+ if local_paths:
208
+ read_tasks.extend(
209
+ _process_local_files(
210
+ local_paths,
211
+ allowed_suffix,
212
+ kv_backend,
213
+ working_dir,
214
+ parallelism,
215
+ recursive,
216
+ reader_kwargs,
 
 
 
 
217
  )
218
+ )
 
 
 
 
 
 
219
 
220
+ # 3. Validate we have at least one dataset
221
+ if not read_tasks:
222
+ raise ValueError("No datasets created from the provided input paths.")
223
 
224
+ # 4. Combine and process datasets
225
+ return _combine_datasets(read_tasks, read_nums, read_storage, input_path)
226
 
227
  except Exception as e:
228
+ logger.error("[READ] Failed to read data from %s: %s", input_path, e)
229
  raise